[SONA] syncs ICMP session information among cluster nodes when SONA is working with reactive ICMP proxy mode.

Change-Id: I5addb2acf6929fc962b36045397fe831282e2b1c
diff --git a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
index ab64063..d833c41 100644
--- a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
+++ b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
@@ -16,7 +16,6 @@
 package org.onosproject.openstacknetworking.impl;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +26,7 @@
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DeviceId;
@@ -45,6 +45,10 @@
 import org.onosproject.openstacknetworking.api.OpenstackRouterService;
 import org.onosproject.openstacknode.api.OpenstackNode;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.openstack4j.model.network.ExternalGateway;
 import org.openstack4j.model.network.IP;
 import org.openstack4j.model.network.Port;
@@ -55,13 +59,13 @@
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_EXTERNAL_ROUTER_MAC;
@@ -84,6 +88,7 @@
     protected final Logger log = getLogger(getClass());
 
     private static final String ERR_REQ = "Failed to handle ICMP request: ";
+    private static final String ERR_DUPLICATE = " already exists";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -92,6 +97,9 @@
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackNodeService osNodeService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -106,7 +114,13 @@
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
     private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
-    private final Map<String, InstancePort> icmpInfoMap = Maps.newHashMap();
+    private ConsistentMap<String, InstancePort> icmpInfoMap;
+
+    private static final KryoNamespace SERIALIZER_ICMP_MAP = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(InstancePort.class)
+            .register(HostBasedInstancePort.class)
+            .build();
 
     private ApplicationId appId;
 
@@ -115,6 +129,12 @@
         appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
         packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
 
+        icmpInfoMap = storageService.<String, InstancePort>consistentMapBuilder()
+                .withSerializer(Serializer.using(SERIALIZER_ICMP_MAP))
+                .withName("openstack-icmpmap")
+                .withApplicationId(appId)
+                .build();
+
         log.info("Started");
     }
 
@@ -188,7 +208,10 @@
             String icmpInfoKey = String.valueOf(getIcmpId(icmp))
                     .concat(String.valueOf(externalIp.getIp4Address().toInt()))
                     .concat(String.valueOf(ipPacket.getDestinationAddress()));
-            icmpInfoMap.putIfAbsent(icmpInfoKey, instPort);
+            icmpInfoMap.compute(icmpInfoKey, (id, existing) -> {
+                checkArgument(existing == null, ERR_DUPLICATE);
+                return instPort;
+            });
         }
     }
 
@@ -197,8 +220,12 @@
                 .concat(String.valueOf(ipPacket.getDestinationAddress()))
                 .concat(String.valueOf(ipPacket.getSourceAddress()));
 
-        processReplyFromExternal(ipPacket, icmpInfoMap.get(icmpInfoKey));
-        icmpInfoMap.remove(icmpInfoKey);
+        if (icmpInfoMap.get(icmpInfoKey) != null) {
+            processReplyFromExternal(ipPacket, icmpInfoMap.get(icmpInfoKey).value());
+            icmpInfoMap.remove(icmpInfoKey);
+        } else {
+            log.warn("No ICMP Info for ICMP packet");
+        }
     }
 
     private Subnet getSourceSubnet(InstancePort instance, IpAddress srcIp) {