Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/foo/pom.xml b/apps/foo/pom.xml
index 1a62f73..6109263 100644
--- a/apps/foo/pom.xml
+++ b/apps/foo/pom.xml
@@ -24,6 +24,11 @@
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
+ <artifactId>onlab-osgi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/NettyEchoHandler.java b/apps/foo/src/main/java/org/onlab/onos/foo/NettyEchoHandler.java
new file mode 100644
index 0000000..1049a6d
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/NettyEchoHandler.java
@@ -0,0 +1,23 @@
+package org.onlab.onos.foo;
+
+import java.io.IOException;
+
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Message handler that echos the message back to the sender.
+ */
+public class NettyEchoHandler implements MessageHandler {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public void handle(Message message) throws IOException {
+ //log.info("Received message. Echoing it back to the sender.");
+ message.respond(message.payload());
+ }
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java b/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java
new file mode 100644
index 0000000..b35a46f
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java
@@ -0,0 +1,19 @@
+package org.onlab.onos.foo;
+
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A MessageHandler that simply logs the information.
+ */
+public class NettyLoggingHandler implements MessageHandler {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public void handle(Message message) {
+ //log.info("Received message. Payload has {} bytes", message.payload().length);
+ }
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
index 8f94ec1..5e3f8f7 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.metrics.MetricsComponent;
@@ -11,12 +10,17 @@
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleNettyClient {
- private SimpleNettyClient() {
+
+private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
+
+ private SimpleNettyClient() {
}
public static void main(String[] args)
@@ -30,33 +34,46 @@
System.exit(0);
}
- public static void startStandalone(String... args) throws Exception {
+ public static void startStandalone(String[] args) throws Exception {
+ String host = args.length > 0 ? args[0] : "localhost";
+ int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
+ int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
+ int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
+ Endpoint endpoint = new Endpoint(host, port);
messaging.activate();
metrics.activate();
- MetricsFeature feature = new MetricsFeature("timers");
+ MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
- Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
- final int warmup = 100;
+ log.info("connecting " + host + ":" + port + " warmup:" + warmup + " iterations:" + iterations);
+
for (int i = 0; i < warmup; i++) {
+ messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
+ Response response = messaging
+ .sendAndReceive(endpoint, "echo",
+ "Hello World".getBytes());
+ }
+
+ log.info("measuring async sender");
+ Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
+
+ for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
+ messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
- metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
- final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8080), "echo",
+ .sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
- System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
+ // System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
- metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
+ metrics.deactivate();
}
public static class TestNettyMessagingService extends NettyMessagingService {
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
index 8a603e9..b63f9c1 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
@@ -13,33 +13,29 @@
description = "Starts the simple Netty client")
public class SimpleNettyClientCommand extends AbstractShellCommand {
- @Argument(index = 0, name = "serverIp", description = "Server IP address",
+ //FIXME: replace these arguments with proper ones needed for the test.
+ @Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
- String serverIp = "127.0.0.1";
+ String hostname = "localhost";
- @Argument(index = 1, name = "workers", description = "IO workers",
+ @Argument(index = 1, name = "port", description = "Port",
required = false, multiValued = false)
- String workers = "6";
+ String port = "8081";
- @Argument(index = 2, name = "messageCount", description = "Message count",
+ @Argument(index = 2, name = "warmupCount", description = "Warm-up count",
required = false, multiValued = false)
- String messageCount = "1000000";
+ String warmupCount = "1000";
- @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
+ @Argument(index = 3, name = "messageCount", description = "Message count",
required = false, multiValued = false)
- String messageLength = "128";
-
- @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
- required = false, multiValued = false)
- String timeoutSecs = "60";
+ String messageCount = "100000";
@Override
protected void execute() {
try {
- startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
+ startStandalone(new String[]{hostname, port, warmupCount, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
}
-
}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServer.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServer.java
index 25ac59b..5578fcd 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServer.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServer.java
@@ -1,6 +1,5 @@
package org.onlab.onos.foo;
-import org.onlab.netty.EchoHandler;
import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -9,7 +8,7 @@
* Test to measure Messaging performance.
*/
public final class SimpleNettyServer {
- private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
+ private static Logger log = LoggerFactory.getLogger(SimpleNettyServer.class);
private SimpleNettyServer() {}
@@ -19,10 +18,10 @@
}
public static void startStandalone(String[] args) throws Exception {
- NettyMessagingService server = new NettyMessagingService(8080);
+ NettyMessagingService server = new NettyMessagingService(8081);
server.activate();
- server.registerHandler("simple", new org.onlab.netty.LoggingHandler());
- server.registerHandler("echo", new EchoHandler());
+ server.registerHandler("simple", new NettyLoggingHandler());
+ server.registerHandler("echo", new NettyEchoHandler());
}
}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServerCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServerCommand.java
index 2f82da0..17b2586 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServerCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyServerCommand.java
@@ -9,10 +9,11 @@
/**
* Starts the Simple Netty server.
*/
-@Command(scope = "onos", name = "test-netty-server",
+@Command(scope = "onos", name = "simple-netty-server",
description = "Starts the simple netty server")
public class SimpleNettyServerCommand extends AbstractShellCommand {
+ //FIXME: Replace these with parameters for
@Argument(index = 0, name = "serverIp", description = "Server IP address",
required = false, multiValued = false)
String serverIp = "127.0.0.1";
diff --git a/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 93bd020..f8ee2b8 100644
--- a/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -7,6 +7,12 @@
<command>
<action class="org.onlab.onos.foo.TestIOServerCommand"/>
</command>
+ <command>
+ <action class="org.onlab.onos.foo.SimpleNettyServerCommand"/>
+ </command>
+ <command>
+ <action class="org.onlab.onos.foo.SimpleNettyClientCommand"/>
+ </command>
</command-bundle>
</blueprint>
diff --git a/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
index b3e03b3..e843770 100644
--- a/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
@@ -1,6 +1,7 @@
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.CoreService;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRuleService;
@@ -21,7 +22,8 @@
protected void execute() {
TopologyService topologyService = get(TopologyService.class);
Topology topology = topologyService.currentTopology();
- print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
+ print("version=%s, nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
+ get(CoreService.class).version().toString(),
get(ClusterService.class).getNodes().size(),
get(DeviceService.class).getDeviceCount(),
get(LinkService.class).getLinkCount(),
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
index be2e964..837a0a7 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
@@ -27,7 +27,7 @@
required = true, multiValued = false)
String two = null;
- private static long id = 1;
+ private static long id = 0x7870001;
@Override
protected void execute() {
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
new file mode 100644
index 0000000..89ec7f6
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
@@ -0,0 +1,93 @@
+package org.onlab.onos.cli.net;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cli.AbstractShellCommand;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.DefaultTrafficSelector;
+import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentService;
+import org.onlab.onos.net.intent.PointToPointIntent;
+import org.onlab.packet.Ethernet;
+
+/**
+ * Installs point-to-point connectivity intents.
+ */
+@Command(scope = "onos", name = "add-point-intent",
+ description = "Installs point-to-point connectivity intent")
+public class AddPointToPointIntentCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "ingressDevice",
+ description = "Ingress Device/Port Description",
+ required = true, multiValued = false)
+ String ingressDeviceString = null;
+
+ @Argument(index = 1, name = "egressDevice",
+ description = "Egress Device/Port Description",
+ required = true, multiValued = false)
+ String egressDeviceString = null;
+
+ private static long id = 0x7470001;
+
+ @Override
+ protected void execute() {
+ IntentService service = get(IntentService.class);
+
+ DeviceId ingressDeviceId = DeviceId.deviceId(getDeviceId(ingressDeviceString));
+ PortNumber ingressPortNumber =
+ PortNumber.portNumber(getPortNumber(ingressDeviceString));
+ ConnectPoint ingress = new ConnectPoint(ingressDeviceId, ingressPortNumber);
+
+ DeviceId egressDeviceId = DeviceId.deviceId(getDeviceId(egressDeviceString));
+ PortNumber egressPortNumber =
+ PortNumber.portNumber(getPortNumber(egressDeviceString));
+ ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .build();
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
+
+ Intent intent =
+ new PointToPointIntent(new IntentId(id++),
+ selector,
+ treatment,
+ ingress,
+ egress);
+ service.submit(intent);
+ }
+
+ /**
+ * Extracts the port number portion of the ConnectPoint.
+ *
+ * @param deviceString string representing the device/port
+ * @return port number as a string, empty string if the port is not found
+ */
+ private String getPortNumber(String deviceString) {
+ int slash = deviceString.indexOf('/');
+ if (slash <= 0) {
+ return "";
+ }
+ return deviceString.substring(slash + 1, deviceString.length());
+ }
+
+ /**
+ * Extracts the device ID portion of the ConnectPoint.
+ *
+ * @param deviceString string representing the device/port
+ * @return device ID string
+ */
+ private String getDeviceId(String deviceString) {
+ int slash = deviceString.indexOf('/');
+ if (slash <= 0) {
+ return "";
+ }
+ return deviceString.substring(0, slash);
+ }
+}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/ConnectPointCompleter.java b/cli/src/main/java/org/onlab/onos/cli/net/ConnectPointCompleter.java
new file mode 100644
index 0000000..29994c7
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/net/ConnectPointCompleter.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.cli.net;
+
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.karaf.shell.console.Completer;
+import org.apache.karaf.shell.console.completer.StringsCompleter;
+import org.onlab.onos.cli.AbstractShellCommand;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.device.DeviceService;
+
+/**
+ * ConnectPoint completer.
+ */
+public class ConnectPointCompleter implements Completer {
+ @Override
+ public int complete(String buffer, int cursor, List<String> candidates) {
+ // Delegate string completer
+ StringsCompleter delegate = new StringsCompleter();
+
+ // Fetch our service and feed it's offerings to the string completer
+ DeviceService service = AbstractShellCommand.get(DeviceService.class);
+
+ // Generate the device ID/port number identifiers
+ for (Device device : service.getDevices()) {
+ SortedSet<String> strings = delegate.getStrings();
+
+ for (Port port : service.getPorts(device.id())) {
+ strings.add(device.id().toString() + "/" + port.number());
+ }
+ }
+
+ // Now let the completer do the work for figuring out what to offer.
+ return delegate.complete(buffer, cursor, candidates);
+ }
+
+}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
index e62f850..fe18ba0 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
@@ -13,40 +13,41 @@
import org.onlab.onos.net.intent.IntentState;
/**
- * Wipes-out the entire network information base, i.e. devices, links, hosts.
+ * Wipes-out the entire network information base, i.e. devices, links, hosts, intents.
*/
@Command(scope = "onos", name = "wipe-out",
description = "Wipes-out the entire network information base, i.e. devices, links, hosts")
public class WipeOutCommand extends ClustersListCommand {
-
- private static final String DISCLAIMER = "Yes, I know it will delete everything!";
+ private static final String DISCLAIMER = "Delete everything please.";
@Argument(index = 0, name = "disclaimer", description = "Device ID",
- required = true, multiValued = false)
+ required = false, multiValued = false)
String disclaimer = null;
@Override
protected void execute() {
- if (!disclaimer.equals(DISCLAIMER)) {
- print("I'm afraid I can't do that...");
- print("You have to acknowledge by: " + DISCLAIMER);
+ if (disclaimer == null || !disclaimer.equals(DISCLAIMER)) {
+ print("I'm afraid I can't do that!\nPlease acknowledge with phrase: '%s'",
+ DISCLAIMER);
return;
}
- print("Good bye...");
+ print("Wiping devices");
DeviceAdminService deviceAdminService = get(DeviceAdminService.class);
DeviceService deviceService = get(DeviceService.class);
for (Device device : deviceService.getDevices()) {
deviceAdminService.removeDevice(device.id());
}
+ print("Wiping hosts");
HostAdminService hostAdminService = get(HostAdminService.class);
HostService hostService = get(HostService.class);
for (Host host : hostService.getHosts()) {
hostAdminService.removeHost(host.id());
}
+ print("Wiping intents");
IntentService intentService = get(IntentService.class);
for (Intent intent : intentService.getIntents()) {
if (intentService.getIntentState(intent.id()) == IntentState.INSTALLED) {
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 92f26fd..6120d30 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -75,6 +75,13 @@
<ref component-id="hostIdCompleter"/>
</completers>
</command>
+ <command>
+ <action class="org.onlab.onos.cli.net.AddPointToPointIntentCommand"/>
+ <completers>
+ <ref component-id="connectPointCompleter"/>
+ <ref component-id="connectPointCompleter"/>
+ </completers>
+ </command>
<command>
<action class="org.onlab.onos.cli.net.ClustersListCommand"/>
@@ -116,5 +123,6 @@
<bean id="hostIdCompleter" class="org.onlab.onos.cli.net.HostIdCompleter"/>
<bean id="intentIdCompleter" class="org.onlab.onos.cli.net.IntentIdCompleter"/>
<bean id="flowRuleStatusCompleter" class="org.onlab.onos.cli.net.FlowRuleStatusCompleter"/>
+ <bean id="connectPointCompleter" class="org.onlab.onos.cli.net.ConnectPointCompleter"/>
</blueprint>
diff --git a/core/api/src/main/java/org/onlab/onos/CoreService.java b/core/api/src/main/java/org/onlab/onos/CoreService.java
new file mode 100644
index 0000000..32c36c5
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/CoreService.java
@@ -0,0 +1,15 @@
+package org.onlab.onos;
+
+/**
+ * Service for interacting with the core system of the controller.
+ */
+public interface CoreService {
+
+ /**
+ * Returns the product version.
+ *
+ * @return product version
+ */
+ Version version();
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/Version.java b/core/api/src/main/java/org/onlab/onos/Version.java
new file mode 100644
index 0000000..5d071b7
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/Version.java
@@ -0,0 +1,113 @@
+package org.onlab.onos;
+
+import java.util.Objects;
+
+import static java.lang.Integer.parseInt;
+
+/**
+ * Representation of the product version.
+ */
+public final class Version {
+
+ public static final String FORMAT = "%d.%d.%d.%s";
+
+ private final int major;
+ private final int minor;
+ private final int patch;
+ private final String build;
+
+ private final String format;
+
+ // Creates a new version descriptor
+ private Version(int major, int minor, int patch, String build) {
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ this.build = build;
+ this.format = String.format(FORMAT, major, minor, patch, build);
+ }
+
+
+ /**
+ * Creates a new version from the specified constituent numbers.
+ *
+ * @param major major version number
+ * @param minor minod version number
+ * @param patch version patch number
+ * @param build build string
+ * @return version descriptor
+ */
+ public static Version version(int major, int minor, int patch, String build) {
+ return new Version(major, minor, patch, build);
+ }
+
+ /**
+ * Creates a new version by parsing the specified string.
+ *
+ * @param string version string
+ * @return version descriptor
+ */
+ public static Version version(String string) {
+ String[] fields = string.split("[.-]");
+ return new Version(parseInt(fields[0]), parseInt(fields[1]),
+ parseInt(fields[2]), fields[3]);
+ }
+
+ /**
+ * Returns the major version number.
+ *
+ * @return major version number
+ */
+ public int major() {
+ return major;
+ }
+
+ /**
+ * Returns the minor version number.
+ *
+ * @return minor version number
+ */
+ public int minor() {
+ return minor;
+ }
+
+ /**
+ * Returns the version patch number.
+ *
+ * @return patch number
+ */
+ public int patch() {
+ return patch;
+ }
+
+ /**
+ * Returns the version build string.
+ *
+ * @return build string
+ */
+ public String build() {
+ return build;
+ }
+
+ @Override
+ public String toString() {
+ return format;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof Version) {
+ final Version other = (Version) obj;
+ return Objects.equals(this.format, other.format);
+ }
+ return false;
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
index b05aa62..51b6f6a 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
@@ -34,7 +34,7 @@
/**
* Abandons mastership of the specified device on the local node thus
* forcing selection of a new master. If the local node is not a master
- * for this device, no action will be taken.
+ * for this device, no master selection will occur.
*
* @param deviceId the identifier of the device
*/
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
index bedc5e9..dc5603f 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
@@ -66,12 +66,25 @@
MastershipTerm getTermFor(DeviceId deviceId);
/**
- * Revokes a controller instance's mastership over a device and hands
- * over mastership to another controller instance.
+ * Sets a controller instance's mastership role to STANDBY for a device.
+ * If the role is MASTER, another controller instance will be selected
+ * as a candidate master.
*
* @param nodeId the controller instance identifier
- * @param deviceId device to revoke mastership for
+ * @param deviceId device to revoke mastership role for
* @return a mastership event
*/
- MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
+ MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
+
+ /**
+ * Allows a controller instance to give up its current role for a device.
+ * If the role is MASTER, another controller instance will be selected
+ * as a candidate master.
+ *
+ * @param nodeId the controller instance identifier
+ * @param deviceId device to revoke mastership role for
+ * @return a mastership event
+ */
+ MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/DefaultEdgeLink.java b/core/api/src/main/java/org/onlab/onos/net/DefaultEdgeLink.java
index 74991c8..46a582a 100644
--- a/core/api/src/main/java/org/onlab/onos/net/DefaultEdgeLink.java
+++ b/core/api/src/main/java/org/onlab/onos/net/DefaultEdgeLink.java
@@ -3,6 +3,7 @@
import org.onlab.onos.net.provider.ProviderId;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default edge link model implementation.
@@ -52,10 +53,14 @@
* for network-to-host direction
* @return new phantom edge link
*/
- public static DefaultEdgeLink createEdgeLink(HostLocation edgePort,
+ public static DefaultEdgeLink createEdgeLink(ConnectPoint edgePort,
boolean isIngress) {
+ checkNotNull(edgePort, "Edge port cannot be null");
+ HostLocation location = (edgePort instanceof HostLocation) ?
+ (HostLocation) edgePort : new HostLocation(edgePort, 0);
return new DefaultEdgeLink(ProviderId.NONE,
new ConnectPoint(HostId.NONE, PortNumber.P0),
- edgePort, isIngress);
+ location, isIngress);
}
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/HostLocation.java b/core/api/src/main/java/org/onlab/onos/net/HostLocation.java
index 76e2312..60c5945 100644
--- a/core/api/src/main/java/org/onlab/onos/net/HostLocation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/HostLocation.java
@@ -22,6 +22,17 @@
}
/**
+ * Creates a new host location derived from the supplied connection point.
+ *
+ * @param connectPoint connection point
+ * @param time time when detected, in millis since start of epoch
+ */
+ public HostLocation(ConnectPoint connectPoint, long time) {
+ super(connectPoint.deviceId(), connectPoint.port());
+ this.time = time;
+ }
+
+ /**
* Returns the time when the location was established, given in
* milliseconds since start of epoch.
*
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DefaultDeviceDescription.java b/core/api/src/main/java/org/onlab/onos/net/device/DefaultDeviceDescription.java
index 788d23a..ede2eb2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DefaultDeviceDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DefaultDeviceDescription.java
@@ -96,4 +96,13 @@
.toString();
}
+ // default constructor for serialization
+ private DefaultDeviceDescription() {
+ this.uri = null;
+ this.type = null;
+ this.manufacturer = null;
+ this.hwVersion = null;
+ this.swVersion = null;
+ this.serialNumber = null;
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
index eb75ede..e1dcf9e 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
@@ -48,4 +48,9 @@
return isEnabled;
}
+ // default constructor for serialization
+ private DefaultPortDescription() {
+ this.number = null;
+ this.isEnabled = false;
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java b/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
index 8364935..54b9d72 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
@@ -42,6 +42,7 @@
* @param deviceId device identifier
* @return designated mastership role
*/
+ //XXX do we want this method here when MastershipService already does?
MastershipRole getRole(DeviceId deviceId);
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
index 738be04..9855498 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
@@ -1,5 +1,9 @@
package org.onlab.onos.net.intent;
+import java.util.concurrent.Future;
+
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+
/**
* Abstraction of entity capable of installing intents to the environment.
*/
@@ -10,7 +14,7 @@
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
- void install(T intent);
+ Future<CompletedBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
@@ -18,5 +22,5 @@
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
- void uninstall(T intent);
+ Future<CompletedBatchOperation> uninstall(T intent);
}
diff --git a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
index 759b62a..a5f81c7 100644
--- a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
@@ -12,6 +12,7 @@
/**
* Updates the mastership term for the specified deviceId.
+ *
* @param deviceId device identifier.
* @param term mastership term.
*/
diff --git a/core/api/src/test/java/org/onlab/onos/VersionTest.java b/core/api/src/test/java/org/onlab/onos/VersionTest.java
new file mode 100644
index 0000000..e357f9d
--- /dev/null
+++ b/core/api/src/test/java/org/onlab/onos/VersionTest.java
@@ -0,0 +1,50 @@
+package org.onlab.onos;
+
+import com.google.common.testing.EqualsTester;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.onlab.onos.Version.version;
+
+/**
+ * Tests of the version descriptor.
+ */
+public class VersionTest {
+
+ @Test
+ public void fromParts() {
+ Version v = version(1, 2, 3, "4321");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "4321", v.build());
+ }
+
+ @Test
+ public void fromString() {
+ Version v = version("1.2.3.4321");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "4321", v.build());
+ }
+
+ @Test
+ public void snapshot() {
+ Version v = version("1.2.3-SNAPSHOT");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "SNAPSHOT", v.build());
+ }
+
+ @Test
+ public void testEquals() {
+ new EqualsTester()
+ .addEqualityGroup(version("1.2.3.4321"), version(1, 2, 3, "4321"))
+ .addEqualityGroup(version("1.9.3.4321"), version(1, 9, 3, "4321"))
+ .addEqualityGroup(version("1.2.8.4321"), version(1, 2, 8, "4321"))
+ .addEqualityGroup(version("1.2.3.x"), version(1, 2, 3, "x"))
+ .testEquals();
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onlab/onos/net/DefaultEdgeLinkTest.java b/core/api/src/test/java/org/onlab/onos/net/DefaultEdgeLinkTest.java
index b3891f1..fd63797 100644
--- a/core/api/src/test/java/org/onlab/onos/net/DefaultEdgeLinkTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/DefaultEdgeLinkTest.java
@@ -5,6 +5,7 @@
import org.onlab.onos.net.provider.ProviderId;
import static org.junit.Assert.assertEquals;
+import static org.onlab.onos.net.DefaultEdgeLink.createEdgeLink;
import static org.onlab.onos.net.DefaultLinkTest.cp;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.HostId.hostId;
@@ -55,4 +56,24 @@
assertEquals("incorrect time", 123L, link.hostLocation().time());
}
+ @Test
+ public void phantomIngress() {
+ HostLocation hostLocation = new HostLocation(DID1, P1, 123L);
+ EdgeLink link = createEdgeLink(hostLocation, true);
+ assertEquals("incorrect dst", hostLocation, link.dst());
+ assertEquals("incorrect type", Link.Type.EDGE, link.type());
+ assertEquals("incorrect connect point", hostLocation, link.hostLocation());
+ assertEquals("incorrect time", 123L, link.hostLocation().time());
+ }
+
+ @Test
+ public void phantomEgress() {
+ ConnectPoint hostLocation = new ConnectPoint(DID1, P1);
+ EdgeLink link = createEdgeLink(hostLocation, false);
+ assertEquals("incorrect src", hostLocation, link.src());
+ assertEquals("incorrect type", Link.Type.EDGE, link.type());
+ assertEquals("incorrect connect point", hostLocation, link.hostLocation());
+ assertEquals("incorrect time", 0L, link.hostLocation().time());
+ }
+
}
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
index 7eb0e19..163a056 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
@@ -1,17 +1,25 @@
package org.onlab.onos.net.intent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.onlab.onos.net.intent.IntentEvent.Type.FAILED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.INSTALLED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.SUBMITTED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.WITHDRAWN;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Future;
-import static org.junit.Assert.*;
-import static org.onlab.onos.net.intent.IntentEvent.Type.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Suite of tests for the intent service contract.
@@ -290,17 +298,19 @@
}
@Override
- public void install(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
+ return null;
}
@Override
- public void uninstall(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
+ return null;
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
new file mode 100644
index 0000000..4b1191f
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.cluster.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.CoreService;
+import org.onlab.onos.Version;
+import org.onlab.util.Tools;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Core service implementation.
+ */
+@Component
+@Service
+public class CoreManager implements CoreService {
+
+ private static final File VERSION_FILE = new File("../VERSION");
+ private static Version version = Version.version("1.0.0-SNAPSHOT");
+
+ // TODO: work in progress
+
+ @Activate
+ public void activate() {
+ List<String> versionLines = Tools.slurp(VERSION_FILE);
+ if (versionLines != null && !versionLines.isEmpty()) {
+ version = Version.version(versionLines.get(0));
+ }
+ }
+
+ @Override
+ public Version version() {
+ return version;
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
index a0da87c..125745b 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
@@ -82,7 +82,7 @@
if (role.equals(MastershipRole.MASTER)) {
event = store.setMaster(nodeId, deviceId);
} else {
- event = store.unsetMaster(nodeId, deviceId);
+ event = store.setStandby(nodeId, deviceId);
}
if (event != null) {
@@ -98,13 +98,10 @@
@Override
public void relinquishMastership(DeviceId deviceId) {
- MastershipRole role = getLocalRole(deviceId);
- if (!role.equals(MastershipRole.MASTER)) {
- return;
- }
-
- MastershipEvent event = store.unsetMaster(
+ MastershipEvent event = null;
+ event = store.relinquishRole(
clusterService.getLocalNode().id(), deviceId);
+
if (event != null) {
post(event);
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 3c4a885..8cde5a3 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -18,6 +18,7 @@
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
@@ -142,8 +143,12 @@
// Applies the specified role to the device; ignores NONE
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
- if (newRole != MastershipRole.NONE) {
+ if (newRole.equals(MastershipRole.NONE)) {
Device device = store.getDevice(deviceId);
+ // FIXME: Device might not be there yet. (eventual consistent)
+ if (device == null) {
+ return;
+ }
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
@@ -193,16 +198,50 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
+
+ log.info("Device {} connected", deviceId);
+ // check my Role
+ MastershipRole role = mastershipService.requestRoleFor(deviceId);
+
+ if (role != MastershipRole.MASTER) {
+ // TODO: Do we need to explicitly tell the Provider that
+ // this instance is no longer the MASTER? probably not
+ return;
+ }
+
+ MastershipTerm term = mastershipService.requestTermService()
+ .getMastershipTerm(deviceId);
+ if (!term.master().equals(clusterService.getLocalNode().id())) {
+ // lost mastership after requestRole told this instance was MASTER.
+ return;
+ }
+ // tell clock provider if this instance is the master
+ clockProviderService.setMastershipTerm(deviceId, term);
+
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
- // If there was a change of any kind, trigger role selection
- // process.
+ // If there was a change of any kind, tell the provider
+ // that this instance is the master.
+ // Note: event can be null, if mastership was lost between
+ // roleRequest and store update calls.
if (event != null) {
- log.info("Device {} connected", deviceId);
- mastershipService.requestRoleFor(deviceId);
- provider().roleChanged(event.subject(),
- mastershipService.requestRoleFor(deviceId));
+ // TODO: Check switch reconnected case. Is it assured that
+ // event will never be null?
+ // Could there be a situation MastershipService told this
+ // instance is the new Master, but
+ // event returned from the store is null?
+
+ // TODO: Confirm: Mastership could be lost after requestRole
+ // and createOrUpdateDevice call.
+ // In that case STANDBY node can
+ // claim itself to be master against the Device.
+ // Will the Node, chosen by the MastershipService, retry
+ // to get the MASTER role when that happen?
+
+ // FIXME: 1st argument should be deviceId, to allow setting
+ // certain roles even if the store returned null.
+ provider().roleChanged(event.subject(), role);
post(event);
}
}
@@ -211,12 +250,23 @@
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
- DeviceEvent event = store.markOffline(deviceId);
- //we're no longer capable of mastership.
+ // FIXME: only the MASTER should be marking off-line in normal cases,
+ // but if I was the last STANDBY connection, etc. and no one else
+ // was there to mark the device offline, this instance may need to
+ // temporarily request for Master Role and mark offline.
+ if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
+ log.debug("Device {} disconnected, but I am not the master", deviceId);
+ //let go of any role anyways
+ mastershipService.relinquishMastership(deviceId);
+ return;
+ }
+ DeviceEvent event = store.markOffline(deviceId);
+ //we're no longer capable of being master or a candidate.
+ mastershipService.relinquishMastership(deviceId);
+
if (event != null) {
log.info("Device {} disconnected", deviceId);
- mastershipService.relinquishMastership(deviceId);
post(event);
}
}
@@ -256,6 +306,9 @@
// FIXME: implement response to this notification
log.warn("Failed to assert role [{}] onto Device {}", role,
deviceId);
+ if (role == MastershipRole.MASTER) {
+ mastershipService.relinquishMastership(deviceId);
+ }
}
}
@@ -267,17 +320,29 @@
}
// Intercepts mastership events
- private class InternalMastershipListener
- implements MastershipListener {
+ private class InternalMastershipListener implements MastershipListener {
+
@Override
public void event(MastershipEvent event) {
- if (event.master().equals(clusterService.getLocalNode().id())) {
- MastershipTerm term = mastershipService.requestTermService()
- .getMastershipTerm(event.subject());
- clockProviderService.setMastershipTerm(event.subject(), term);
- applyRole(event.subject(), MastershipRole.MASTER);
+ final DeviceId did = event.subject();
+ if (isAvailable(did)) {
+ final NodeId myNodeId = clusterService.getLocalNode().id();
+
+ if (myNodeId.equals(event.master())) {
+ MastershipTerm term = termService.getMastershipTerm(did);
+
+ if (term.master().equals(myNodeId)) {
+ // only set the new term if I am the master
+ clockProviderService.setMastershipTerm(did, term);
+ }
+ applyRole(did, MastershipRole.MASTER);
+ } else {
+ applyRole(did, MastershipRole.STANDBY);
+ }
} else {
- applyRole(event.subject(), MastershipRole.STANDBY);
+ //device dead to node, give up
+ mastershipService.relinquishMastership(did);
+ applyRole(did, MastershipRole.STANDBY);
}
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 16b75f2..50f1038 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -13,12 +13,14 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +30,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
@@ -44,7 +47,9 @@
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
@@ -67,7 +72,8 @@
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
- private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ private ExecutorService executor;
+ private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -86,6 +92,8 @@
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
+ executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
@@ -94,6 +102,8 @@
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
+ executor.shutdown();
+ monitorExecutor.shutdown();
log.info("Stopped");
}
@@ -240,14 +250,23 @@
}
}
- // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
- // TODO: implement compilation traversing tree structure
+ /**
+ * Compiles an intent recursively.
+ *
+ * @param intent intent
+ * @return result of compilation
+ */
private List<InstallableIntent> compileIntent(Intent intent) {
- List<InstallableIntent> installable = new ArrayList<>();
- for (Intent compiled : getCompiler(intent).compile(intent)) {
- InstallableIntent installableIntent = (InstallableIntent) compiled;
- installable.add(installableIntent);
+ if (intent instanceof InstallableIntent) {
+ return ImmutableList.of((InstallableIntent) intent);
}
+
+ List<InstallableIntent> installable = new ArrayList<>();
+ // TODO do we need to registerSubclassCompiler?
+ for (Intent compiled : getCompiler(intent).compile(intent)) {
+ installable.addAll(compileIntent(compiled));
+ }
+
return installable;
}
@@ -261,6 +280,7 @@
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
+ List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
@@ -268,17 +288,20 @@
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
- getInstaller(installable).install(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
+ installFutures.add(future);
}
}
- eventDispatcher.post(store.setState(intent, INSTALLED));
-
+ // FIXME we have to wait for the installable intents
+ //eventDispatcher.post(store.setState(intent, INSTALLED));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
- executeRecompilingPhase(intent);
+ // FIXME
+ //executeRecompilingPhase(intent);
}
}
@@ -327,12 +350,14 @@
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
- uninstallIntent(intent);
+ uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
- store.removeInstalledIntents(intent.id());
- eventDispatcher.post(store.setState(intent, WITHDRAWN));
+ // FIXME need to clean up
+ //store.removeInstalledIntents(intent.id());
+ // FIXME
+ //eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
@@ -340,14 +365,17 @@
*
* @param intent intent to be uninstalled
*/
- private void uninstallIntent(Intent intent) {
+ private void uninstallIntent(Intent intent, IntentState nextState) {
+ List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
- getInstaller(installable).uninstall(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
+ uninstallFutures.add(future);
}
}
+ monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
@@ -422,9 +450,10 @@
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
- executeRecompilingPhase(intent);
+ //FIXME
+ //executeRecompilingPhase(intent);
}
if (compileAllFailed) {
@@ -460,4 +489,44 @@
}
}
+ private class IntentInstallMonitor implements Runnable {
+
+ private final Intent intent;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final IntentState nextState;
+
+ public IntentInstallMonitor(Intent intent,
+ List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
+ this.intent = intent;
+ this.futures = futures;
+ this.nextState = nextState;
+ }
+
+ private void updateIntent(Intent intent) {
+ if (nextState == RECOMPILING) {
+ executor.execute(new IntentTask(nextState, intent));
+ } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
+ eventDispatcher.post(store.setState(intent, nextState));
+ } else {
+ log.warn("Invalid next intent state {} for intent {}", nextState, intent);
+ }
+ }
+
+ @Override
+ public void run() {
+ for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
+ Future<CompletedBatchOperation> future = i.next();
+ if (future.isDone()) {
+ // TODO: we may want to get the future here
+ i.remove();
+ }
+ }
+ if (futures.isEmpty()) {
+ updateIntent(intent);
+ } else {
+ // resubmit ourselves if we are not done yet
+ monitorExecutor.submit(this);
+ }
+ }
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index 0ca75c2..56214c6 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -5,7 +5,7 @@
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -15,6 +15,7 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
@@ -57,8 +58,26 @@
intentManager.unregisterInstaller(PathIntent.class);
}
+ /**
+ * Apply a list of FlowRules.
+ *
+ * @param rules rules to apply
+ */
+ private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
+ return future;
+// try {
+// //FIXME don't do this here
+// future.get();
+// } catch (InterruptedException | ExecutionException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+ }
+
@Override
- public void install(PathIntent intent) {
+ public Future<CompletedBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -74,20 +93,14 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
- //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+ return applyBatch(rules);
}
@Override
- public void uninstall(PathIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -103,15 +116,131 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
- //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ return applyBatch(rules);
+ }
+
+ // TODO refactor below this line... ----------------------------
+
+ /**
+ * Generates the series of MatchActionOperations from the
+ * {@link FlowBatchOperation}.
+ * <p>
+ * FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
+ * <p>
+ * FIXME: MatchActionOperations should have dependency field to the other
+ * match action operations, and this method should use this.
+ *
+ * @param op the {@link FlowBatchOperation} object
+ * @return the list of {@link MatchActionOperations} objects
+ */
+ /*
+ private List<MatchActionOperations>
+ generateMatchActionOperationsList(FlowBatchOperation op) {
+
+ // MatchAction operations at head (ingress) switches.
+ MatchActionOperations headOps = matchActionService.createOperationsList();
+
+ // MatchAction operations at rest of the switches.
+ MatchActionOperations tailOps = matchActionService.createOperationsList();
+
+ MatchActionOperations removeOps = matchActionService.createOperationsList();
+
+ for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
+
+ if (e.getOperator() == FlowBatchOperation.Operator.ADD) {
+ generateInstallMatchActionOperations(e, tailOps, headOps);
+ } else if (e.getOperator() == FlowBatchOperation.Operator.REMOVE) {
+ generateRemoveMatchActionOperations(e, removeOps);
+ } else {
+ throw new UnsupportedOperationException(
+ "FlowManager supports ADD and REMOVE operations only.");
+ }
+
+ }
+
+ return Arrays.asList(tailOps, headOps, removeOps);
+ }
+ */
+
+ /**
+ * Generates MatchActionOperations for an INSTALL FlowBatchOperation.
+ * <p/>
+ * FIXME: Currently only supports flows that generate exactly two match
+ * action operation sets.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param tailOps MatchActionOperation set that the tail
+ * MatchActionOperations will be placed in
+ * @param headOps MatchActionOperation set that the head
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateInstallMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations tailOps,
+ MatchActionOperations headOps) {
+
+ if (!(e.getTarget() instanceof Flow)) {
+ throw new IllegalStateException(
+ "The target is not Flow object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ Flow flow = (Flow) e.getTarget();
+ List<MatchActionOperations> maOps = flow.compile(
+ e.getOperator(), matchActionService);
+ verifyNotNull(maOps, "Could not compile the flow: " + flow);
+ verify(maOps.size() == 2,
+ "The flow generates unspported match-action operations.");
+
+ // Map FlowId to MatchActionIds
+ for (MatchActionOperations maOp : maOps) {
+ for (MatchActionOperationEntry entry : maOp.getOperations()) {
+ flowMatchActionsMap.put(
+ KryoFactory.serialize(flow.getId()),
+ KryoFactory.serialize(entry.getTarget()));
+ }
+ }
+
+ // Merge match-action operations
+ for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ tailOps.addOperation(mae);
+ }
+ for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ headOps.addOperation(mae);
}
}
+ */
+ /**
+ * Generates MatchActionOperations for a REMOVE FlowBatchOperation.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param removeOps MatchActionOperation set that the remove
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateRemoveMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations removeOps) {
+
+ if (!(e.getTarget() instanceof FlowId)) {
+ throw new IllegalStateException(
+ "The target is not a FlowId object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ FlowId flowId = (FlowId) e.getTarget();
+
+ for (byte[] matchActionIdBytes :
+ flowMatchActionsMap.remove(KryoFactory.serialize(flowId))) {
+ MatchActionId matchActionId = KryoFactory.deserialize(matchActionIdBytes);
+ removeOps.addOperation(new MatchActionOperationEntry(
+ MatchActionOperations.Operator.REMOVE, matchActionId));
+ }
+ }
+ */
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
new file mode 100644
index 0000000..0bd1703
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
@@ -0,0 +1,105 @@
+package org.onlab.onos.net.intent.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultEdgeLink;
+import org.onlab.onos.net.DefaultPath;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.Path;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.intent.IdGenerator;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentCompiler;
+import org.onlab.onos.net.intent.IntentExtensionService;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.PathIntent;
+import org.onlab.onos.net.intent.PointToPointIntent;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.net.topology.PathService;
+
+/**
+ * A intent compiler for {@link org.onlab.onos.net.intent.HostToHostIntent}.
+ */
+@Component(immediate = true)
+public class PointToPointIntentCompiler
+ implements IntentCompiler<PointToPointIntent> {
+
+ private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentExtensionService intentManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PathService pathService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ private IdGenerator<IntentId> intentIdGenerator;
+
+ @Activate
+ public void activate() {
+ IdBlockAllocator idBlockAllocator = new DummyIdBlockAllocator();
+ intentIdGenerator = new IdBlockAllocatorBasedIntentIdGenerator(idBlockAllocator);
+ intentManager.registerCompiler(PointToPointIntent.class, this);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ intentManager.unregisterCompiler(PointToPointIntent.class);
+ }
+
+ @Override
+ public List<Intent> compile(PointToPointIntent intent) {
+ Path path = getPath(intent.ingressPoint(), intent.egressPoint());
+
+ List<Link> links = new ArrayList<>();
+ links.add(DefaultEdgeLink.createEdgeLink(intent.ingressPoint(), true));
+ links.addAll(path.links());
+ links.add(DefaultEdgeLink.createEdgeLink(intent.egressPoint(), false));
+
+ return Arrays.asList(createPathIntent(new DefaultPath(PID, links, path.cost() + 2,
+ path.annotations()),
+ intent));
+ }
+
+ /**
+ * Creates a path intent from the specified path and original
+ * connectivity intent.
+ *
+ * @param path path to create an intent for
+ * @param intent original intent
+ */
+ private Intent createPathIntent(Path path,
+ PointToPointIntent intent) {
+
+ return new PathIntent(intentIdGenerator.getNewId(),
+ intent.selector(), intent.treatment(),
+ path.src(), path.dst(), path);
+ }
+
+ /**
+ * Computes a path between two ConnectPoints.
+ *
+ * @param one start of the path
+ * @param two end of the path
+ * @return Path between the two
+ * @throws PathNotFoundException if a path cannot be found
+ */
+ private Path getPath(ConnectPoint one, ConnectPoint two) {
+ Set<Path> paths = pathService.getPaths(one.deviceId(), two.deviceId());
+ if (paths.isEmpty()) {
+ throw new PathNotFoundException("No path from " + one + " to " + two);
+ }
+ // TODO: let's be more intelligent about this eventually
+ return paths.iterator().next();
+ }
+}
diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
index 7d09cca..e833b4a 100644
--- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
@@ -1,12 +1,20 @@
package org.onlab.onos.net.device.impl;
import com.google.common.collect.Sets;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipServiceAdapter;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.Device;
@@ -27,7 +35,9 @@
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.store.trivial.impl.SimpleDeviceStore;
+import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
import java.util.Iterator;
@@ -56,6 +66,8 @@
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
+ private static final NodeId NID_LOCAL = new NodeId("local");
+ private static final IpPrefix LOCALHOST = IpPrefix.valueOf("127.0.0.1");
private DeviceManager mgr;
@@ -75,6 +87,8 @@
mgr.store = new SimpleDeviceStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.mastershipService = new TestMastershipService();
+ mgr.clusterService = new TestClusterService();
+ mgr.clockProviderService = new TestClockProviderService();
mgr.activate();
service.addListener(listener);
@@ -258,7 +272,8 @@
}
}
- private static class TestMastershipService extends MastershipServiceAdapter {
+ private static class TestMastershipService
+ extends MastershipServiceAdapter {
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
@@ -273,6 +288,59 @@
public MastershipRole requestRoleFor(DeviceId deviceId) {
return MastershipRole.MASTER;
}
+
+ @Override
+ public MastershipTermService requestTermService() {
+ return new MastershipTermService() {
+ @Override
+ public MastershipTerm getMastershipTerm(DeviceId deviceId) {
+ // FIXME: just returning something not null
+ return MastershipTerm.of(NID_LOCAL, 1);
+ }
+ };
+ }
}
+ // code clone
+ private final class TestClusterService implements ClusterService {
+
+ ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return local;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return null;
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+ }
+
+ private final class TestClockProviderService implements
+ ClockProviderService {
+
+ @Override
+ public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
+ // TODO Auto-generated method stub
+ }
+ }
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
deleted file mode 100644
index 3f8c0a8..0000000
--- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-package org.onlab.onos.net.device.impl;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.hazelcast.config.Config;
-import com.hazelcast.core.Hazelcast;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.MastershipServiceAdapter;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.event.Event;
-import org.onlab.onos.event.EventDeliveryService;
-import org.onlab.onos.event.impl.TestEventDispatcher;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DefaultDeviceDescription;
-import org.onlab.onos.net.device.DefaultPortDescription;
-import org.onlab.onos.net.device.DeviceAdminService;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceListener;
-import org.onlab.onos.net.device.DeviceProvider;
-import org.onlab.onos.net.device.DeviceProviderRegistry;
-import org.onlab.onos.net.device.DeviceProviderService;
-import org.onlab.onos.net.device.DeviceService;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.AbstractProvider;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.common.StoreManager;
-import org.onlab.onos.store.common.StoreService;
-import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.device.impl.DistributedDeviceStore;
-import org.onlab.packet.IpPrefix;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-import static org.onlab.onos.net.Device.Type.SWITCH;
-import static org.onlab.onos.net.DeviceId.deviceId;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-
-// FIXME This test is slow starting up Hazelcast on each test cases.
-// FIXME DistributedDeviceStore should have it's own test cases.
-
-/**
- * Test codifying the device service & device provider service contracts.
- */
-public class DistributedDeviceManagerTest {
-
- private static final ProviderId PID = new ProviderId("of", "foo");
- private static final DeviceId DID1 = deviceId("of:foo");
- private static final DeviceId DID2 = deviceId("of:bar");
- private static final String MFR = "whitebox";
- private static final String HW = "1.1.x";
- private static final String SW1 = "3.8.1";
- private static final String SW2 = "3.9.5";
- private static final String SN = "43311-12345";
-
- private static final PortNumber P1 = PortNumber.portNumber(1);
- private static final PortNumber P2 = PortNumber.portNumber(2);
- private static final PortNumber P3 = PortNumber.portNumber(3);
-
- private static final DefaultControllerNode SELF
- = new DefaultControllerNode(new NodeId("foobar"),
- IpPrefix.valueOf("127.0.0.1"));
-
-
- private DeviceManager mgr;
-
- protected StoreManager storeManager;
- protected DeviceService service;
- protected DeviceAdminService admin;
- protected DeviceProviderRegistry registry;
- protected DeviceProviderService providerService;
- protected TestProvider provider;
- protected TestListener listener = new TestListener();
- private DistributedDeviceStore dstore;
- private TestMastershipManager masterManager;
- private EventDeliveryService eventService;
-
- @Before
- public void setUp() {
- mgr = new DeviceManager();
- service = mgr;
- admin = mgr;
- registry = mgr;
- // TODO should find a way to clean Hazelcast instance without shutdown.
- Config config = TestStoreManager.getTestConfig();
-
- masterManager = new TestMastershipManager();
-
- storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
- storeManager.activate();
-
- dstore = new TestDistributedDeviceStore(storeManager);
- dstore.activate();
-
- mgr.store = dstore;
- eventService = new TestEventDispatcher();
- mgr.eventDispatcher = eventService;
- mgr.mastershipService = masterManager;
- mgr.activate();
-
- service.addListener(listener);
-
- provider = new TestProvider();
- providerService = registry.register(provider);
- assertTrue("provider should be registered",
- registry.getProviders().contains(provider.id()));
- }
-
- @After
- public void tearDown() {
- registry.unregister(provider);
- assertFalse("provider should not be registered",
- registry.getProviders().contains(provider.id()));
- service.removeListener(listener);
- mgr.deactivate();
-
- dstore.deactivate();
- storeManager.deactivate();
- }
-
- private void connectDevice(DeviceId deviceId, String swVersion) {
- DeviceDescription description =
- new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
- HW, swVersion, SN);
- providerService.deviceConnected(deviceId, description);
- assertNotNull("device should be found", service.getDevice(DID1));
- }
-
- @Test
- public void deviceConnected() {
- assertNull("device should not be found", service.getDevice(DID1));
- connectDevice(DID1, SW1);
- validateEvents(DEVICE_ADDED);
-
- assertEquals("only one device expected", 1, Iterables.size(service.getDevices()));
- Iterator<Device> it = service.getDevices().iterator();
- assertNotNull("one device expected", it.next());
- assertFalse("only one device expected", it.hasNext());
-
- assertEquals("incorrect device count", 1, service.getDeviceCount());
- assertTrue("device should be available", service.isAvailable(DID1));
- }
-
- @Test
- public void deviceDisconnected() {
- connectDevice(DID1, SW1);
- connectDevice(DID2, SW1);
- validateEvents(DEVICE_ADDED, DEVICE_ADDED);
- assertTrue("device should be available", service.isAvailable(DID1));
-
- // Disconnect
- providerService.deviceDisconnected(DID1);
- assertNotNull("device should not be found", service.getDevice(DID1));
- assertFalse("device should not be available", service.isAvailable(DID1));
- validateEvents(DEVICE_AVAILABILITY_CHANGED);
-
- // Reconnect
- connectDevice(DID1, SW1);
- validateEvents(DEVICE_AVAILABILITY_CHANGED);
-
- assertEquals("incorrect device count", 2, service.getDeviceCount());
- }
-
- @Test
- public void deviceUpdated() {
- connectDevice(DID1, SW1);
- validateEvents(DEVICE_ADDED);
-
- connectDevice(DID1, SW2);
- validateEvents(DEVICE_UPDATED);
- }
-
- @Test
- public void getRole() {
- connectDevice(DID1, SW1);
- assertEquals("incorrect role", MastershipRole.MASTER, service.getRole(DID1));
- }
-
- @Test
- public void updatePorts() {
- connectDevice(DID1, SW1);
- List<PortDescription> pds = new ArrayList<>();
- pds.add(new DefaultPortDescription(P1, true));
- pds.add(new DefaultPortDescription(P2, true));
- pds.add(new DefaultPortDescription(P3, true));
- providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
- pds.clear();
-
- pds.add(new DefaultPortDescription(P1, false));
- pds.add(new DefaultPortDescription(P3, true));
- providerService.updatePorts(DID1, pds);
- validateEvents(PORT_UPDATED, PORT_REMOVED);
- }
-
- @Test
- public void updatePortStatus() {
- connectDevice(DID1, SW1);
- List<PortDescription> pds = new ArrayList<>();
- pds.add(new DefaultPortDescription(P1, true));
- pds.add(new DefaultPortDescription(P2, true));
- providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
-
- providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
- validateEvents(PORT_UPDATED);
- providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
- assertTrue("no events expected", listener.events.isEmpty());
- }
-
- @Test
- public void getPorts() {
- connectDevice(DID1, SW1);
- List<PortDescription> pds = new ArrayList<>();
- pds.add(new DefaultPortDescription(P1, true));
- pds.add(new DefaultPortDescription(P2, true));
- providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
- assertEquals("wrong port count", 2, service.getPorts(DID1).size());
-
- Port port = service.getPort(DID1, P1);
- assertEquals("incorrect port", P1, service.getPort(DID1, P1).number());
- assertEquals("incorrect state", true, service.getPort(DID1, P1).isEnabled());
- }
-
- @Test
- public void removeDevice() {
- connectDevice(DID1, SW1);
- connectDevice(DID2, SW2);
- assertEquals("incorrect device count", 2, service.getDeviceCount());
- admin.removeDevice(DID1);
- validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED);
- assertNull("device should not be found", service.getDevice(DID1));
- assertNotNull("device should be found", service.getDevice(DID2));
- assertEquals("incorrect device count", 1, service.getDeviceCount());
- }
-
- protected void validateEvents(Enum... types) {
- for (Enum type : types) {
- try {
- Event event = listener.events.poll(1, TimeUnit.SECONDS);
- assertNotNull("Timed out waiting for " + event, event);
- assertEquals("incorrect event type", type, event.type());
- } catch (InterruptedException e) {
- fail("Unexpected interrupt");
- }
- }
- assertTrue("Unexpected events left", listener.events.isEmpty());
- listener.events.clear();
- }
-
-
- private class TestProvider extends AbstractProvider implements DeviceProvider {
- private Device deviceReceived;
- private MastershipRole roleReceived;
-
- public TestProvider() {
- super(PID);
- }
-
- @Override
- public void triggerProbe(Device device) {
- }
-
- @Override
- public void roleChanged(Device device, MastershipRole newRole) {
- deviceReceived = device;
- roleReceived = newRole;
- }
- }
-
- private static class TestListener implements DeviceListener {
- final BlockingQueue<DeviceEvent> events = new LinkedBlockingQueue<>();
-
- @Override
- public void event(DeviceEvent event) {
- events.add(event);
- }
- }
-
- private class TestDistributedDeviceStore extends DistributedDeviceStore {
-
- public TestDistributedDeviceStore(StoreService storeService) {
- this.storeService = storeService;
- }
- }
-
- private static class TestMastershipManager extends MastershipServiceAdapter {
-
- private ConcurrentMap<DeviceId, NodeId> masters = new ConcurrentHashMap<>();
-
- public TestMastershipManager() {
- // SELF master of all initially
- masters.put(DID1, SELF.id());
- masters.put(DID1, SELF.id());
- }
- @Override
- public MastershipRole getLocalRole(DeviceId deviceId) {
- return MastershipRole.MASTER;
- }
-
- @Override
- public Set<DeviceId> getDevicesOf(NodeId nodeId) {
- HashSet<DeviceId> set = Sets.newHashSet();
- for (Entry<DeviceId, NodeId> e : masters.entrySet()) {
- if (e.getValue().equals(nodeId)) {
- set.add(e.getKey());
- }
- }
- return set;
- }
-
- @Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
- if (SELF.id().equals(masters.get(deviceId))) {
- return MastershipRole.MASTER;
- } else {
- return MastershipRole.STANDBY;
- }
- }
-
- @Override
- public void relinquishMastership(DeviceId deviceId) {
- masters.remove(deviceId, SELF.id());
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index 2bdf5a0..b74f887 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -2,8 +2,7 @@
import org.onlab.onos.cluster.NodeId;
-// TODO: ClusterMessage should be aware about how to serialize the payload
-// TODO: Should payload type be made generic?
+// TODO: Should payload type be ByteBuffer?
/**
* Base message for cluster-wide communications.
*/
@@ -12,7 +11,6 @@
private final NodeId sender;
private final MessageSubject subject;
private final byte[] payload;
- // TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index ee3e789..43df15f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -45,4 +45,9 @@
MessageSubject that = (MessageSubject) obj;
return Objects.equals(this.value, that.value);
}
+
+ // for serializer
+ protected MessageSubject() {
+ this.value = "";
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7b05401..1b11873 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -3,12 +3,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -26,8 +23,10 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
@@ -50,8 +49,6 @@
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
- // FIXME: `members` should go away and should be using ClusterService
- private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@@ -59,11 +56,14 @@
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(ClusterMessage.class)
+ .register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
+ .register(byte[].class)
+ .register(MessageSubject.class, new MessageSubjectSerializer())
.build()
.populate(1);
}
@@ -73,7 +73,15 @@
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
- messagingService = new NettyMessagingService(localNode.tcpPort());
+ NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+ // FIXME: workaround until it becomes a service.
+ try {
+ netty.activate();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("NettyMessagingService#activate", e);
+ }
+ messagingService = netty;
log.info("Started");
}
@@ -86,7 +94,7 @@
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
- for (ControllerNode node : members.values()) {
+ for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
@@ -107,11 +115,12 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- ControllerNode node = members.get(toNodeId);
+ ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ messagingService.sendAsync(nodeEp,
+ message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -137,7 +146,7 @@
@Override
public void addNode(ControllerNode node) {
- members.put(node.id(), node);
+ //members.put(node.id(), node);
}
@Override
@@ -146,7 +155,7 @@
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- members.remove(node.id());
+ //members.remove(node.id());
}
// Sends a heart beat to all peers.
@@ -181,7 +190,7 @@
}
}
- private static class InternalClusterMessageHandler implements MessageHandler {
+ private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
@@ -191,8 +200,13 @@
@Override
public void handle(Message message) {
- ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
- handler.handle(clusterMessage);
+ try {
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ handler.handle(clusterMessage);
+ } catch (Exception e) {
+ log.error("Exception caught during ClusterMessageHandler", e);
+ throw e;
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
index 0598d6d..77b0a87 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -82,7 +82,7 @@
// Default constructor for serialization
@Deprecated
- protected Timestamped() {
+ private Timestamped() {
this.value = null;
this.timestamp = null;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2f1e504..f39413b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -42,6 +42,7 @@
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
@@ -113,14 +114,18 @@
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(InternalDeviceEvent.class)
- .register(InternalPortEvent.class)
- .register(InternalPortStatusEvent.class)
+ .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+ .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+ .register(InternalDeviceRemovedEvent.class)
+ .register(InternalPortEvent.class, new InternalPortEventSerializer())
+ .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+ .register(Timestamp.class)
.register(Timestamped.class)
- .register(MastershipBasedTimestamp.class)
+ .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.build()
.populate(1);
}
@@ -132,6 +137,10 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
+ clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
@@ -175,7 +184,7 @@
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
- log.error("Failed to notify peers of a device update topology event or providerId: "
+ log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
@@ -278,7 +287,18 @@
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
- return markOfflineInternal(deviceId, timestamp);
+ DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ }
+ }
+ return event;
}
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
@@ -566,7 +586,16 @@
public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
- // TODO: broadcast removal event
+ if (event != null) {
+ log.info("Notifying peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ }
+ }
return event;
}
@@ -809,6 +838,22 @@
clusterCommunicator.broadcast(message);
}
+ private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -828,15 +873,46 @@
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
+
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+
createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
}
}
+ private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device offline event from peer: {}", message.sender());
+ InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ markOfflineInternal(deviceId, timestamp);
+ }
+ }
+
+ private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device removed event from peer: {}", message.sender());
+ InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ removeDeviceInternal(deviceId, timestamp);
+ }
+ }
+
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 58fed70..5272182 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -3,13 +3,15 @@
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
- * MessageSubjects used by GossipDeviceStore.
+ * MessageSubjects used by GossipDeviceStore peer-peer communication.
*/
public final class GossipDeviceStoreMessageSubjects {
private GossipDeviceStoreMessageSubjects() {}
public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
+ public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
+ public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 26f1d7f..4214384 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -35,4 +35,11 @@
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
+
+ // for serializer
+ protected InternalDeviceEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.deviceDescription = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
new file mode 100644
index 0000000..0d3d013
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceEvent}.
+ */
+public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceEvent}.
+ */
+ public InternalDeviceEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.deviceDescription());
+ }
+
+ @Override
+ public InternalDeviceEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<DeviceDescription> deviceDescription
+ = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
+
+ return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
new file mode 100644
index 0000000..d8942d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * going offline.
+ */
+public class InternalDeviceOfflineEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceOfflineEvent.
+ * @param deviceId identifier of device going offline.
+ * @param timestamp timestamp of when the device went offline.
+ */
+ public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceOfflineEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
new file mode 100644
index 0000000..7059636
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceOfflineEvent}.
+ */
+public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceOfflineEvent}.
+ */
+ public InternalDeviceOfflineEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.timestamp());
+ }
+
+ @Override
+ public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceOfflineEvent> type) {
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
+
+ return new InternalDeviceOfflineEvent(deviceId, timestamp);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
new file mode 100644
index 0000000..6c8b905
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * being administratively removed.
+ */
+public class InternalDeviceRemovedEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceRemovedEvent.
+ * @param deviceId identifier of the removed device.
+ * @param timestamp timestamp of when the device was administratively removed.
+ */
+ public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceRemovedEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index 48e3be6..64e77ca 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -37,4 +37,11 @@
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
+
+ // for serializer
+ protected InternalPortEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescriptions = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
new file mode 100644
index 0000000..6fff395
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortEvent}.
+ */
+public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortEvent}.
+ */
+ public InternalPortEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescriptions());
+ }
+
+ @Override
+ public InternalPortEvent read(Kryo kryo, Input input,
+ Class<InternalPortEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<List<PortDescription>> portDescriptions
+ = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
+
+ return new InternalPortEvent(providerId, deviceId, portDescriptions);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 0bdfdbf..7d3854b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -35,4 +35,11 @@
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
+
+ // for serializer
+ protected InternalPortStatusEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescription = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
new file mode 100644
index 0000000..6ec4122
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortStatusEvent}.
+ */
+public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortStatusEvent}.
+ */
+ public InternalPortStatusEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescription());
+ }
+
+ @Override
+ public InternalPortStatusEvent read(Kryo kryo, Input input,
+ Class<InternalPortStatusEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
+
+ return new InternalPortStatusEvent(providerId, deviceId, portDescription);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index f4dadad..c0cefd6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -3,7 +3,6 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@@ -11,6 +10,9 @@
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+ /**
+ * Creates a serializer for {@link ClusterMessage}.
+ */
public ClusterMessageSerializer() {
// does not accept null
super(false);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
index 9250076..516915e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
@@ -14,7 +14,7 @@
public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
/**
- * Default constructor.
+ * Creates a serializer for {@link MastershipBasedTimestamp}.
*/
public MastershipBasedTimestampSerializer() {
// non-null, immutable
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
new file mode 100644
index 0000000..bb6b292
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
+
+ /**
+ * Creates a serializer for {@link MessageSubject}.
+ */
+ public MessageSubjectSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+
+ @Override
+ public void write(Kryo kryo, Output output, MessageSubject object) {
+ output.writeString(object.value());
+ }
+
+ @Override
+ public MessageSubject read(Kryo kryo, Input input,
+ Class<MessageSubject> type) {
+ return new MessageSubject(input.readString());
+ }
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index aafbe4b..71d42fa 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -1,10 +1,8 @@
package org.onlab.onos.store.cluster.impl;
-import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
@@ -21,17 +19,16 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
-import org.onlab.onos.store.common.OptionalCacheLoader;
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
+import com.hazelcast.core.MultiMap;
/**
- * Distributed implementation of the cluster nodes store.
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
*/
@Component(immediate = true)
@Service
@@ -39,8 +36,21 @@
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
- private IMap<byte[], byte[]> rawMasters;
- private LoadingCache<DeviceId, Optional<NodeId>> masters;
+ //arbitrary lock name
+ private static final String LOCK = "lock";
+ //initial term/TTL value
+ private static final Integer INIT = 0;
+
+ //devices to masters
+ protected IMap<byte[], byte[]> masters;
+ //devices to terms
+ protected IMap<byte[], Integer> terms;
+
+ //re-election related, disjoint-set structures:
+ //device-nodes multiset of available nodes
+ protected MultiMap<byte[], byte[]> standbys;
+ //device-nodes multiset for nodes that have given up on device
+ protected MultiMap<byte[], byte[]> unusable;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -50,89 +60,263 @@
public void activate() {
super.activate();
- rawMasters = theInstance.getMap("masters");
- OptionalCacheLoader<DeviceId, NodeId> nodeLoader
- = new OptionalCacheLoader<>(serializer, rawMasters);
- masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+ masters = theInstance.getMap("masters");
+ terms = theInstance.getMap("terms");
+ standbys = theInstance.getMultiMap("backups");
+ unusable = theInstance.getMultiMap("unusable");
- loadMasters();
+ masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
log.info("Started");
}
- private void loadMasters() {
- for (byte[] keyBytes : rawMasters.keySet()) {
- final DeviceId id = deserialize(keyBytes);
- masters.refresh(id);
- }
- }
-
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- synchronized (this) {
- NodeId currentMaster = getMaster(deviceId);
- if (Objects.equals(currentMaster, nodeId)) {
- return null;
- }
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ byte[] did = serialize(deviceId);
+ byte[] nid = serialize(nodeId);
- // FIXME: for now implementing semantics of setMaster
- rawMasters.put(serialize(deviceId), serialize(nodeId));
- masters.put(deviceId, Optional.of(nodeId));
- return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ NodeId current = deserialize(masters.get(did));
+ if (current == null) {
+ if (standbys.containsEntry(did, nid)) {
+ //was previously standby, or set to standby from master
+ return MastershipRole.STANDBY;
+ } else {
+ return MastershipRole.NONE;
+ }
+ } else {
+ if (current.equals(nodeId)) {
+ //*should* be in unusable, not always
+ return MastershipRole.MASTER;
+ } else {
+ //may be in backups or unusable from earlier retirement
+ return MastershipRole.STANDBY;
+ }
+ }
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ //reinforce mastership
+ evict(nid, did);
+ return null;
+ case STANDBY:
+ //make current master standby
+ byte [] current = masters.get(did);
+ if (current != null) {
+ backup(current, did);
+ }
+ //assign specified node as new master
+ masters.put(did, nid);
+ evict(nid, did);
+ updateTerm(did);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ case NONE:
+ masters.put(did, nid);
+ evict(nid, did);
+ updateTerm(did);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
- return masters.getUnchecked(deviceId).orNull();
+ return deserialize(masters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
- for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
- if (nodeId.equals(entry.getValue().get())) {
- builder.add(entry.getKey());
+
+ for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
+ if (nodeId.equals(deserialize(entry.getValue()))) {
+ builder.add((DeviceId) deserialize(entry.getKey()));
}
}
+
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
- // FIXME: for now we are 'selecting' as master whoever asks
- setMaster(clusterService.getLocalNode().id(), deviceId);
- return MastershipRole.MASTER;
- }
+ NodeId local = clusterService.getLocalNode().id();
+ byte [] did = serialize(deviceId);
+ byte [] lnid = serialize(local);
- @Override
- public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId master = masters.getUnchecked(deviceId).orNull();
- return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ evict(lnid, did);
+ break;
+ case STANDBY:
+ backup(lnid, did);
+ terms.putIfAbsent(did, INIT);
+ break;
+ case NONE:
+ //claim mastership
+ masters.put(did, lnid);
+ evict(lnid, did);
+ updateTerm(did);
+ role = MastershipRole.MASTER;
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ lock.unlock();
+ }
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ byte[] did = serialize(deviceId);
+ if ((masters.get(did) == null) ||
+ (terms.get(did) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(
+ (NodeId) deserialize(masters.get(did)), terms.get(did));
}
@Override
- public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+ MastershipEvent event = null;
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId);
+ backup(nid, did);
+ break;
+ case STANDBY:
+ //fall through to reinforce role
+ case NONE:
+ backup(nid, did);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ lock.unlock();
+ }
}
- private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
- public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
- super(cache);
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+ MastershipEvent event = null;
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId);
+ evict(nid, did);
+ break;
+ case STANDBY:
+ //fall through to reinforce relinquishment
+ case NONE:
+ evict(nid, did);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ lock.unlock();
}
+ }
+
+ //helper to fetch a new master candidate for a given device.
+ private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(current);
+
+ //if this is an queue it'd be neater.
+ byte [] backup = null;
+ for (byte [] n : standbys.get(serialize(deviceId))) {
+ if (!current.equals(deserialize(n))) {
+ backup = n;
+ break;
+ }
+ }
+
+ if (backup == null) {
+ masters.remove(did, nid);
+ return null;
+ } else {
+ masters.put(did, backup);
+ evict(backup, did);
+ Integer term = terms.get(did);
+ terms.put(did, ++term);
+ return new MastershipEvent(
+ MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
+ }
+ }
+
+ //adds node to pool(s) of backups and moves them from unusable.
+ private void backup(byte [] nodeId, byte [] deviceId) {
+ if (!standbys.containsEntry(deviceId, nodeId)) {
+ standbys.put(deviceId, nodeId);
+ }
+ if (unusable.containsEntry(deviceId, nodeId)) {
+ unusable.remove(deviceId, nodeId);
+ }
+ }
+
+ //adds node to unusable and evicts it from backup pool.
+ private void evict(byte [] nodeId, byte [] deviceId) {
+ if (!unusable.containsEntry(deviceId, nodeId)) {
+ unusable.put(deviceId, nodeId);
+ }
+ if (standbys.containsEntry(deviceId, nodeId)) {
+ standbys.remove(deviceId, nodeId);
+ }
+ }
+
+ //adds or updates term information.
+ private void updateTerm(byte [] deviceId) {
+ Integer term = terms.get(deviceId);
+ if (term == null) {
+ terms.put(deviceId, INIT);
+ } else {
+ terms.put(deviceId, ++term);
+ }
+ }
+
+ private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
@Override
protected void onAdd(DeviceId deviceId, NodeId nodeId) {
@@ -141,12 +325,13 @@
@Override
protected void onRemove(DeviceId deviceId, NodeId nodeId) {
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
@Override
protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ //only addition indicates a change in mastership
+ //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
}
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..bf1bb38
--- /dev/null
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,333 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipEvent.Type;
+import org.onlab.onos.cluster.MastershipStoreDelegate;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.common.StoreManager;
+import org.onlab.onos.store.common.StoreService;
+import org.onlab.onos.store.common.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+ private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+ private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+ private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+
+ private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+ private static final NodeId N1 = new NodeId("node1");
+ private static final NodeId N2 = new NodeId("node2");
+
+ private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+ private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+ private DistributedMastershipStore dms;
+ private TestDistributedMastershipStore testStore;
+ private KryoSerializer serializationMgr;
+ private StoreManager storeMgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // TODO should find a way to clean Hazelcast instance without shutdown.
+ Config config = TestStoreManager.getTestConfig();
+
+ storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+ storeMgr.activate();
+
+ serializationMgr = new KryoSerializer();
+
+ dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+ dms.clusterService = new TestClusterService();
+ dms.activate();
+
+ testStore = (TestDistributedMastershipStore) dms;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dms.deactivate();
+
+ storeMgr.deactivate();
+ }
+
+ @Test
+ public void getRole() {
+ assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+ testStore.put(DID1, N1, true, false, true);
+ assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+ assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+ }
+
+ @Test
+ public void getMaster() {
+ assertTrue("wrong store state:", dms.masters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ assertEquals("wrong master:", N1, dms.getMaster(DID1));
+ assertNull("wrong master:", dms.getMaster(DID2));
+ }
+
+ @Test
+ public void getDevices() {
+ assertTrue("wrong store state:", dms.masters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ testStore.put(DID2, N1, true, false, false);
+ testStore.put(DID3, N2, true, false, false);
+
+ assertEquals("wrong devices",
+ Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+ }
+
+ @Test
+ public void requestRoleAndTerm() {
+ //CN1 is "local"
+ testStore.setCurrent(CN1);
+
+ //if already MASTER, nothing should happen
+ testStore.put(DID2, N1, true, false, false);
+ assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+
+ //populate maps with DID1, N1 thru NONE case
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertTrue("wrong state for store:", !dms.terms.isEmpty());
+ assertEquals("wrong term",
+ MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+ //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong number of entries:", 2, dms.terms.size());
+
+ //change term and requestRole() again; should persist
+ testStore.increment(DID2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void setMaster() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+ //switch over to N2
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+ //orphan switch - should be rare case
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+ //disconnect and reconnect - sign of failing re-election or single-instance channel
+ testStore.reset(true, false, false);
+ dms.setMaster(N2, DID2);
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void relinquishRole() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ //no backup, no new MASTER/event
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+
+ dms.requestRole(DID1);
+
+ //add backup CN2, get it elected MASTER by relinquishing
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
+ assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+ //STANDBY - nothing here, either
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+ assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
+
+ //all nodes "give up" on device, which goes back to NONE.
+ assertNull("wrong event:", dms.relinquishRole(N2, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
+
+ assertEquals("wrong number of retired nodes", 2, dms.unusable.size());
+
+ //bring nodes back
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong number of backup nodes", 1, dms.standbys.size());
+
+ //NONE - nothing happens
+ assertNull("wrong event:", dms.relinquishRole(N1, DID2));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+
+ }
+
+ @Ignore("Ignore until Delegate spec. is clear.")
+ @Test
+ public void testEvents() throws InterruptedException {
+ //shamelessly copy other distributed store tests
+ final CountDownLatch addLatch = new CountDownLatch(1);
+
+ MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() {
+ @Override
+ public void notify(MastershipEvent event) {
+ assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
+ assertEquals("wrong subject", DID1, event.subject());
+ assertEquals("wrong subject", N1, event.master());
+ addLatch.countDown();
+ }
+ };
+
+ dms.setDelegate(checkAdd);
+ dms.setMaster(N1, DID1);
+ //this will fail until we do something about single-instance-ness
+ assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ private class TestDistributedMastershipStore extends
+ DistributedMastershipStore {
+ public TestDistributedMastershipStore(StoreService storeService,
+ KryoSerializer kryoSerialization) {
+ this.storeService = storeService;
+ this.serializer = kryoSerialization;
+ }
+
+ //helper to populate master/backup structures
+ public void put(DeviceId dev, NodeId node,
+ boolean master, boolean backup, boolean term) {
+ byte [] n = serialize(node);
+ byte [] d = serialize(dev);
+
+ if (master) {
+ dms.masters.put(d, n);
+ dms.unusable.put(d, n);
+ dms.standbys.remove(d, n);
+ }
+ if (backup) {
+ dms.standbys.put(d, n);
+ dms.masters.remove(d, n);
+ dms.unusable.remove(d, n);
+ }
+ if (term) {
+ dms.terms.put(d, 0);
+ }
+ }
+
+ //a dumb utility function.
+ public void dump() {
+ System.out.println("standbys");
+ for (Map.Entry<byte [], byte []> e : standbys.entrySet()) {
+ System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
+ }
+ System.out.println("unusable");
+ for (Map.Entry<byte [], byte []> e : unusable.entrySet()) {
+ System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
+ }
+ }
+
+ //clears structures
+ public void reset(boolean store, boolean backup, boolean term) {
+ if (store) {
+ dms.masters.clear();
+ dms.unusable.clear();
+ }
+ if (backup) {
+ dms.standbys.clear();
+ }
+ if (term) {
+ dms.terms.clear();
+ }
+ }
+
+ //increment term for a device
+ public void increment(DeviceId dev) {
+ Integer t = dms.terms.get(serialize(dev));
+ if (t != null) {
+ dms.terms.put(serialize(dev), ++t);
+ }
+ }
+
+ //sets the "local" node
+ public void setCurrent(ControllerNode node) {
+ ((TestClusterService) clusterService).current = node;
+ }
+ }
+
+ private class TestClusterService implements ClusterService {
+
+ protected ControllerNode current;
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return current;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(CN1, CN2);
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+
+ }
+
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
index ff9f43a..a22dd89 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
@@ -16,7 +16,7 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.Serializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,7 +34,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
- protected Serializer serializer;
+ protected StoreSerializer serializer;
protected HazelcastInstance theInstance;
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
index 6631594..d5fc380 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
@@ -2,7 +2,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import org.onlab.onos.store.serializers.Serializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
@@ -18,7 +18,7 @@
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
- private final Serializer serializer;
+ private final StoreSerializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
@@ -27,7 +27,7 @@
* @param serializer to use for serialization
* @param rawMap underlying IMap
*/
- public OptionalCacheLoader(Serializer serializer, IMap<byte[], byte[]> rawMap) {
+ public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
index 93ee854..738086e 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -7,9 +7,9 @@
import java.nio.ByteBuffer;
/**
- * Serializer implementation using Kryo.
+ * StoreSerializer implementation using Kryo.
*/
-public class KryoSerializer implements Serializer {
+public class KryoSerializer implements StoreSerializer {
private final Logger log = LoggerFactory.getLogger(getClass());
protected KryoPool serializerPool;
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
similarity index 96%
rename from core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java
rename to core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
index 12cf3bc..6c43a1b 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
@@ -6,7 +6,7 @@
/**
* Service to serialize Objects into byte array.
*/
-public interface Serializer {
+public interface StoreSerializer {
/**
* Serializes the specified object into bytes.
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
index 0439d79..e8096ea 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
@@ -174,7 +174,7 @@
}
@Override
- public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
@@ -214,4 +214,9 @@
return backup;
}
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ return setStandby(nodeId, deviceId);
+ }
+
}
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
index 32d3d68..1e8e5c7 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -129,22 +129,22 @@
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
- sms.unsetMaster(N1, DID1);
+ sms.setStandby(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
- sms.unsetMaster(N1, DID1);
+ sms.setStandby(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
- assertNull("wrong event", sms.unsetMaster(N1, DID1));
+ assertNull("wrong event", sms.setStandby(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
- assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
+ assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
}
//helper to populate master/backup structures
diff --git a/features/features.xml b/features/features.xml
index 0a56d9a..b240917 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -153,6 +153,7 @@
description="ONOS sample playground application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
+ <bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-config" version="1.0.0"
diff --git a/features/old-features.xml b/features/old-features.xml
deleted file mode 100644
index 46e68ca..0000000
--- a/features/old-features.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
- ~ Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
- ~
- ~ This program and the accompanying materials are made available under the
- ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
- ~ and is available at http://www.eclipse.org/legal/epl-v10.html
- -->
-
-<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
- name="net.onrc.onos-1.0.0">
- <repository>mvn:net.onrc.onos/onos-features/1.0.0-SNAPSHOT/xml/features</repository>
-
- <feature name="thirdparty" version="1.0.0"
- description="ONOS 3rd party dependencies">
- <bundle>mvn:com.google.code.findbugs/annotations/2.0.2</bundle>
- <bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
- <bundle>mvn:com.google.guava/guava/17.0</bundle>
- <bundle>mvn:com.google.guava/guava/15.0</bundle>
-
- </feature>
-
- <feature name="base" version="1.0.0"
- description="ONOS Base">
- <feature>scr</feature>
- <feature>thirdparty</feature>
- <bundle>mvn:net.onrc.onos.sb/onos-sb/0.0.1</bundle>
- <bundle>mvn:org.projectfloodlight/openflowj/0.3.6-SNAPSHOT</bundle>
- </feature>
-
-</features>
diff --git a/openflow/api/pom.xml b/openflow/api/pom.xml
index 2c58e47..afc2faf 100644
--- a/openflow/api/pom.xml
+++ b/openflow/api/pom.xml
@@ -17,14 +17,11 @@
<description>ONOS OpenFlow controller subsystem API</description>
<repositories>
- <!-- FIXME: for Loxigen. Decide how to use Loxigen before release. -->
+ <!-- FIXME: for Loxigen + optical experimenter. Decide how to use Loxigen before release. -->
<repository>
- <id>sonatype-oss-snapshot</id>
- <name>Sonatype OSS snapshot repository</name>
- <url>https://oss.sonatype.org/content/repositories/snapshots</url>
- <releases>
- <enabled>false</enabled>
- </releases>
+ <id>onlab-temp</id>
+ <name>ON.lab temporary repository</name>
+ <url>http://mavenrepo.onlab.us:8081/nexus/content/repositories/releases</url>
</repository>
</repositories>
@@ -32,7 +29,8 @@
<dependency>
<groupId>org.projectfloodlight</groupId>
<artifactId>openflowj</artifactId>
- <version>0.3.8-SNAPSHOT</version>
+ <!-- FIXME once experimenter gets merged to upstream -->
+ <version>0.3.8-optical_experimenter</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
index 178041d..5be7c69 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
@@ -981,11 +981,13 @@
// switch was a duplicate-dpid, calling the method below would clear
// all state for the original switch (with the same dpid),
// which we obviously don't want.
+ log.info("{}:removal called");
sw.removeConnectedSwitch();
} else {
// A duplicate was disconnected on this ChannelHandler,
// this is the same switch reconnecting, but the original state was
// not cleaned up - XXX check liveness of original ChannelHandler
+ log.info("{}:duplicate found");
duplicateDpidFound = Boolean.FALSE;
}
} else {
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
index e8ebcd1..716f7ec 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -307,9 +307,11 @@
connectedSwitches.remove(dpid);
OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
if (sw == null) {
+ log.warn("sw was null for {}", dpid);
sw = activeEqualSwitches.remove(dpid);
}
for (OpenFlowSwitchListener l : ofSwitchListener) {
+ log.warn("removal for {}", dpid);
l.switchRemoved(dpid);
}
}
diff --git a/tools/build/envDefaults b/tools/build/envDefaults
index 473095c..cbc6577 100644
--- a/tools/build/envDefaults
+++ b/tools/build/envDefaults
@@ -9,10 +9,14 @@
export KARAF_TAR=${KARAF_TAR:-~/Downloads/apache-karaf-3.0.1.tar.gz}
export KARAF_DIST=$(basename $KARAF_ZIP .zip)
+# Fallback build number us derived from from the user name & time
+export BUILD_NUMBER=${BUILD_NUMBER:-$(id -un)~$(date +'%Y/%m/%d@%H:%M')}
+
# ONOS Version and onos.tar.gz staging environment
-export ONOS_VERSION=${ONOS_VERSION:-1.0.0-SNAPSHOT}
+export ONOS_POM_VERSION="1.0.0-SNAPSHOT"
+export ONOS_VERSION=${ONOS_VERSION:-1.0.0.$BUILD_NUMBER}
+export ONOS_BITS=onos-${ONOS_VERSION%~*}
export ONOS_STAGE_ROOT=${ONOS_STAGE_ROOT:-/tmp}
-export ONOS_BITS=onos-$ONOS_VERSION
export ONOS_STAGE=$ONOS_STAGE_ROOT/$ONOS_BITS
export ONOS_TAR=$ONOS_STAGE.tar.gz
diff --git a/tools/build/onos-package b/tools/build/onos-package
index 2d6b954..a55a613 100755
--- a/tools/build/onos-package
+++ b/tools/build/onos-package
@@ -49,7 +49,7 @@
# ONOS Patching ----------------------------------------------------------------
# Patch the Apache Karaf distribution file to add ONOS features repository
-perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-features/$ONOS_VERSION/xml/features|" \
+perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-features/$ONOS_POM_VERSION/xml/features|" \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution file to load ONOS features
@@ -57,10 +57,14 @@
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution with ONOS branding bundle
-cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_VERSION/onos-branding-*.jar \
+cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_POM_VERSION/onos-branding-*.jar \
$ONOS_STAGE/$KARAF_DIST/lib
+# Patch in the ONOS version file
+echo $ONOS_VERSION > $ONOS_STAGE/VERSION
+
# Now package up the ONOS tar file
cd $ONOS_STAGE_ROOT
COPYFILE_DISABLE=1 tar zcf $ONOS_TAR $ONOS_BITS
ls -l $ONOS_TAR >&2
+rm -r $ONOS_STAGE
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index f4ed3c3..6c1444f 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -33,6 +33,7 @@
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
+alias go='ob && ot && onos -w'
alias pub='onos-push-update-bundle'
# Short-hand for tailing the ONOS (karaf) log
diff --git a/tools/package/bin/onos-service b/tools/package/bin/onos-service
index a5de5bd..c030887 100755
--- a/tools/package/bin/onos-service
+++ b/tools/package/bin/onos-service
@@ -4,6 +4,7 @@
#-------------------------------------------------------------------------------
export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/}
+export JAVA_OPTS="-Xms256M -Xmx2048M"
cd /opt/onos
/opt/onos/apache-karaf-3.0.1/bin/karaf "$@"
diff --git a/tools/test/bin/onos-push-update-bundle b/tools/test/bin/onos-push-update-bundle
index 6073bac..4f8ca7d 100755
--- a/tools/test/bin/onos-push-update-bundle
+++ b/tools/test/bin/onos-push-update-bundle
@@ -15,7 +15,7 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for node in $nodes; do
+ scp -q $jar $ONOS_USER@$node:.m2/repository/$jar
scp -q $jar $ONOS_USER@$node:$ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar
- ssh $ONOS_USER@$node "ls -l $ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar"
ssh $ONOS_USER@$node "$ONOS_INSTALL_DIR/bin/onos \"bundle:update -f $bundle\"" 2>/dev/null
done
diff --git a/tools/test/topos/tt.py b/tools/test/topos/tt.py
new file mode 100644
index 0000000..b74d446
--- /dev/null
+++ b/tools/test/topos/tt.py
@@ -0,0 +1,5 @@
+#!/usr/bin/python
+# Launches mininet with Tower topology configuration.
+import sys, tower
+net = tower.Tower(cip=sys.argv[1])
+net.run()
diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
index 357791e..8494d4e 100644
--- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
+++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
@@ -1,7 +1,5 @@
package org.onlab.metrics;
-import java.io.File;
-import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -10,9 +8,11 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
-import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
@@ -56,6 +56,7 @@
@Component(immediate = true)
public final class MetricsManager implements MetricsService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
/**
* Registry to hold the Components defined in the system.
*/
@@ -69,15 +70,20 @@
/**
* Default Reporter for this metrics manager.
*/
- private final CsvReporter reporter;
+ //private final Slf4jReporter reporter;
+ private final ConsoleReporter reporter;
public MetricsManager() {
this.metricsRegistry = new MetricRegistry();
- this.reporter = CsvReporter.forRegistry(metricsRegistry)
- .formatFor(Locale.US)
+// this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
+// .outputTo(log)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MICROSECONDS)
+// .build();
+ this.reporter = ConsoleReporter.forRegistry(this.metricsRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MICROSECONDS)
- .build(new File("/var/onos/log/metrics/"));
+ .build();
}
@Activate
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index c5162f6..5643098 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -4,6 +4,12 @@
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ThreadFactory;
public abstract class Tools {
@@ -66,4 +72,24 @@
}
}
+ /**
+ * Slurps the contents of a file into a list of strings, one per line.
+ *
+ * @param path file path
+ * @return file contents
+ */
+ public static List<String> slurp(File path) {
+ try (BufferedReader br = new BufferedReader(new FileReader(path))) {
+ List<String> lines = new ArrayList<>();
+ String line;
+ while ((line = br.readLine()) != null) {
+ lines.add(line);
+ }
+ return lines;
+
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index d4832e5..b494ce9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,11 +8,16 @@
import java.util.Arrays;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
@@ -57,4 +62,10 @@
checkState(false, "Must not be here");
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 716efb9..d026dec 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -1,5 +1,8 @@
package org.onlab.netty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -11,6 +14,8 @@
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
@@ -31,11 +36,6 @@
// write preamble
out.writeBytes(PREAMBLE);
- try {
- SERIALIZER.encode(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
byte[] payload = SERIALIZER.encode(message);
// write payload length
@@ -47,4 +47,10 @@
// write payload.
out.writeBytes(payload);
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 48aeb30..5a51ad4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -248,6 +248,7 @@
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
deleted file mode 100644
index 3869948..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-
-import org.onlab.metrics.MetricsComponent;
-import org.onlab.metrics.MetricsFeature;
-import org.onlab.metrics.MetricsManager;
-
-import com.codahale.metrics.Timer;
-
-// FIXME: Should be move out to test or app
-public final class SimpleClient {
- private SimpleClient() {
- }
-
- public static void main(String... args) throws Exception {
- NettyMessagingService messaging = new TestNettyMessagingService(9081);
- MetricsManager metrics = new MetricsManager();
- messaging.activate();
- metrics.activate();
- MetricsFeature feature = new MetricsFeature("timers");
- MetricsComponent component = metrics.registerComponent("NettyMessaging");
- Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
- final int warmup = 100;
- for (int i = 0; i < warmup; i++) {
- Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
-
- Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
- final int iterations = 1000000;
- for (int i = 0; i < iterations; i++) {
- Timer.Context context = sendAndReceiveTimer.time();
- Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8080), "echo",
- "Hello World".getBytes());
- System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
- }
-
- public static class TestNettyMessagingService extends NettyMessagingService {
- public TestNettyMessagingService(int port) throws Exception {
- super(port);
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
deleted file mode 100644
index b8ae5b0..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.onlab.netty;
-
-//FIXME: Should be move out to test or app
-public final class SimpleServer {
- private SimpleServer() {}
-
- public static void main(String... args) throws Exception {
- NettyMessagingService server = new NettyMessagingService(8080);
- server.activate();
- server.registerHandler("simple", new LoggingHandler());
- server.registerHandler("echo", new EchoHandler());
- }
-}