Merge remote-tracking branch 'origin/master'
diff --git a/apps/foo/pom.xml b/apps/foo/pom.xml
index 860d70b..868b992 100644
--- a/apps/foo/pom.xml
+++ b/apps/foo/pom.xml
@@ -28,10 +28,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.livetribe.slp</groupId>
- <artifactId>livetribe-slp</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
diff --git a/apps/foo/src/main/java/org/onlab/onos/ccc/DistributedClusterStore.java b/apps/foo/src/main/java/org/onlab/onos/ccc/DistributedClusterStore.java
deleted file mode 100644
index 62eed8e..0000000
--- a/apps/foo/src/main/java/org/onlab/onos/ccc/DistributedClusterStore.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package org.onlab.onos.ccc;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.nio.AcceptorLoop;
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-import org.onlab.onos.cluster.ClusterEvent;
-import org.onlab.onos.cluster.ClusterStore;
-import org.onlab.onos.cluster.ClusterStoreDelegate;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static java.net.InetAddress.getByAddress;
-import static org.onlab.onos.cluster.ControllerNode.State;
-import static org.onlab.packet.IpPrefix.valueOf;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
-@Component(immediate = true)
-@Service
-public class DistributedClusterStore
- extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final long SELECT_TIMEOUT = 50;
- private static final int WORKERS = 3;
- private static final int COMM_BUFFER_SIZE = 16 * 1024;
- private static final int COMM_IDLE_TIME = 500;
-
- private DefaultControllerNode self;
- private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
- private final Map<NodeId, State> states = new ConcurrentHashMap<>();
-
- private final ExecutorService listenExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-listen"));
- private final ExecutorService commExecutors =
- Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster"));
- private final ExecutorService heartbeatExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat"));
-
- private ListenLoop listenLoop;
- private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
-
- @Activate
- public void activate() {
- establishIdentity();
- startCommunications();
- startListening();
- log.info("Started");
- }
-
- private void startCommunications() {
- for (int i = 0; i < WORKERS; i++) {
- try {
- CommLoop loop = new CommLoop();
- commLoops.add(loop);
- commExecutors.execute(loop);
- } catch (IOException e) {
- log.warn("Unable to start comm IO loop", e);
- }
- }
- }
-
- // Starts listening for connections from peer cluster members.
- private void startListening() {
- try {
- listenLoop = new ListenLoop(self.ip(), self.tcpPort());
- listenExecutor.execute(listenLoop);
- } catch (IOException e) {
- log.error("Unable to listen for cluster connections", e);
- }
- }
-
- // Establishes the controller's own identity.
- private void establishIdentity() {
- // For now rely on env. variable.
- IpPrefix ip = valueOf(System.getenv("ONOS_NIC"));
- self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
- }
-
- @Deactivate
- public void deactivate() {
- listenLoop.shutdown();
- for (CommLoop loop : commLoops) {
- loop.shutdown();
- }
- log.info("Stopped");
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return self;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
- return builder.addAll(nodes.values()).build();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- return nodes.get(nodeId);
- }
-
- @Override
- public State getState(NodeId nodeId) {
- State state = states.get(nodeId);
- return state == null ? State.INACTIVE : state;
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
- DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- nodes.put(nodeId, node);
- return node;
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- nodes.remove(nodeId);
- }
-
- // Listens and accepts inbound connections from other cluster nodes.
- private class ListenLoop extends AcceptorLoop {
- ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
- super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel channel) throws IOException {
-
- }
- }
-
- private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
- CommLoop() throws IOException {
- super(SELECT_TIMEOUT);
- }
-
- @Override
- protected TLVMessageStream createStream(ByteChannel byteChannel) {
- return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
- }
-
- @Override
- protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
-
- }
- }
-}
diff --git a/apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessageStream.java b/apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessageStream.java
deleted file mode 100644
index da0973e..0000000
--- a/apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessageStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.onlab.onos.ccc;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Stream for transferring TLV messages between cluster members.
- */
-public class TLVMessageStream extends MessageStream<TLVMessage> {
-
- private static final long MARKER = 0xfeedcafecafefeedL;
-
- /**
- * Creates a message stream associated with the specified IO loop and
- * backed by the given byte channel.
- *
- * @param loop IO loop
- * @param byteChannel backing byte channel
- * @param bufferSize size of the backing byte buffers
- * @param maxIdleMillis maximum number of millis the stream can be idle
- */
- protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
- int bufferSize, int maxIdleMillis) {
- super(loop, byteChannel, bufferSize, maxIdleMillis);
- }
-
- @Override
- protected TLVMessage read(ByteBuffer buffer) {
- long marker = buffer.getLong();
- checkState(marker == MARKER, "Incorrect message marker");
-
- int type = buffer.getInt();
- int length = buffer.getInt();
-
- // TODO: add deserialization hook here
-
- return new TLVMessage(type, length, null);
- }
-
- @Override
- protected void write(TLVMessage message, ByteBuffer buffer) {
- buffer.putLong(MARKER);
- buffer.putInt(message.type());
- buffer.putInt(message.length());
-
- // TODO: add serialization hook here
- }
-}
diff --git a/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java b/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java
new file mode 100644
index 0000000..7c9a163
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cluster.ClusterAdminService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Adds a new controller cluster node.
+ */
+@Command(scope = "onos", name = "add-node",
+ description = "Adds a new controller cluster node")
+public class NodeAddCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "nodeId", description = "Node ID",
+ required = true, multiValued = false)
+ String nodeId = null;
+
+ @Argument(index = 1, name = "ip", description = "Node IP address",
+ required = true, multiValued = false)
+ String ip = null;
+
+ @Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
+ required = false, multiValued = false)
+ int tcpPort = 9876;
+
+ @Override
+ protected void execute() {
+ ClusterAdminService service = get(ClusterAdminService.class);
+ service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort);
+ }
+
+}
diff --git a/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java b/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java
new file mode 100644
index 0000000..219c187
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java
@@ -0,0 +1,25 @@
+package org.onlab.onos.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cluster.ClusterAdminService;
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Removes a controller cluster node.
+ */
+@Command(scope = "onos", name = "remove-node",
+ description = "Removes a new controller cluster node")
+public class NodeRemoveCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "nodeId", description = "Node ID",
+ required = true, multiValued = false)
+ String nodeId = null;
+
+ @Override
+ protected void execute() {
+ ClusterAdminService service = get(ClusterAdminService.class);
+ service.removeNode(new NodeId(nodeId));
+ }
+
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 30fce6f..16b5672 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -5,6 +5,12 @@
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
+ <action class="org.onlab.onos.cli.NodeAddCommand"/>
+ </command>
+ <command>
+ <action class="org.onlab.onos.cli.NodeRemoveCommand"/>
+ </command>
+ <command>
<action class="org.onlab.onos.cli.MastersListCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 900a2ff..577376a 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -26,6 +26,23 @@
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
+
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onlab-nio</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
new file mode 100644
index 0000000..4dc67d4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
@@ -0,0 +1,75 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Allows for reading and writing cluster definition as a JSON file.
+ */
+public class ClusterDefinitionStore {
+
+ private final File file;
+
+ /**
+ * Creates a reader/writer of the cluster definition file.
+ *
+ * @param filePath location of the definition file
+ */
+ public ClusterDefinitionStore(String filePath) {
+ file = new File(filePath);
+ }
+
+ /**
+ * Returns set of the controller nodes, including self.
+ *
+ * @return set of controller nodes
+ */
+ public Set<DefaultControllerNode> read() throws IOException {
+ Set<DefaultControllerNode> nodes = new HashSet<>();
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
+ Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
+ while (it.hasNext()) {
+ ObjectNode nodeDef = (ObjectNode) it.next();
+ nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
+ IpPrefix.valueOf(nodeDef.get("ip").asText()),
+ nodeDef.get("tcpPort").asInt(9876)));
+ }
+ return nodes;
+ }
+
+ /**
+ * Writes the given set of the controller nodes.
+ *
+ * @param nodes set of controller nodes
+ */
+ public void write(Set<DefaultControllerNode> nodes) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode clusterNodeDef = mapper.createObjectNode();
+ ArrayNode nodeDefs = mapper.createArrayNode();
+ clusterNodeDef.set("nodes", nodeDefs);
+ for (DefaultControllerNode node : nodes) {
+ ObjectNode nodeDef = mapper.createObjectNode();
+ nodeDef.put("id", node.id().toString())
+ .put("ip", node.ip().toString())
+ .put("tcpPort", node.tcpPort());
+ nodeDefs.add(nodeDef);
+ }
+ mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
+ clusterNodeDef);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
new file mode 100644
index 0000000..08a182b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -0,0 +1,362 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.nio.AcceptorLoop;
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterStore;
+import org.onlab.onos.cluster.ClusterStoreDelegate;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.net.InetAddress.getByAddress;
+import static org.onlab.onos.cluster.ControllerNode.State;
+import static org.onlab.packet.IpPrefix.valueOf;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedClusterStore
+ extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
+ implements ClusterStore {
+
+ private static final int HELLO_MSG = 1;
+ private static final int ECHO_MSG = 2;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
+ private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
+
+ private static final long START_TIMEOUT = 1000;
+ private static final long SELECT_TIMEOUT = 50;
+ private static final int WORKERS = 3;
+ private static final int COMM_BUFFER_SIZE = 32 * 1024;
+ private static final int COMM_IDLE_TIME = 500;
+
+ private static final boolean SO_NO_DELAY = false;
+ private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
+ private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
+
+ private DefaultControllerNode self;
+ private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
+ private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+
+ // Means to track message streams to other nodes.
+ private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
+ private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
+
+ // Executor pools for listening and managing connections to other nodes.
+ private final ExecutorService listenExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
+ private final ExecutorService commExecutors =
+ Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
+ private final ExecutorService heartbeatExecutor =
+ Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
+
+ private final Timer timer = new Timer("onos-comm-initiator");
+ private final TimerTask connectionCustodian = new ConnectionCustodian();
+
+ private ListenLoop listenLoop;
+ private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
+
+ @Activate
+ public void activate() {
+ loadClusterDefinition();
+ startCommunications();
+ startListening();
+ startInitiating();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ listenLoop.shutdown();
+ for (CommLoop loop : commLoops) {
+ loop.shutdown();
+ }
+ log.info("Stopped");
+ }
+
+ // Loads the cluster definition file
+ private void loadClusterDefinition() {
+// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
+// try {
+// Set<DefaultControllerNode> storedNodes = cds.read();
+// for (DefaultControllerNode node : storedNodes) {
+// nodes.put(node.id(), node);
+// }
+// } catch (IOException e) {
+// log.error("Unable to read cluster definitions", e);
+// }
+
+ // Establishes the controller's own identity.
+ IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
+ self = nodes.get(new NodeId(ip.toString()));
+
+ // As a fall-back, let's make sure we at least know who we are.
+ if (self == null) {
+ self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ nodes.put(self.id(), self);
+ }
+ }
+
+ // Kicks off the IO loops.
+ private void startCommunications() {
+ for (int i = 0; i < WORKERS; i++) {
+ try {
+ CommLoop loop = new CommLoop();
+ commLoops.add(loop);
+ commExecutors.execute(loop);
+ } catch (IOException e) {
+ log.warn("Unable to start comm IO loop", e);
+ }
+ }
+
+ // Wait for the IO loops to start
+ for (CommLoop loop : commLoops) {
+ if (!loop.awaitStart(START_TIMEOUT)) {
+ log.warn("Comm loop did not start on-time; moving on...");
+ }
+ }
+ }
+
+ // Starts listening for connections from peer cluster members.
+ private void startListening() {
+ try {
+ listenLoop = new ListenLoop(self.ip(), self.tcpPort());
+ listenExecutor.execute(listenLoop);
+ if (!listenLoop.awaitStart(START_TIMEOUT)) {
+ log.warn("Listen loop did not start on-time; moving on...");
+ }
+ } catch (IOException e) {
+ log.error("Unable to listen for cluster connections", e);
+ }
+ }
+
+ /**
+ * Initiates open connection request and registers the pending socket
+ * channel with the given IO loop.
+ *
+ * @param loop loop with which the channel should be registered
+ * @throws java.io.IOException if the socket could not be open or connected
+ */
+ private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
+ SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
+ SocketChannel ch = SocketChannel.open();
+ nodesByChannel.put(ch, node);
+ ch.configureBlocking(false);
+ ch.connect(sa);
+ loop.connectStream(ch);
+ }
+
+
+ // Attempts to connect to any nodes that do not have an associated connection.
+ private void startInitiating() {
+ timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return self;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
+ return builder.addAll(nodes.values()).build();
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ State state = states.get(nodeId);
+ return state == null ? State.INACTIVE : state;
+ }
+
+ @Override
+ public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
+ DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+ nodes.put(nodeId, node);
+ return node;
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ nodes.remove(nodeId);
+ streams.remove(nodeId);
+ }
+
+ // Listens and accepts inbound connections from other cluster nodes.
+ private class ListenLoop extends AcceptorLoop {
+ ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
+ super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+ SocketChannel sc = channel.accept();
+ sc.configureBlocking(false);
+
+ Socket so = sc.socket();
+ so.setTcpNoDelay(SO_NO_DELAY);
+ so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+ so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+ findLeastUtilizedLoop().acceptStream(sc);
+ }
+ }
+
+ private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
+ CommLoop() throws IOException {
+ super(SELECT_TIMEOUT);
+ }
+
+ @Override
+ protected TLVMessageStream createStream(ByteChannel byteChannel) {
+ return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+ }
+
+ @Override
+ protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
+ TLVMessageStream tlvStream = (TLVMessageStream) stream;
+ for (TLVMessage message : messages) {
+ // TODO: add type-based dispatching here...
+ log.info("Got message {}", message.type());
+
+ // FIXME: hack to get going
+ if (message.type() == HELLO_MSG) {
+ processHello(message, tlvStream);
+ }
+ }
+ }
+
+ @Override
+ public TLVMessageStream acceptStream(SocketChannel channel) {
+ TLVMessageStream stream = super.acceptStream(channel);
+ try {
+ InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
+ log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
+ stream.write(createHello(self));
+
+ } catch (IOException e) {
+ log.warn("Unable to accept connection from an unknown end-point", e);
+ }
+ return stream;
+ }
+
+ @Override
+ public TLVMessageStream connectStream(SocketChannel channel) {
+ TLVMessageStream stream = super.connectStream(channel);
+ DefaultControllerNode node = nodesByChannel.get(channel);
+ if (node != null) {
+ log.info("Opened connection to node {}", node.id());
+ nodesByChannel.remove(channel);
+ }
+ return stream;
+ }
+
+ @Override
+ protected void connect(SelectionKey key) {
+ super.connect(key);
+ TLVMessageStream stream = (TLVMessageStream) key.attachment();
+ send(stream, createHello(self));
+ }
+ }
+
+ // FIXME: pure hack for now
+ private void processHello(TLVMessage message, TLVMessageStream stream) {
+ String data = new String(message.data());
+ log.info("Processing hello with data [{}]", data);
+ String[] fields = new String(data).split(":");
+ DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
+ IpPrefix.valueOf(fields[1]),
+ Integer.parseInt(fields[2]));
+ stream.setNode(node);
+ nodes.put(node.id(), node);
+ streams.put(node.id(), stream);
+ }
+
+ // Sends message to the specified stream.
+ private void send(TLVMessageStream stream, TLVMessage message) {
+ try {
+ stream.write(message);
+ } catch (IOException e) {
+ log.warn("Unable to send message to {}", stream.node().id());
+ }
+ }
+
+ private TLVMessage createHello(DefaultControllerNode self) {
+ return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
+ }
+
+ // Sweeps through all controller nodes and attempts to open connection to
+ // those that presently do not have one.
+ private class ConnectionCustodian extends TimerTask {
+ @Override
+ public void run() {
+ for (DefaultControllerNode node : nodes.values()) {
+ if (node != self && !streams.containsKey(node.id())) {
+ try {
+ openConnection(node, findLeastUtilizedLoop());
+ } catch (IOException e) {
+ log.warn("Unable to connect", e);
+ }
+ }
+ }
+ }
+ }
+
+ // Finds the least utilities IO loop.
+ private CommLoop findLeastUtilizedLoop() {
+ CommLoop leastUtilized = null;
+ int minCount = Integer.MAX_VALUE;
+ for (CommLoop loop : commLoops) {
+ int count = loop.streamCount();
+ if (count == 0) {
+ return loop;
+ }
+
+ if (count < minCount) {
+ leastUtilized = loop;
+ minCount = count;
+ }
+ }
+ return leastUtilized;
+ }
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java
similarity index 81%
rename from apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessage.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java
index 33295a5..246f8ee 100644
--- a/apps/foo/src/main/java/org/onlab/onos/ccc/TLVMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.ccc;
+package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
@@ -12,17 +12,16 @@
public class TLVMessage extends AbstractMessage {
private final int type;
- private final Object data;
+ private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
- * @param length message length
- * @param data message data
+ * @param data message data bytes
*/
- public TLVMessage(int type, int length, Object data) {
- this.length = length;
+ public TLVMessage(int type, byte[] data) {
+ this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
@@ -37,11 +36,11 @@
}
/**
- * Returns the data object.
+ * Returns the data bytes.
*
* @return message data
*/
- public Object data() {
+ public byte[] data() {
return data;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java
new file mode 100644
index 0000000..b003945
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java
@@ -0,0 +1,95 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring TLV messages between cluster members.
+ */
+public class TLVMessageStream extends MessageStream<TLVMessage> {
+
+ public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+
+ private static final int LENGTH_OFFSET = 12;
+ private static final long MARKER = 0xfeedcafecafefeedL;
+
+ private DefaultControllerNode node;
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ * @param bufferSize size of the backing byte buffers
+ * @param maxIdleMillis maximum number of millis the stream can be idle
+ */
+ protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
+ int bufferSize, int maxIdleMillis) {
+ super(loop, byteChannel, bufferSize, maxIdleMillis);
+ }
+
+ /**
+ * Returns the node with which this stream is associated.
+ *
+ * @return controller node
+ */
+ DefaultControllerNode node() {
+ return node;
+ }
+
+ /**
+ * Sets the node with which this stream is affiliated.
+ *
+ * @param node controller node
+ */
+ void setNode(DefaultControllerNode node) {
+ checkState(this.node == null, "Stream is already bound to a node");
+ this.node = node;
+ }
+
+ @Override
+ protected TLVMessage read(ByteBuffer buffer) {
+ // Do we have enough bytes to read the header? If not, bail.
+ if (buffer.remaining() < METADATA_LENGTH) {
+ return null;
+ }
+
+ // Peek at the length and if we have enough to read the entire message
+ // go ahead, otherwise bail.
+ int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+ if (buffer.remaining() < length) {
+ return null;
+ }
+
+ // At this point, we have enough data to read a complete message.
+ long marker = buffer.getLong();
+ checkState(marker == MARKER, "Incorrect message marker");
+
+ int type = buffer.getInt();
+ length = buffer.getInt();
+
+ // TODO: add deserialization hook here
+ byte[] data = new byte[length - METADATA_LENGTH];
+ buffer.get(data);
+
+ return new TLVMessage(type, data);
+ }
+
+ @Override
+ protected void write(TLVMessage message, ByteBuffer buffer) {
+ buffer.putLong(MARKER);
+ buffer.putInt(message.type());
+ buffer.putInt(message.length());
+
+ // TODO: add serialization hook here
+ buffer.put(message.data());
+ }
+
+}
diff --git a/features/features.xml b/features/features.xml
index 739e6b6..c49fd3c 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -9,7 +9,6 @@
<bundle>mvn:org.apache.commons/commons-lang3/3.3.2</bundle>
<bundle>mvn:com.google.guava/guava/18.0</bundle>
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
- <bundle>mvn:org.livetribe.slp/livetribe-slp-osgi/2.2.1</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
@@ -49,14 +48,11 @@
description="ONOS core components">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
- <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
- <bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
- <bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
- <bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
+ <bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
</feature>
- <feature name="onos-core-dist" version="1.0.0"
- description="ONOS core components">
+ <feature name="onos-core-hazelcast" version="1.0.0"
+ description="ONOS core components built on hazelcast">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
diff --git a/tools/package/debian/onos.conf b/tools/package/debian/onos.conf
index 6d80502..842a937 100644
--- a/tools/package/debian/onos.conf
+++ b/tools/package/debian/onos.conf
@@ -15,7 +15,7 @@
pre-stop script
/opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log
- sleep 3
+ sleep 2
end script
script
diff --git a/tools/test/bin/onos-config b/tools/test/bin/onos-config
index 9f1e3b0..4c4f7e1 100755
--- a/tools/test/bin/onos-config
+++ b/tools/test/bin/onos-config
@@ -8,7 +8,21 @@
remote=$ONOS_USER@${1:-$OCI}
+# Generate a cluster.json from the ON* environment variables
+CDEF_FILE=/tmp/cluster.json
+echo "{ \"nodes\":[" > $CDEF_FILE
+for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
+ echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE
+done
+echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE
+echo "]}" >> $CDEF_FILE
+
ssh $remote "
sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
-"
\ No newline at end of file
+
+ echo \"onos.ip = \$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
+ >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
+"
+
+scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/
\ No newline at end of file
diff --git a/tools/test/bin/onos-install b/tools/test/bin/onos-install
index d594105..a87ff17 100755
--- a/tools/test/bin/onos-install
+++ b/tools/test/bin/onos-install
@@ -24,6 +24,7 @@
# Make a link to the log file directory and make a home for auxiliaries
ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log
mkdir $ONOS_INSTALL_DIR/var
+ mkdir $ONOS_INSTALL_DIR/config
# Install the upstart configuration file and setup options for debugging
sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf
diff --git a/tools/test/cells/local b/tools/test/cells/local
index b04a5e3..6b9fea5 100644
--- a/tools/test/cells/local
+++ b/tools/test/cells/local
@@ -1,6 +1,8 @@
# Default virtual box ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
+export ONOS_NIC=192.168.56.*
+
export OC1="192.168.56.101"
export OC2="192.168.56.102"
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
index 1309330..805b58a 100644
--- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -54,6 +54,15 @@
}
/**
+ * Returns the number of message stream in custody of the loop.
+ *
+ * @return number of message streams
+ */
+ public int streamCount() {
+ return streams.size();
+ }
+
+ /**
* Creates a new message stream backed by the specified socket channel.
*
* @param byteChannel backing byte channel
@@ -182,9 +191,10 @@
* with a pending accept operation.
*
* @param channel backing socket channel
+ * @return newly accepted message stream
*/
- public void acceptStream(SocketChannel channel) {
- createAndAdmit(channel, SelectionKey.OP_READ);
+ public S acceptStream(SocketChannel channel) {
+ return createAndAdmit(channel, SelectionKey.OP_READ);
}
@@ -193,9 +203,10 @@
* with a pending connect operation.
*
* @param channel backing socket channel
+ * @return newly connected message stream
*/
- public void connectStream(SocketChannel channel) {
- createAndAdmit(channel, SelectionKey.OP_CONNECT);
+ public S connectStream(SocketChannel channel) {
+ return createAndAdmit(channel, SelectionKey.OP_CONNECT);
}
/**
@@ -205,12 +216,14 @@
* @param channel socket channel
* @param op pending operations mask to be applied to the selection
* key as a set of initial interestedOps
+ * @return newly created message stream
*/
- private synchronized void createAndAdmit(SocketChannel channel, int op) {
+ private synchronized S createAndAdmit(SocketChannel channel, int op) {
S stream = createStream(channel);
streams.add(stream);
newStreamRequests.add(new NewStreamRequest(stream, channel, op));
selector.wakeup();
+ return stream;
}
/**
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index 89107bf..a7416e9 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -170,7 +170,7 @@
}
/**
- * Reads, withouth blocking, a list of messages from the stream.
+ * Reads, without blocking, a list of messages from the stream.
* The list will be empty if there were not messages pending.
*
* @return list of messages or null if backing channel has been closed