Distribute FPM connection state amongst the cluster
Change-Id: I7b02a630e33107c124d9445f2fefbf4fd31ffc45
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 9019960..c85cd9c 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -37,8 +37,11 @@
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.routing.Route;
import org.onosproject.incubator.net.routing.RouteAdminService;
import org.onosproject.routing.fpm.protocol.FpmHeader;
@@ -48,15 +51,22 @@
import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
import org.onosproject.routing.fpm.protocol.RtNetlink;
import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -78,11 +88,17 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RouteAdminService routeService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private ChannelGroup allChannels = new DefaultChannelGroup();
- private Map<FpmPeer, Long> peers = new ConcurrentHashMap<>();
+ private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@@ -97,6 +113,17 @@
"distributed", "true");
componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(FpmPeer.class)
+ .register(FpmConnectionInfo.class)
+ .build();
+ peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
+ .withName("fpm-connections")
+ .withSerializer(Serializer.using(serializer))
+ .build();
+
modified(context);
startServer();
log.info("Started");
@@ -258,8 +285,10 @@
}
@Override
- public Map<FpmPeer, Long> peers() {
- return ImmutableMap.copyOf(peers);
+ public Map<FpmPeer, Collection<FpmConnectionInfo>> peers() {
+ return ImmutableMap.<FpmPeer, Collection<FpmConnectionInfo>>builder()
+ .putAll(peers.asJavaMap())
+ .build();
}
private class InternalFpmListener implements FpmListener {
@@ -274,7 +303,16 @@
return false;
}
- peers.put(peer, System.currentTimeMillis());
+ NodeId localNode = clusterService.getLocalNode().id();
+ peers.compute(peer, (p, infos) -> {
+ if (infos == null) {
+ infos = new HashSet<>();
+ }
+
+ infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+ return infos;
+ });
+
fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
return true;
}
@@ -287,7 +325,18 @@
clearRoutes(peer);
}
- peers.remove(peer);
+ peers.compute(peer, (p, infos) -> {
+ infos.stream()
+ .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
+ .findAny()
+ .ifPresent(i -> infos.remove(i));
+
+ if (infos.isEmpty()) {
+ return null;
+ }
+
+ return infos;
+ });
}
}