Cleanup Hazelcast event notification framework.

Removed the custom leftover notification channels and replace them
with the newer (generic) notification framework channels.

NOTE: For now, all transient entry/events (PacketOutNotification and
ArpReplyNotification) are added by replacing the original (key, dummyByte)
tuple by (key, key) tuple. The reason is because with the new notification
framework, the entryAdded() upcall contains only the value.
The (key, key) tuple is a hack that needs to be removed when the
corresponding modules are refactored and fixed.

Change-Id: I3b1d31fba8d65400a5ed4b45dd42cef9da0bfc48
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 9bd838b..e204e45 100755
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -4,11 +4,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -17,30 +13,15 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
-import net.onrc.onos.ofcontroller.devicemanager.IDeviceEventHandler;
-import net.onrc.onos.ofcontroller.devicemanager.OnosDevice;
-import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
-import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
-import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
-import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
-import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
-import com.hazelcast.core.IMap;
 import com.hazelcast.instance.GroupProperties;
-import net.onrc.onos.intent.Intent;
 
 /**
  * A datagrid service that uses Hazelcast as a datagrid.
@@ -48,8 +29,6 @@
  * appropriate in a multi-node cluster.
  */
 public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
-    private static final int MAX_BUFFER_SIZE = 64 * 1024;
-
     static final Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
     private IRestApiService restApi;
 
@@ -57,8 +36,6 @@
     private HazelcastInstance hazelcastInstance;
     private Config hazelcastConfig;
 
-    private final KryoFactory kryoFactory = new KryoFactory();
-
     //
     // NOTE: eventChannels is kept thread safe by using explicit "synchronized"
     // blocks below. Those are needed to protect the integrity of each entry
@@ -66,167 +43,13 @@
     //
     private final Map<String, IEventChannel<?, ?>> eventChannels = new HashMap<>();
 
