Cleanup Hazelcast event notification framework.

Removed the custom leftover notification channels and replace them
with the newer (generic) notification framework channels.

NOTE: For now, all transient entry/events (PacketOutNotification and
ArpReplyNotification) are added by replacing the original (key, dummyByte)
tuple by (key, key) tuple. The reason is because with the new notification
framework, the entryAdded() upcall contains only the value.
The (key, key) tuple is a hack that needs to be removed when the
corresponding modules are refactored and fixed.

Change-Id: I3b1d31fba8d65400a5ed4b45dd42cef9da0bfc48
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
deleted file mode 100644
index 98fcf0c..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-/**
- * Listener interface for ARP reply event callbacks.
- */
-public interface IArpReplyEventHandler {
-    /**
-     * An ARP reply has been received.
-     * @param arpReply data about the received ARP reply
-     */
-    public void arpReplyEvent(ArpReplyNotification arpReply);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
deleted file mode 100644
index ce98703..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-/**
- * Classes may implement this interface if they wish to subscribe to packet out
- * notifications from the datagrid service. Packet out notifications are used to
- * direct other ONOS instances to send packets out particular ports under their
- * control.
- *
- */
-public interface IPacketOutEventHandler {
-
-    /**
-     * Notify the packet out event handler that an packet out notification has
-     * been received.
-     *
-     * @param packetOutNotification An object describing the notification
-     */
-    public void packetOutNotification(
-            PacketOutNotification packetOutNotification);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 33d4d7d..d235c37 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -22,6 +22,8 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
 import net.onrc.onos.ofcontroller.bgproute.Interface;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
@@ -46,7 +48,7 @@
 import com.google.common.collect.SetMultimap;
 
 public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
-        IPacketOutEventHandler, IArpReplyEventHandler, IFloodlightModule {
+					IFloodlightModule {
     private static final Logger log = LoggerFactory
             .getLogger(ProxyArpManager.class);
 
@@ -56,6 +58,15 @@
 
     private IFloodlightProviderService floodlightProvider;
     private IDatagridService datagrid;
+    private IEventChannel<PacketOutNotification, PacketOutNotification> packetOutEventChannel;
+    private IEventChannel<ArpReplyNotification, ArpReplyNotification> arpReplyEventChannel;
+    private static final String PACKET_OUT_CHANNEL_NAME = "onos.packet_out";
+    private static final String ARP_REPLY_CHANNEL_NAME = "onos.arp_reply";
+    private PacketOutEventHandler packetOutEventHandler =
+	new PacketOutEventHandler();
+    private ArpReplyEventHandler arpReplyEventHandler =
+	new ArpReplyEventHandler();
+
     private IConfigInfoService configService;
     private IRestApiService restApi;
     private IFlowPusherService flowPusher;
@@ -65,6 +76,87 @@
 
     private SetMultimap<InetAddress, ArpRequest> arpRequests;
 
+    //
+    // TODO: Using PacketOutNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class PacketOutEventHandler implements
+	IEventChannelListener<PacketOutNotification, PacketOutNotification> {
+	@Override
+	public void entryAdded(PacketOutNotification packetOutNotification) {
+	    if (packetOutNotification instanceof SinglePacketOutNotification) {
+		SinglePacketOutNotification notification =
+		    (SinglePacketOutNotification) packetOutNotification;
+		sendArpRequestOutPort(notification.packet,
+				      notification.getOutSwitch(),
+				      notification.getOutPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
+		BroadcastPacketOutNotification notification =
+		    (BroadcastPacketOutNotification) packetOutNotification;
+		broadcastArpRequestOutMyEdge(notification.packet,
+					     notification.getInSwitch(),
+					     notification.getInPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else {
+		log.warn("Unknown packet out notification received");
+	    }
+	}
+
+	@Override
+	public void entryUpdated(PacketOutNotification packetOutNotification) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(packetOutNotification);
+	}
+
+	@Override
+	public void entryRemoved(PacketOutNotification packetOutNotification) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
+    //
+    // TODO: Using ArpReplyNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class ArpReplyEventHandler implements
+	IEventChannelListener<ArpReplyNotification, ArpReplyNotification> {
+	@Override
+	public void entryAdded(ArpReplyNotification arpReply) {
+	    log.debug("Received ARP reply notification for {}",
+		      arpReply.getTargetAddress());
+	    sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
+					    arpReply.getTargetMacAddress());
+	}
+
+	@Override
+	public void entryUpdated(ArpReplyNotification arpReply) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(arpReply);
+	}
+
+	@Override
+	public void entryRemoved(ArpReplyNotification arpReply) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
     private static class ArpRequest {
         private final IArpRequester requester;
         private final boolean retry;
@@ -171,8 +263,22 @@
         restApi.addRestletRoutable(new ArpWebRoutable());
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 
-        datagrid.registerPacketOutEventHandler(this);
-        datagrid.registerArpReplyEventHandler(this);
+	//
+	// Event notification setup: channels and event handlers
+	//
+	//
+	// TODO: Using PacketOutNotification or ArpReplyNotification as both
+	// the key and the value is a hack that should be removed when this
+	// module is refactored.
+	//
+	packetOutEventChannel = datagrid.addListener(PACKET_OUT_CHANNEL_NAME,
+						     packetOutEventHandler,
+						     PacketOutNotification.class,
+						     PacketOutNotification.class);
+	arpReplyEventChannel = datagrid.addListener(ARP_REPLY_CHANNEL_NAME,
+						    arpReplyEventHandler,
+						    ArpReplyNotification.class,
+						    ArpReplyNotification.class);
 
         Timer arpTimer = new Timer("arp-processing");
         arpTimer.scheduleAtFixedRate(new TimerTask() {
@@ -348,9 +454,11 @@
         	}
 
         	// We don't know the device so broadcast the request out
-        	datagrid.sendPacketOutNotification(
-        			new BroadcastPacketOutNotification(eth.serialize(),
-        					target, sw.getId(), pi.getInPort()));
+		PacketOutNotification key =
+		    new BroadcastPacketOutNotification(eth.serialize(),
+        					target, sw.getId(),
+						pi.getInPort());
+        	packetOutEventChannel.addTransientEntry(key, key);
         }
         else {
         	// Even if the device exists in our database, we do not reply to
@@ -374,10 +482,11 @@
         			log.trace("Device {} exists but is not connected to any ports" +
         					" - broadcasting", macAddress);
         		}
-
-        		datagrid.sendPacketOutNotification(
-        				new BroadcastPacketOutNotification(eth.serialize(),
-        						target, sw.getId(), pi.getInPort()));
+			PacketOutNotification key =
+			    new BroadcastPacketOutNotification(eth.serialize(),
+					target, sw.getId(),
+					pi.getInPort());
+			packetOutEventChannel.addTransientEntry(key, key);
         	}
         	else {
         		for (IPortObject portObject : outPorts) {
@@ -401,9 +510,10 @@
         						HexString.toHexString(outSwitch), outPort});
         			}
 
-        			datagrid.sendPacketOutNotification(
-        					new SinglePacketOutNotification(eth.serialize(),
-        							target, outSwitch, outPort));
+				PacketOutNotification key =
+        				new SinglePacketOutNotification(eth.serialize(),
+						target, outSwitch, outPort);
+				packetOutEventChannel.addTransientEntry(key, key);
         		}
         	}
         }
@@ -503,8 +613,10 @@
         }
 
         // sendArpRequestToSwitches(ipAddress, eth.serialize());
-        datagrid.sendPacketOutNotification(new SinglePacketOutNotification(eth
-                .serialize(), ipAddress, intf.getDpid(), intf.getPort()));
+	PacketOutNotification key =
+	    new SinglePacketOutNotification(eth.serialize(), ipAddress,
+					    intf.getDpid(), intf.getPort());
+	packetOutEventChannel.addTransientEntry(key, key);
     }
 
     private void sendArpRequestToSwitches(InetAddress dstAddress,
@@ -560,8 +672,9 @@
 
         MACAddress mac = new MACAddress(arp.getSenderHardwareAddress());
 
-        datagrid.sendArpReplyNotification(new ArpReplyNotification(
-                targetAddress, mac));
+	ArpReplyNotification key =
+	    new ArpReplyNotification(targetAddress, mac);
+	arpReplyEventChannel.addTransientEntry(key, key);
     }
 
     private void broadcastArpRequestOutMyEdge(byte[] arpRequest, long inSwitch,
@@ -799,45 +912,4 @@
             request.dispatchReply(address, mac);
         }
     }
-
-    @Override
-    public void arpReplyEvent(ArpReplyNotification arpReply) {
-        log.debug("Received ARP reply notification for {}",
-                arpReply.getTargetAddress());
-        sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
-                arpReply.getTargetMacAddress());
-    }
-
-    @Override
-    public void packetOutNotification(
-            PacketOutNotification packetOutNotification) {
-
-        if (packetOutNotification instanceof SinglePacketOutNotification) {
-            SinglePacketOutNotification notification = (SinglePacketOutNotification) packetOutNotification;
-            sendArpRequestOutPort(notification.packet,
-                    notification.getOutSwitch(), notification.getOutPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
-            BroadcastPacketOutNotification notification = (BroadcastPacketOutNotification) packetOutNotification;
-            broadcastArpRequestOutMyEdge(notification.packet,
-                    notification.getInSwitch(), notification.getInPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else {
-            log.warn("Unknown packet out notification received");
-        }
-    }
 }