Change for the way to learn IP and MAC binding.
ProxyArpManager has not done unit tests because the codes will be changed soon by another Jono's fix.

Change-Id: I0cf45aa328871497db522c531dcef999156c2d55
diff --git a/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
index 6b84e45..503bf30 100644
--- a/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
@@ -61,21 +61,24 @@
     private static final Logger log = LoggerFactory
             .getLogger(ProxyArpManager.class);
 
-    private static final long ARP_TIMER_PERIOD = 100; // ms
-
-    private static final int ARP_REQUEST_TIMEOUT = 2000; // ms
+    private static long arpTimerPeriodConfig = 100; // ms
+    private static int arpRequestTimeoutConfig = 2000; // ms
+    private long arpCleaningTimerPeriodConfig = 60 * 1000; // ms (1 min)
 
     private IFloodlightProviderService floodlightProvider;
     private IDatagridService datagrid;
     private IEventChannel<Long, ArpReplyNotification> arpReplyEventChannel;
     private IEventChannel<Long, BroadcastPacketOutNotification> broadcastPacketOutEventChannel;
     private IEventChannel<Long, SinglePacketOutNotification> singlePacketOutEventChannel;
+    private IEventChannel<String, ArpCacheNotification> arpCacheEventChannel;
     private static final String ARP_REPLY_CHANNEL_NAME = "onos.arp_reply";
     private static final String BROADCAST_PACKET_OUT_CHANNEL_NAME = "onos.broadcast_packet_out";
     private static final String SINGLE_PACKET_OUT_CHANNEL_NAME = "onos.single_packet_out";
-    private ArpReplyEventHandler arpReplyEventHandler = new ArpReplyEventHandler();
-    private BroadcastPacketOutEventHandler broadcastPacketOutEventHandler = new BroadcastPacketOutEventHandler();
-    private SinglePacketOutEventHandler singlePacketOutEventHandler = new SinglePacketOutEventHandler();
+    private static final String ARP_CACHE_CHANNEL_NAME = "onos.arp_cache";
+    private final ArpReplyEventHandler arpReplyEventHandler = new ArpReplyEventHandler();
+    private final BroadcastPacketOutEventHandler broadcastPacketOutEventHandler = new BroadcastPacketOutEventHandler();
+    private final SinglePacketOutEventHandler singlePacketOutEventHandler = new SinglePacketOutEventHandler();
+    private final ArpCacheEventHandler arpCacheEventHandler = new ArpCacheEventHandler();
 
     private IConfigInfoService configService;
     private IRestApiService restApi;
@@ -90,20 +93,24 @@
 
     private SetMultimap<InetAddress, ArpRequest> arpRequests;
 
