Added Netty based messaging. Updated cluster management to use Netty based messaging
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
deleted file mode 100644
index 22ed9de..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.cluster.DefaultControllerNode;
-
-/**
- * Service for administering communications manager.
- */
-public interface ClusterCommunicationAdminService {
-
- /**
- * Adds the node to the list of monitored nodes.
- *
- * @param node node to be added
- */
- void addNode(DefaultControllerNode node);
-
- /**
- * Removes the node from the list of monitored nodes.
- *
- * @param node node to be removed
- */
- void removeNode(DefaultControllerNode node);
-
- /**
- * Starts-up the communications engine.
- *
- * @param localNode local controller node
- * @param delegate nodes delegate
- */
- void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
-
- /**
- * Clears all nodes and streams as part of leaving the cluster.
- */
- void clearAllNodesAndStreams();
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
deleted file mode 100644
index 2e2887c..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ /dev/null
@@ -1,354 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
-import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
-import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
-import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static java.net.InetAddress.getByAddress;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Implements the cluster communication services to use by other stores.
- */
-@Component(immediate = true)
-@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService, ClusterCommunicationAdminService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
- private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
-
- private static final long START_TIMEOUT = 1000;
- private static final int WORKERS = 3;
-
- private ClusterConnectionListener connectionListener;
- private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
-
- private DefaultControllerNode localNode;
- private ClusterNodesDelegate nodesDelegate;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected SerializationService serializationService;
-
- // Nodes to be monitored to make sure they have a connection.
- private final Set<DefaultControllerNode> nodes = new HashSet<>();
-
- // Means to track message streams to other nodes.
- private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
-
- // TODO: use something different that won't require synchronization
- private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
-
- // Executor pools for listening and managing connections to other nodes.
- private final ExecutorService listenExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
- private final ExecutorService commExecutors =
- Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
- private final ExecutorService heartbeatExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
-
- private final Timer timer = new Timer("onos-comm-initiator");
- private final TimerTask connectionCustodian = new ConnectionCustodian();
- private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
-
- @Activate
- public void activate() {
- addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
- addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
- log.info("Activated but waiting for delegate");
- }
-
- @Deactivate
- public void deactivate() {
- removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
- removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
-
- connectionCustodian.cancel();
- if (connectionListener != null) {
- connectionListener.shutdown();
- for (ClusterIOWorker worker : workers) {
- worker.shutdown();
- }
- }
- log.info("Stopped");
- }
-
- @Override
- public boolean send(ClusterMessage message) {
- boolean ok = true;
- for (DefaultControllerNode node : nodes) {
- if (!node.equals(localNode)) {
- ok = send(message, node.id()) && ok;
- }
- }
- return ok;
- }
-
- @Override
- public boolean send(ClusterMessage message, NodeId toNodeId) {
- ClusterMessageStream stream = streams.get(toNodeId);
- if (stream != null && !toNodeId.equals(localNode.id())) {
- try {
- stream.write(message);
- return true;
- } catch (IOException e) {
- log.warn("Unable to send message {} to node {}",
- message.subject(), toNodeId);
- }
- }
- return false;
- }
-
- @Override
- public synchronized void addSubscriber(MessageSubject subject,
- MessageSubscriber subscriber) {
- subscribers.put(subject, subscriber);
- }
-
- @Override
- public synchronized void removeSubscriber(MessageSubject subject,
- MessageSubscriber subscriber) {
- subscribers.remove(subject, subscriber);
- }
-
- @Override
- public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
- return ImmutableSet.copyOf(subscribers.get(subject));
- }
-
- @Override
- public void addNode(DefaultControllerNode node) {
- nodes.add(node);
- }
-
- @Override
- public void removeNode(DefaultControllerNode node) {
- send(new LeavingMemberMessage(node.id()));
- nodes.remove(node);
- ClusterMessageStream stream = streams.remove(node.id());
- if (stream != null) {
- stream.close();
- }
- }
-
- @Override
- public void startUp(DefaultControllerNode localNode,
- ClusterNodesDelegate delegate) {
- this.localNode = localNode;
- this.nodesDelegate = delegate;
-
- startCommunications();
- startListening();
- startInitiatingConnections();
- log.info("Started");
- }
-
- @Override
- public void clearAllNodesAndStreams() {
- nodes.clear();
- send(new LeavingMemberMessage(localNode.id()));
- for (ClusterMessageStream stream : streams.values()) {
- stream.close();
- }
- streams.clear();
- }
-
- /**
- * Dispatches the specified message to all subscribers to its subject.
- *
- * @param message message to dispatch
- * @param fromNodeId node from which the message was received
- */
- void dispatch(ClusterMessage message, NodeId fromNodeId) {
- Set<MessageSubscriber> set = getSubscribers(message.subject());
- if (set != null) {
- for (MessageSubscriber subscriber : set) {
- subscriber.receive(message, fromNodeId);
- }
- }
- }
-
- /**
- * Adds the stream associated with the specified node.
- *
- * @param nodeId newly detected cluster node id
- * @param ip node IP listen address
- * @param tcpPort node TCP listen port
- * @return controller node bound to the stream
- */
- DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
- ClusterMessageStream stream) {
- DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
- stream.setNode(node);
- streams.put(node.id(), stream);
- send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
- return node;
- }
-
- /**
- * Removes the stream associated with the specified node.
- *
- * @param node node whose stream to remove
- */
- void removeNodeStream(DefaultControllerNode node) {
- nodesDelegate.nodeVanished(node.id());
- streams.remove(node.id());
- }
-
- /**
- * Finds the least utilized IO worker.
- *
- * @return IO worker
- */
- ClusterIOWorker findWorker() {
- ClusterIOWorker leastUtilized = null;
- int minCount = Integer.MAX_VALUE;
- for (ClusterIOWorker worker : workers) {
- int count = worker.streamCount();
- if (count == 0) {
- return worker;
- }
-
- if (count < minCount) {
- leastUtilized = worker;
- minCount = count;
- }
- }
- return leastUtilized;
- }
-
- /**
- * Kicks off the IO loops and waits for them to startup.
- */
- private void startCommunications() {
- HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
- localNode.tcpPort());
- for (int i = 0; i < WORKERS; i++) {
- try {
- ClusterIOWorker worker =
- new ClusterIOWorker(this, serializationService, hello);
- workers.add(worker);
- commExecutors.execute(worker);
- } catch (IOException e) {
- log.warn("Unable to start communication worker", e);
- }
- }
-
- // Wait for the IO loops to start
- for (ClusterIOWorker loop : workers) {
- if (!loop.awaitStart(START_TIMEOUT)) {
- log.warn("Comm loop did not start on-time; moving on...");
- }
- }
- }
-
- /**
- * Starts listening for connections from peer cluster members.
- */
- private void startListening() {
- try {
- connectionListener =
- new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
- listenExecutor.execute(connectionListener);
- if (!connectionListener.awaitStart(START_TIMEOUT)) {
- log.warn("Listener did not start on-time; moving on...");
- }
- } catch (IOException e) {
- log.error("Unable to listen for cluster connections", e);
- }
- }
-
- /**
- * Attempts to connect to any nodes that do not have an associated connection.
- */
- private void startInitiatingConnections() {
- timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
- CONNECTION_CUSTODIAN_FREQUENCY);
- }
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO worker.
- *
- * @param worker loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private void initiateConnection(DefaultControllerNode node,
- ClusterIOWorker worker) throws IOException {
- SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- ch.connect(sa);
- worker.connectStream(ch);
- }
-
- // Sweeps through all controller nodes and attempts to open connection to
- // those that presently do not have one.
- private class ConnectionCustodian extends TimerTask {
- @Override
- public void run() {
- for (DefaultControllerNode node : nodes) {
- if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
- try {
- initiateConnection(node, findWorker());
- } catch (IOException e) {
- log.debug("Unable to connect", e);
- }
- }
- }
- }
- }
-
- private class MembershipSubscriber implements MessageSubscriber {
- @Override
- public void receive(ClusterMessage message, NodeId fromNodeId) {
- MessageSubject subject = message.subject();
- ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
- if (message.subject() == MessageSubject.NEW_MEMBER) {
- log.info("Node {} arrived", cmm.nodeId());
- nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
-
- } else if (subject == MessageSubject.LEAVING_MEMBER) {
- log.info("Node {} is leaving", cmm.nodeId());
- nodesDelegate.nodeRemoved(cmm.nodeId());
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
deleted file mode 100644
index 36d5ab4..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.nio.AcceptorLoop;
-import org.onlab.packet.IpPrefix;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import static java.net.InetAddress.getByAddress;
-
-/**
- * Listens to inbound connection requests and accepts them.
- */
-public class ClusterConnectionListener extends AcceptorLoop {
-
- private static final long SELECT_TIMEOUT = 50;
- private static final int COMM_BUFFER_SIZE = 32 * 1024;
-
- private static final boolean SO_NO_DELAY = false;
- private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
- private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
-
- private final ClusterCommunicationManager manager;
-
- ClusterConnectionListener(ClusterCommunicationManager manager,
- IpPrefix ip, int tcpPort) throws IOException {
- super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
- this.manager = manager;
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel channel) throws IOException {
- SocketChannel sc = channel.accept();
- sc.configureBlocking(false);
-
- Socket so = sc.socket();
- so.setTcpNoDelay(SO_NO_DELAY);
- so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
- so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
-
- manager.findWorker().acceptStream(sc);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
deleted file mode 100644
index d442cc8..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.Objects;
-
-import static org.onlab.packet.IpPrefix.valueOf;
-
-/**
- * Performs the IO operations related to a cluster-wide communications.
- */
-public class ClusterIOWorker extends
- IOLoop<ClusterMessage, ClusterMessageStream> {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final long SELECT_TIMEOUT = 50;
-
- private final ClusterCommunicationManager manager;
- private final SerializationService serializationService;
- private final ClusterMessage helloMessage;
-
- /**
- * Creates a new cluster IO worker.
- *
- * @param manager parent comms manager
- * @param serializationService serialization service for encode/decode
- * @param helloMessage hello message for greeting peers
- * @throws IOException if errors occur during IO loop ignition
- */
- ClusterIOWorker(ClusterCommunicationManager manager,
- SerializationService serializationService,
- ClusterMessage helloMessage) throws IOException {
- super(SELECT_TIMEOUT);
- this.manager = manager;
- this.serializationService = serializationService;
- this.helloMessage = helloMessage;
- }
-
- @Override
- protected ClusterMessageStream createStream(ByteChannel byteChannel) {
- return new ClusterMessageStream(serializationService, this, byteChannel);
- }
-
- @Override
- protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
- NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
- for (ClusterMessage message : messages) {
- manager.dispatch(message, nodeId);
- }
- }
-
- // Retrieves the node from the stream. If one is not bound, it attempts
- // to bind it using the knowledge that the first message must be a hello.
- private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
- DefaultControllerNode node = stream.node();
- if (node == null && !messages.isEmpty()) {
- ClusterMessage firstMessage = messages.get(0);
- if (firstMessage instanceof HelloMessage) {
- HelloMessage hello = (HelloMessage) firstMessage;
- node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
- hello.tcpPort(), stream);
- }
- }
- return node != null ? node.id() : null;
- }
-
- @Override
- public ClusterMessageStream acceptStream(SocketChannel channel) {
- ClusterMessageStream stream = super.acceptStream(channel);
- try {
- InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
- log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
- stream.write(helloMessage);
-
- } catch (IOException e) {
- log.warn("Unable to accept connection from an unknown end-point", e);
- }
- return stream;
- }
-
- @Override
- protected void connect(SelectionKey key) throws IOException {
- try {
- super.connect(key);
- ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
- stream.write(helloMessage);
-
- } catch (IOException e) {
- if (!Objects.equals(e.getMessage(), "Connection refused")) {
- throw e;
- }
- }
- }
-
- @Override
- protected void removeStream(MessageStream<ClusterMessage> stream) {
- DefaultControllerNode node = ((ClusterMessageStream) stream).node();
- if (node != null) {
- log.info("Closed connection to node {}", node.id());
- manager.removeNodeStream(node);
- }
- super.removeStream(stream);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
deleted file mode 100644
index d182aa1..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Stream for transferring messages between two cluster members.
- */
-public class ClusterMessageStream extends MessageStream<ClusterMessage> {
-
- private static final int COMM_BUFFER_SIZE = 32 * 1024;
- private static final int COMM_IDLE_TIME = 500;
-
- private DefaultControllerNode node;
- private SerializationService serializationService;
-
- /**
- * Creates a message stream associated with the specified IO loop and
- * backed by the given byte channel.
- *
- * @param serializationService service for encoding/decoding messages
- * @param loop IO loop
- * @param byteChannel backing byte channel
- */
- public ClusterMessageStream(SerializationService serializationService,
- IOLoop<ClusterMessage, ?> loop,
- ByteChannel byteChannel) {
- super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
- this.serializationService = serializationService;
- }
-
- /**
- * Returns the node with which this stream is associated.
- *
- * @return controller node
- */
- public DefaultControllerNode node() {
- return node;
- }
-
- /**
- * Sets the node with which this stream is affiliated.
- *
- * @param node controller node
- */
- public void setNode(DefaultControllerNode node) {
- checkState(this.node == null, "Stream is already bound to a node");
- this.node = node;
- }
-
- @Override
- protected ClusterMessage read(ByteBuffer buffer) {
- return serializationService.decode(buffer);
- }
-
- @Override
- protected void write(ClusterMessage message, ByteBuffer buffer) {
- serializationService.encode(message, buffer);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index d4b7289..e25c964 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -1,6 +1,11 @@
package org.onlab.onos.store.cluster.impl;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +19,8 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
+import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
@@ -40,21 +48,25 @@
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+ private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+ .removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationAdminService communicationAdminService;
+ private ClusterCommunicationAdminService clusterCommunicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
@Activate
- public void activate() {
+ public void activate() throws IOException {
loadClusterDefinition();
establishSelfIdentity();
// Start-up the comm service and prime it with the loaded nodes.
- communicationAdminService.startUp(localNode, nodesDelegate);
+ clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
- communicationAdminService.addNode(node);
+ clusterCommunicationAdminService.addNode(node);
}
log.info("Started");
}
@@ -121,15 +133,13 @@
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
- communicationAdminService.addNode(node);
+ clusterCommunicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
- // We are being ejected from the cluster, so remove all other nodes.
- communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
@@ -137,7 +147,7 @@
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
- communicationAdminService.removeNode(node);
+ clusterCommunicationAdminService.removeNode(node);
}
}
}
@@ -151,6 +161,7 @@
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
+ livenessCache.put(nodeId, node);
return node;
}
@@ -165,4 +176,13 @@
}
}
+ private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
+
+ @Override
+ public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
+ NodeId nodeId = entry.getKey();
+ log.warn("Failed to receive heartbeats from controller: " + nodeId);
+ nodesDelegate.nodeVanished(nodeId);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
index c6ebca9..98e80f7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -21,12 +21,7 @@
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.EchoMessage;
-import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
@@ -43,12 +38,9 @@
import org.slf4j.LoggerFactory;
import java.net.URI;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Factory for parsing messages sent between cluster members.
*/
@@ -96,11 +88,7 @@
Link.Type.class,
- MessageSubject.class,
- HelloMessage.class,
- NewMemberMessage.class,
- LeavingMemberMessage.class,
- EchoMessage.class
+ MessageSubject.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
@@ -118,49 +106,12 @@
@Override
- public ClusterMessage decode(ByteBuffer buffer) {
- try {
- // Do we have enough bytes to read the header? If not, bail.
- if (buffer.remaining() < METADATA_LENGTH) {
- return null;
- }
-
- // Peek at the length and if we have enough to read the entire message
- // go ahead, otherwise bail.
- int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
- if (buffer.remaining() < length) {
- return null;
- }
-
- // At this point, we have enough data to read a complete message.
- long marker = buffer.getLong();
- checkState(marker == MARKER, "Incorrect message marker");
- length = buffer.getInt();
-
- // TODO: sanity checking for length
- byte[] data = new byte[length - METADATA_LENGTH];
- buffer.get(data);
- return (ClusterMessage) serializerPool.deserialize(data);
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to decode message due to: " + e);
- }
- return null;
+ public Object decode(byte[] data) {
+ return serializerPool.deserialize(data);
}
@Override
- public void encode(ClusterMessage message, ByteBuffer buffer) {
- try {
- byte[] data = serializerPool.serialize(message);
- buffer.putLong(MARKER);
- buffer.putInt(data.length + METADATA_LENGTH);
- buffer.put(data);
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to encode message due to: " + e);
- }
+ public byte[] encode(Object payload) {
+ return serializerPool.serialize(payload);
}
-
-}
+}
\ No newline at end of file