Cleanup Hazelcast event notification framework.

Removed the custom leftover notification channels and replace them
with the newer (generic) notification framework channels.

NOTE: For now, all transient entry/events (PacketOutNotification and
ArpReplyNotification) are added by replacing the original (key, dummyByte)
tuple by (key, key) tuple. The reason is because with the new notification
framework, the entryAdded() upcall contains only the value.
The (key, key) tuple is a hack that needs to be removed when the
corresponding modules are refactored and fixed.

Change-Id: I3b1d31fba8d65400a5ed4b45dd42cef9da0bfc48
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 33d4d7d..d235c37 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -22,6 +22,8 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
 import net.onrc.onos.ofcontroller.bgproute.Interface;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
@@ -46,7 +48,7 @@
 import com.google.common.collect.SetMultimap;
 
 public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
-        IPacketOutEventHandler, IArpReplyEventHandler, IFloodlightModule {
+					IFloodlightModule {
     private static final Logger log = LoggerFactory
             .getLogger(ProxyArpManager.class);
 
@@ -56,6 +58,15 @@
 
     private IFloodlightProviderService floodlightProvider;
     private IDatagridService datagrid;
+    private IEventChannel<PacketOutNotification, PacketOutNotification> packetOutEventChannel;
+    private IEventChannel<ArpReplyNotification, ArpReplyNotification> arpReplyEventChannel;
+    private static final String PACKET_OUT_CHANNEL_NAME = "onos.packet_out";
+    private static final String ARP_REPLY_CHANNEL_NAME = "onos.arp_reply";
+    private PacketOutEventHandler packetOutEventHandler =
+	new PacketOutEventHandler();
+    private ArpReplyEventHandler arpReplyEventHandler =
+	new ArpReplyEventHandler();
+
     private IConfigInfoService configService;
     private IRestApiService restApi;
     private IFlowPusherService flowPusher;
@@ -65,6 +76,87 @@
 
     private SetMultimap<InetAddress, ArpRequest> arpRequests;
 
+    //
+    // TODO: Using PacketOutNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class PacketOutEventHandler implements
+	IEventChannelListener<PacketOutNotification, PacketOutNotification> {
+	@Override
+	public void entryAdded(PacketOutNotification packetOutNotification) {
+	    if (packetOutNotification instanceof SinglePacketOutNotification) {
+		SinglePacketOutNotification notification =
+		    (SinglePacketOutNotification) packetOutNotification;
+		sendArpRequestOutPort(notification.packet,
+				      notification.getOutSwitch(),
+				      notification.getOutPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
+		BroadcastPacketOutNotification notification =
+		    (BroadcastPacketOutNotification) packetOutNotification;
+		broadcastArpRequestOutMyEdge(notification.packet,
+					     notification.getInSwitch(),
+					     notification.getInPort());
+
+		// set timestamp
+		InetAddress addr = notification.getTargetAddress();
+		if (addr != null) {
+		    for (ArpRequest request : arpRequests.get(addr)) {
+			request.setRequestTime();
+		    }
+		}
+	    } else {
+		log.warn("Unknown packet out notification received");
+	    }
+	}
+
+	@Override
+	public void entryUpdated(PacketOutNotification packetOutNotification) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(packetOutNotification);
+	}
+
+	@Override
+	public void entryRemoved(PacketOutNotification packetOutNotification) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
+    //
+    // TODO: Using ArpReplyNotification as both the key and the
+    // value is a hack that should be removed when this module is
+    // refactored.
+    //
+    private class ArpReplyEventHandler implements
+	IEventChannelListener<ArpReplyNotification, ArpReplyNotification> {
+	@Override
+	public void entryAdded(ArpReplyNotification arpReply) {
+	    log.debug("Received ARP reply notification for {}",
+		      arpReply.getTargetAddress());
+	    sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
+					    arpReply.getTargetMacAddress());
+	}
+
+	@Override
+	public void entryUpdated(ArpReplyNotification arpReply) {
+	    // TODO: For now, entryUpdated() is processed as entryAdded()
+	    entryAdded(arpReply);
+	}
+
+	@Override
+	public void entryRemoved(ArpReplyNotification arpReply) {
+	    // TODO: Not implemented. Revisit when this module is refactored
+	}
+    }
+
     private static class ArpRequest {
         private final IArpRequester requester;
         private final boolean retry;
@@ -171,8 +263,22 @@
         restApi.addRestletRoutable(new ArpWebRoutable());
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 
-        datagrid.registerPacketOutEventHandler(this);
-        datagrid.registerArpReplyEventHandler(this);
+	//
+	// Event notification setup: channels and event handlers
+	//
+	//
+	// TODO: Using PacketOutNotification or ArpReplyNotification as both
+	// the key and the value is a hack that should be removed when this
+	// module is refactored.
+	//
+	packetOutEventChannel = datagrid.addListener(PACKET_OUT_CHANNEL_NAME,
+						     packetOutEventHandler,
+						     PacketOutNotification.class,
+						     PacketOutNotification.class);
+	arpReplyEventChannel = datagrid.addListener(ARP_REPLY_CHANNEL_NAME,
+						    arpReplyEventHandler,
+						    ArpReplyNotification.class,
+						    ArpReplyNotification.class);
 
         Timer arpTimer = new Timer("arp-processing");
         arpTimer.scheduleAtFixedRate(new TimerTask() {
@@ -348,9 +454,11 @@
         	}
 
         	// We don't know the device so broadcast the request out
-        	datagrid.sendPacketOutNotification(
-        			new BroadcastPacketOutNotification(eth.serialize(),
-        					target, sw.getId(), pi.getInPort()));
+		PacketOutNotification key =
+		    new BroadcastPacketOutNotification(eth.serialize(),
+        					target, sw.getId(),
+						pi.getInPort());
+        	packetOutEventChannel.addTransientEntry(key, key);
         }
         else {
         	// Even if the device exists in our database, we do not reply to
@@ -374,10 +482,11 @@
         			log.trace("Device {} exists but is not connected to any ports" +
         					" - broadcasting", macAddress);
         		}
-
-        		datagrid.sendPacketOutNotification(
-        				new BroadcastPacketOutNotification(eth.serialize(),
-        						target, sw.getId(), pi.getInPort()));
+			PacketOutNotification key =
+			    new BroadcastPacketOutNotification(eth.serialize(),
+					target, sw.getId(),
+					pi.getInPort());
+			packetOutEventChannel.addTransientEntry(key, key);
         	}
         	else {
         		for (IPortObject portObject : outPorts) {
@@ -401,9 +510,10 @@
         						HexString.toHexString(outSwitch), outPort});
         			}
 
-        			datagrid.sendPacketOutNotification(
-        					new SinglePacketOutNotification(eth.serialize(),
-        							target, outSwitch, outPort));
+				PacketOutNotification key =
+        				new SinglePacketOutNotification(eth.serialize(),
+						target, outSwitch, outPort);
+				packetOutEventChannel.addTransientEntry(key, key);
         		}
         	}
         }
