package net.onrc.onos.datagrid;

import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.datagrid.web.DatagridWebRoutable;
import net.onrc.onos.ofcontroller.devicemanager.IDeviceEventHandler;
import net.onrc.onos.ofcontroller.devicemanager.OnosDevice;
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.PerformanceMonitor;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
import com.hazelcast.instance.GroupProperties;
import net.onrc.onos.intent.Intent;

/**
 * A datagrid service that uses Hazelcast as a datagrid.
 * The relevant data is stored in the Hazelcast datagrid and shared as
 * appropriate in a multi-node cluster.
 */
public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
    private static final int MAX_BUFFER_SIZE = 64 * 1024;

    static final Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
    private IRestApiService restApi;

    static final String HAZELCAST_CONFIG_FILE = "datagridConfig";
    private HazelcastInstance hazelcastInstance;
    private Config hazelcastConfig;

    private final KryoFactory kryoFactory = new KryoFactory();
    private IFlowEventHandlerService flowEventHandlerService;

    private final Map<String, IEventChannel<?, ?>> eventChannels = new HashMap<>();

    // State related to the Flow map
    private static final String MAP_FLOW_NAME = "mapFlow";
    private IMap<Long, byte[]> mapFlow;
    private MapFlowListener mapFlowListener;
    private String mapFlowListenerId;

    // State related to the Flow Entry map
    private static final String MAP_FLOW_ENTRY_NAME = "mapFlowEntry";
    private IMap<Long, byte[]> mapFlowEntry;
    private MapFlowEntryListener mapFlowEntryListener;
    private String mapFlowEntryListenerId;

    // State related to the Flow ID map
    private static final String MAP_FLOW_ID_NAME = "mapFlowId";
    private IMap<Long, byte[]> mapFlowId;
    private MapFlowIdListener mapFlowIdListener;
    private String mapFlowIdListenerId;

    // State related to the Flow Entry ID map
    private static final String MAP_FLOW_ENTRY_ID_NAME = "mapFlowEntryId";
    private IMap<Long, byte[]> mapFlowEntryId;
    private MapFlowEntryIdListener mapFlowEntryIdListener;
    private String mapFlowEntryIdListenerId;

    // State related to the Network Topology map
    private static final String MAP_TOPOLOGY_NAME = "mapTopology";
    private IMap<String, byte[]> mapTopology;
    private MapTopologyListener mapTopologyListener;
    private String mapTopologyListenerId;

    // State related to the packet out map
    private static final String PACKET_OUT_MAP_NAME = "packetOutMap";
    private IMap<PacketOutNotification, byte[]> packetOutMap;
    private final List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<>();

    private final byte[] dummyByte = {0};

    // State related to the ARP reply map
    private static final String ARP_REPLY_MAP_NAME = "arpReplyMap";
    private IMap<ArpReplyNotification, byte[]> arpReplyMap;
    private final List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<>();


    private static final String INTENT_LIST_NAME = "intentList";
    private IList<Intent> intentList;

    @Override
    public void registerIntent(Collection<Intent> intents) {
        intentList.addAll(intents);
    }


    // State related to the Network Device map
    private static final String MAP_DEVICE_NAME = "mapDevice";
    private IMap<Long, OnosDevice> mapDevice;
    private final List<IDeviceEventHandler> deviceEventHandlers = new ArrayList<>();

    /**
     * Class for receiving notifications for Flow state.
     * <p/>
     * The datagrid map is:
     * - Key : Flow ID (Long)
     * - Value : Serialized FlowPath (byte[])
     */
    class MapFlowListener implements EntryListener<Long, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryAdded(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowPath flowPath = kryo.readObject(input, FlowPath.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowAdded(flowPath);
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryRemoved(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowPath flowPath = kryo.readObject(input, FlowPath.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryUpdated(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowPath flowPath = kryo.readObject(input, FlowPath.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryEvicted(EntryEvent<Long, byte[]> event) {
            // NOTE: We don't use eviction for this map
        }
    }

    /**
     * Class for receiving notifications for FlowEntry state.
     * <p/>
     * The datagrid map is:
     * - Key : FlowEntry ID (Long)
     * - Value : Serialized FlowEntry (byte[])
     */
    class MapFlowEntryListener implements EntryListener<Long, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryAdded(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryRemoved(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryUpdated(EntryEvent<Long, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryEvicted(EntryEvent<Long, byte[]> event) {
            // NOTE: We don't use eviction for this map
        }
    }

    /**
     * Class for receiving notifications for FlowId state.
     * <p/>
     * The datagrid map is:
     * - Key : FlowId (Long)
     * - Value : Serialized Switch Dpid (byte[])
     */
    class MapFlowIdListener implements EntryListener<Long, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        public void entryAdded(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowId flowId = new FlowId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        public void entryRemoved(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowId flowId = new FlowId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        public void entryUpdated(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowId flowId = new FlowId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        public void entryEvicted(EntryEvent<Long, byte[]> event) {
            // NOTE: We don't use eviction for this map
        }
    }

    /**
     * Class for receiving notifications for FlowEntryId state.
     * <p/>
     * The datagrid map is:
     * - Key : FlowEntryId (Long)
     * - Value : Serialized Switch Dpid (byte[])
     */
    class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        public void entryAdded(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowEntryId flowEntryId = new FlowEntryId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        public void entryRemoved(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowEntryId flowEntryId = new FlowEntryId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        public void entryUpdated(EntryEvent<Long, byte[]> event) {
            Long keyLong = event.getKey();
            FlowEntryId flowEntryId = new FlowEntryId(keyLong);

            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        public void entryEvicted(EntryEvent<Long, byte[]> event) {
            // NOTE: We don't use eviction for this map
        }
    }

    /**
     * Class for receiving notifications for Network Topology state.
     * <p/>
     * The datagrid map is:
     * - Key: TopologyElement ID (String)
     * - Value: Serialized TopologyElement (byte[])
     */
    class MapTopologyListener implements EntryListener<String, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryAdded(EntryEvent<String, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            TopologyElement topologyElement =
                    kryo.readObject(input, TopologyElement.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryRemoved(EntryEvent<String, byte[]> event) {
//          String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
            String tag = "TopologyEntryRemoved.NotificationReceived";
            PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            TopologyElement topologyElement =
                    kryo.readObject(input, TopologyElement.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
//          PerformanceMonitor.stop(tag);
            m.stop();
//          PerformanceMonitor.report(tag);
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryUpdated(EntryEvent<String, byte[]> event) {
            byte[] valueBytes = event.getValue();

            //
            // Decode the value and deliver the notification
            //
            Kryo kryo = kryoFactory.newKryo();
            Input input = new Input(valueBytes);
            TopologyElement topologyElement =
                    kryo.readObject(input, TopologyElement.class);
            kryoFactory.deleteKryo(kryo);
            flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryEvicted(EntryEvent<String, byte[]> event) {
            // NOTE: We don't use eviction for this map
        }
    }

    /**
     * MapDeviceListener - reacts to Device related events.
     */
    class MapDeviceListener implements EntryListener<Long, OnosDevice> {

        @Override
        public void entryAdded(EntryEvent<Long, OnosDevice> event) {
            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
                deviceEventHandler.addDeviceEvent(event.getKey(), event.getValue());
            }
        }

        @Override
        public void entryRemoved(EntryEvent<Long, OnosDevice> event) {
            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
                deviceEventHandler.deleteDeviceEvent(event.getKey(), event.getValue());
            }
        }

        @Override
        public void entryUpdated(EntryEvent<Long, OnosDevice> event) {
            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
                deviceEventHandler.updateDeviceEvent(event.getKey(), event.getValue());
            }
        }

        @Override
        public void entryEvicted(EntryEvent<Long, OnosDevice> arg0) {
            //Not used.
        }
    }

    /**
     * Class for receiving notifications for sending packet-outs.
     * <p/>
     * The datagrid map is:
     * - Key: Packet-out to send (PacketOutNotification)
     * - Value: dummy value (we only need the key) (byte[])
     */
    class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
            for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
                packetOutEventHandler.packetOutNotification(event.getKey());
            }
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
            // Not used
        }

        /**
         * Receive a notification that an entry is updated.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
            // Not used
        }

        /**
         * Receive a notification that an entry is evicted.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
            // Not used
        }
    }

    /**
     * Class for receiving notifications for sending packet-outs.
     * <p/>
     * The datagrid map is:
     * - Key: Packet-out to send (PacketOutNotification)
     * - Value: dummy value (we only need the key) (byte[])
     */
    class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param event the notification event for the entry.
         */
        @Override
        public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
            triggerEventHandler(event.getKey());
        }

        @Override
        public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {
            triggerEventHandler(event.getKey());
        }

        @Override
        public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {
            // Not used for ARP replies
        }

        @Override
        public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {
            // Not used for ARP replies
        }

        /**
         * Handle an event.
         * @param notification notification
         */
        private void triggerEventHandler(ArpReplyNotification notification) {
            for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
                arpReplyEventHandler.arpReplyEvent(notification);
            }
        }
    }

    /**
     * Initialize the Hazelcast Datagrid operation.
     *
     * @param configFilename the configuration filename.
     */
    public void init(String configFilename) {
    /*
        System.setProperty("hazelcast.socket.receive.buffer.size", "32");
        System.setProperty("hazelcast.socket.send.buffer.size", "32");
        */
        // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");

        // Init from configuration file
        try {
            hazelcastConfig = new FileSystemXmlConfig(configFilename);
        } catch (FileNotFoundException e) {
            log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
        }
        /*
        hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
        hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
        hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
        */
        //
        hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
        hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
        hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
    }

    /**
     * Shutdown the Hazelcast Datagrid operation.
     */
    @Override
    protected void finalize() {
        close();
    }

    /**
     * Shutdown the Hazelcast Datagrid operation.
     */
    public void close() {
        Hazelcast.shutdownAll();
    }

    /**
     * Get the collection of offered module services.
     *
     * @return the collection of offered module services.
     */
    @Override
    public Collection<Class<? extends IFloodlightService>> getModuleServices() {
        Collection<Class<? extends IFloodlightService>> l =
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(IDatagridService.class);
        return l;
    }

    /**
     * Get the collection of implemented services.
     *
     * @return the collection of implemented services.
     */
    @Override
    public Map<Class<? extends IFloodlightService>, IFloodlightService>
    getServiceImpls() {
        Map<Class<? extends IFloodlightService>,
                IFloodlightService> m =
                new HashMap<Class<? extends IFloodlightService>,
                        IFloodlightService>();
        m.put(IDatagridService.class, this);
        return m;
    }

    /**
     * Get the collection of modules this module depends on.
     *
     * @return the collection of modules this module depends on.
     */
    @Override
    public Collection<Class<? extends IFloodlightService>>
    getModuleDependencies() {
        Collection<Class<? extends IFloodlightService>> l =
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(IFloodlightProviderService.class);
        l.add(IRestApiService.class);
        return l;
    }

    /**
     * Initialize the module.
     *
     * @param context the module context to use for the initialization.
     * @throws FloodlightModuleException on error
     */
    @Override
    public void init(FloodlightModuleContext context)
            throws FloodlightModuleException {
        restApi = context.getServiceImpl(IRestApiService.class);

        // Get the configuration file name and configure the Datagrid
        Map<String, String> configMap = context.getConfigParams(this);
        String configFilename = configMap.get(HAZELCAST_CONFIG_FILE);
        this.init(configFilename);
    }

    /**
     * Startup module operation.
     *
     * @param context the module context to use for the startup.
     */
    @Override
    public void startUp(FloodlightModuleContext context) {
        hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);

        restApi.addRestletRoutable(new DatagridWebRoutable());

        packetOutMap = hazelcastInstance.getMap(PACKET_OUT_MAP_NAME);
        packetOutMap.addEntryListener(new PacketOutMapListener(), true);

        arpReplyMap = hazelcastInstance.getMap(ARP_REPLY_MAP_NAME);
        arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
        intentList = hazelcastInstance.getList(INTENT_LIST_NAME);

        mapDevice = hazelcastInstance.getMap(MAP_DEVICE_NAME);
        mapDevice.addEntryListener(new MapDeviceListener(), true);
    }

    /**
     * Create an event channel.
     * <p/>
     * If the channel already exists, just return it.
     * NOTE: The channel is started automatically.
     *
     * @param channelName the event channel name.
     * @param <K>         the type of the Key in the Key-Value store.
     * @param <V>         the type of the Value in the Key-Value store.
     * @param typeK       the type of the Key in the Key-Value store.
     * @param typeV       the type of the Value in the Key-Value store.
     * @return the event channel for the channel name.
     */
    @Override
    public <K, V> IEventChannel<K, V> createChannel(String channelName,
                                                    Class<K> typeK, Class<V> typeV) {
        IEventChannel<K, V> eventChannel =
                createChannelImpl(channelName, typeK, typeV);
        eventChannel.startup();

        return eventChannel;
    }

    /**
     * Create an event channel implementation.
     * <p/>
     * If the channel already exists, just return it.
     * NOTE: The caller must call IEventChannel.startup() to startup the
     * channel operation.
     *
     * @param channelName the event channel name.
     * @param <K>         the type of the Key in the Key-Value store.
     * @param <V>         the type of the Value in the Key-Value store.
     * @param typeK       the type of the Key in the Key-Value store.
     * @param typeV       the type of the Value in the Key-Value store.
     * @return the event channel for the channel name.
     */
    private synchronized <K, V> IEventChannel<K, V> createChannelImpl(
            String channelName,
            Class<K> typeK, Class<V> typeV) {
        IEventChannel<K, V> castedEventChannel;
        IEventChannel<?, ?> genericEventChannel =
                eventChannels.get(channelName);

        // Add the channel if the first listener
        if (genericEventChannel == null) {
            castedEventChannel =
                    new HazelcastEventChannel<K, V>(hazelcastInstance,
                            channelName, typeK, typeV);
            eventChannels.put(channelName, castedEventChannel);
        } else {
            //
            // TODO: Find if we can use Java internal support to check for
            // type mismatch.
            //
            if (!genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
                throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
            }
            castedEventChannel = (IEventChannel<K, V>) genericEventChannel;
        }

        return castedEventChannel;
    }

    /**
     * Add event channel listener.
     *
     * NOTE: The channel is started automatically right after the listener
     * is added.
     *
     * @param channelName the event channel name.
     * @param listener    the listener to add.
     * @param <K>         the type of the Key in the Key-Value store.
     * @param <V>         the type of the Value in the Key-Value store.
     * @param typeK       the type of the Key in the Key-Value store.
     * @param typeV       the type of the Value in the Key-Value store.
     * @return the event channel for the channel name.
     */
    @Override
    public <K, V> IEventChannel<K, V> addListener(String channelName,
                                                  IEventChannelListener<K, V> listener,
                                                  Class<K> typeK, Class<V> typeV) {
        IEventChannel<K, V> eventChannel =
                createChannelImpl(channelName, typeK, typeV);
        eventChannel.addListener(listener);
        eventChannel.startup();

        return eventChannel;
    }

    /**
     * Remove event channel listener.
     *
     * @param <K>         the type of the Key in the Key-Value store.
     * @param <V>         the type of the Value in the Key-Value store.
     * @param channelName the event channel name.
     * @param listener    the listener to remove.
     */
    @Override
    public <K, V> void removeListener(String channelName,
                                      IEventChannelListener<K, V> listener) {
        IEventChannel<K, V> castedEventChannel;
        IEventChannel<?, ?> genericEventChannel =
                eventChannels.get(channelName);

        if (genericEventChannel != null) {
            //
            // TODO: Find if we can use Java internal support to check for
            // type mismatch.
            // NOTE: Using "ClassCastException" exception below doesn't work.
            //
            castedEventChannel = (IEventChannel<K, V>) genericEventChannel;
            castedEventChannel.removeListener(listener);
        }
    }

    /**
     * Register Flow Event Handler Service for receiving Flow-related
     * notifications.
     * <p/>
     * NOTE: Only a single Flow Event Handler Service can be registered.
     *
     * @param flowEventHandlerServiceToRegister the Flow Event Handler Service to register.
     */
    @Override
    public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerServiceToRegister) {
        this.flowEventHandlerService = flowEventHandlerServiceToRegister;

        // Initialize the Flow-related map state
        mapFlowListener = new MapFlowListener();
        mapFlow = hazelcastInstance.getMap(MAP_FLOW_NAME);
        mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);

        // Initialize the FlowEntry-related map state
        mapFlowEntryListener = new MapFlowEntryListener();
        mapFlowEntry = hazelcastInstance.getMap(MAP_FLOW_ENTRY_NAME);
        mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);

        // Initialize the FlowId-related map state
        mapFlowIdListener = new MapFlowIdListener();
        mapFlowId = hazelcastInstance.getMap(MAP_FLOW_ID_NAME);
        mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);

        // Initialize the FlowEntryId-related map state
        mapFlowEntryIdListener = new MapFlowEntryIdListener();
        mapFlowEntryId = hazelcastInstance.getMap(MAP_FLOW_ENTRY_ID_NAME);
        mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);

        // Initialize the Topology-related map state
        mapTopologyListener = new MapTopologyListener();
        mapTopology = hazelcastInstance.getMap(MAP_TOPOLOGY_NAME);
        mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
    }

    /**
     * De-register Flow Event Handler Service for receiving Flow-related
     * notifications.
     * <p/>
     * NOTE: Only a single Flow Event Handler Service can be registered.
     *
     * @param flowEventHandlerServiceToDeregister the Flow Event Handler Service to
     *                                de-register.
     */
    @Override
    public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerServiceToDeregister) {
        // Clear the Flow-related map state
        mapFlow.removeEntryListener(mapFlowListenerId);
        mapFlow = null;
        mapFlowListener = null;

        // Clear the FlowEntry-related map state
        mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
        mapFlowEntry = null;
        mapFlowEntryListener = null;

        // Clear the FlowId-related map state
        mapFlowId.removeEntryListener(mapFlowIdListenerId);
        mapFlowId = null;
        mapFlowIdListener = null;

        // Clear the FlowEntryId-related map state
        mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
        mapFlowEntryId = null;
        mapFlowEntryIdListener = null;

        // Clear the Topology-related map state
        mapTopology.removeEntryListener(mapTopologyListenerId);
        mapTopology = null;
        mapTopologyListener = null;

        this.flowEventHandlerService = null;
    }

    @Override
    public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
        if (packetOutEventHandler != null) {
            packetOutEventHandlers.add(packetOutEventHandler);
        }
    }

    @Override
    public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
        packetOutEventHandlers.remove(packetOutEventHandler);
    }

    @Override
    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
        if (arpReplyEventHandler != null) {
            arpReplyEventHandlers.add(arpReplyEventHandler);
        }
    }

    @Override
    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
        arpReplyEventHandlers.remove(arpReplyEventHandler);
    }

    @Override
    public void registerMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
        if (deviceEventHandler != null) {
            deviceEventHandlers.add(deviceEventHandler);
        }
    }

    @Override
    public void deregisterMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
        deviceEventHandlers.remove(deviceEventHandler);
    }

    /**
     * Get all Flows that are currently in the datagrid.
     *
     * @return all Flows that are currently in the datagrid.
     */
    @Override
    public Collection<FlowPath> getAllFlows() {
        Collection<FlowPath> allFlows = new LinkedList<FlowPath>();

        //
        // Get all current entries
        //
        Collection<byte[]> values = mapFlow.values();
        Kryo kryo = kryoFactory.newKryo();
        for (byte[] valueBytes : values) {
            //
            // Decode the value
            //
            Input input = new Input(valueBytes);
            FlowPath flowPath = kryo.readObject(input, FlowPath.class);
            allFlows.add(flowPath);
        }
        kryoFactory.deleteKryo(kryo);

        return allFlows;
    }

    /**
     * Get a Flow for a given Flow ID.
     *
     * @param flowId the Flow ID of the Flow to get.
     * @return the Flow if found, otherwise null.
     */
    @Override
    public FlowPath getFlow(FlowId flowId) {
        byte[] valueBytes = mapFlow.get(flowId.value());
        if (valueBytes == null) {
            return null;
        }

        Kryo kryo = kryoFactory.newKryo();
        //
        // Decode the value
        //
        Input input = new Input(valueBytes);
        FlowPath flowPath = kryo.readObject(input, FlowPath.class);
        kryoFactory.deleteKryo(kryo);

        return flowPath;
    }

    /**
     * Send a notification that a Flow is added.
     *
     * @param flowPath the Flow that is added.
     */
    @Override
    public void notificationSendFlowAdded(FlowPath flowPath) {
        //
        // Encode the value
        //
        byte[] buffer = new byte[MAX_BUFFER_SIZE];
        Kryo kryo = kryoFactory.newKryo();
        Output output = new Output(buffer, -1);
        kryo.writeObject(output, flowPath);
        byte[] valueBytes = output.toBytes();
        kryoFactory.deleteKryo(kryo);

        //
        // Put the entry:
        //  - Key : Flow ID (Long)
        //  - Value : Serialized Flow (byte[])
        //
        mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
    }

    /**
     * Send a notification that a Flow is removed.
     *
     * @param flowId the Flow ID of the Flow that is removed.
     */
    @Override
    public void notificationSendFlowRemoved(FlowId flowId) {
        //
        // Remove the entry:
        //  - Key : Flow ID (Long)
        //  - Value : Serialized Flow (byte[])
        //
        mapFlow.removeAsync(flowId.value());
    }

    /**
     * Send a notification that a Flow is updated.
     *
     * @param flowPath the Flow that is updated.
     */
    @Override
    public void notificationSendFlowUpdated(FlowPath flowPath) {
        // NOTE: Adding an entry with an existing key automatically updates it
        notificationSendFlowAdded(flowPath);
    }

    /**
     * Send a notification that all Flows are removed.
     */
    @Override
    public void notificationSendAllFlowsRemoved() {
        //
        // Remove all entries
        // NOTE: We remove the entries one-by-one so the per-entry
        // notifications will be delivered.
        //
        // mapFlow.clear();
        Set<Long> keySet = mapFlow.keySet();
        for (Long key : keySet) {
            mapFlow.removeAsync(key);
        }
    }

    /**
     * Get all Flow Entries that are currently in the datagrid.
     *
     * @return all Flow Entries that are currently in the datagrid.
     */
    @Override
    public Collection<FlowEntry> getAllFlowEntries() {
        Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();

        //
        // Get all current entries
        //
        Collection<byte[]> values = mapFlowEntry.values();
        Kryo kryo = kryoFactory.newKryo();
        for (byte[] valueBytes : values) {
            //
            // Decode the value
            //
            Input input = new Input(valueBytes);
            FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
            allFlowEntries.add(flowEntry);
        }
        kryoFactory.deleteKryo(kryo);

        return allFlowEntries;
    }

    /**
     * Get a Flow Entry for a given Flow Entry ID.
     *
     * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
     * @return the Flow Entry if found, otherwise null.
     */
    @Override
    public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
        byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
        if (valueBytes == null) {
            return null;
        }

        Kryo kryo = kryoFactory.newKryo();
        //
        // Decode the value
        //
        Input input = new Input(valueBytes);
        FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
        kryoFactory.deleteKryo(kryo);

        return flowEntry;
    }

    /**
     * Send a notification that a FlowEntry is added.
     *
     * @param flowEntry the FlowEntry that is added.
     */
    @Override
    public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
        //
        // Encode the value
        //
        byte[] buffer = new byte[MAX_BUFFER_SIZE];
        Kryo kryo = kryoFactory.newKryo();
        Output output = new Output(buffer, -1);
        kryo.writeObject(output, flowEntry);
        byte[] valueBytes = output.toBytes();
        kryoFactory.deleteKryo(kryo);

        //
        // Put the entry:
        //  - Key : FlowEntry ID (Long)
        //  - Value : Serialized FlowEntry (byte[])
        //
        mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
    }

    /**
     * Send a notification that a FlowEntry is removed.
     *
     * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
     */
    @Override
    public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
        //
        // Remove the entry:
        //  - Key : FlowEntry ID (Long)
        //  - Value : Serialized FlowEntry (byte[])
        //
        mapFlowEntry.removeAsync(flowEntryId.value());
    }

    /**
     * Send a notification that a FlowEntry is updated.
     *
     * @param flowEntry the FlowEntry that is updated.
     */
    @Override
    public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
        // NOTE: Adding an entry with an existing key automatically updates it
        notificationSendFlowEntryAdded(flowEntry);
    }

    /**
     * Send a notification that all Flow Entries are removed.
     */
    @Override
    public void notificationSendAllFlowEntriesRemoved() {
        //
        // Remove all entries
        // NOTE: We remove the entries one-by-one so the per-entry
        // notifications will be delivered.
        //
        // mapFlowEntry.clear();
        Set<Long> keySet = mapFlowEntry.keySet();
        for (Long key : keySet) {
            mapFlowEntry.removeAsync(key);
        }
    }

    /**
     * Get all Flow IDs that are currently in the datagrid.
     *
     * @return all Flow IDs that are currently in the datagrid.
     */
    @Override
    public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
        Collection<Pair<FlowId, Dpid>> allFlowIds =
                new LinkedList<Pair<FlowId, Dpid>>();

        //
        // Get all current entries
        //
        Kryo kryo = kryoFactory.newKryo();
        for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
            Long key = entry.getKey();
            byte[] valueBytes = entry.getValue();

            FlowId flowId = new FlowId(key);

            //
            // Decode the value
            //
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);

            Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
            allFlowIds.add(pair);
        }
        kryoFactory.deleteKryo(kryo);

        return allFlowIds;
    }

    /**
     * Get all Flow Entry IDs that are currently in the datagrid.
     *
     * @return all Flow Entry IDs that ae currently in the datagrid.
     */
    @Override
    public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
        Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
                new LinkedList<Pair<FlowEntryId, Dpid>>();

        //
        // Get all current entries
        //
        Kryo kryo = kryoFactory.newKryo();
        for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
            Long key = entry.getKey();
            byte[] valueBytes = entry.getValue();

            FlowEntryId flowEntryId = new FlowEntryId(key);

            //
            // Decode the value
            //
            Input input = new Input(valueBytes);
            Dpid dpid = kryo.readObject(input, Dpid.class);

            Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
            allFlowEntryIds.add(pair);
        }
        kryoFactory.deleteKryo(kryo);

        return allFlowEntryIds;
    }

    /**
     * Send a notification that a FlowId is added.
     *
     * @param flowId the FlowId that is added.
     * @param dpid   the Source Switch Dpid.
     */
    @Override
    public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
        //
        // Encode the value
        //
        byte[] buffer = new byte[MAX_BUFFER_SIZE];
        Kryo kryo = kryoFactory.newKryo();
        Output output = new Output(buffer, -1);
        kryo.writeObject(output, dpid);
        byte[] valueBytes = output.toBytes();
        kryoFactory.deleteKryo(kryo);

        //
        // Put the entry:
        //  - Key : FlowId (Long)
        //  - Value : Serialized Switch Dpid (byte[])
        //
        mapFlowId.putAsync(flowId.value(), valueBytes);
    }

    /**
     * Send a notification that a FlowId is removed.
     *
     * @param flowId the FlowId that is removed.
     */
    @Override
    public void notificationSendFlowIdRemoved(FlowId flowId) {
        //
        // Remove the entry:
        //  - Key : FlowId (Long)
        //  - Value : Serialized Switch Dpid (byte[])
        //
        mapFlowId.removeAsync(flowId.value());
    }

    /**
     * Send a notification that a FlowId is updated.
     *
     * @param flowId the FlowId that is updated.
     * @param dpid   the Source Switch Dpid.
     */
    @Override
    public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
        // NOTE: Adding an entry with an existing key automatically updates it
        notificationSendFlowIdAdded(flowId, dpid);
    }

    /**
     * Send a notification that all Flow IDs are removed.
     */
    @Override
    public void notificationSendAllFlowIdsRemoved() {
        //
        // Remove all entries
        // NOTE: We remove the entries one-by-one so the per-entry
        // notifications will be delivered.
        //
        // mapFlowId.clear();
        Set<Long> keySet = mapFlowId.keySet();
        for (Long key : keySet) {
            mapFlowId.removeAsync(key);
        }
    }

    /**
     * Send a notification that a FlowEntryId is added.
     *
     * @param flowEntryId the FlowEntryId that is added.
     * @param dpid        the Switch Dpid.
     */
    @Override
    public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
                                                 Dpid dpid) {
        //
        // Encode the value
        //
        byte[] buffer = new byte[MAX_BUFFER_SIZE];
        Kryo kryo = kryoFactory.newKryo();
        Output output = new Output(buffer, -1);
        kryo.writeObject(output, dpid);
        byte[] valueBytes = output.toBytes();
        kryoFactory.deleteKryo(kryo);

        //
        // Put the entry:
        //  - Key : FlowEntryId (Long)
        //  - Value : Serialized Switch Dpid (byte[])
        //
        mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
    }

    /**
     * Send a notification that a FlowEntryId is removed.
     *
     * @param flowEntryId the FlowEntryId that is removed.
     */
    @Override
    public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
        //
        // Remove the entry:
        //  - Key : FlowEntryId (Long)
        //  - Value : Serialized Switch Dpid (byte[])
        //
        mapFlowEntryId.removeAsync(flowEntryId.value());
    }

    /**
     * Send a notification that a FlowEntryId is updated.
     *
     * @param flowEntryId the FlowEntryId that is updated.
     * @param dpid        the Switch Dpid.
     */
    @Override
    public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
                                                   Dpid dpid) {
        // NOTE: Adding an entry with an existing key automatically updates it
        notificationSendFlowEntryIdAdded(flowEntryId, dpid);
    }

    /**
     * Send a notification that all Flow Entry IDs are removed.
     */
    @Override
    public void notificationSendAllFlowEntryIdsRemoved() {
        //
        // Remove all entries
        // NOTE: We remove the entries one-by-one so the per-entry
        // notifications will be delivered.
        //
        // mapFlowEntryId.clear();
        Set<Long> keySet = mapFlowEntryId.keySet();
        for (Long key : keySet) {
            mapFlowEntryId.removeAsync(key);
        }
    }

    /**
     * Get all Topology Elements that are currently in the datagrid.
     *
     * @return all Topology Elements that are currently in the datagrid.
     */
    @Override
    public Collection<TopologyElement> getAllTopologyElements() {
        Collection<TopologyElement> allTopologyElements =
                new LinkedList<TopologyElement>();

        //
        // Get all current entries
        //
        Collection<byte[]> values = mapTopology.values();
        Kryo kryo = kryoFactory.newKryo();
        for (byte[] valueBytes : values) {
            //
            // Decode the value
            //
            Input input = new Input(valueBytes);
            TopologyElement topologyElement =
                    kryo.readObject(input, TopologyElement.class);
            allTopologyElements.add(topologyElement);
        }
        kryoFactory.deleteKryo(kryo);

        return allTopologyElements;
    }

    /**
     * Send a notification that a Topology Element is added.
     *
     * @param topologyElement the Topology Element that is added.
     */
    @Override
    public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
        //
        // Encode the value
        //
        byte[] buffer = new byte[MAX_BUFFER_SIZE];
        Kryo kryo = kryoFactory.newKryo();
        Output output = new Output(buffer, -1);
        kryo.writeObject(output, topologyElement);
        byte[] valueBytes = output.toBytes();
        kryoFactory.deleteKryo(kryo);

        //
        // Put the entry:
        //  - Key : TopologyElement ID (String)
        //  - Value : Serialized TopologyElement (byte[])
        //
        mapTopology.putAsync(topologyElement.elementId(), valueBytes);

    }

    /**
     * Send a notification that a Topology Element is removed.
     *
     * @param topologyElement the Topology Element that is removed.
     */
    @Override
    public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
        //
        // Remove the entry:
        //  - Key : TopologyElement ID (String)
        //  - Value : Serialized TopologyElement (byte[])
        //
        mapTopology.removeAsync(topologyElement.elementId());
    }

    /**
     * Send a notification that a Topology Element is updated.
     *
     * @param topologyElement the Topology Element that is updated.
     */
    @Override
    public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
        // NOTE: Adding an entry with an existing key automatically updates it
        notificationSendTopologyElementAdded(topologyElement);
    }

    /**
     * Send a notification that all Topology Elements are removed.
     */
    @Override
    public void notificationSendAllTopologyElementsRemoved() {
        //
        // Remove all entries
        // NOTE: We remove the entries one-by-one so the per-entry
        // notifications will be delivered.
        //
        // mapTopology.clear();
        Set<String> keySet = mapTopology.keySet();
        for (String key : keySet) {
            mapTopology.removeAsync(key);
        }
    }

    @Override
    public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
        packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void sendArpReplyNotification(ArpReplyNotification arpReply) {
        arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void sendNotificationDeviceAdded(Long mac, OnosDevice dev) {
        log.debug("DeviceAdded in datagrid. mac {}", dev.getMacAddress());
        mapDevice.putAsync(mac, dev);
    }

    @Override
    public void sendNotificationDeviceDeleted(OnosDevice dev) {
        long mac = dev.getMacAddress().toLong();
        if (mapDevice.containsKey(mac)) {
            log.debug("DeviceDeleted in datagrid. mac {}", dev.getMacAddress());
            mapDevice.removeAsync(mac);
        }
    }
}
