Distribute FPM connection state amongst the cluster

Change-Id: I7b02a630e33107c124d9445f2fefbf4fd31ffc45
diff --git a/apps/routing/fpm/BUCK b/apps/routing/fpm/BUCK
index d80d428..a4396f7 100644
--- a/apps/routing/fpm/BUCK
+++ b/apps/routing/fpm/BUCK
@@ -4,6 +4,7 @@
     '//cli:onos-cli',
     '//incubator/api:onos-incubator-api',
     '//apps/routing-api:onos-apps-routing-api',
+    '//core/store/serializers:onos-core-serializers',
 ]
 
 TEST_DEPS = [
diff --git a/apps/routing/fpm/pom.xml b/apps/routing/fpm/pom.xml
index bb68a79..c296927 100644
--- a/apps/routing/fpm/pom.xml
+++ b/apps/routing/fpm/pom.xml
@@ -43,6 +43,12 @@
             <groupId>org.apache.karaf.shell</groupId>
             <artifactId>org.apache.karaf.shell.console</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
new file mode 100644
index 0000000..db0ac36fe
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm;
+
+import org.onosproject.cluster.NodeId;
+
+/**
+ * Information about an FPM connection.
+ */
+public class FpmConnectionInfo {
+
+    private final NodeId connectedTo;
+    private final long connectTime;
+    private final FpmPeer peer;
+
+    /**
+     * Creates a new connection info.
+     *
+     * @param connectedTo ONOS node the FPM peer is connected to
+     * @param peer FPM peer
+     * @param connectTime time the connection was made
+     */
+    public FpmConnectionInfo(NodeId connectedTo, FpmPeer peer, long connectTime) {
+        this.connectedTo = connectedTo;
+        this.peer = peer;
+        this.connectTime = connectTime;
+    }
+
+    /**
+     * Returns the node the FPM peers is connected to.
+     *
+     * @return ONOS node
+     */
+    public NodeId connectedTo() {
+        return connectedTo;
+    }
+
+    /**
+     * Returns the FPM peer.
+     *
+     * @return FPM peer
+     */
+    public FpmPeer peer() {
+        return peer;
+    }
+
+    /**
+     * Returns the time the connection was made.
+     *
+     * @return connect time
+     */
+    public long connectTime() {
+        return connectTime;
+    }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index cd24cc6..8fe35bb 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.routing.fpm;
 
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -28,5 +29,5 @@
      *
      * @return a map of FPM peer to connection time.
      */
-    Map<FpmPeer, Long> peers();
+    Map<FpmPeer, Collection<FpmConnectionInfo>> peers();
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 9019960..c85cd9c 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -37,8 +37,11 @@
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.incubator.net.routing.Route;
 import org.onosproject.incubator.net.routing.RouteAdminService;
 import org.onosproject.routing.fpm.protocol.FpmHeader;
@@ -48,15 +51,22 @@
 import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
 import org.onosproject.routing.fpm.protocol.RtNetlink;
 import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -78,11 +88,17 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected RouteAdminService routeService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
     private ServerBootstrap serverBootstrap;
     private Channel serverChannel;
     private ChannelGroup allChannels = new DefaultChannelGroup();
 
-    private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
+    private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
 
     private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
 
@@ -97,6 +113,17 @@
                 "distributed", "true");
 
         componentConfigService.registerProperties(getClass());
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(FpmPeer.class)
+                .register(FpmConnectionInfo.class)
+                .build();
+        peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
+                .withName("fpm-connections")
+                .withSerializer(Serializer.using(serializer))
+                .build();
+
         modified(context);
         startServer();
         log.info("Started");
@@ -258,8 +285,10 @@
     }
 
     @Override
-    public Map<FpmPeer, Long> peers() {
-        return ImmutableMap.copyOf(peers);
+    public Map<FpmPeer, Collection<FpmConnectionInfo>> peers() {
+        return ImmutableMap.<FpmPeer, Collection<FpmConnectionInfo>>builder()
+                .putAll(peers.asJavaMap())
+                .build();
     }
 
     private class InternalFpmListener implements FpmListener {
@@ -274,7 +303,16 @@
                 return false;
             }
 
-            peers.put(peer, System.currentTimeMillis());
+            NodeId localNode = clusterService.getLocalNode().id();
+            peers.compute(peer, (p, infos) -> {
+                if (infos == null) {
+                    infos = new HashSet<>();
+                }
+
+                infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+                return infos;
+            });
+
             fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
             return true;
         }
@@ -287,7 +325,18 @@
                 clearRoutes(peer);
             }
 
-            peers.remove(peer);
+            peers.compute(peer, (p, infos) -> {
+                infos.stream()
+                        .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
+                        .findAny()
+                        .ifPresent(i -> infos.remove(i));
+
+                if (infos.isEmpty()) {
+                    return null;
+                }
+
+                return infos;
+            });
         }
     }
 
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
index 5a398b8..8516ed2 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
@@ -17,10 +17,15 @@
 package org.onosproject.routing.fpm.cli;
 
 import org.apache.karaf.shell.commands.Command;
+import org.onlab.packet.IpAddress;
 import org.onlab.util.Tools;
 import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.routing.fpm.FpmConnectionInfo;
 import org.onosproject.routing.fpm.FpmInfoService;
 
+import java.util.Comparator;
+
 /**
  * Displays the current FPM connections.
  */
@@ -28,14 +33,19 @@
         description = "Displays the current FPM connections")
 public class FpmConnectionsList extends AbstractShellCommand {
 
-    private static final String FORMAT = "%s:%s connected since %s";
+    private static final String FORMAT = "peer %s:%s connected to %s since %s %s";
 
     @Override
     protected void execute() {
-        FpmInfoService fpmInfo = AbstractShellCommand.get(FpmInfoService.class);
+        FpmInfoService fpmInfo = get(FpmInfoService.class);
+        ClusterService clusterService = get(ClusterService.class);
 
-        fpmInfo.peers().forEach((peer, timestamp) -> {
-            print(FORMAT, peer.address(), peer.port(), Tools.timeAgo(timestamp));
-        });
+        fpmInfo.peers().values().stream()
+                .flatMap(v -> v.stream())
+                .sorted(Comparator.<FpmConnectionInfo, IpAddress>comparing(i -> i.peer().address())
+                        .thenComparing(i -> i.peer().port()))
+                .forEach(info -> print(FORMAT, info.peer().address(), info.peer().port(),
+                        info.connectedTo(), Tools.timeAgo(info.connectTime()),
+                        info.connectedTo().equals(clusterService.getLocalNode().id()) ? "*" : ""));
     }
 }