+    private ArpCache arpCache;
+
     private class BroadcastPacketOutEventHandler implements
             IEventChannelListener<Long, BroadcastPacketOutNotification> {
 
         @Override
         public void entryAdded(BroadcastPacketOutNotification value) {
             if (log.isTraceEnabled()) {
-                log.trace("entryAdded ip{}, sw {}, port {}, packet {}", value.getTargetAddress(), value.getInSwitch(), value.getInPort(), value.getPacketData().length);
+                log.trace("entryAdded for BroadcastPacketOutEventHandler, ip{}, sw {}, port {}",
+                        value.getTargetAddress(), value.getInSwitch(), value.getInPort());
             }
-            BroadcastPacketOutNotification notification = (BroadcastPacketOutNotification) value;
+            BroadcastPacketOutNotification notification = value;
             broadcastArpRequestOutMyEdge(notification.getPacketData(),
                     notification.getInSwitch(),
                     notification.getInPort());
 
             // set timestamp
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(notification.getTargetAddress());
             InetAddress addr = null;
@@ -122,15 +129,13 @@
 
         @Override
         public void entryUpdated(BroadcastPacketOutNotification value) {
-            log.debug("entryUpdated");
-            // TODO: For now, entryUpdated() is processed as entryAdded()
+            log.debug("entryUpdated for BroadcastPacketOutEventHandler");
             entryAdded(value);
         }
 
         @Override
         public void entryRemoved(BroadcastPacketOutNotification value) {
-            log.debug("entryRemoved");
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. BroadcastPacketOutNotification is used only for remote messaging.
         }
     }
 
@@ -138,14 +143,15 @@
             IEventChannelListener<Long, SinglePacketOutNotification> {
         @Override
         public void entryAdded(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryAdded");
+            log.debug("entryAdded for SinglePacketOutEventHandler");
             SinglePacketOutNotification notification =
-                    (SinglePacketOutNotification) packetOutNotification;
+                    packetOutNotification;
             sendArpRequestOutPort(notification.getPacketData(),
                     notification.getOutSwitch(),
                     notification.getOutPort());
 
             // set timestamp
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(notification.getTargetAddress());
             InetAddress addr = null;
@@ -164,15 +170,13 @@
 
         @Override
         public void entryUpdated(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryUpdated");
-            // TODO: For now, entryUpdated() is processed as entryAdded()
+            log.debug("entryUpdated for SinglePacketOutEventHandler");
             entryAdded(packetOutNotification);
         }
 
         @Override
         public void entryRemoved(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryRemoved");
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. SinglePacketOutNotification is used only for remote messaging.
         }
     }
 
@@ -183,6 +187,7 @@
         public void entryAdded(ArpReplyNotification arpReply) {
             log.debug("Received ARP reply notification for ip {}, mac {}",
                     arpReply.getTargetAddress(), arpReply.getTargetMacAddress());
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(arpReply.getTargetAddress());
             InetAddress addr = null;
@@ -200,13 +205,66 @@
 
         @Override
         public void entryUpdated(ArpReplyNotification arpReply) {
-            // TODO: For now, entryUpdated() is processed as entryAdded()
             entryAdded(arpReply);
         }
 
         @Override
         public void entryRemoved(ArpReplyNotification arpReply) {
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. ArpReplyEventHandler is used only for remote messaging.
+        }
+    }
+
+    private class ArpCacheEventHandler implements
+    IEventChannelListener<String, ArpCacheNotification> {
+
+        /**
+         * Startup processing.
+         */
+        private void startUp() {
+            //
+            // TODO: Read all state from the database:
+            // For now, as a shortcut we read it from the datagrid
+            //
+            Collection<ArpCacheNotification> arpCacheEvents =
+                    arpCacheEventChannel.getAllEntries();
+
+            for (ArpCacheNotification arpCacheEvent : arpCacheEvents) {
+                entryAdded(arpCacheEvent);
+            }
+        }
+
+        @Override
+        public void entryAdded(ArpCacheNotification value) {
+
+            try {
+                log.debug("Received entryAdded for ARP cache notification for ip {}, mac {}",
+                    InetAddress.getByAddress(value.getTargetAddress()), value.getTargetMacAddress());
+                arpCache.update(InetAddress.getByAddress(value.getTargetAddress()), MACAddress.valueOf(value.getTargetMacAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
+        }
+
+        @Override
+        public void entryRemoved(ArpCacheNotification value) {
+            log.debug("Received entryRemoved for ARP cache notification for ip {}, mac {}",
+                    value.getTargetAddress(), value.getTargetMacAddress());
+            try {
+                arpCache.remove(InetAddress.getByAddress(value.getTargetAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
+        }
+
+        @Override
+        public void entryUpdated(ArpCacheNotification value) {
+            try {
+                log.debug("Received entryUpdated for ARP cache notification for ip {}, mac {}",
+                        InetAddress.getByAddress(value.getTargetAddress()), value.getTargetMacAddress());
+                arpCache.update(InetAddress.getByAddress(value.getTargetAddress()), MACAddress.valueOf(value.getTargetMacAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
         }
     }
 
@@ -228,7 +286,7 @@
 
         public boolean isExpired() {
             return sent
-                    && ((System.currentTimeMillis() - requestTime) > ARP_REQUEST_TIMEOUT);
+                    && ((System.currentTimeMillis() - requestTime) > arpRequestTimeoutConfig);
         }
 
         public boolean shouldRetry() {
@@ -308,11 +366,27 @@
         this.networkGraphService = context.getServiceImpl(INetworkGraphService.class);
         this.onosDeviceService = context.getServiceImpl(IOnosDeviceService.class);
 
-        // arpCache = new ArpCache();
+        Map<String, String> configOptions = context.getConfigParams(this);
+        Long agingmsec = null;
+        try {
+            agingmsec = Long.parseLong(configOptions.get("agingmsec"));
+        } catch (NumberFormatException e) {
+            log.debug("ArpEntryTimeout related config options were not set. Use default.");
+        }
+
+        arpCache = new ArpCache();
+        if (agingmsec != null) {
+            arpCache.setArpEntryTimeoutConfig(agingmsec);
+        }
+
+        try {
+            arpCleaningTimerPeriodConfig = Long.parseLong(configOptions.get("cleanupmsec"));
+        } catch (NumberFormatException e) {
+            log.debug("ArpCleaningTimerPeriod related config options were not set. Use default.");
+        }
 
         arpRequests = Multimaps.synchronizedSetMultimap(HashMultimap
                 .<InetAddress, ArpRequest>create());
-
     }
 
     @Override
@@ -342,13 +416,27 @@
                 Long.class,
                 ArpReplyNotification.class);
 
+        arpCacheEventChannel = datagrid.addListener(ARP_CACHE_CHANNEL_NAME,
+                arpCacheEventHandler,
+                String.class,
+                ArpCacheNotification.class);
+        arpCacheEventHandler.startUp();
+
         Timer arpTimer = new Timer("arp-processing");
         arpTimer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 doPeriodicArpProcessing();
             }
-        }, 0, ARP_TIMER_PERIOD);
+        }, 0, arpTimerPeriodConfig);
+
+        Timer arpCacheTimer = new Timer("arp-clearning");
+        arpCacheTimer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                doPeriodicArpCleaning();
+            }
+        }, 0, arpCleaningTimerPeriodConfig);
     }
 
     /*
@@ -376,10 +464,11 @@
                             .getKey().getHostAddress());
 
                     // If the ARP request is expired and then delete the device
-                    // TODO check whether this is OK from this thread
                     HostArpRequester requester = (HostArpRequester) request.requester;
                     ARP req = requester.getArpRequest();
+                    networkGraph.acquireReadLock();
                     Device targetDev = networkGraph.getDeviceByMac(MACAddress.valueOf(req.getTargetHardwareAddress()));
+                    networkGraph.releaseReadLock();
                     if (targetDev != null) {
                         onosDeviceService.deleteOnosDeviceByMac(MACAddress.valueOf(req.getTargetHardwareAddress()));
                         if (log.isDebugEnabled()) {
@@ -440,22 +529,24 @@
             return Command.CONTINUE;
         }
 
-        OFPacketIn pi = (OFPacketIn) msg;
-
         Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,
                 IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 
+        return classifyPacket(sw, msg, eth);
+    }
+
+    protected Command classifyPacket(IOFSwitch sw, OFMessage msg, Ethernet eth) {
+        OFPacketIn pi = (OFPacketIn) msg;
+
         if (eth.getEtherType() == Ethernet.TYPE_ARP) {
             ARP arp = (ARP) eth.getPayload();
+            learnArp(arp);
             if (arp.getOpCode() == ARP.OP_REQUEST) {
                 handleArpRequest(sw, pi, arp, eth);
             } else if (arp.getOpCode() == ARP.OP_REPLY) {
                 // For replies we simply send a notification via Hazelcast
                 sendArpReplyNotification(eth, pi);
-
-                // handleArpReply(sw, pi, arp);
             }
-
             // Stop ARP packets here
             return Command.STOP;
         }
@@ -464,6 +555,18 @@
         return Command.CONTINUE;
     }
 
+    private void learnArp(ARP arp) {
+        ArpCacheNotification arpCacheNotification = null;
+
+        arpCacheNotification = new ArpCacheNotification(arp.getSenderProtocolAddress(), arp.getSenderHardwareAddress());
+
+        try {
+            arpCacheEventChannel.addEntry(InetAddress.getByAddress(arp.getSenderProtocolAddress()).toString(), arpCacheNotification);
+        } catch (UnknownHostException e) {
+            log.error("Exception : ", e);
+        }
+    }
+
     private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp,
                                   Ethernet eth) {
         if (log.isTraceEnabled()) {
@@ -495,12 +598,13 @@
             return;
         }
 
-        // MACAddress macAddress = arpCache.lookup(target);
+        //MACAddress mac = arpCache.lookup(target);
 
-        arpRequests.put(target, new ArpRequest(
-                new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
+        arpRequests.put(target, new ArpRequest(new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
 
+        networkGraph.acquireReadLock();
         Device targetDevice = networkGraph.getDeviceByMac(MACAddress.valueOf(arp.getTargetHardwareAddress()));
+        networkGraph.releaseReadLock();
 
         if (targetDevice == null) {
             if (log.isTraceEnabled()) {
@@ -509,12 +613,12 @@
             }
 
             // We don't know the device so broadcast the request out
-            BroadcastPacketOutNotification key =
+            BroadcastPacketOutNotification value =
                     new BroadcastPacketOutNotification(eth.serialize(),
                             ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort());
             log.debug("broadcastPacketOutEventChannel mac {}, ip {}, dpid {}, port {}, paket {}", eth.getSourceMAC().toLong(),
                     ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort(), eth.serialize().length);
-            broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+            broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
         } else {
             // Even if the device exists in our database, we do not reply to
             // the request directly, but check whether the device is still valid
@@ -538,14 +642,12 @@
                             " - broadcasting", macAddress);
                 }
 
-//                              BroadcastPacketOutNotification key =
-//                                              new BroadcastPacketOutNotification(eth.serialize(),
-//                                                              target, sw.getId(), pi.getInPort());
-//                              broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+                BroadcastPacketOutNotification value =
+                        new BroadcastPacketOutNotification(eth.serialize(),
+                                ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort());
+                broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
             } else {
                 for (net.onrc.onos.core.topology.Port portObject : outPorts) {
-                    //long outSwitch = 0;
-                    //short outPort = 0;
 
                     if (portObject.getOutgoingLink() != null || portObject.getIncomingLink() != null) {
                         continue;
@@ -561,59 +663,15 @@
                                         HexString.toHexString(outSwitch), outPort});
                     }
 
-                    SinglePacketOutNotification key =
+                    SinglePacketOutNotification value =
                             new SinglePacketOutNotification(eth.serialize(),
                                     ByteBuffer.wrap(target.getAddress()).getInt(), outSwitch, outPort);
-                    singlePacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+                    singlePacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
                 }
             }
         }
     }
 
-    // Not used because device manager currently updates the database
-    // for ARP replies. May be useful in the future.
-    private void handleArpReply(IOFSwitch sw, OFPacketIn pi, ARP arp) {
-        if (log.isTraceEnabled()) {
-            log.trace("ARP reply recieved: {} => {}, on {}/{}", new Object[]{
-                    inetAddressToString(arp.getSenderProtocolAddress()),
-                    HexString.toHexString(arp.getSenderHardwareAddress()),
-                    HexString.toHexString(sw.getId()), pi.getInPort()});
-        }
-
-        InetAddress senderIpAddress;
-        try {
-            senderIpAddress = InetAddress.getByAddress(arp
-                    .getSenderProtocolAddress());
-        } catch (UnknownHostException e) {
-            log.debug("Invalid address in ARP reply", e);
-            return;
-        }
-
-        MACAddress senderMacAddress = MACAddress.valueOf(arp
-                .getSenderHardwareAddress());
-
-        // See if anyone's waiting for this ARP reply
-        Set<ArpRequest> requests = arpRequests.get(senderIpAddress);
-
-        // Synchronize on the Multimap while using an iterator for one of the
-        // sets
-        List<ArpRequest> requestsToSend = new ArrayList<ArpRequest>(
-                requests.size());
-        synchronized (arpRequests) {
-            Iterator<ArpRequest> it = requests.iterator();
-            while (it.hasNext()) {
-                ArpRequest request = it.next();
-                it.remove();
-                requestsToSend.add(request);
-            }
-        }
-
-        // Don't hold an ARP lock while dispatching requests
-        for (ArpRequest request : requestsToSend) {
-            request.dispatchReply(senderIpAddress, senderMacAddress);
-        }
-    }
-
     private void sendArpRequestForAddress(InetAddress ipAddress) {
         // TODO what should the sender IP address and MAC address be if no
         // IP addresses are configured? Will there ever be a need to send
@@ -638,7 +696,7 @@
                 .setTargetProtocolAddress(ipAddress.getAddress());
 
         MACAddress routerMacAddress = configService.getRouterMacAddress();
-        // TODO hack for now as it's unclear what the MAC address should be
+        // As for now, it's unclear what the MAC address should be
         byte[] senderMacAddress = genericNonZeroMac;
         if (routerMacAddress != null) {
             senderMacAddress = routerMacAddress.toBytes();
@@ -670,44 +728,40 @@
         }
 
         // sendArpRequestToSwitches(ipAddress, eth.serialize());
-        SinglePacketOutNotification key =
+        SinglePacketOutNotification value =
                 new SinglePacketOutNotification(eth.serialize(), ByteBuffer.wrap(ipAddress.getAddress()).getInt(),
                         intf.getDpid(), intf.getPort());
-        singlePacketOutEventChannel.addTransientEntry(MACAddress.valueOf(senderMacAddress).toLong(), key);
+
+        singlePacketOutEventChannel.addTransientEntry(MACAddress.valueOf(senderMacAddress).toLong(), value);
     }
 
+    //Please leave it for now because this code is needed for SDN-IP. It will be removed soon.
     /*
     private void sendArpRequestToSwitches(InetAddress dstAddress, byte[] arpRequest) {
-        sendArpRequestToSwitches(dstAddress, arpRequest, 0,
-                OFPort.OFPP_NONE.getValue());
+        sendArpRequestToSwitches(dstAddress, arpRequest,
+                0, OFPort.OFPP_NONE.getValue());
     }
-    */
 
-    /*
-    private void sendArpRequestToSwitches(InetAddress dstAddress,
-                                          byte[] arpRequest, long inSwitch, short inPort) {
+    private void sendArpRequestToSwitches(InetAddress dstAddress, byte[] arpRequest,
+            long inSwitch, short inPort) {
 
         if (configService.hasLayer3Configuration()) {
             Interface intf = configService.getOutgoingInterface(dstAddress);
-            if (intf == null) {
-                // TODO here it should be broadcast out all non-interface edge
-                // ports.
-                // I think we can assume that if it's not a request for an
-                // external
-                // network, it's an ARP for a host in our own network. So we
-                // want to
-                // send it out all edge ports that don't have an interface
-                // configured
-                // to ensure it reaches all hosts in our network.
+            if (intf != null) {
+                sendArpRequestOutPort(arpRequest, intf.getDpid(), intf.getPort());
+            }
+            else {
+                //TODO here it should be broadcast out all non-interface edge ports.
+                //I think we can assume that if it's not a request for an external
+                //network, it's an ARP for a host in our own network. So we want to
+                //send it out all edge ports that don't have an interface configured
+                //to ensure it reaches all hosts in our network.
                 log.debug("No interface found to send ARP request for {}",
                         dstAddress.getHostAddress());
-            } else {
-                sendArpRequestOutPort(arpRequest, intf.getDpid(),
-                        intf.getPort());
             }
-        } else {
-            // broadcastArpRequestOutEdge(arpRequest, inSwitch, inPort);
-            broadcastArpRequestOutMyEdge(arpRequest, inSwitch, inPort);
+        }
+        else {
+            broadcastArpRequestOutEdge(arpRequest, inSwitch, inPort);
         }
     }
     */
@@ -732,10 +786,10 @@
 
         MACAddress mac = new MACAddress(arp.getSenderHardwareAddress());
 
-        ArpReplyNotification key =
+        ArpReplyNotification value =
                 new ArpReplyNotification(ByteBuffer.wrap(targetAddress.getAddress()).getInt(), mac);
         log.debug("ArpReplyNotification ip {}, mac{}", ByteBuffer.wrap(targetAddress.getAddress()).getInt(), mac);
-        arpReplyEventChannel.addTransientEntry(mac.toLong(), key);
+        arpReplyEventChannel.addTransientEntry(mac.toLong(), value);
     }
 
     private void broadcastArpRequestOutMyEdge(byte[] arpRequest, long inSwitch,
@@ -750,7 +804,10 @@
 
             List<OFAction> actions = new ArrayList<OFAction>();
 
+            networkGraph.acquireReadLock();
             Switch graphSw = networkGraph.getSwitch(sw.getId());
+            networkGraph.releaseReadLock();
+
             Collection<net.onrc.onos.core.topology.Port> ports = graphSw.getPorts();
 
             if (ports == null) {
@@ -891,8 +948,7 @@
 
     @Override
     public MACAddress getMacAddress(InetAddress ipAddress) {
-        // return arpCache.lookup(ipAddress);
-        return null;
+        return arpCache.lookup(ipAddress);
     }
 
     @Override
@@ -932,18 +988,17 @@
             }
         }
 
-        //TODO here, comment outed from long time ago. I will check if we need it later.
-        /*IDeviceObject deviceObject = deviceStorage.getDeviceByIP(
-                InetAddresses.coerceToInteger(address));
-
-        MACAddress mac = MACAddress.valueOf(deviceObject.getMACAddress());
-
-        log.debug("Found {} at {} in network map",
-                address.getHostAddress(), mac);*/
-
         // Don't hold an ARP lock while dispatching requests
         for (ArpRequest request : requestsToSend) {
             request.dispatchReply(address, mac);
         }
     }
+
+    private void doPeriodicArpCleaning() {
+        List<InetAddress> expiredipslist = arpCache.getExpiredArpCacheIps();
+        for (InetAddress expireIp : expiredipslist) {
+            log.debug("call arpCacheEventChannel.removeEntry, ip {}", expireIp);
+            arpCacheEventChannel.removeEntry(expireIp.toString());
+        }
+    }
 }