@@ -503,8 +613,10 @@
         }
 
         // sendArpRequestToSwitches(ipAddress, eth.serialize());
-        datagrid.sendPacketOutNotification(new SinglePacketOutNotification(eth
-                .serialize(), ipAddress, intf.getDpid(), intf.getPort()));
+	PacketOutNotification key =
+	    new SinglePacketOutNotification(eth.serialize(), ipAddress,
+					    intf.getDpid(), intf.getPort());
+	packetOutEventChannel.addTransientEntry(key, key);
     }
 
     private void sendArpRequestToSwitches(InetAddress dstAddress,
@@ -560,8 +672,9 @@
 
         MACAddress mac = new MACAddress(arp.getSenderHardwareAddress());
 
-        datagrid.sendArpReplyNotification(new ArpReplyNotification(
-                targetAddress, mac));
+	ArpReplyNotification key =
+	    new ArpReplyNotification(targetAddress, mac);
+	arpReplyEventChannel.addTransientEntry(key, key);
     }
 
     private void broadcastArpRequestOutMyEdge(byte[] arpRequest, long inSwitch,
@@ -799,45 +912,4 @@
             request.dispatchReply(address, mac);
         }
     }
-
-    @Override
-    public void arpReplyEvent(ArpReplyNotification arpReply) {
-        log.debug("Received ARP reply notification for {}",
-                arpReply.getTargetAddress());
-        sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(),
-                arpReply.getTargetMacAddress());
-    }
-
-    @Override
-    public void packetOutNotification(
-            PacketOutNotification packetOutNotification) {
-
-        if (packetOutNotification instanceof SinglePacketOutNotification) {
-            SinglePacketOutNotification notification = (SinglePacketOutNotification) packetOutNotification;
-            sendArpRequestOutPort(notification.packet,
-                    notification.getOutSwitch(), notification.getOutPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
-            BroadcastPacketOutNotification notification = (BroadcastPacketOutNotification) packetOutNotification;
-            broadcastArpRequestOutMyEdge(notification.packet,
-                    notification.getInSwitch(), notification.getInPort());
-
-            // set timestamp
-            InetAddress addr = notification.getTargetAddress();
-            if (addr != null) {
-                for (ArpRequest request : arpRequests.get(addr)) {
-                    request.setRequestTime();
-                }
-            }
-        } else {
-            log.warn("Unknown packet out notification received");
-        }
-    }
 }