Change for the way to learn IP and MAC binding.
ProxyArpManager has not done unit tests because the codes will be changed soon by another Jono's fix.

Change-Id: I0cf45aa328871497db522c531dcef999156c2d55
diff --git a/src/main/java/net/onrc/onos/apps/proxyarp/ArpCache.java b/src/main/java/net/onrc/onos/apps/proxyarp/ArpCache.java
index f8c1589..38cd41f 100644
--- a/src/main/java/net/onrc/onos/apps/proxyarp/ArpCache.java
+++ b/src/main/java/net/onrc/onos/apps/proxyarp/ArpCache.java
@@ -2,22 +2,18 @@
 
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.core.datastore.KVArpCache;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/*
- * TODO clean out old ARP entries out of the cache periodically. We currently
- * don't do this which means the cache memory size will never decrease. We
- * already have a periodic thread that can be used to do this in
- * ProxyArpManager.
- */
-
 /**
  * Implements a basic ARP cache which maps IPv4 addresses to MAC addresses.
  * Mappings time out after a short period of time (currently 1 min). We don't
@@ -26,11 +22,11 @@
  */
 class ArpCache {
     private static final Logger log = LoggerFactory.getLogger(ArpCache.class);
-
-    private static final long ARP_ENTRY_TIMEOUT = 60000; // ms (1 min)
+    private static long arpEntryTimeoutConfig = 60000; // ms (1 min)
+    private final KVArpCache kvArpCache;
 
     // Protected by locking on the ArpCache object (this)
-    private final Map<InetAddress, ArpCacheEntry> arpCache;
+    private final ConcurrentMap<InetAddress, ArpCacheEntry> arpCache;
 
     /**
      * Represents a MAC address entry with a timestamp in the ARP cache.
@@ -75,17 +71,30 @@
          * @return true if the entry has timed out.
          */
         public boolean isExpired() {
-            return System.currentTimeMillis() - timeLastSeen > ARP_ENTRY_TIMEOUT;
+            return System.currentTimeMillis() - timeLastSeen > arpEntryTimeoutConfig;
         }
     }
 
     /**
-     * Class constructor.
+     * Class constructors.
      */
