Distribute FPM connection state amongst the cluster

Change-Id: I7b02a630e33107c124d9445f2fefbf4fd31ffc45
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 9019960..c85cd9c 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -37,8 +37,11 @@
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.incubator.net.routing.Route;
 import org.onosproject.incubator.net.routing.RouteAdminService;
 import org.onosproject.routing.fpm.protocol.FpmHeader;
@@ -48,15 +51,22 @@
 import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
 import org.onosproject.routing.fpm.protocol.RtNetlink;
 import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -78,11 +88,17 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected RouteAdminService routeService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
     private ServerBootstrap serverBootstrap;
     private Channel serverChannel;
     private ChannelGroup allChannels = new DefaultChannelGroup();
 
-    private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
+    private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
 
     private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
 
@@ -97,6 +113,17 @@
                 "distributed", "true");
 
         componentConfigService.registerProperties(getClass());
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(FpmPeer.class)
+                .register(FpmConnectionInfo.class)
+                .build();
+        peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
+                .withName("fpm-connections")
+                .withSerializer(Serializer.using(serializer))
+                .build();
+
         modified(context);
         startServer();
         log.info("Started");
@@ -258,8 +285,10 @@
     }
 
     @Override
-    public Map<FpmPeer, Long> peers() {
-        return ImmutableMap.copyOf(peers);
+    public Map<FpmPeer, Collection<FpmConnectionInfo>> peers() {
+        return ImmutableMap.<FpmPeer, Collection<FpmConnectionInfo>>builder()
+                .putAll(peers.asJavaMap())
+                .build();
     }
 
     private class InternalFpmListener implements FpmListener {
@@ -274,7 +303,16 @@
                 return false;
             }
 
-            peers.put(peer, System.currentTimeMillis());
+            NodeId localNode = clusterService.getLocalNode().id();
+            peers.compute(peer, (p, infos) -> {
+                if (infos == null) {
+                    infos = new HashSet<>();
+                }
+
+                infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+                return infos;
+            });
+
             fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
             return true;
         }
@@ -287,7 +325,18 @@
                 clearRoutes(peer);
             }
 
-            peers.remove(peer);
+            peers.compute(peer, (p, infos) -> {
+                infos.stream()
+                        .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
+                        .findAny()
+                        .ifPresent(i -> infos.remove(i));
+
+                if (infos.isEmpty()) {
+                    return null;
+                }
+
+                return infos;
+            });
         }
     }