-    // State related to the packet out map
-    private static final String PACKET_OUT_MAP_NAME = "packetOutMap";
-    private IMap<PacketOutNotification, byte[]> packetOutMap;
-    private final List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<>();
-
-    private final byte[] dummyByte = {0};
-
-    // State related to the ARP reply map
-    private static final String ARP_REPLY_MAP_NAME = "arpReplyMap";
-    private IMap<ArpReplyNotification, byte[]> arpReplyMap;
-    private final List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<>();
-
-
-    private static final String INTENT_LIST_NAME = "intentList";
-    private IList<Intent> intentList;
-
-    @Override
-    public void registerIntent(Collection<Intent> intents) {
-        intentList.addAll(intents);
-    }
-
-
-    // State related to the Network Device map
-    private static final String MAP_DEVICE_NAME = "mapDevice";
-    private IMap<Long, OnosDevice> mapDevice;
-    private final List<IDeviceEventHandler> deviceEventHandlers = new ArrayList<>();
-
-    /**
-     * MapDeviceListener - reacts to Device related events.
-     */
-    class MapDeviceListener implements EntryListener<Long, OnosDevice> {
-
-        @Override
-        public void entryAdded(EntryEvent<Long, OnosDevice> event) {
-            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
-                deviceEventHandler.addDeviceEvent(event.getKey(), event.getValue());
-            }
-        }
-
-        @Override
-        public void entryRemoved(EntryEvent<Long, OnosDevice> event) {
-            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
-                deviceEventHandler.deleteDeviceEvent(event.getKey(), event.getValue());
-            }
-        }
-
-        @Override
-        public void entryUpdated(EntryEvent<Long, OnosDevice> event) {
-            for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
-                deviceEventHandler.updateDeviceEvent(event.getKey(), event.getValue());
-            }
-        }
-
-        @Override
-        public void entryEvicted(EntryEvent<Long, OnosDevice> arg0) {
-            //Not used.
-        }
-    }
-
-    /**
-     * Class for receiving notifications for sending packet-outs.
-     * <p/>
-     * The datagrid map is:
-     * - Key: Packet-out to send (PacketOutNotification)
-     * - Value: dummy value (we only need the key) (byte[])
-     */
-    class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
-        /**
-         * Receive a notification that an entry is added.
-         *
-         * @param event the notification event for the entry.
-         */
-        @Override
-        public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
-            for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
-                packetOutEventHandler.packetOutNotification(event.getKey());
-            }
-        }
-
-        /**
-         * Receive a notification that an entry is removed.
-         *
-         * @param event the notification event for the entry.
-         */
-        @Override
-        public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
-            // Not used
-        }
-
-        /**
-         * Receive a notification that an entry is updated.
-         *
-         * @param event the notification event for the entry.
-         */
-        @Override
-        public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
-            // Not used
-        }
-
-        /**
-         * Receive a notification that an entry is evicted.
-         *
-         * @param event the notification event for the entry.
-         */
-        @Override
-        public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
-            // Not used
-        }
-    }
-
-    /**
-     * Class for receiving notifications for sending packet-outs.
-     * <p/>
-     * The datagrid map is:
-     * - Key: Packet-out to send (PacketOutNotification)
-     * - Value: dummy value (we only need the key) (byte[])
-     */
-    class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
-        /**
-         * Receive a notification that an entry is added.
-         *
-         * @param event the notification event for the entry.
-         */
-        @Override
-        public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
-            triggerEventHandler(event.getKey());
-        }
-
-        @Override
-        public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {
-            triggerEventHandler(event.getKey());
-        }
-
-        @Override
-        public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {
-            // Not used for ARP replies
-        }
-
-        @Override
-        public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {
-            // Not used for ARP replies
-        }
-
-        /**
-         * Handle an event.
-         * @param notification notification
-         */
-        private void triggerEventHandler(ArpReplyNotification notification) {
-            for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
-                arpReplyEventHandler.arpReplyEvent(notification);
-            }
-        }
-    }
-
     /**
      * Initialize the Hazelcast Datagrid operation.
      *
      * @param configFilename the configuration filename.
      */
     public void init(String configFilename) {
-    /*
+        /*
         System.setProperty("hazelcast.socket.receive.buffer.size", "32");
         System.setProperty("hazelcast.socket.send.buffer.size", "32");
         */
@@ -335,16 +158,6 @@
         hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
 
         restApi.addRestletRoutable(new DatagridWebRoutable());
-
-        packetOutMap = hazelcastInstance.getMap(PACKET_OUT_MAP_NAME);
-        packetOutMap.addEntryListener(new PacketOutMapListener(), true);
-
-        arpReplyMap = hazelcastInstance.getMap(ARP_REPLY_MAP_NAME);
-        arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
-        intentList = hazelcastInstance.getList(INTENT_LIST_NAME);
-
-        mapDevice = hazelcastInstance.getMap(MAP_DEVICE_NAME);
-        mapDevice.addEntryListener(new MapDeviceListener(), true);
     }
 
     /**
@@ -469,65 +282,4 @@
             }
         }
     }
-
-    @Override
-    public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
-        if (packetOutEventHandler != null) {
-            packetOutEventHandlers.add(packetOutEventHandler);
-        }
-    }
-
-    @Override
-    public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
-        packetOutEventHandlers.remove(packetOutEventHandler);
-    }
-
-    @Override
-    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
-        if (arpReplyEventHandler != null) {
-            arpReplyEventHandlers.add(arpReplyEventHandler);
-        }
-    }
-
-    @Override
-    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
-        arpReplyEventHandlers.remove(arpReplyEventHandler);
-    }
-
-    @Override
-    public void registerMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
-        if (deviceEventHandler != null) {
-            deviceEventHandlers.add(deviceEventHandler);
-        }
-    }
-
-    @Override
-    public void deregisterMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
-        deviceEventHandlers.remove(deviceEventHandler);
-    }
-
-    @Override
-    public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
-        packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void sendArpReplyNotification(ArpReplyNotification arpReply) {
-        arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void sendNotificationDeviceAdded(Long mac, OnosDevice dev) {
-        log.debug("DeviceAdded in datagrid. mac {}", dev.getMacAddress());
-        mapDevice.putAsync(mac, dev);
-    }
-
-    @Override
-    public void sendNotificationDeviceDeleted(OnosDevice dev) {
-        long mac = dev.getMacAddress().toLong();
-        if (mapDevice.containsKey(mac)) {
-            log.debug("DeviceDeleted in datagrid. mac {}", dev.getMacAddress());
-            mapDevice.removeAsync(mac);
-        }
-    }
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1045bcd..1015bf2 100755
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -1,15 +1,6 @@
 package net.onrc.onos.datagrid;
 
-import java.util.Collection;
-
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.intent.Intent;
-import net.onrc.onos.ofcontroller.devicemanager.IDeviceEventHandler;
-import net.onrc.onos.ofcontroller.devicemanager.OnosDevice;
-import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
-import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
-import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
-import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 
 /**
  * Interface for providing Datagrid Service to other modules.
@@ -53,65 +44,4 @@
      */
     <K, V> void removeListener(String channelName,
 			      IEventChannelListener<K, V> listener);
