Distribute FPM connection state amongst the cluster
Change-Id: I7b02a630e33107c124d9445f2fefbf4fd31ffc45
diff --git a/apps/routing/fpm/BUCK b/apps/routing/fpm/BUCK
index d80d428..a4396f7 100644
--- a/apps/routing/fpm/BUCK
+++ b/apps/routing/fpm/BUCK
@@ -4,6 +4,7 @@
'//cli:onos-cli',
'//incubator/api:onos-incubator-api',
'//apps/routing-api:onos-apps-routing-api',
+ '//core/store/serializers:onos-core-serializers',
]
TEST_DEPS = [
diff --git a/apps/routing/fpm/pom.xml b/apps/routing/fpm/pom.xml
index bb68a79..c296927 100644
--- a/apps/routing/fpm/pom.xml
+++ b/apps/routing/fpm/pom.xml
@@ -43,6 +43,12 @@
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
new file mode 100644
index 0000000..db0ac36fe
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmConnectionInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm;
+
+import org.onosproject.cluster.NodeId;
+
+/**
+ * Information about an FPM connection.
+ */
+public class FpmConnectionInfo {
+
+ private final NodeId connectedTo;
+ private final long connectTime;
+ private final FpmPeer peer;
+
+ /**
+ * Creates a new connection info.
+ *
+ * @param connectedTo ONOS node the FPM peer is connected to
+ * @param peer FPM peer
+ * @param connectTime time the connection was made
+ */
+ public FpmConnectionInfo(NodeId connectedTo, FpmPeer peer, long connectTime) {
+ this.connectedTo = connectedTo;
+ this.peer = peer;
+ this.connectTime = connectTime;
+ }
+
+ /**
+ * Returns the node the FPM peers is connected to.
+ *
+ * @return ONOS node
+ */
+ public NodeId connectedTo() {
+ return connectedTo;
+ }
+
+ /**
+ * Returns the FPM peer.
+ *
+ * @return FPM peer
+ */
+ public FpmPeer peer() {
+ return peer;
+ }
+
+ /**
+ * Returns the time the connection was made.
+ *
+ * @return connect time
+ */
+ public long connectTime() {
+ return connectTime;
+ }
+}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index cd24cc6..8fe35bb 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -16,6 +16,7 @@
package org.onosproject.routing.fpm;
+import java.util.Collection;
import java.util.Map;
/**
@@ -28,5 +29,5 @@
*
* @return a map of FPM peer to connection time.
*/
- Map<FpmPeer, Long> peers();
+ Map<FpmPeer, Collection<FpmConnectionInfo>> peers();
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 9019960..c85cd9c 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -37,8 +37,11 @@
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.routing.Route;
import org.onosproject.incubator.net.routing.RouteAdminService;
import org.onosproject.routing.fpm.protocol.FpmHeader;
@@ -48,15 +51,22 @@
import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
import org.onosproject.routing.fpm.protocol.RtNetlink;
import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -78,11 +88,17 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RouteAdminService routeService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private ChannelGroup allChannels = new DefaultChannelGroup();
- private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
+ private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@@ -97,6 +113,17 @@
"distributed", "true");
componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(FpmPeer.class)
+ .register(FpmConnectionInfo.class)
+ .build();
+ peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
+ .withName("fpm-connections")
+ .withSerializer(Serializer.using(serializer))
+ .build();
+
modified(context);
startServer();
log.info("Started");
@@ -258,8 +285,10 @@
}
@Override
- public Map<FpmPeer, Long> peers() {
- return ImmutableMap.copyOf(peers);
+ public Map<FpmPeer, Collection<FpmConnectionInfo>> peers() {
+ return ImmutableMap.<FpmPeer, Collection<FpmConnectionInfo>>builder()
+ .putAll(peers.asJavaMap())
+ .build();
}
private class InternalFpmListener implements FpmListener {
@@ -274,7 +303,16 @@
return false;
}
- peers.put(peer, System.currentTimeMillis());
+ NodeId localNode = clusterService.getLocalNode().id();
+ peers.compute(peer, (p, infos) -> {
+ if (infos == null) {
+ infos = new HashSet<>();
+ }
+
+ infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+ return infos;
+ });
+
fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
return true;
}
@@ -287,7 +325,18 @@
clearRoutes(peer);
}
- peers.remove(peer);
+ peers.compute(peer, (p, infos) -> {
+ infos.stream()
+ .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
+ .findAny()
+ .ifPresent(i -> infos.remove(i));
+
+ if (infos.isEmpty()) {
+ return null;
+ }
+
+ return infos;
+ });
}
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
index 5a398b8..8516ed2 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmConnectionsList.java
@@ -17,10 +17,15 @@
package org.onosproject.routing.fpm.cli;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.routing.fpm.FpmConnectionInfo;
import org.onosproject.routing.fpm.FpmInfoService;
+import java.util.Comparator;
+
/**
* Displays the current FPM connections.
*/
@@ -28,14 +33,19 @@
description = "Displays the current FPM connections")
public class FpmConnectionsList extends AbstractShellCommand {
- private static final String FORMAT = "%s:%s connected since %s";
+ private static final String FORMAT = "peer %s:%s connected to %s since %s %s";
@Override
protected void execute() {
- FpmInfoService fpmInfo = AbstractShellCommand.get(FpmInfoService.class);
+ FpmInfoService fpmInfo = get(FpmInfoService.class);
+ ClusterService clusterService = get(ClusterService.class);
- fpmInfo.peers().forEach((peer, timestamp) -> {
- print(FORMAT, peer.address(), peer.port(), Tools.timeAgo(timestamp));
- });
+ fpmInfo.peers().values().stream()
+ .flatMap(v -> v.stream())
+ .sorted(Comparator.<FpmConnectionInfo, IpAddress>comparing(i -> i.peer().address())
+ .thenComparing(i -> i.peer().port()))
+ .forEach(info -> print(FORMAT, info.peer().address(), info.peer().port(),
+ info.connectedTo(), Tools.timeAgo(info.connectTime()),
+ info.connectedTo().equals(clusterService.getLocalNode().id()) ? "*" : ""));
}
}