Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
new file mode 100644
index 0000000..22ed9de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+/**
+ * Service for administering communications manager.
+ */
+public interface ClusterCommunicationAdminService {
+
+ /**
+ * Adds the node to the list of monitored nodes.
+ *
+ * @param node node to be added
+ */
+ void addNode(DefaultControllerNode node);
+
+ /**
+ * Removes the node from the list of monitored nodes.
+ *
+ * @param node node to be removed
+ */
+ void removeNode(DefaultControllerNode node);
+
+ /**
+ * Starts-up the communications engine.
+ *
+ * @param localNode local controller node
+ * @param delegate nodes delegate
+ */
+ void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
+
+ /**
+ * Clears all nodes and streams as part of leaving the cluster.
+ */
+ void clearAllNodesAndStreams();
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
new file mode 100644
index 0000000..bac32e9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -0,0 +1,339 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.net.InetAddress.getByAddress;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Implements the cluster communication services to use by other stores.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterCommunicationManager
+ implements ClusterCommunicationService, ClusterCommunicationAdminService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
+ private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
+
+ private static final long START_TIMEOUT = 1000;
+ private static final int WORKERS = 3;
+
+ private ClusterConnectionListener connectionListener;
+ private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
+
+ private DefaultControllerNode localNode;
+ private ClusterNodesDelegate nodesDelegate;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SerializationService serializationService;
+
+ // Nodes to be monitored to make sure they have a connection.
+ private final Set<DefaultControllerNode> nodes = new HashSet<>();
+
+ // Means to track message streams to other nodes.
+ private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
+
+ // TODO: use something different that won't require synchronization
+ private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
+
+ // Executor pools for listening and managing connections to other nodes.
+ private final ExecutorService listenExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
+ private final ExecutorService commExecutors =
+ Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
+ private final ExecutorService heartbeatExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
+
+ private final Timer timer = new Timer("onos-comm-initiator");
+ private final TimerTask connectionCustodian = new ConnectionCustodian();
+ private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
+
+ @Activate
+ public void activate() {
+ addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
+ log.info("Activated but waiting for delegate");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ connectionCustodian.cancel();
+ if (connectionListener != null) {
+ connectionListener.shutdown();
+ for (ClusterIOWorker worker : workers) {
+ worker.shutdown();
+ }
+ }
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean send(ClusterMessage message) {
+ boolean ok = true;
+ for (DefaultControllerNode node : nodes) {
+ if (!node.equals(localNode)) {
+ ok = send(message, node.id()) && ok;
+ }
+ }
+ return ok;
+ }
+
+ @Override
+ public boolean send(ClusterMessage message, NodeId toNodeId) {
+ ClusterMessageStream stream = streams.get(toNodeId);
+ if (stream != null && !toNodeId.equals(localNode.id())) {
+ try {
+ stream.write(message);
+ return true;
+ } catch (IOException e) {
+ log.warn("Unable to send message {} to node {}",
+ message.subject(), toNodeId);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void addSubscriber(MessageSubject subject,
+ MessageSubscriber subscriber) {
+ subscribers.put(subject, subscriber);
+ }
+
+ @Override
+ public synchronized void removeSubscriber(MessageSubject subject,
+ MessageSubscriber subscriber) {
+ subscribers.remove(subject, subscriber);
+ }
+
+ @Override
+ public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
+ return ImmutableSet.copyOf(subscribers.get(subject));
+ }
+
+ @Override
+ public void addNode(DefaultControllerNode node) {
+ nodes.add(node);
+ }
+
+ @Override
+ public void removeNode(DefaultControllerNode node) {
+ send(new GoodbyeMessage(node.id()));
+ nodes.remove(node);
+ ClusterMessageStream stream = streams.remove(node.id());
+ if (stream != null) {
+ stream.close();
+ }
+ }
+
+ @Override
+ public void startUp(DefaultControllerNode localNode,
+ ClusterNodesDelegate delegate) {
+ this.localNode = localNode;
+ this.nodesDelegate = delegate;
+
+ startCommunications();
+ startListening();
+ startInitiatingConnections();
+ log.info("Started");
+ }
+
+ @Override
+ public void clearAllNodesAndStreams() {
+ nodes.clear();
+ send(new GoodbyeMessage(localNode.id()));
+ for (ClusterMessageStream stream : streams.values()) {
+ stream.close();
+ }
+ streams.clear();
+ }
+
+ /**
+ * Dispatches the specified message to all subscribers to its subject.
+ *
+ * @param message message to dispatch
+ * @param fromNodeId node from which the message was received
+ */
+ void dispatch(ClusterMessage message, NodeId fromNodeId) {
+ Set<MessageSubscriber> set = getSubscribers(message.subject());
+ if (set != null) {
+ for (MessageSubscriber subscriber : set) {
+ subscriber.receive(message, fromNodeId);
+ }
+ }
+ }
+
+ /**
+ * Removes the stream associated with the specified node.
+ *
+ * @param nodeId newly detected cluster node id
+ * @param ip node IP listen address
+ * @param tcpPort node TCP listen port
+ * @return controller node bound to the stream
+ */
+ DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
+ ClusterMessageStream stream) {
+ DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
+ stream.setNode(node);
+ streams.put(node.id(), stream);
+ return node;
+ }
+
+ /**
+ * Removes the stream associated with the specified node.
+ *
+ * @param node node whose stream to remove
+ */
+ void removeNodeStream(DefaultControllerNode node) {
+ nodesDelegate.nodeVanished(node.id());
+ streams.remove(node.id());
+ }
+
+ /**
+ * Finds the least utilized IO worker.
+ *
+ * @return IO worker
+ */
+ ClusterIOWorker findWorker() {
+ ClusterIOWorker leastUtilized = null;
+ int minCount = Integer.MAX_VALUE;
+ for (ClusterIOWorker worker : workers) {
+ int count = worker.streamCount();
+ if (count == 0) {
+ return worker;
+ }
+
+ if (count < minCount) {
+ leastUtilized = worker;
+ minCount = count;
+ }
+ }
+ return leastUtilized;
+ }
+
+ /**
+ * Kicks off the IO loops and waits for them to startup.
+ */
+ private void startCommunications() {
+ HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
+ localNode.tcpPort());
+ for (int i = 0; i < WORKERS; i++) {
+ try {
+ ClusterIOWorker worker =
+ new ClusterIOWorker(this, serializationService, hello);
+ workers.add(worker);
+ commExecutors.execute(worker);
+ } catch (IOException e) {
+ log.warn("Unable to start communication worker", e);
+ }
+ }
+
+ // Wait for the IO loops to start
+ for (ClusterIOWorker loop : workers) {
+ if (!loop.awaitStart(START_TIMEOUT)) {
+ log.warn("Comm loop did not start on-time; moving on...");
+ }
+ }
+ }
+
+ /**
+ * Starts listening for connections from peer cluster members.
+ */
+ private void startListening() {
+ try {
+ connectionListener =
+ new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
+ listenExecutor.execute(connectionListener);
+ if (!connectionListener.awaitStart(START_TIMEOUT)) {
+ log.warn("Listener did not start on-time; moving on...");
+ }
+ } catch (IOException e) {
+ log.error("Unable to listen for cluster connections", e);
+ }
+ }
+
+ /**
+ * Attempts to connect to any nodes that do not have an associated connection.
+ */
+ private void startInitiatingConnections() {
+ timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
+ CONNECTION_CUSTODIAN_FREQUENCY);
+ }
+
+ /**
+ * Initiates open connection request and registers the pending socket
+ * channel with the given IO worker.
+ *
+ * @param worker loop with which the channel should be registered
+ * @throws java.io.IOException if the socket could not be open or connected
+ */
+ private void initiateConnection(DefaultControllerNode node,
+ ClusterIOWorker worker) throws IOException {
+ SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
+ SocketChannel ch = SocketChannel.open();
+ ch.configureBlocking(false);
+ ch.connect(sa);
+ worker.connectStream(ch);
+ }
+
+ // Sweeps through all controller nodes and attempts to open connection to
+ // those that presently do not have one.
+ private class ConnectionCustodian extends TimerTask {
+ @Override
+ public void run() {
+ for (DefaultControllerNode node : nodes) {
+ if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
+ try {
+ initiateConnection(node, findWorker());
+ } catch (IOException e) {
+ log.debug("Unable to connect", e);
+ }
+ }
+ }
+ }
+ }
+
+ private class GoodbyeSubscriber implements MessageSubscriber {
+ @Override
+ public void receive(ClusterMessage message, NodeId fromNodeId) {
+ log.info("Received goodbye message from {}", fromNodeId);
+ nodesDelegate.nodeRemoved(fromNodeId);
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
index ae4a76f..36d5ab4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
@@ -23,12 +23,12 @@
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
- private final WorkerFinder workerFinder;
+ private final ClusterCommunicationManager manager;
- ClusterConnectionListener(IpPrefix ip, int tcpPort,
- WorkerFinder workerFinder) throws IOException {
+ ClusterConnectionListener(ClusterCommunicationManager manager,
+ IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
- this.workerFinder = workerFinder;
+ this.manager = manager;
}
@Override
@@ -41,7 +41,7 @@
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
- workerFinder.findWorker().acceptStream(sc);
+ manager.findWorker().acceptStream(sc);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
index 0e93985..d442cc8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
@@ -3,8 +3,9 @@
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,27 +30,23 @@
private static final long SELECT_TIMEOUT = 50;
- private final ConnectionManager connectionManager;
- private final CommunicationsDelegate commsDelegate;
+ private final ClusterCommunicationManager manager;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
- * @param connectionManager parent connection manager
- * @param commsDelegate communications delegate for dispatching
+ * @param manager parent comms manager
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
- ClusterIOWorker(ConnectionManager connectionManager,
- CommunicationsDelegate commsDelegate,
+ ClusterIOWorker(ClusterCommunicationManager manager,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
- this.connectionManager = connectionManager;
- this.commsDelegate = commsDelegate;
+ this.manager = manager;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@@ -61,11 +58,27 @@
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
+ NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
for (ClusterMessage message : messages) {
- commsDelegate.dispatch(message);
+ manager.dispatch(message, nodeId);
}
}
+ // Retrieves the node from the stream. If one is not bound, it attempts
+ // to bind it using the knowledge that the first message must be a hello.
+ private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
+ DefaultControllerNode node = stream.node();
+ if (node == null && !messages.isEmpty()) {
+ ClusterMessage firstMessage = messages.get(0);
+ if (firstMessage instanceof HelloMessage) {
+ HelloMessage hello = (HelloMessage) firstMessage;
+ node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
+ hello.tcpPort(), stream);
+ }
+ }
+ return node != null ? node.id() : null;
+ }
+
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
@@ -99,7 +112,7 @@
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
- connectionManager.removeNodeStream(node);
+ manager.removeNodeStream(node);
}
super.removeStream(stream);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
index 0970726..d182aa1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
@@ -1,8 +1,10 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
index b822304..b82a835 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
@@ -1,6 +1,8 @@
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
/**
* Simple back interface through which connection manager can interact with
@@ -9,17 +11,27 @@
public interface ClusterNodesDelegate {
/**
- * Notifies about a new cluster node being detected.
+ * Notifies about cluster node coming online.
*
- * @param node newly detected cluster node
+ * @param nodeId newly detected cluster node id
+ * @param ip node IP listen address
+ * @param tcpPort node TCP listen port
+ * @return the controller node
*/
- void nodeDetected(DefaultControllerNode node);
+ DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Notifies about cluster node going offline.
*
- * @param node cluster node that vanished
+ * @param nodeId identifier of the cluster node that vanished
*/
- void nodeVanished(DefaultControllerNode node);
+ void nodeVanished(NodeId nodeId);
+
+ /**
+ * Notifies about remote request to remove node from cluster.
+ *
+ * @param nodeId identifier of the cluster node that was removed
+ */
+ void nodeRemoved(NodeId nodeId);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java
deleted file mode 100644
index e74d14b..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-
-/**
- * Simple back interface for interacting with the communications service.
- */
-public interface CommunicationsDelegate {
-
- /**
- * Dispatches the specified message to all registered subscribers.
- *
- * @param message message to be dispatched
- */
- void dispatch(ClusterMessage message);
-
- /**
- * Sets the sender.
- *
- * @param messageSender message sender
- */
- void setSender(MessageSender messageSender);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java
deleted file mode 100644
index fac3c21..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static java.net.InetAddress.getByAddress;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Manages connections to other controller cluster nodes.
- */
-public class ConnectionManager implements MessageSender {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
- private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
-
- private static final long START_TIMEOUT = 1000;
- private static final int WORKERS = 3;
-
- private ClusterConnectionListener connectionListener;
- private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
-
- private final DefaultControllerNode localNode;
- private final ClusterNodesDelegate nodesDelegate;
- private final CommunicationsDelegate commsDelegate;
- private final SerializationService serializationService;
-
- // Nodes to be monitored to make sure they have a connection.
- private final Set<DefaultControllerNode> nodes = new HashSet<>();
-
- // Means to track message streams to other nodes.
- private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
-
- // Executor pools for listening and managing connections to other nodes.
- private final ExecutorService listenExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
- private final ExecutorService commExecutors =
- Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
- private final ExecutorService heartbeatExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
-
- private final Timer timer = new Timer("onos-comm-initiator");
- private final TimerTask connectionCustodian = new ConnectionCustodian();
-
- private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
-
-
- /**
- * Creates a new connection manager.
- */
- ConnectionManager(DefaultControllerNode localNode,
- ClusterNodesDelegate nodesDelegate,
- CommunicationsDelegate commsDelegate,
- SerializationService serializationService) {
- this.localNode = localNode;
- this.nodesDelegate = nodesDelegate;
- this.commsDelegate = commsDelegate;
- this.serializationService = serializationService;
-
- commsDelegate.setSender(this);
- startCommunications();
- startListening();
- startInitiating();
- log.info("Started");
- }
-
- /**
- * Shuts down the connection manager.
- */
- void shutdown() {
- connectionListener.shutdown();
- for (ClusterIOWorker worker : workers) {
- worker.shutdown();
- }
- log.info("Stopped");
- }
-
- /**
- * Adds the node to the list of monitored nodes.
- *
- * @param node node to be added
- */
- void addNode(DefaultControllerNode node) {
- nodes.add(node);
- }
-
- /**
- * Removes the node from the list of monitored nodes.
- *
- * @param node node to be removed
- */
- void removeNode(DefaultControllerNode node) {
- nodes.remove(node);
- ClusterMessageStream stream = streams.remove(node.id());
- if (stream != null) {
- stream.close();
- }
- }
-
- /**
- * Removes the stream associated with the specified node.
- *
- * @param node node whose stream to remove
- */
- void removeNodeStream(DefaultControllerNode node) {
- nodesDelegate.nodeVanished(node);
- streams.remove(node.id());
- }
-
- @Override
- public boolean send(NodeId nodeId, ClusterMessage message) {
- ClusterMessageStream stream = streams.get(nodeId);
- if (stream != null) {
- try {
- stream.write(message);
- return true;
- } catch (IOException e) {
- log.warn("Unable to send a message about {} to node {}",
- message.subject(), nodeId);
- }
- }
- return false;
- }
-
- /**
- * Kicks off the IO loops and waits for them to startup.
- */
- private void startCommunications() {
- HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
- localNode.tcpPort());
- for (int i = 0; i < WORKERS; i++) {
- try {
- ClusterIOWorker worker =
- new ClusterIOWorker(this, commsDelegate,
- serializationService, hello);
- workers.add(worker);
- commExecutors.execute(worker);
- } catch (IOException e) {
- log.warn("Unable to start communication worker", e);
- }
- }
-
- // Wait for the IO loops to start
- for (ClusterIOWorker loop : workers) {
- if (!loop.awaitStart(START_TIMEOUT)) {
- log.warn("Comm loop did not start on-time; moving on...");
- }
- }
- }
-
- /**
- * Starts listening for connections from peer cluster members.
- */
- private void startListening() {
- try {
- connectionListener =
- new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
- workerFinder);
- listenExecutor.execute(connectionListener);
- if (!connectionListener.awaitStart(START_TIMEOUT)) {
- log.warn("Listener did not start on-time; moving on...");
- }
- } catch (IOException e) {
- log.error("Unable to listen for cluster connections", e);
- }
- }
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO loop.
- *
- * @param loop loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private void initiateConnection(DefaultControllerNode node,
- ClusterIOWorker loop) throws IOException {
- SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- ch.connect(sa);
- loop.connectStream(ch);
- }
-
-
- /**
- * Attempts to connect to any nodes that do not have an associated connection.
- */
- private void startInitiating() {
- timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
- CONNECTION_CUSTODIAN_FREQUENCY);
- }
-
- // Sweeps through all controller nodes and attempts to open connection to
- // those that presently do not have one.
- private class ConnectionCustodian extends TimerTask {
- @Override
- public void run() {
- for (DefaultControllerNode node : nodes) {
- if (node != localNode && !streams.containsKey(node.id())) {
- try {
- initiateConnection(node, workerFinder.findWorker());
- } catch (IOException e) {
- log.debug("Unable to connect", e);
- }
- }
- }
- }
- }
-
- // Finds the least utilitied IO loop.
- private class LeastUtilitiedWorkerFinder implements WorkerFinder {
-
- @Override
- public ClusterIOWorker findWorker() {
- ClusterIOWorker leastUtilized = null;
- int minCount = Integer.MAX_VALUE;
- for (ClusterIOWorker worker : workers) {
- int count = worker.streamCount();
- if (count == 0) {
- return worker;
- }
-
- if (count < minCount) {
- leastUtilized = worker;
- minCount = count;
- }
- }
- return leastUtilized;
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index ae04226..647690e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -14,7 +14,6 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,20 +42,20 @@
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private CommunicationsDelegate commsDelegate;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private SerializationService serializationService;
+ private ClusterCommunicationAdminService communicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
- private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
establishSelfIdentity();
- connectionManager = new ConnectionManager(localNode, nodesDelegate,
- commsDelegate, serializationService);
+
+ // Start-up the comm service and prime it with the loaded nodes.
+ communicationAdminService.startUp(localNode, nodesDelegate);
+ for (DefaultControllerNode node : nodes.values()) {
+ communicationAdminService.addNode(node);
+ }
log.info("Started");
}
@@ -92,8 +91,8 @@
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
- states.put(localNode.id(), State.ACTIVE);
}
+ states.put(localNode.id(), State.ACTIVE);
}
@Override
@@ -122,29 +121,46 @@
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
- connectionManager.addNode(node);
+ communicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
- DefaultControllerNode node = nodes.remove(nodeId);
- if (node != null) {
- connectionManager.removeNode(node);
+ if (nodeId.equals(localNode.id())) {
+ // FIXME: this is still broken
+ // We are being ejected from the cluster, so remove all other nodes.
+ communicationAdminService.clearAllNodesAndStreams();
+ nodes.clear();
+ } else {
+ // Remove the other node.
+ DefaultControllerNode node = nodes.remove(nodeId);
+ if (node != null) {
+ communicationAdminService.removeNode(node);
+ }
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
- public void nodeDetected(DefaultControllerNode node) {
- nodes.put(node.id(), node);
- states.put(node.id(), State.ACTIVE);
+ public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+ DefaultControllerNode node = nodes.get(nodeId);
+ if (node == null) {
+ node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
+ }
+ states.put(nodeId, State.ACTIVE);
+ return node;
+ }
+ @Override
+ public void nodeVanished(NodeId nodeId) {
+ states.put(nodeId, State.INACTIVE);
}
@Override
- public void nodeVanished(DefaultControllerNode node) {
- states.put(node.id(), State.INACTIVE);
+ public void nodeRemoved(NodeId nodeId) {
+ removeNode(nodeId);
}
}
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java
deleted file mode 100644
index 55f868c..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-
-/**
- * Created by tom on 9/29/14.
- */
-public interface MessageSender {
-
- /**
- * Sends the specified message to the given cluster node.
- *
- * @param nodeId node identifier
- * @param message mesage to send
- * @return true if the message was sent sucessfully; false if there is
- * no stream or if there was an error
- */
- boolean send(NodeId nodeId, ClusterMessage message);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
new file mode 100644
index 0000000..260d2b2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -0,0 +1,164 @@
+package org.onlab.onos.store.cluster.impl;
+
+import de.javakaffee.kryoserializers.URISerializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.EchoMessage;
+import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.onos.store.serializers.ConnectPointSerializer;
+import org.onlab.onos.store.serializers.DefaultLinkSerializer;
+import org.onlab.onos.store.serializers.DefaultPortSerializer;
+import org.onlab.onos.store.serializers.DeviceIdSerializer;
+import org.onlab.onos.store.serializers.IpPrefixSerializer;
+import org.onlab.onos.store.serializers.LinkKeySerializer;
+import org.onlab.onos.store.serializers.NodeIdSerializer;
+import org.onlab.onos.store.serializers.PortNumberSerializer;
+import org.onlab.onos.store.serializers.ProviderIdSerializer;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final int METADATA_LENGTH = 12; // 8 + 4
+ private static final int LENGTH_OFFSET = 8;
+
+ private static final long MARKER = 0xfeedcafebeaddeadL;
+
+ private KryoPool serializerPool;
+
+ @Activate
+ public void activate() {
+ setupKryoPool();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ // FIXME Slice out types used in common to separate pool/namespace.
+ serializerPool = KryoPool.newBuilder()
+ .register(ArrayList.class,
+ HashMap.class,
+
+ ControllerNode.State.class,
+ Device.Type.class,
+
+ DefaultControllerNode.class,
+ DefaultDevice.class,
+ MastershipRole.class,
+ Port.class,
+ Element.class,
+
+ Link.Type.class,
+
+ MessageSubject.class,
+ HelloMessage.class,
+ GoodbyeMessage.class,
+ EchoMessage.class
+ )
+ .register(IpPrefix.class, new IpPrefixSerializer())
+ .register(URI.class, new URISerializer())
+ .register(NodeId.class, new NodeIdSerializer())
+ .register(ProviderId.class, new ProviderIdSerializer())
+ .register(DeviceId.class, new DeviceIdSerializer())
+ .register(PortNumber.class, new PortNumberSerializer())
+ .register(DefaultPort.class, new DefaultPortSerializer())
+ .register(LinkKey.class, new LinkKeySerializer())
+ .register(ConnectPoint.class, new ConnectPointSerializer())
+ .register(DefaultLink.class, new DefaultLinkSerializer())
+ .build()
+ .populate(1);
+ }
+
+
+ @Override
+ public ClusterMessage decode(ByteBuffer buffer) {
+ try {
+ // Do we have enough bytes to read the header? If not, bail.
+ if (buffer.remaining() < METADATA_LENGTH) {
+ return null;
+ }
+
+ // Peek at the length and if we have enough to read the entire message
+ // go ahead, otherwise bail.
+ int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+ if (buffer.remaining() < length) {
+ return null;
+ }
+
+ // At this point, we have enough data to read a complete message.
+ long marker = buffer.getLong();
+ checkState(marker == MARKER, "Incorrect message marker");
+ length = buffer.getInt();
+
+ // TODO: sanity checking for length
+ byte[] data = new byte[length - METADATA_LENGTH];
+ buffer.get(data);
+ return (ClusterMessage) serializerPool.deserialize(data);
+
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ log.warn("Unable to decode message due to: " + e);
+ }
+ return null;
+ }
+
+ @Override
+ public void encode(ClusterMessage message, ByteBuffer buffer) {
+ try {
+ byte[] data = serializerPool.serialize(message);
+ buffer.putLong(MARKER);
+ buffer.putInt(data.length + METADATA_LENGTH);
+ buffer.put(data);
+
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ log.warn("Unable to encode message due to: " + e);
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java
deleted file mode 100644
index 06f4f8a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-/**
- * Provides means to find a worker IO loop.
- */
-public interface WorkerFinder {
-
- /**
- * Finds a suitable worker.
- *
- * @return available worker
- */
- ClusterIOWorker findWorker();
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
new file mode 100644
index 0000000..f4b9710
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Distributed cluster store and messaging subsystem implementation.
+ */
+package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 87ed221..fe7fcd3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -10,6 +10,15 @@
public interface ClusterCommunicationService {
/**
+ * Sends a message to all controller nodes.
+ *
+ * @param message message to send
+ * @return true if the message was sent sucessfully to all nodes; false
+ * if there is no stream or if there was an error for some node
+ */
+ boolean send(ClusterMessage message);
+
+ /**
* Sends a message to the specified controller node.
*
* @param message message to send
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java
new file mode 100644
index 0000000..e9326f3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java
@@ -0,0 +1,37 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Goodbye message that nodes use to leave the cluster for good.
+ */
+public class GoodbyeMessage extends ClusterMessage {
+
+ private NodeId nodeId;
+
+ // For serialization
+ private GoodbyeMessage() {
+ super(MessageSubject.GOODBYE);
+ nodeId = null;
+ }
+
+ /**
+ * Creates a new goodbye message.
+ *
+ * @param nodeId sending node identification
+ */
+ public GoodbyeMessage(NodeId nodeId) {
+ super(MessageSubject.HELLO);
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Returns the sending node identifer.
+ *
+ * @return node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
index ddc79d3..923e21e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
@@ -29,9 +29,9 @@
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
- nodeId = nodeId;
- ipAddress = ipAddress;
- tcpPort = tcpPort;
+ this.nodeId = nodeId;
+ this.ipAddress = ipAddress;
+ this.tcpPort = tcpPort;
}
/**
@@ -60,4 +60,5 @@
public int tcpPort() {
return tcpPort;
}
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index 3b888b3..bf86c5b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -8,6 +8,9 @@
/** Represents a first greeting message. */
HELLO,
+ /** Signifies node's intent to leave the cluster. */
+ GOODBYE,
+
/** Signifies a heart-beat message. */
ECHO
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
index 6b78fec..68cd83c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.cluster.messaging;
+import org.onlab.onos.cluster.NodeId;
+
/**
* Represents a message consumer.
*/
@@ -8,8 +10,9 @@
/**
* Receives the specified cluster message.
*
- * @param message message to be received
+ * @param message message to be received
+ * @param fromNodeId node from which the message was received
*/
- void receive(ClusterMessage message);
+ void receive(ClusterMessage message, NodeId fromNodeId);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
index 79e054b..7521630 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -3,12 +3,12 @@
import java.nio.ByteBuffer;
/**
- * Service for serializing/deserializing intra-cluster messages.
+ * Service for encoding & decoding intra-cluster messages.
*/
public interface SerializationService {
/**
- * Decodes the specified byte buffer to obtain a message within.
+ * Decodes the specified byte buffer to obtain the message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
deleted file mode 100644
index bafb2c3..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
-import org.onlab.onos.store.cluster.impl.MessageSender;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
-
-import java.util.Set;
-
-/**
- * Implements the cluster communication services to use by other stores.
- */
-@Component(immediate = true)
-@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService, CommunicationsDelegate {
-
- // TODO: use something different that won't require synchronization
- private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
- private MessageSender messageSender;
-
- @Override
- public boolean send(ClusterMessage message, NodeId toNodeId) {
- return messageSender.send(toNodeId, message);
- }
-
- @Override
- public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
- subscribers.put(subject, subscriber);
- }
-
- @Override
- public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
- subscribers.remove(subject, subscriber);
- }
-
- @Override
- public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
- return ImmutableSet.copyOf(subscribers.get(subject));
- }
-
- @Override
- public void dispatch(ClusterMessage message) {
- Set<MessageSubscriber> set = getSubscribers(message.subject());
- if (set != null) {
- for (MessageSubscriber subscriber : set) {
- subscriber.receive(message);
- }
- }
- }
-
- @Override
- public void setSender(MessageSender messageSender) {
- this.messageSender = messageSender;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
deleted file mode 100644
index 93c8310..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-
-import java.nio.ByteBuffer;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Factory for parsing messages sent between cluster members.
- */
-public class MessageSerializer implements SerializationService {
-
- private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
- private static final int LENGTH_OFFSET = 12;
-
- private static final long MARKER = 0xfeedcafebeaddeadL;
-
- @Override
- public ClusterMessage decode(ByteBuffer buffer) {
- try {
- // Do we have enough bytes to read the header? If not, bail.
- if (buffer.remaining() < METADATA_LENGTH) {
- return null;
- }
-
- // Peek at the length and if we have enough to read the entire message
- // go ahead, otherwise bail.
- int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
- if (buffer.remaining() < length) {
- return null;
- }
-
- // At this point, we have enough data to read a complete message.
- long marker = buffer.getLong();
- checkState(marker == MARKER, "Incorrect message marker");
-
- int subjectOrdinal = buffer.getInt();
- MessageSubject subject = MessageSubject.values()[subjectOrdinal];
- length = buffer.getInt();
-
- // TODO: sanity checking for length
- byte[] data = new byte[length - METADATA_LENGTH];
- buffer.get(data);
-
- // TODO: add deserialization hook here; for now this hack
- return null; // actually deserialize
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- e.printStackTrace();
- }
- return null;
- }
-
- @Override
- public void encode(ClusterMessage message, ByteBuffer buffer) {
- try {
- int i = 0;
- // Type based lookup for proper encoder
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- e.printStackTrace();
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
new file mode 100644
index 0000000..5276b0b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Cluster messaging APIs for the use by the various distributed stores.
+ */
+package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
new file mode 100644
index 0000000..6ae334b
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -0,0 +1,124 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests of the cluster communication manager.
+ */
+public class ClusterCommunicationManagerTest {
+
+ private static final NodeId N1 = new NodeId("n1");
+ private static final NodeId N2 = new NodeId("n2");
+
+ private static final int P1 = 9881;
+ private static final int P2 = 9882;
+
+ private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+ private ClusterCommunicationManager ccm1;
+ private ClusterCommunicationManager ccm2;
+
+ private TestDelegate cnd1 = new TestDelegate();
+ private TestDelegate cnd2 = new TestDelegate();
+
+ private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
+ private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
+
+ @Before
+ public void setUp() {
+ MessageSerializer messageSerializer = new MessageSerializer();
+ messageSerializer.activate();
+
+ ccm1 = new ClusterCommunicationManager();
+ ccm1.serializationService = messageSerializer;
+ ccm1.activate();
+
+ ccm2 = new ClusterCommunicationManager();
+ ccm2.serializationService = messageSerializer;
+ ccm2.activate();
+
+ ccm1.startUp(node1, cnd1);
+ ccm2.startUp(node2, cnd2);
+ }
+
+ @After
+ public void tearDown() {
+ ccm1.deactivate();
+ ccm2.deactivate();
+ }
+
+ @Test
+ public void connect() throws Exception {
+ cnd1.latch = new CountDownLatch(1);
+ cnd2.latch = new CountDownLatch(1);
+
+ ccm1.addNode(node2);
+ validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+ validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+ }
+
+ @Test
+ public void disconnect() throws Exception {
+ cnd1.latch = new CountDownLatch(1);
+ cnd2.latch = new CountDownLatch(1);
+
+ ccm1.addNode(node2);
+ validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+ validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+
+ cnd1.latch = new CountDownLatch(1);
+ cnd2.latch = new CountDownLatch(1);
+ ccm1.deactivate();
+//
+// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
+ }
+
+ private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
+ throws InterruptedException {
+ assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
+ assertEquals("incorrect event", op, delegate.op);
+ assertEquals("incorrect event node", nodeId, delegate.nodeId);
+ }
+
+ enum Op { DETECTED, VANISHED, REMOVED };
+
+ private class TestDelegate implements ClusterNodesDelegate {
+
+ Op op;
+ CountDownLatch latch;
+ NodeId nodeId;
+
+ @Override
+ public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+ latch(nodeId, Op.DETECTED);
+ return new DefaultControllerNode(nodeId, ip, tcpPort);
+ }
+
+ @Override
+ public void nodeVanished(NodeId nodeId) {
+ latch(nodeId, Op.VANISHED);
+ }
+
+ @Override
+ public void nodeRemoved(NodeId nodeId) {
+ latch(nodeId, Op.REMOVED);
+ }
+
+ private void latch(NodeId nodeId, Op op) {
+ this.op = op;
+ this.nodeId = nodeId;
+ latch.countDown();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tools/test/bin/onos b/tools/test/bin/onos
index 74bf1cf..76b5c15 100755
--- a/tools/test/bin/onos
+++ b/tools/test/bin/onos
@@ -3,5 +3,7 @@
# ONOS remote command-line client.
#-------------------------------------------------------------------------------
+[ "$1" = "-w" ] && shift && onos-wait-for-start $1
+
[ -n "$1" ] && OCI=$1 && shift
-client -h $OCI "$@"
+client -h $OCI "$@" 2>/dev/null
diff --git a/tools/test/bin/onos-kill b/tools/test/bin/onos-kill
new file mode 100755
index 0000000..6b849d8
--- /dev/null
+++ b/tools/test/bin/onos-kill
@@ -0,0 +1,9 @@
+#!/bin/bash
+#-------------------------------------------------------------------------------
+# Remotely kills the ONOS service on the specified node.
+#-------------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+ssh $ONOS_USER@${1:-$OCI} "kill -9 \$(ps -ef | grep karaf.jar | grep -v grep | cut -c10-15)"
\ No newline at end of file
diff --git a/tools/test/cells/tom b/tools/test/cells/tom
new file mode 100644
index 0000000..2eb0523
--- /dev/null
+++ b/tools/test/cells/tom
@@ -0,0 +1,10 @@
+# Default virtual box ONOS instances 1,2 & ONOS mininet box
+
+export ONOS_NIC=192.168.56.*
+
+export OC1="192.168.56.11"
+export OC2="192.168.56.12"
+
+export OCN="192.168.56.7"
+
+