-
-    /*
-     * register all the intents as one batch
-     */
-    void registerIntent(Collection<Intent> intents);
-
-    /**
-     * Register event handler for packet-out events.
-     * 
-     * @param packetOutEventHandler The packet-out event handler to register.
-     */
-    public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler);
-    
-    /**
-     * Deregister event handler service for packet-out events.
-     * 
-     * @param packetOutEventHandler The packet-out event handler to deregister.
-     */
-    public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler);
-    
-    /**
-     * Register event handler for ARP reply events.
-     * 
-     * @param packetOutEventHandler The ARP reply event handler to register.
-     */
-    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
-    
-    /**
-     * Deregister event handler service for ARP reply events.
-     * 
-     * @param packetOutEventHandler The ARP reply event handler to deregister.
-     */
-    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
-
-    /**
-     * Send a packet-out notification to other ONOS instances. This informs
-     * other instances that they should send this packet out some of the ports
-     * they control. Not all notifications are applicable to all instances 
-     * (i.e. some notifications specify a single port to send the packet out),
-     * so each instance must determine whether it needs to take action when it
-     * receives the notification.
-     * 
-     * @param packetOutNotification The packet notification to send
-     */
-    public void sendPacketOutNotification(PacketOutNotification packetOutNotification);
-    
-    /**
-     * Send notification to other ONOS instances that an ARP reply has been 
-     * received.
-     * @param arpReply The notification of the ARP reply
-     */
-    public void sendArpReplyNotification(ArpReplyNotification arpReply);
-
-	void sendNotificationDeviceAdded(Long mac, OnosDevice dev);
-
-	void sendNotificationDeviceDeleted(OnosDevice dev);
-
-	void registerMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler);
-
-	void deregisterMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler);
-
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/devicemanager/IDeviceEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/devicemanager/IDeviceEventHandler.java
deleted file mode 100644
index 81bc09f..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/devicemanager/IDeviceEventHandler.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package net.onrc.onos.ofcontroller.devicemanager;
-
-public interface IDeviceEventHandler {
-	public void addDeviceEvent(Long key, OnosDevice value);
-	public void deleteDeviceEvent(Long key, OnosDevice value);
-	public void updateDeviceEvent(Long key, OnosDevice value);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/devicemanager/OnosDeviceManager.java b/src/main/java/net/onrc/onos/ofcontroller/devicemanager/OnosDeviceManager.java
index d6dffac..b7e9697 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/devicemanager/OnosDeviceManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/devicemanager/OnosDeviceManager.java
@@ -25,6 +25,8 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
 import net.onrc.onos.packet.ARP;
 import net.onrc.onos.packet.DHCP;
 import net.onrc.onos.packet.Ethernet;
@@ -39,8 +41,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OnosDeviceManager implements IFloodlightModule, IOFMessageListener,
-										IOnosDeviceService {
+public class OnosDeviceManager implements IFloodlightModule,
+					  IOFMessageListener,
+					  IOnosDeviceService,
+					  IEventChannelListener<Long, OnosDevice> {
 	protected final static Logger log = LoggerFactory.getLogger(OnosDeviceManager.class);
 	private static final int CLEANUP_SECOND = 60*60;
 	private static final int AGEING_MILLSEC = 60*60*1000;
@@ -50,6 +54,8 @@
 	private final static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
 	private IDatagridService datagrid;
+	private IEventChannel<Long, OnosDevice> eventChannel;
+	private static final String DEVICE_CHANNEL_NAME = "onos.device";
 	private Map<Long, OnosDevice> mapDevice = new ConcurrentHashMap<Long, OnosDevice>();
 	private INetworkGraphService networkGraphService;
 	private NetworkGraph networkGraph;
@@ -297,13 +303,16 @@
 	@Override
 	public void startUp(FloodlightModuleContext context) {
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
-		datagrid.registerMapDeviceEventHandler(new MapDevListener());
+		eventChannel = datagrid.addListener(DEVICE_CHANNEL_NAME, this,
+						    Long.class,
+						    OnosDevice.class);
 	}
 
     @Override
 	public void deleteOnosDevice(OnosDevice dev) {
-		datagrid.sendNotificationDeviceDeleted(dev);
-		floodlightProvider.publishUpdate(new OnosDeviceUpdate(dev, OnosDeviceUpdateType.DELETE));
+	    Long mac = dev.getMacAddress().toLong();
+	    eventChannel.removeEntry(mac);
+	    floodlightProvider.publishUpdate(new OnosDeviceUpdate(dev, OnosDeviceUpdateType.DELETE));
 	}
     
     @Override
@@ -314,31 +323,30 @@
 	
 	@Override
 	public void addOnosDevice(Long mac, OnosDevice dev) {
-        datagrid.sendNotificationDeviceAdded(mac, dev);
-        floodlightProvider.publishUpdate(new OnosDeviceUpdate(dev, OnosDeviceUpdateType.ADD));
+	    eventChannel.addEntry(mac, dev);
+	    floodlightProvider.publishUpdate(new OnosDeviceUpdate(dev, OnosDeviceUpdateType.ADD));
 	}
 
-	//This is listener for datagrid mapDevice change.
-    class MapDevListener implements IDeviceEventHandler {
+	@Override
+	public void entryAdded(OnosDevice dev) {
+	    Long mac = dev.getMacAddress().toLong();
+	    mapDevice.put(mac, dev);
+	    log.debug("Device added: device mac {}", mac);
+	}
 
-		@Override
-		public void addDeviceEvent(Long mac, OnosDevice dev) {
-			mapDevice.put(mac, dev);
-			log.debug("addDeviceMap: device mac {}", mac);
-		}
+	@Override
+	public void entryRemoved(OnosDevice dev) {
+	    Long mac = dev.getMacAddress().toLong();
+	    mapDevice.remove(mac);
+	    log.debug("Device removed: device mac {}", mac);
+	}
 
-		@Override
-		public void deleteDeviceEvent(Long mac, OnosDevice dev) {
-			mapDevice.remove(mac);
-			log.debug("deleteDeviceMap: device mac {}", mac);
-		}
-
-		@Override
-		public void updateDeviceEvent(Long mac, OnosDevice dev) {
-			mapDevice.put(mac, dev);
-			log.debug("updateDeviceMap: device mac {}", mac);
-		}
-    }
+	@Override
+	public void entryUpdated(OnosDevice dev) {
+	    Long mac = dev.getMacAddress().toLong();
+	    mapDevice.put(mac, dev);
+	    log.debug("Device updated: device mac {}", mac);
+	}
 
 	@Override
 	public void addOnosDeviceListener(IOnosDeviceListener listener) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 0ee4466..294e1a5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -19,10 +19,12 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
 import net.onrc.onos.ofcontroller.devicemanager.IOnosDeviceService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.ofcontroller.proxyarp.BroadcastPacketOutNotification;
 import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
+import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
 import net.onrc.onos.ofcontroller.util.Dpid;
@@ -67,6 +69,14 @@
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowPusherService flowPusher;
 	private IDatagridService datagrid;
+	//
+	// TODO: Using PacketOutNotification as both the key and the
+	// value is a hack that should be removed when this module is
+	// refactored.
+	//
+	private IEventChannel<PacketOutNotification, PacketOutNotification> eventChannel;
+	private static final String PACKET_OUT_CHANNEL_NAME = "onos.packet_out";
+
 	private IControllerRegistryService controllerRegistryService;
 	
 	// TODO it seems there is a Guava collection that will time out entries.
@@ -176,7 +186,14 @@
 	
 	@Override
 	public void startUp(FloodlightModuleContext context) {
-		// no-op
+		//
+		// TODO: Using PacketOutNotification as both the key and the
+		// value is a hack that should be removed when this module is
+		// refactored.
+		//
+		eventChannel = datagrid.createChannel(PACKET_OUT_CHANNEL_NAME,
+						      PacketOutNotification.class,
+						      PacketOutNotification.class);
 	}
 
 	@Override
@@ -231,8 +248,17 @@
 			log.trace("Sending broadcast packet to other ONOS instances");
 		}
 
-		 datagrid.sendPacketOutNotification(new BroadcastPacketOutNotification(
-				 eth.serialize(), null, sw.getId(), pi.getInPort()));
+		PacketOutNotification key =
+		    new BroadcastPacketOutNotification(
+						       eth.serialize(),
+						       null, sw.getId(),
+						       pi.getInPort());
+		//
+		// TODO: Using PacketOutNotification as both the key and the
+		// value is a hack that should be removed when this module is
+		// refactored.
+		//
+		eventChannel.addTransientEntry(key, key);
 	}
 	
 	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth){
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
deleted file mode 100644
index 98fcf0c..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-/**
- * Listener interface for ARP reply event callbacks.
- */
-public interface IArpReplyEventHandler {
-    /**
-     * An ARP reply has been received.
-     * @param arpReply data about the received ARP reply
-     */
-    public void arpReplyEvent(ArpReplyNotification arpReply);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
deleted file mode 100644
index ce98703..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-/**
- * Classes may implement this interface if they wish to subscribe to packet out
- * notifications from the datagrid service. Packet out notifications are used to
- * direct other ONOS instances to send packets out particular ports under their
- * control.
- *
- */
-public interface IPacketOutEventHandler {
-
-    /**
-     * Notify the packet out event handler that an packet out notification has
-     * been received.
-     *
-     * @param packetOutNotification An object describing the notification
-     */
-    public void packetOutNotification(
-            PacketOutNotification packetOutNotification);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 33d4d7d..d235c37 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -22,6 +22,8 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
 import net.onrc.onos.ofcontroller.bgproute.Interface;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
@@ -46,7 +48,7 @@
 import com.google.common.collect.SetMultimap;
 
 public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
-        IPacketOutEventHandler, IArpReplyEventHandler, IFloodlightModule {
+					IFloodlightModule {
     private static final Logger log = LoggerFactory
             .getLogger(ProxyArpManager.class);
 
@@ -56,6 +58,15 @@
 
     private IFloodlightProviderService floodlightProvider;
     private IDatagridService datagrid;
+    private IEventChannel<PacketOutNotification, PacketOutNotification> packetOutEventChannel;
+    private IEventChannel<ArpReplyNotification, ArpReplyNotification> arpReplyEventChannel;
+    private static final String PACKET_OUT_CHANNEL_NAME = "onos.packet_out";
+    private static final String ARP_REPLY_CHANNEL_NAME = "onos.arp_reply";
+    private PacketOutEventHandler packetOutEventHandler =
+	new PacketOutEventHandler();
+    private ArpReplyEventHandler arpReplyEventHandler =
+	new ArpReplyEventHandler();
+
     private IConfigInfoService configService;
     private IRestApiService restApi;
     private IFlowPusherService flowPusher;
@@ -65,6 +76,87 @@
 
     private SetMultimap<InetAddress, ArpRequest> arpRequests;
 
+    //
+    // TODO: Using PacketOutNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class PacketOutEventHandler implements
+	IEventChannelListener<PacketOutNotification, PacketOutNotification> {
+	@Override
+	public void entryAdded(PacketOutNotification packetOutNotification) {
+	    if (packetOutNotification instanceof SinglePacketOutNotification) {
+		SinglePacketOutNotification notification =
+		    (SinglePacketOutNotification) packetOutNotification;
+		sendArpRequestOutPort(notification.packet,
+				      notification.getOutSwitch(),
+				      notification.getOutPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
+		BroadcastPacketOutNotification notification =
+		    (BroadcastPacketOutNotification) packetOutNotification;
+		broadcastArpRequestOutMyEdge(notification.packet,
+					     notification.getInSwitch(),
+					     notification.getInPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else {
+		log.warn("Unknown packet out notification received");
+	    }
+	}
+
+	@Override
+	public void entryUpdated(PacketOutNotification packetOutNotification) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(packetOutNotification);
+	}
+
+	@Override
+	public void entryRemoved(PacketOutNotification packetOutNotification) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
+    //
+    // TODO: Using ArpReplyNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class ArpReplyEventHandler implements
+	IEventChannelListener<ArpReplyNotification, ArpReplyNotification> {
+	@Override
+	public void entryAdded(ArpReplyNotification arpReply) {
+	    log.debug("Received ARP reply notification for {}",
+		      arpReply.getTargetAddress());
+	    sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
+					    arpReply.getTargetMacAddress());
+	}
+
+	@Override
+	public void entryUpdated(ArpReplyNotification arpReply) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(arpReply);
+	}
+
+	@Override
+	public void entryRemoved(ArpReplyNotification arpReply) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
     private static class ArpRequest {
         private final IArpRequester requester;
         private final boolean retry;
@@ -171,8 +263,22 @@
         restApi.addRestletRoutable(new ArpWebRoutable());
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 
-        datagrid.registerPacketOutEventHandler(this);
-        datagrid.registerArpReplyEventHandler(this);
+	//
+	// Event notification setup: channels and event handlers
+	//
+	//
+	// TODO: Using PacketOutNotification or ArpReplyNotification as both
+	// the key and the value is a hack that should be removed when this
+	// module is refactored.
+	//
+	packetOutEventChannel = datagrid.addListener(PACKET_OUT_CHANNEL_NAME,
+						     packetOutEventHandler,
+						     PacketOutNotification.class,
+						     PacketOutNotification.class);
+	arpReplyEventChannel = datagrid.addListener(ARP_REPLY_CHANNEL_NAME,
+						    arpReplyEventHandler,
+						    ArpReplyNotification.class,
+						    ArpReplyNotification.class);
 
         Timer arpTimer = new Timer("arp-processing");
         arpTimer.scheduleAtFixedRate(new TimerTask() {
@@ -348,9 +454,11 @@
         	}
 
         	// We don't know the device so broadcast the request out
-        	datagrid.sendPacketOutNotification(
-        			new BroadcastPacketOutNotification(eth.serialize(),
-        					target, sw.getId(), pi.getInPort()));
+		PacketOutNotification key =
+		    new BroadcastPacketOutNotification(eth.serialize(),
+        					target, sw.getId(),
+						pi.getInPort());
+        	packetOutEventChannel.addTransientEntry(key, key);
         }
         else {
         	// Even if the device exists in our database, we do not reply to
@@ -374,10 +482,11 @@
         			log.trace("Device {} exists but is not connected to any ports" +
         					" - broadcasting", macAddress);
         		}
-
-        		datagrid.sendPacketOutNotification(
-        				new BroadcastPacketOutNotification(eth.serialize(),
-        						target, sw.getId(), pi.getInPort()));
+			PacketOutNotification key =
+			    new BroadcastPacketOutNotification(eth.serialize(),
+					target, sw.getId(),
+					pi.getInPort());
+			packetOutEventChannel.addTransientEntry(key, key);
         	}
         	else {
         		for (IPortObject portObject : outPorts) {
@@ -401,9 +510,10 @@
         						HexString.toHexString(outSwitch), outPort});
         			}
 
-        			datagrid.sendPacketOutNotification(
-        					new SinglePacketOutNotification(eth.serialize(),
-        							target, outSwitch, outPort));
+				PacketOutNotification key =
+        				new SinglePacketOutNotification(eth.serialize(),
+						target, outSwitch, outPort);
+				packetOutEventChannel.addTransientEntry(key, key);
         		}
         	}
         }
@@ -503,8 +613,10 @@
         }
 
         // sendArpRequestToSwitches(ipAddress, eth.serialize());
-        datagrid.sendPacketOutNotification(new SinglePacketOutNotification(eth
-                .serialize(), ipAddress, intf.getDpid(), intf.getPort()));
+	PacketOutNotification key =
+	    new SinglePacketOutNotification(eth.serialize(), ipAddress,
+					    intf.getDpid(), intf.getPort());
+	packetOutEventChannel.addTransientEntry(key, key);
     }
 
     private void sendArpRequestToSwitches(InetAddress dstAddress,
@@ -560,8 +672,9 @@
 
         MACAddress mac = new MACAddress(arp.getSenderHardwareAddress());
 
-        datagrid.sendArpReplyNotification(new ArpReplyNotification(
-                targetAddress, mac));
+	ArpReplyNotification key =
+	    new ArpReplyNotification(targetAddress, mac);
+	arpReplyEventChannel.addTransientEntry(key, key);
     }
 
     private void broadcastArpRequestOutMyEdge(byte[] arpRequest, long inSwitch,
@@ -799,45 +912,4 @@
             request.dispatchReply(address, mac);
         }
     }
-
-    @Override
-    public void arpReplyEvent(ArpReplyNotification arpReply) {
-        log.debug("Received ARP reply notification for {}",
-                arpReply.getTargetAddress());
-        sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
-                arpReply.getTargetMacAddress());
-    }
-
-    @Override
-    public void packetOutNotification(
-            PacketOutNotification packetOutNotification) {
-
-        if (packetOutNotification instanceof SinglePacketOutNotification) {
-            SinglePacketOutNotification notification = (SinglePacketOutNotification) packetOutNotification;
-            sendArpRequestOutPort(notification.packet,
-                    notification.getOutSwitch(), notification.getOutPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
-            BroadcastPacketOutNotification notification = (BroadcastPacketOutNotification) packetOutNotification;
-            broadcastArpRequestOutMyEdge(notification.packet,
-                    notification.getInSwitch(), notification.getInPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else {
-            log.warn("Unknown packet out notification received");
-        }
-    }
 }