-    ArpCache() {
-        arpCache = new HashMap<InetAddress, ArpCacheEntry>();
+    public ArpCache() {
+        arpCache = new ConcurrentHashMap<InetAddress, ArpCacheEntry>();
+        kvArpCache = new KVArpCache();
     }
 
+    public void setArpEntryTimeoutConfig(long arpEntryTimeout) {
+        if (arpEntryTimeout <= 0) {
+            throw new IllegalArgumentException(Long.toString(arpEntryTimeout));
+        }
+        arpEntryTimeoutConfig(arpEntryTimeout);
+   }
+
+    private static void arpEntryTimeoutConfig(long arpEntryTimeout) {
+        ArpCache.arpEntryTimeoutConfig = arpEntryTimeout;
+        log.debug("Set arpEntryTimeoutConfig {}", ArpCache.arpEntryTimeoutConfig);
+   }
+
     /**
      * Get the MAC address that is mapped to an IP address in the ARP cache.
      *
@@ -124,8 +133,27 @@
 
         if (arpEntry != null && arpEntry.getMacAddress().equals(macAddress)) {
             arpEntry.setTimeLastSeen(System.currentTimeMillis());
+            log.debug("The same ArpCache, ip {}, mac {}. Update local cache last seen time only.", ipAddress, macAddress);
         } else {
             arpCache.put(ipAddress, new ArpCacheEntry(macAddress));
+            kvArpCache.forceCreate(ipAddress, macAddress.toBytes());
+            log.debug("Create/Update ip {}, mac {} in ArpCache.", ipAddress, macAddress);
+        }
+    }
+
+    /**
+     * Remove an entry in the ARP cache.
+     *
+     * @param ipAddress  the IP address that will be mapped in the cache
+     */
+    synchronized void remove(InetAddress ipAddress) {
+        ArpCacheEntry entry = arpCache.remove(ipAddress);
+
+        if (entry == null) {
+            log.debug("ArpCache doesn't have the ip key {}.", ipAddress);
+        } else {
+            kvArpCache.forceDelete(ipAddress);
+            log.debug("Remove it in ArpCache and DB, ip {}", ipAddress);
         }
     }
 
@@ -134,7 +162,7 @@
      *
      * @return list of all ARP mappings, formatted as a human-readable string
      */
-    synchronized List<String> getMappings() {
+    List<String> getMappings() {
         List<String> result = new ArrayList<String>(arpCache.size());
 
         for (Map.Entry<InetAddress, ArpCacheEntry> entry : arpCache.entrySet()) {
@@ -146,4 +174,22 @@
 
         return result;
     }
+
+    /**
+     * Retrieve a list of all expired IPs in the ARP cache.
+     *
+     * @return list of all expired IPs
+     */
+    List<InetAddress> getExpiredArpCacheIps() {
+        List<InetAddress> result = new ArrayList<InetAddress>();
+
+        for (Entry<InetAddress, ArpCacheEntry> entry : arpCache.entrySet()) {
+            if (entry.getValue().isExpired()) {
+                log.debug("add to the expired ip list, ip {}", entry.getKey());
+                result.add(entry.getKey());
+            }
+        }
+
+        return result;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/apps/proxyarp/ArpCacheNotification.java b/src/main/java/net/onrc/onos/apps/proxyarp/ArpCacheNotification.java
new file mode 100644
index 0000000..3eef171
--- /dev/null
+++ b/src/main/java/net/onrc/onos/apps/proxyarp/ArpCacheNotification.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.apps.proxyarp;
+
+
+/**
+ * Inter-instance notification that an ARP Cache status has been changed.
+ */
+public class ArpCacheNotification {
+
+    private byte[] targetAddress;
+    private byte[] targetMacAddress;
+
+    private ArpCacheNotification() {
+        targetAddress = null;
+        targetMacAddress = null;
+    }
+
+    /**
+     * Class constructor.
+     *
+     * @param targetAddress    IP address
+     * @param targetMacAddress MAC address
+     */
+    public ArpCacheNotification(byte[] targetAddress,
+                                byte[] targetMacAddress) {
+        this.targetAddress = (byte[]) targetAddress.clone();
+        this.targetMacAddress = (byte[]) targetMacAddress.clone();
+    }
+
+    /**
+     * Returns the IP address.
+     *
+     * @return the IP address
+     */
+    public byte[] getTargetAddress() {
+        return (byte[]) targetAddress.clone();
+    }
+
+    /**
+     * Returns the MAC address.
+     *
+     * @return the MAC address
+     */
+    public byte[] getTargetMacAddress() {
+        return (byte[]) targetMacAddress.clone();
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
index 6b84e45..503bf30 100644
--- a/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/apps/proxyarp/ProxyArpManager.java
@@ -61,21 +61,24 @@
     private static final Logger log = LoggerFactory
             .getLogger(ProxyArpManager.class);
 
-    private static final long ARP_TIMER_PERIOD = 100; // ms
-
-    private static final int ARP_REQUEST_TIMEOUT = 2000; // ms
+    private static long arpTimerPeriodConfig = 100; // ms
+    private static int arpRequestTimeoutConfig = 2000; // ms
+    private long arpCleaningTimerPeriodConfig = 60 * 1000; // ms (1 min)
 
     private IFloodlightProviderService floodlightProvider;
     private IDatagridService datagrid;
     private IEventChannel<Long, ArpReplyNotification> arpReplyEventChannel;
     private IEventChannel<Long, BroadcastPacketOutNotification> broadcastPacketOutEventChannel;
     private IEventChannel<Long, SinglePacketOutNotification> singlePacketOutEventChannel;
+    private IEventChannel<String, ArpCacheNotification> arpCacheEventChannel;
     private static final String ARP_REPLY_CHANNEL_NAME = "onos.arp_reply";
     private static final String BROADCAST_PACKET_OUT_CHANNEL_NAME = "onos.broadcast_packet_out";
     private static final String SINGLE_PACKET_OUT_CHANNEL_NAME = "onos.single_packet_out";
-    private ArpReplyEventHandler arpReplyEventHandler = new ArpReplyEventHandler();
-    private BroadcastPacketOutEventHandler broadcastPacketOutEventHandler = new BroadcastPacketOutEventHandler();
-    private SinglePacketOutEventHandler singlePacketOutEventHandler = new SinglePacketOutEventHandler();
+    private static final String ARP_CACHE_CHANNEL_NAME = "onos.arp_cache";
+    private final ArpReplyEventHandler arpReplyEventHandler = new ArpReplyEventHandler();
+    private final BroadcastPacketOutEventHandler broadcastPacketOutEventHandler = new BroadcastPacketOutEventHandler();
+    private final SinglePacketOutEventHandler singlePacketOutEventHandler = new SinglePacketOutEventHandler();
+    private final ArpCacheEventHandler arpCacheEventHandler = new ArpCacheEventHandler();
 
     private IConfigInfoService configService;
     private IRestApiService restApi;
@@ -90,20 +93,24 @@
 
     private SetMultimap<InetAddress, ArpRequest> arpRequests;
 
+    private ArpCache arpCache;
+
     private class BroadcastPacketOutEventHandler implements
             IEventChannelListener<Long, BroadcastPacketOutNotification> {
 
         @Override
         public void entryAdded(BroadcastPacketOutNotification value) {
             if (log.isTraceEnabled()) {
-                log.trace("entryAdded ip{}, sw {}, port {}, packet {}", value.getTargetAddress(), value.getInSwitch(), value.getInPort(), value.getPacketData().length);
+                log.trace("entryAdded for BroadcastPacketOutEventHandler, ip{}, sw {}, port {}",
+                        value.getTargetAddress(), value.getInSwitch(), value.getInPort());
             }
-            BroadcastPacketOutNotification notification = (BroadcastPacketOutNotification) value;
+            BroadcastPacketOutNotification notification = value;
             broadcastArpRequestOutMyEdge(notification.getPacketData(),
                     notification.getInSwitch(),
                     notification.getInPort());
 
             // set timestamp
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(notification.getTargetAddress());
             InetAddress addr = null;
@@ -122,15 +129,13 @@
 
         @Override
         public void entryUpdated(BroadcastPacketOutNotification value) {
-            log.debug("entryUpdated");
-            // TODO: For now, entryUpdated() is processed as entryAdded()
+            log.debug("entryUpdated for BroadcastPacketOutEventHandler");
             entryAdded(value);
         }
 
         @Override
         public void entryRemoved(BroadcastPacketOutNotification value) {
-            log.debug("entryRemoved");
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. BroadcastPacketOutNotification is used only for remote messaging.
         }
     }
 
@@ -138,14 +143,15 @@
             IEventChannelListener<Long, SinglePacketOutNotification> {
         @Override
         public void entryAdded(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryAdded");
+            log.debug("entryAdded for SinglePacketOutEventHandler");
             SinglePacketOutNotification notification =
-                    (SinglePacketOutNotification) packetOutNotification;
+                    packetOutNotification;
             sendArpRequestOutPort(notification.getPacketData(),
                     notification.getOutSwitch(),
                     notification.getOutPort());
 
             // set timestamp
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(notification.getTargetAddress());
             InetAddress addr = null;
@@ -164,15 +170,13 @@
 
         @Override
         public void entryUpdated(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryUpdated");
-            // TODO: For now, entryUpdated() is processed as entryAdded()
+            log.debug("entryUpdated for SinglePacketOutEventHandler");
             entryAdded(packetOutNotification);
         }
 
         @Override
         public void entryRemoved(SinglePacketOutNotification packetOutNotification) {
-            log.debug("entryRemoved");
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. SinglePacketOutNotification is used only for remote messaging.
         }
     }
 
@@ -183,6 +187,7 @@
         public void entryAdded(ArpReplyNotification arpReply) {
             log.debug("Received ARP reply notification for ip {}, mac {}",
                     arpReply.getTargetAddress(), arpReply.getTargetMacAddress());
+            //This 4 means ipv4 addr size. Need to change it in the future.
             ByteBuffer buffer = ByteBuffer.allocate(4);
             buffer.putInt(arpReply.getTargetAddress());
             InetAddress addr = null;
@@ -200,13 +205,66 @@
 
         @Override
         public void entryUpdated(ArpReplyNotification arpReply) {
-            // TODO: For now, entryUpdated() is processed as entryAdded()
             entryAdded(arpReply);
         }
 
         @Override
         public void entryRemoved(ArpReplyNotification arpReply) {
-            // TODO: Not implemented. Revisit when this module is refactored
+            //Not implemented. ArpReplyEventHandler is used only for remote messaging.
+        }
+    }
+
+    private class ArpCacheEventHandler implements
+    IEventChannelListener<String, ArpCacheNotification> {
+
+        /**
+         * Startup processing.
+         */
+        private void startUp() {
+            //
+            // TODO: Read all state from the database:
+            // For now, as a shortcut we read it from the datagrid
+            //
+            Collection<ArpCacheNotification> arpCacheEvents =
+                    arpCacheEventChannel.getAllEntries();
+
+            for (ArpCacheNotification arpCacheEvent : arpCacheEvents) {
+                entryAdded(arpCacheEvent);
+            }
+        }
+
+        @Override
+        public void entryAdded(ArpCacheNotification value) {
+
+            try {
+                log.debug("Received entryAdded for ARP cache notification for ip {}, mac {}",
+                    InetAddress.getByAddress(value.getTargetAddress()), value.getTargetMacAddress());
+                arpCache.update(InetAddress.getByAddress(value.getTargetAddress()), MACAddress.valueOf(value.getTargetMacAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
+        }
+
+        @Override
+        public void entryRemoved(ArpCacheNotification value) {
+            log.debug("Received entryRemoved for ARP cache notification for ip {}, mac {}",
+                    value.getTargetAddress(), value.getTargetMacAddress());
+            try {
+                arpCache.remove(InetAddress.getByAddress(value.getTargetAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
+        }
+
+        @Override
+        public void entryUpdated(ArpCacheNotification value) {
+            try {
+                log.debug("Received entryUpdated for ARP cache notification for ip {}, mac {}",
+                        InetAddress.getByAddress(value.getTargetAddress()), value.getTargetMacAddress());
+                arpCache.update(InetAddress.getByAddress(value.getTargetAddress()), MACAddress.valueOf(value.getTargetMacAddress()));
+            } catch (UnknownHostException e) {
+                log.error("Exception : ", e);
+            }
         }
     }
 
@@ -228,7 +286,7 @@
 
         public boolean isExpired() {
             return sent
-                    && ((System.currentTimeMillis() - requestTime) > ARP_REQUEST_TIMEOUT);
+                    && ((System.currentTimeMillis() - requestTime) > arpRequestTimeoutConfig);
         }
 
         public boolean shouldRetry() {
@@ -308,11 +366,27 @@
         this.networkGraphService = context.getServiceImpl(INetworkGraphService.class);
         this.onosDeviceService = context.getServiceImpl(IOnosDeviceService.class);
 
-        // arpCache = new ArpCache();
+        Map<String, String> configOptions = context.getConfigParams(this);
+        Long agingmsec = null;
+        try {
+            agingmsec = Long.parseLong(configOptions.get("agingmsec"));
+        } catch (NumberFormatException e) {
+            log.debug("ArpEntryTimeout related config options were not set. Use default.");
+        }
+
+        arpCache = new ArpCache();
+        if (agingmsec != null) {
+            arpCache.setArpEntryTimeoutConfig(agingmsec);
+        }
+
+        try {
+            arpCleaningTimerPeriodConfig = Long.parseLong(configOptions.get("cleanupmsec"));
+        } catch (NumberFormatException e) {
+            log.debug("ArpCleaningTimerPeriod related config options were not set. Use default.");
+        }
 
         arpRequests = Multimaps.synchronizedSetMultimap(HashMultimap
                 .<InetAddress, ArpRequest>create());
-
     }
 
     @Override
@@ -342,13 +416,27 @@
                 Long.class,
                 ArpReplyNotification.class);
 
+        arpCacheEventChannel = datagrid.addListener(ARP_CACHE_CHANNEL_NAME,
+                arpCacheEventHandler,
+                String.class,
+                ArpCacheNotification.class);
+        arpCacheEventHandler.startUp();
+
         Timer arpTimer = new Timer("arp-processing");
         arpTimer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 doPeriodicArpProcessing();
             }
-        }, 0, ARP_TIMER_PERIOD);
+        }, 0, arpTimerPeriodConfig);
+
+        Timer arpCacheTimer = new Timer("arp-clearning");
+        arpCacheTimer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                doPeriodicArpCleaning();
+            }
+        }, 0, arpCleaningTimerPeriodConfig);
     }
 
     /*
@@ -376,10 +464,11 @@
                             .getKey().getHostAddress());
 
                     // If the ARP request is expired and then delete the device
-                    // TODO check whether this is OK from this thread
                     HostArpRequester requester = (HostArpRequester) request.requester;
                     ARP req = requester.getArpRequest();
+                    networkGraph.acquireReadLock();
                     Device targetDev = networkGraph.getDeviceByMac(MACAddress.valueOf(req.getTargetHardwareAddress()));
+                    networkGraph.releaseReadLock();
                     if (targetDev != null) {
                         onosDeviceService.deleteOnosDeviceByMac(MACAddress.valueOf(req.getTargetHardwareAddress()));
                         if (log.isDebugEnabled()) {
@@ -440,22 +529,24 @@
             return Command.CONTINUE;
         }
 
-        OFPacketIn pi = (OFPacketIn) msg;
-
         Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,
                 IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 
+        return classifyPacket(sw, msg, eth);
+    }
+
+    protected Command classifyPacket(IOFSwitch sw, OFMessage msg, Ethernet eth) {
+        OFPacketIn pi = (OFPacketIn) msg;
+
         if (eth.getEtherType() == Ethernet.TYPE_ARP) {
             ARP arp = (ARP) eth.getPayload();
+            learnArp(arp);
             if (arp.getOpCode() == ARP.OP_REQUEST) {
                 handleArpRequest(sw, pi, arp, eth);
             } else if (arp.getOpCode() == ARP.OP_REPLY) {
                 // For replies we simply send a notification via Hazelcast
                 sendArpReplyNotification(eth, pi);
-
-                // handleArpReply(sw, pi, arp);
             }
-
             // Stop ARP packets here
             return Command.STOP;
         }
@@ -464,6 +555,18 @@
         return Command.CONTINUE;
     }
 
+    private void learnArp(ARP arp) {
+        ArpCacheNotification arpCacheNotification = null;
+
+        arpCacheNotification = new ArpCacheNotification(arp.getSenderProtocolAddress(), arp.getSenderHardwareAddress());
+
+        try {
+            arpCacheEventChannel.addEntry(InetAddress.getByAddress(arp.getSenderProtocolAddress()).toString(), arpCacheNotification);
+        } catch (UnknownHostException e) {
+            log.error("Exception : ", e);
+        }
+    }
+
     private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp,
                                   Ethernet eth) {
         if (log.isTraceEnabled()) {
@@ -495,12 +598,13 @@
             return;
         }
 
-        // MACAddress macAddress = arpCache.lookup(target);
+        //MACAddress mac = arpCache.lookup(target);
 
-        arpRequests.put(target, new ArpRequest(
-                new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
+        arpRequests.put(target, new ArpRequest(new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
 
+        networkGraph.acquireReadLock();
         Device targetDevice = networkGraph.getDeviceByMac(MACAddress.valueOf(arp.getTargetHardwareAddress()));
+        networkGraph.releaseReadLock();
 
         if (targetDevice == null) {
             if (log.isTraceEnabled()) {
@@ -509,12 +613,12 @@
             }
 
             // We don't know the device so broadcast the request out
-            BroadcastPacketOutNotification key =
+            BroadcastPacketOutNotification value =
                     new BroadcastPacketOutNotification(eth.serialize(),
                             ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort());
             log.debug("broadcastPacketOutEventChannel mac {}, ip {}, dpid {}, port {}, paket {}", eth.getSourceMAC().toLong(),
                     ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort(), eth.serialize().length);
-            broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+            broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
         } else {
             // Even if the device exists in our database, we do not reply to
             // the request directly, but check whether the device is still valid
@@ -538,14 +642,12 @@
                             " - broadcasting", macAddress);
                 }
 
-//                              BroadcastPacketOutNotification key =
-//                                              new BroadcastPacketOutNotification(eth.serialize(),
-//                                                              target, sw.getId(), pi.getInPort());
-//                              broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+                BroadcastPacketOutNotification value =
+                        new BroadcastPacketOutNotification(eth.serialize(),
+                                ByteBuffer.wrap(arp.getTargetProtocolAddress()).getInt(), sw.getId(), pi.getInPort());
+                broadcastPacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
             } else {
                 for (net.onrc.onos.core.topology.Port portObject : outPorts) {
-                    //long outSwitch = 0;
-                    //short outPort = 0;
 
                     if (portObject.getOutgoingLink() != null || portObject.getIncomingLink() != null) {
                         continue;
@@ -561,59 +663,15 @@
                                         HexString.toHexString(outSwitch), outPort});
                     }
 
-                    SinglePacketOutNotification key =
+                    SinglePacketOutNotification value =
                             new SinglePacketOutNotification(eth.serialize(),
                                     ByteBuffer.wrap(target.getAddress()).getInt(), outSwitch, outPort);
-                    singlePacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), key);
+                    singlePacketOutEventChannel.addTransientEntry(eth.getDestinationMAC().toLong(), value);
                 }
             }
         }
     }
 
-    // Not used because device manager currently updates the database
-    // for ARP replies. May be useful in the future.
-    private void handleArpReply(IOFSwitch sw, OFPacketIn pi, ARP arp) {
-        if (log.isTraceEnabled()) {
-            log.trace("ARP reply recieved: {} => {}, on {}/{}", new Object[]{
-                    inetAddressToString(arp.getSenderProtocolAddress()),
-                    HexString.toHexString(arp.getSenderHardwareAddress()),
-                    HexString.toHexString(sw.getId()), pi.getInPort()});
-        }
-
-        InetAddress senderIpAddress;
-        try {
-            senderIpAddress = InetAddress.getByAddress(arp
-                    .getSenderProtocolAddress());
-        } catch (UnknownHostException e) {
-            log.debug("Invalid address in ARP reply", e);
-            return;
-        }
-
-        MACAddress senderMacAddress = MACAddress.valueOf(arp
-                .getSenderHardwareAddress());
-
-        // See if anyone's waiting for this ARP reply
-        Set<ArpRequest> requests = arpRequests.get(senderIpAddress);
-
-        // Synchronize on the Multimap while using an iterator for one of the
-        // sets
-        List<ArpRequest> requestsToSend = new ArrayList<ArpRequest>(
-                requests.size());
-        synchronized (arpRequests) {
-            Iterator<ArpRequest> it = requests.iterator();
-            while (it.hasNext()) {
-                ArpRequest request = it.next();
-                it.remove();
-                requestsToSend.add(request);
-            }
-        }
-
-        // Don't hold an ARP lock while dispatching requests
-        for (ArpRequest request : requestsToSend) {
-            request.dispatchReply(senderIpAddress, senderMacAddress);
-        }
-    }
-
     private void sendArpRequestForAddress(InetAddress ipAddress) {
         // TODO what should the sender IP address and MAC address be if no
         // IP addresses are configured? Will there ever be a need to send
@@ -638,7 +696,7 @@
                 .setTargetProtocolAddress(ipAddress.getAddress());
 
         MACAddress routerMacAddress = configService.getRouterMacAddress();
-        // TODO hack for now as it's unclear what the MAC address should be
+        // As for now, it's unclear what the MAC address should be
         byte[] senderMacAddress = genericNonZeroMac;
         if (routerMacAddress != null) {
             senderMacAddress = routerMacAddress.toBytes();
@@ -670,44 +728,40 @@
         }
 
         // sendArpRequestToSwitches(ipAddress, eth.serialize());
-        SinglePacketOutNotification key =
+        SinglePacketOutNotification value =
                 new SinglePacketOutNotification(eth.serialize(), ByteBuffer.wrap(ipAddress.getAddress()).getInt(),
                         intf.getDpid(), intf.getPort());
-        singlePacketOutEventChannel.addTransientEntry(MACAddress.valueOf(senderMacAddress).toLong(), key);
+
+        singlePacketOutEventChannel.addTransientEntry(MACAddress.valueOf(senderMacAddress).toLong(), value);
     }
 
+    //Please leave it for now because this code is needed for SDN-IP. It will be removed soon.
     /*
     private void sendArpRequestToSwitches(InetAddress dstAddress, byte[] arpRequest) {
-        sendArpRequestToSwitches(dstAddress, arpRequest, 0,
-                OFPort.OFPP_NONE.getValue());
+        sendArpRequestToSwitches(dstAddress, arpRequest,
+                0, OFPort.OFPP_NONE.getValue());
     }
-    */
 
-    /*
-    private void sendArpRequestToSwitches(InetAddress dstAddress,
-                                          byte[] arpRequest, long inSwitch, short inPort) {
+    private void sendArpRequestToSwitches(InetAddress dstAddress, byte[] arpRequest,
+            long inSwitch, short inPort) {
 
         if (configService.hasLayer3Configuration()) {
             Interface intf = configService.getOutgoingInterface(dstAddress);
-            if (intf == null) {
-                // TODO here it should be broadcast out all non-interface edge
-                // ports.
-                // I think we can assume that if it's not a request for an
-                // external
-                // network, it's an ARP for a host in our own network. So we
-                // want to
-                // send it out all edge ports that don't have an interface
-                // configured
-                // to ensure it reaches all hosts in our network.
+            if (intf != null) {
+                sendArpRequestOutPort(arpRequest, intf.getDpid(), intf.getPort());
+            }
+            else {
+                //TODO here it should be broadcast out all non-interface edge ports.
+                //I think we can assume that if it's not a request for an external
+                //network, it's an ARP for a host in our own network. So we want to
+                //send it out all edge ports that don't have an interface configured
+                //to ensure it reaches all hosts in our network.
                 log.debug("No interface found to send ARP request for {}",
                         dstAddress.getHostAddress());
-            } else {
-                sendArpRequestOutPort(arpRequest, intf.getDpid(),
-                        intf.getPort());
             }
-        } else {
-            // broadcastArpRequestOutEdge(arpRequest, inSwitch, inPort);
-            broadcastArpRequestOutMyEdge(arpRequest, inSwitch, inPort);
+        }
+        else {
+            broadcastArpRequestOutEdge(arpRequest, inSwitch, inPort);
         }
     }
     */
@@ -732,10 +786,10 @@
 
         MACAddress mac = new MACAddress(arp.getSenderHardwareAddress());
 
-        ArpReplyNotification key =
+        ArpReplyNotification value =
                 new ArpReplyNotification(ByteBuffer.wrap(targetAddress.getAddress()).getInt(), mac);
         log.debug("ArpReplyNotification ip {}, mac{}", ByteBuffer.wrap(targetAddress.getAddress()).getInt(), mac);
-        arpReplyEventChannel.addTransientEntry(mac.toLong(), key);
+        arpReplyEventChannel.addTransientEntry(mac.toLong(), value);
     }
 
     private void broadcastArpRequestOutMyEdge(byte[] arpRequest, long inSwitch,
@@ -750,7 +804,10 @@
 
             List<OFAction> actions = new ArrayList<OFAction>();
 
+            networkGraph.acquireReadLock();
             Switch graphSw = networkGraph.getSwitch(sw.getId());
+            networkGraph.releaseReadLock();
+
             Collection<net.onrc.onos.core.topology.Port> ports = graphSw.getPorts();
 
             if (ports == null) {
@@ -891,8 +948,7 @@
 
     @Override
     public MACAddress getMacAddress(InetAddress ipAddress) {
-        // return arpCache.lookup(ipAddress);
-        return null;
+        return arpCache.lookup(ipAddress);
     }
 
     @Override
@@ -932,18 +988,17 @@
             }
         }
 
-        //TODO here, comment outed from long time ago. I will check if we need it later.
-        /*IDeviceObject deviceObject = deviceStorage.getDeviceByIP(
-                InetAddresses.coerceToInteger(address));
-
-        MACAddress mac = MACAddress.valueOf(deviceObject.getMACAddress());
-
-        log.debug("Found {} at {} in network map",
-                address.getHostAddress(), mac);*/
-
         // Don't hold an ARP lock while dispatching requests
         for (ArpRequest request : requestsToSend) {
             request.dispatchReply(address, mac);
         }
     }
+
+    private void doPeriodicArpCleaning() {
+        List<InetAddress> expiredipslist = arpCache.getExpiredArpCacheIps();
+        for (InetAddress expireIp : expiredipslist) {
+            log.debug("call arpCacheEventChannel.removeEntry, ip {}", expireIp);
+            arpCacheEventChannel.removeEntry(expireIp.toString());
+        }
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/datastore/KVArpCache.java b/src/main/java/net/onrc/onos/core/datastore/KVArpCache.java
new file mode 100644
index 0000000..01e0587
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/KVArpCache.java
@@ -0,0 +1,54 @@
+package net.onrc.onos.core.datastore;
+
+import java.net.InetAddress;
+
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KVArpCache {
+
+    private static final Logger log = LoggerFactory.getLogger(KVArpCache.class);
+
+    private static final String GLOBAL_ARPCACHE_TABLE_NAME = "arp_cache";
+    private final IKVTable table;
+
+    public long getVersionNonexistant() {
+        return table.getVersionNonexistant();
+    }
+
+    public KVArpCache() {
+        table = DataStoreClient.getClient().getTable(GLOBAL_ARPCACHE_TABLE_NAME);
+        log.debug("create table {}", table.getTableId());
+    }
+
+    public void dropArpCache() {
+        DataStoreClient.getClient().dropTable(table);
+        log.debug("drop table {}", table.getTableId());
+    }
+
+    public long create(InetAddress ip, byte[] mac) throws ObjectExistsException {
+        return table.create(ip.getAddress(), mac);
+    }
+
+    public long forceCreate(InetAddress ip, byte[] mac) {
+        return table.forceCreate(ip.getAddress(), mac);
+    }
+
+    public IKVEntry read(InetAddress ip) throws ObjectDoesntExistException {
+        return table.read(ip.getAddress());
+    }
+
+    public long update(InetAddress ip, byte[] mac) throws ObjectDoesntExistException {
+        return table.update(ip.getAddress(), mac);
+    }
+
+    public long forceDelete(InetAddress ip) {
+        return table.forceDelete(ip.getAddress());
+    }
+
+    public Iterable<IKVEntry> getAllEntries() {
+        return table.getAllEntries();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index e229d30..d1794d1 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -7,6 +7,7 @@
 import java.util.HashSet;
 
 import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.apps.proxyarp.ArpCacheNotification;
 import net.onrc.onos.apps.proxyarp.ArpReplyNotification;
 import net.onrc.onos.core.devicemanager.OnosDevice;
 import net.onrc.onos.core.intent.ConstrainedShortestPathIntent;
@@ -219,6 +220,7 @@
         kryo.register(BroadcastPacketOutNotification.class);
         kryo.register(SinglePacketOutNotification.class);
         kryo.register(ArpReplyNotification.class);
+        kryo.register(ArpCacheNotification.class);
 
         return kryo;
     }
diff --git a/src/test/java/net/onrc/onos/apps/proxyarp/ArpCacheTest.java b/src/test/java/net/onrc/onos/apps/proxyarp/ArpCacheTest.java
new file mode 100644
index 0000000..d12c663
--- /dev/null
+++ b/src/test/java/net/onrc/onos/apps/proxyarp/ArpCacheTest.java
@@ -0,0 +1,90 @@
+package net.onrc.onos.apps.proxyarp;
+
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.util.MACAddress;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArpCacheTest {
+    ArpCache arpCache;
+    InetAddress ip, ip2;
+    MACAddress mac, mac2;
+    Map<InetAddress, MACAddress> map;
+
+    @Before
+    public void setUp() throws Exception {
+        arpCache = new ArpCache();
+        arpCache.setArpEntryTimeoutConfig(1000);
+        mac = MACAddress.valueOf("00:01:02:03:04:05");
+        ip = InetAddress.getByAddress(new byte[]{10, 0, 0, 1});
+        mac2 = MACAddress.valueOf("00:01:02:03:04:06");
+        ip2 = InetAddress.getByAddress(new byte[]{10, 0, 0, 2});
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        arpCache = null;
+    }
+
+    @Test
+    public void testArpCache() {
+        assertNotNull(new ArpCache());
+    }
+
+    @Test
+    public void testLookup() {
+        testUpdate();
+        assertEquals(mac2, arpCache.lookup(ip2));
+    }
+
+    @Test
+    public void testUpdate() {
+        map = new HashMap<InetAddress, MACAddress>();
+        arpCache.update(ip, mac);
+        map.put(ip, mac);
+        arpCache.update(ip2, mac2);
+        map.put(ip2, mac2);
+        assertEquals(mac, arpCache.lookup(ip));
+    }
+
+    @Test
+    public void testRemove() {
+        testUpdate();
+        arpCache.remove(ip);
+        assertNull(arpCache.lookup(ip));
+    }
+
+    @Test
+    public void testGetMappings() {
+        testUpdate();
+        for(String macStr :arpCache.getMappings()) {
+            assertNotNull(macStr);
+        }
+    }
+
+    @Test
+    public void testGetExpiredArpCacheIps() {
+        testUpdate();
+        
+        try {
+            Thread.sleep(3000);
+        } catch (InterruptedException e) {
+            fail();
+        }
+        
+        assertNotNull(arpCache.getExpiredArpCacheIps());
+        assertEquals(map.size(), arpCache.getExpiredArpCacheIps().size());
+        for(InetAddress ip : arpCache.getExpiredArpCacheIps()) {
+           assertTrue(map.containsKey(ip));
+        }
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/core/datastore/KVArpCacheTest.java b/src/test/java/net/onrc/onos/core/datastore/KVArpCacheTest.java
new file mode 100644
index 0000000..9db55f2
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/datastore/KVArpCacheTest.java
@@ -0,0 +1,130 @@
+package net.onrc.onos.core.datastore;
+
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVArpCacheTest {
+    
+    KVArpCache arpCache = null;
+    InetAddress ip = null;
+    MACAddress mac = null;
+
+    @Before
+    public void setUp() throws Exception {
+        arpCache = new KVArpCache();  
+        try {  
+            mac = MACAddress.valueOf("00:01:02:03:04:05");
+            ip = InetAddress.getLocalHost();  
+        } catch (UnknownHostException e) {
+            fail();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        arpCache.dropArpCache();
+        arpCache = null;
+    }
+
+    @Test
+    public void testKVArpCache() {
+        assertNotNull(arpCache);
+    }
+
+    @Test
+    public void testCreate() {
+        byte[] byteMac = mac.toBytes();
+        try {
+            long verison = arpCache.create(ip, byteMac);
+            assertNotEquals(arpCache.getVersionNonexistant(), verison);
+        } catch (ObjectExistsException e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testForceCreate() {
+        byte[] byteMac = mac.toBytes();
+        long version = arpCache.forceCreate(ip, byteMac);
+        assertNotEquals(arpCache.getVersionNonexistant(), version);
+    }
+
+    @Test
+    public void testRead() {
+        byte[] byteMac = mac.toBytes();
+        try {
+            arpCache.create(ip, byteMac);
+            byte[] entry = arpCache.read(ip).getValue();
+            assertEquals(MACAddress.valueOf(byteMac), MACAddress.valueOf(entry));
+        } catch (ObjectDoesntExistException | ObjectExistsException e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testUpdateInetAddressByteArray() {
+        byte[] byteMac = mac.toBytes();
+        byte[] byteMac2 = MACAddress.valueOf("00:01:02:03:04:06").toBytes();
+        try {
+            arpCache.create(ip, byteMac);
+            arpCache.update(ip, byteMac2);
+            byte[] entry = arpCache.read(ip).getValue();
+            assertEquals(MACAddress.valueOf(byteMac2), MACAddress.valueOf(entry));
+        } catch (ObjectDoesntExistException | ObjectExistsException e) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testForceDelete() {
+        byte[] byteMac = mac.toBytes();
+        long ver = arpCache.forceCreate(ip, byteMac);
+        long deletedVer= arpCache.forceDelete(ip);
+        assertEquals(ver, deletedVer);
+    }
+
+    @Test
+    public void testGetAllEntries() {
+        byte[] ipAddr = new byte[]{10, 0, 0, 1};
+        InetAddress ip2 = null;
+        try {
+            ip2 = InetAddress.getByAddress(ipAddr);
+        } catch (UnknownHostException e) {
+            fail();
+        }
+        
+        byte[] byteMac = mac.toBytes();
+        byte[] byteMac2 = MACAddress.valueOf("00:01:02:03:04:06").toBytes();
+        Map<InetAddress, byte[]> map = new HashMap<InetAddress, byte[]>();
+        try {
+            arpCache.create(ip, byteMac);
+            map.put(ip, byteMac);
+            arpCache.create(ip2, byteMac2);
+            map.put(ip2, byteMac2);
+            for(IKVEntry entry : arpCache.getAllEntries()) {
+                try {
+                    assertTrue(map.containsKey(InetAddress.getByAddress(entry.getKey())));
+                    MACAddress mac1 = MACAddress.valueOf(map.get(InetAddress.getByAddress(entry.getKey())));
+                    MACAddress mac2 = MACAddress.valueOf(entry.getValue());
+                    assertEquals(mac1, mac2);
+                } catch (UnknownHostException e) {
+                    fail();
+                }
+            }
+        } catch (ObjectExistsException e) {
+            fail();
+        }
+    }
+
+}