Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java b/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java
index bc10c82..b35a46f 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/NettyLoggingHandler.java
@@ -14,6 +14,6 @@
@Override
public void handle(Message message) {
- log.info("Received message. Payload has {} bytes", message.payload().length);
+ //log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
index dbdb3db..5e3f8f7 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
@@ -10,12 +10,17 @@
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleNettyClient {
- private SimpleNettyClient() {
+
+private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
+
+ private SimpleNettyClient() {
}
public static void main(String[] args)
@@ -29,30 +34,33 @@
System.exit(0);
}
- public static void startStandalone(String... args) throws Exception {
+ public static void startStandalone(String[] args) throws Exception {
String host = args.length > 0 ? args[0] : "localhost";
int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
+ Endpoint endpoint = new Endpoint(host, port);
messaging.activate();
metrics.activate();
MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
+ log.info("connecting " + host + ":" + port + " warmup:" + warmup + " iterations:" + iterations);
for (int i = 0; i < warmup; i++) {
- messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
+ messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
Response response = messaging
- .sendAndReceive(new Endpoint(host, port), "echo",
+ .sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
}
+ log.info("measuring async sender");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
+ messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
@@ -60,11 +68,12 @@
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
- .sendAndReceive(new Endpoint(host, port), "echo",
+ .sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
+ metrics.deactivate();
}
public static class TestNettyMessagingService extends NettyMessagingService {
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
index 2af0c13..b63f9c1 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
@@ -16,27 +16,26 @@
//FIXME: replace these arguments with proper ones needed for the test.
@Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
- String host = "localhost";
+ String hostname = "localhost";
- @Argument(index = 3, name = "port", description = "Port",
+ @Argument(index = 1, name = "port", description = "Port",
required = false, multiValued = false)
String port = "8081";
- @Argument(index = 1, name = "warmupCount", description = "Warm-up count",
+ @Argument(index = 2, name = "warmupCount", description = "Warm-up count",
required = false, multiValued = false)
- String warmup = "1000";
+ String warmupCount = "1000";
- @Argument(index = 2, name = "messageCount", description = "Message count",
+ @Argument(index = 3, name = "messageCount", description = "Message count",
required = false, multiValued = false)
- String messageCount = "5000000";
+ String messageCount = "100000";
@Override
protected void execute() {
try {
- startStandalone(new String[]{host, port, warmup, messageCount});
+ startStandalone(new String[]{hostname, port, warmupCount, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
}
-
}
diff --git a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
index 1accddb..e5eac73 100644
--- a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
+++ b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
@@ -10,6 +10,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.Path;
@@ -53,13 +54,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
@Activate
public void activate() {
- appId = ApplicationId.getAppId();
+ appId = coreService.registerApplication("org.onlab.onos.fwd");
packetService.addProcessor(processor, PacketProcessor.ADVISOR_MAX + 2);
log.info("Started with Application ID {}", appId.id());
}
diff --git a/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java b/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java
index 7958f99..88b3a5c 100644
--- a/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java
+++ b/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java
@@ -10,6 +10,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.device.DeviceService;
@@ -44,11 +45,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
private ApplicationId appId;
@Activate
public void activate() {
- appId = ApplicationId.getAppId();
+ appId = coreService.registerApplication("org.onlab.onos.mobility");
hostService.addListener(new InternalHostListener());
log.info("Started with Application ID {}", appId.id());
}
diff --git a/apps/proxyarp/src/main/java/org/onlab/onos/proxyarp/ProxyArp.java b/apps/proxyarp/src/main/java/org/onlab/onos/proxyarp/ProxyArp.java
index a06470f..dc231ce 100644
--- a/apps/proxyarp/src/main/java/org/onlab/onos/proxyarp/ProxyArp.java
+++ b/apps/proxyarp/src/main/java/org/onlab/onos/proxyarp/ProxyArp.java
@@ -8,6 +8,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
@@ -31,11 +32,14 @@
private ProxyArpProcessor processor = new ProxyArpProcessor();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
private ApplicationId appId;
@Activate
public void activate() {
- appId = ApplicationId.getAppId();
+ appId = coreService.registerApplication("org.onlab.onos.proxyarp");
packetService.addProcessor(processor, PacketProcessor.ADVISOR_MAX + 1);
log.info("Started with Application ID {}", appId.id());
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
index be2e964..837a0a7 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
@@ -27,7 +27,7 @@
required = true, multiValued = false)
String two = null;
- private static long id = 1;
+ private static long id = 0x7870001;
@Override
protected void execute() {
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
index 15cf176..89ec7f6 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
@@ -14,6 +14,7 @@
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
+import org.onlab.packet.Ethernet;
/**
* Installs point-to-point connectivity intents.
@@ -32,7 +33,7 @@
required = true, multiValued = false)
String egressDeviceString = null;
- private static long id = 1;
+ private static long id = 0x7470001;
@Override
protected void execute() {
@@ -48,7 +49,9 @@
PortNumber.portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
- TrafficSelector selector = DefaultTrafficSelector.builder().build();
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
Intent intent =
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
index 902b27b..28309c5 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
@@ -1,14 +1,9 @@
package org.onlab.onos.cli.net;
-import static com.google.common.collect.Lists.newArrayList;
-import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.CoreService;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.cli.Comparators;
import org.onlab.onos.net.Device;
@@ -18,37 +13,43 @@
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRuleService;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
/**
* Lists all currently-known hosts.
*/
@Command(scope = "onos", name = "flows",
-description = "Lists all currently-known flows.")
+ description = "Lists all currently-known flows.")
public class FlowsListCommand extends AbstractShellCommand {
public static final String ANY = "any";
private static final String FMT =
- " id=%s, state=%s, bytes=%s, packets=%s, duration=%s, priority=%s";
+ " id=%s, state=%s, bytes=%s, packets=%s, duration=%s, priority=%s, appId=%s";
private static final String TFMT = " treatment=%s";
private static final String SFMT = " selector=%s";
@Argument(index = 1, name = "uri", description = "Device ID",
- required = false, multiValued = false)
+ required = false, multiValued = false)
String uri = null;
@Argument(index = 0, name = "state", description = "Flow Rule state",
- required = false, multiValued = false)
+ required = false, multiValued = false)
String state = null;
@Override
protected void execute() {
+ CoreService coreService = get(CoreService.class);
DeviceService deviceService = get(DeviceService.class);
FlowRuleService service = get(FlowRuleService.class);
Map<Device, List<FlowEntry>> flows = getSortedFlows(deviceService, service);
for (Device d : getSortedDevices(deviceService)) {
- printFlows(d, flows.get(d));
+ printFlows(d, flows.get(d), coreService);
}
}
@@ -67,7 +68,7 @@
s = FlowEntryState.valueOf(state.toUpperCase());
}
Iterable<Device> devices = uri == null ? deviceService.getDevices() :
- Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
+ Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
for (Device d : devices) {
if (s == null) {
rules = newArrayList(service.getFlowEntries(d.id()));
@@ -87,16 +88,19 @@
/**
* Prints flows.
- * @param d the device
+ *
+ * @param d the device
* @param flows the set of flows for that device.
*/
- protected void printFlows(Device d, List<FlowEntry> flows) {
+ protected void printFlows(Device d, List<FlowEntry> flows,
+ CoreService coreService) {
boolean empty = flows == null || flows.isEmpty();
print("deviceId=%s, flowRuleCount=%d", d.id(), empty ? 0 : flows.size());
if (!empty) {
for (FlowEntry f : flows) {
- print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
- f.packets(), f.life(), f.priority());
+ print(FMT, Long.toHexString(f.id().value()), f.state(),
+ f.bytes(), f.packets(), f.life(), f.priority(),
+ coreService.getAppId(f.appId()).name());
print(SFMT, f.selector().criteria());
print(TFMT, f.treatment().instructions());
}
diff --git a/core/api/src/main/java/org/onlab/onos/ApplicationId.java b/core/api/src/main/java/org/onlab/onos/ApplicationId.java
index 433265e..3fab53f 100644
--- a/core/api/src/main/java/org/onlab/onos/ApplicationId.java
+++ b/core/api/src/main/java/org/onlab/onos/ApplicationId.java
@@ -1,56 +1,21 @@
package org.onlab.onos;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
/**
- * Application id generator class.
+ * Application identifier.
*/
-public final class ApplicationId {
-
- private static final AtomicInteger ID_DISPENCER = new AtomicInteger(1);
- private final Integer id;
-
- // Ban public construction
- private ApplicationId(Integer id) {
- this.id = id;
- }
-
- public Integer id() {
- return id;
- }
-
- public static ApplicationId valueOf(Integer id) {
- return new ApplicationId(id);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof ApplicationId)) {
- return false;
- }
- ApplicationId other = (ApplicationId) obj;
- return Objects.equals(this.id, other.id);
- }
+public interface ApplicationId {
/**
- * Returns a new application id.
- *
- * @return app id
+ * Returns the application id.
+ * @return a short value
*/
- public static ApplicationId getAppId() {
- return new ApplicationId(ApplicationId.ID_DISPENCER.getAndIncrement());
- }
+ short id();
+
+ /**
+ * Returns the applications supplied identifier.
+ * @return a string identifier
+ */
+ String name();
}
diff --git a/core/api/src/main/java/org/onlab/onos/CoreService.java b/core/api/src/main/java/org/onlab/onos/CoreService.java
index 32c36c5..3302888 100644
--- a/core/api/src/main/java/org/onlab/onos/CoreService.java
+++ b/core/api/src/main/java/org/onlab/onos/CoreService.java
@@ -12,4 +12,21 @@
*/
Version version();
+ /**
+ * Registers a new application by its name, which is expected
+ * to follow the reverse DNS convention, e.g.
+ * {@code org.flying.circus.app}
+ *
+ * @param identifier string identifier
+ * @return the application id
+ */
+ ApplicationId registerApplication(String identifier);
+
+ /**
+ * Returns an existing application id from a given id.
+ * @param id the short value of the id
+ * @return an application id
+ */
+ ApplicationId getAppId(Short id);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
index b05aa62..51b6f6a 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
@@ -34,7 +34,7 @@
/**
* Abandons mastership of the specified device on the local node thus
* forcing selection of a new master. If the local node is not a master
- * for this device, no action will be taken.
+ * for this device, no master selection will occur.
*
* @param deviceId the identifier of the device
*/
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
index bedc5e9..dc5603f 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
@@ -66,12 +66,25 @@
MastershipTerm getTermFor(DeviceId deviceId);
/**
- * Revokes a controller instance's mastership over a device and hands
- * over mastership to another controller instance.
+ * Sets a controller instance's mastership role to STANDBY for a device.
+ * If the role is MASTER, another controller instance will be selected
+ * as a candidate master.
*
* @param nodeId the controller instance identifier
- * @param deviceId device to revoke mastership for
+ * @param deviceId device to revoke mastership role for
* @return a mastership event
*/
- MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
+ MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
+
+ /**
+ * Allows a controller instance to give up its current role for a device.
+ * If the role is MASTER, another controller instance will be selected
+ * as a candidate master.
+ *
+ * @param nodeId the controller instance identifier
+ * @param deviceId device to revoke mastership role for
+ * @return a mastership event
+ */
+ MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java b/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
index 8364935..54b9d72 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DeviceService.java
@@ -42,6 +42,7 @@
* @param deviceId device identifier
* @return designated mastership role
*/
+ //XXX do we want this method here when MastershipService already does?
MastershipRole getRole(DeviceId deviceId);
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
new file mode 100644
index 0000000..43fd694
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
@@ -0,0 +1,23 @@
+package org.onlab.onos.net.flow;
+
+import java.util.List;
+
+/**
+ * Interface capturing the result of a batch operation.
+ *
+ */
+public interface BatchOperationResult<T> {
+
+ /**
+ * Returns whether the operation was successful.
+ * @return true if successful, false otherwise
+ */
+ boolean isSuccess();
+
+ /**
+ * Obtains a list of items which failed.
+ * @return a list of failures
+ */
+ List<T> failedItems();
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index bde752e..e9889cd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -1,6 +1,29 @@
package org.onlab.onos.net.flow;
-public class CompletedBatchOperation {
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
+
+
+ private final boolean success;
+ private final List<FlowEntry> failures;
+
+ public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
+ this.success = success;
+ this.failures = ImmutableList.copyOf(failures);
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+
+ @Override
+ public List<FlowEntry> failedItems() {
+ return failures;
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
index 5a0f55b..d4657d2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
@@ -17,6 +17,10 @@
private long lastSeen = -1;
+ private final int errType;
+
+ private final int errCode;
+
public DefaultFlowEntry(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowEntryState state,
@@ -27,6 +31,8 @@
this.life = life;
this.packets = packets;
this.bytes = bytes;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
@@ -37,6 +43,8 @@
this.life = life;
this.packets = packets;
this.bytes = bytes;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
@@ -46,9 +54,18 @@
this.life = 0;
this.packets = 0;
this.bytes = 0;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
+ public DefaultFlowEntry(FlowRule rule, int errType, int errCode) {
+ super(rule);
+ this.state = FlowEntryState.FAILED;
+ this.errType = errType;
+ this.errCode = errCode;
+ }
+
@Override
public long life() {
return life;
@@ -100,6 +117,16 @@
}
@Override
+ public int errType() {
+ return this.errType;
+ }
+
+ @Override
+ public int errCode() {
+ return this.errCode;
+ }
+
+ @Override
public String toString() {
return toStringHelper(this)
.add("rule", super.toString())
@@ -108,4 +135,6 @@
}
+
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
index 47e9fed..e5504db 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
@@ -21,7 +21,7 @@
private final FlowId id;
- private final ApplicationId appId;
+ private final short appId;
private final int timeout;
@@ -36,7 +36,7 @@
this.timeout = timeout;
this.created = System.currentTimeMillis();
- this.appId = ApplicationId.valueOf((int) (flowId >> 32));
+ this.appId = (short) (flowId >>> 48);
this.id = FlowId.valueOf(flowId);
}
@@ -52,11 +52,11 @@
this.priority = priority;
this.selector = selector;
this.treatment = treatement;
- this.appId = appId;
+ this.appId = appId.id();
this.timeout = timeout;
this.created = System.currentTimeMillis();
- this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
+ this.id = FlowId.valueOf((((long) this.appId) << 48) | (this.hash() & 0x0000ffffffffL));
}
public DefaultFlowRule(FlowRule rule) {
@@ -78,7 +78,7 @@
}
@Override
- public ApplicationId appId() {
+ public short appId() {
return appId;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
index 31c53a8..a388b48 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
@@ -140,6 +140,16 @@
}
@Override
+ public Builder matchTcpSrc(Short tcpPort) {
+ return add(Criteria.matchTcpSrc(tcpPort));
+ }
+
+ @Override
+ public Builder matchTcpDst(Short tcpPort) {
+ return add(Criteria.matchTcpDst(tcpPort));
+ }
+
+ @Override
public TrafficSelector build() {
return new DefaultTrafficSelector(ImmutableSet.copyOf(selector.values()));
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
index 5b5f89b..882c9df 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
@@ -29,7 +29,12 @@
/**
* Flow has been removed from flow table and can be purged.
*/
- REMOVED
+ REMOVED,
+
+ /**
+ * Indicates that the installation of this flow has failed.
+ */
+ FAILED
}
/**
@@ -95,4 +100,16 @@
*/
void setBytes(long bytes);
+ /**
+ * Indicates the error type.
+ * @return an integer value of the error
+ */
+ int errType();
+
+ /**
+ * Indicates the error code.
+ * @return an integer value of the error
+ */
+ int errCode();
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 410aed4..c63f247 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -1,6 +1,5 @@
package org.onlab.onos.net.flow;
-import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.intent.BatchOperationTarget;
@@ -26,7 +25,7 @@
*
* @return an applicationId
*/
- ApplicationId appId();
+ short appId();
/**
* Returns the flow rule priority given in natural order; higher numbers
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 68762ac..3592e39 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -37,6 +37,12 @@
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
- Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+ /**
+ * Installs a batch of flow rules. Each flowrule is associated to an
+ * operation which results in either addition, removal or modification.
+ * @param batch a batch of flow rules
+ * @return a future indicating the status of this execution
+ */
+ Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
index c704c8f..41bceb8 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
@@ -98,6 +98,20 @@
public Builder matchIPDst(IpPrefix ip);
/**
+ * Matches a TCP source port number.
+ * @param tcpPort a TCP source port number
+ * @return a selection builder
+ */
+ public Builder matchTcpSrc(Short tcpPort);
+
+ /**
+ * Matches a TCP destination port number.
+ * @param tcpPort a TCP destination port number
+ * @return a selection builder
+ */
+ public Builder matchTcpDst(Short tcpPort);
+
+ /**
* Builds an immutable traffic selector.
*
* @return traffic selector
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
index a819bd3..8bd0960 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
@@ -113,6 +113,25 @@
return new IPCriterion(ip, Type.IPV4_DST);
}
+ /**
+ * Creates a match on TCP source port field using the specified value.
+ *
+ * @param tcpPort
+ * @return match criterion
+ */
+ public static Criterion matchTcpSrc(Short tcpPort) {
+ return new TcpPortCriterion(tcpPort, Type.TCP_SRC);
+ }
+
+ /**
+ * Creates a match on TCP destination port field using the specified value.
+ *
+ * @param tcpPort
+ * @return match criterion
+ */
+ public static Criterion matchTcpDst(Short tcpPort) {
+ return new TcpPortCriterion(tcpPort, Type.TCP_DST);
+ }
/*
* Implementations of criteria.
@@ -437,4 +456,49 @@
}
+ public static final class TcpPortCriterion implements Criterion {
+
+ private final Short tcpPort;
+ private final Type type;
+
+ public TcpPortCriterion(Short tcpPort, Type type) {
+ this.tcpPort = tcpPort;
+ this.type = type;
+ }
+
+ @Override
+ public Type type() {
+ return this.type;
+ }
+
+ public Short tcpPort() {
+ return this.tcpPort;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(type().toString())
+ .add("tcpPort", tcpPort).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tcpPort, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof TcpPortCriterion) {
+ TcpPortCriterion that = (TcpPortCriterion) obj;
+ return Objects.equals(tcpPort, that.tcpPort) &&
+ Objects.equals(type, that.type);
+
+
+ }
+ return false;
+ }
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
index 738be04..9855498 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
@@ -1,5 +1,9 @@
package org.onlab.onos.net.intent;
+import java.util.concurrent.Future;
+
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+
/**
* Abstraction of entity capable of installing intents to the environment.
*/
@@ -10,7 +14,7 @@
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
- void install(T intent);
+ Future<CompletedBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
@@ -18,5 +22,5 @@
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
- void uninstall(T intent);
+ Future<CompletedBatchOperation> uninstall(T intent);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
index fc023bb..d693c9b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
@@ -33,6 +33,8 @@
/**
* Returns the number of intents in the store.
+ *
+ * @return the number of intents in the store
*/
long getIntentCount();
@@ -44,7 +46,7 @@
Iterable<Intent> getIntents();
/**
- * Returns the intent with the specified identifer.
+ * Returns the intent with the specified identifier.
*
* @param intentId intent identification
* @return intent or null if not found
@@ -94,7 +96,6 @@
* specified original intent.
*
* @param intentId original intent identifier
- * @return compiled state transition event
*/
void removeInstalledIntents(IntentId intentId);
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java b/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
index ff97f5b..3e5e46f 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
@@ -53,4 +53,4 @@
* while the system determines where to perform the compilation or while it
* performs global recomputation/optimization across all prior intents.
*/
-package org.onlab.onos.net.intent;
\ No newline at end of file
+package org.onlab.onos.net.intent;
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
index 7eb0e19..163a056 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
@@ -1,17 +1,25 @@
package org.onlab.onos.net.intent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.onlab.onos.net.intent.IntentEvent.Type.FAILED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.INSTALLED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.SUBMITTED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.WITHDRAWN;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Future;
-import static org.junit.Assert.*;
-import static org.onlab.onos.net.intent.IntentEvent.Type.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Suite of tests for the intent service contract.
@@ -290,17 +298,19 @@
}
@Override
- public void install(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
+ return null;
}
@Override
- public void uninstall(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
+ return null;
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
deleted file mode 100644
index 4b1191f..0000000
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.onlab.onos.cluster.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.CoreService;
-import org.onlab.onos.Version;
-import org.onlab.util.Tools;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * Core service implementation.
- */
-@Component
-@Service
-public class CoreManager implements CoreService {
-
- private static final File VERSION_FILE = new File("../VERSION");
- private static Version version = Version.version("1.0.0-SNAPSHOT");
-
- // TODO: work in progress
-
- @Activate
- public void activate() {
- List<String> versionLines = Tools.slurp(VERSION_FILE);
- if (versionLines != null && !versionLines.isEmpty()) {
- version = Version.version(versionLines.get(0));
- }
- }
-
- @Override
- public Version version() {
- return version;
- }
-
-}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
index a0da87c..125745b 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
@@ -82,7 +82,7 @@
if (role.equals(MastershipRole.MASTER)) {
event = store.setMaster(nodeId, deviceId);
} else {
- event = store.unsetMaster(nodeId, deviceId);
+ event = store.setStandby(nodeId, deviceId);
}
if (event != null) {
@@ -98,13 +98,10 @@
@Override
public void relinquishMastership(DeviceId deviceId) {
- MastershipRole role = getLocalRole(deviceId);
- if (!role.equals(MastershipRole.MASTER)) {
- return;
- }
-
- MastershipEvent event = store.unsetMaster(
+ MastershipEvent event = null;
+ event = store.relinquishRole(
clusterService.getLocalNode().id(), deviceId);
+
if (event != null) {
post(event);
}
diff --git a/core/net/src/main/java/org/onlab/onos/impl/CoreManager.java b/core/net/src/main/java/org/onlab/onos/impl/CoreManager.java
new file mode 100644
index 0000000..edfc080
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/impl/CoreManager.java
@@ -0,0 +1,59 @@
+package org.onlab.onos.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
+import org.onlab.onos.Version;
+import org.onlab.util.Tools;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Core service implementation.
+ */
+@Component
+@Service
+public class CoreManager implements CoreService {
+
+ private static final AtomicInteger ID_DISPENSER = new AtomicInteger(1);
+
+ private static final File VERSION_FILE = new File("../VERSION");
+ private static Version version = Version.version("1.0.0-SNAPSHOT");
+
+ private final Map<Short, DefaultApplicationId> appIds = new ConcurrentHashMap<>();
+
+ // TODO: work in progress
+
+ @Activate
+ public void activate() {
+ List<String> versionLines = Tools.slurp(VERSION_FILE);
+ if (versionLines != null && !versionLines.isEmpty()) {
+ version = Version.version(versionLines.get(0));
+ }
+ }
+
+ @Override
+ public Version version() {
+ return version;
+ }
+
+ @Override
+ public ApplicationId getAppId(Short id) {
+ return appIds.get(id);
+ }
+
+ @Override
+ public ApplicationId registerApplication(String name) {
+ short id = (short) ID_DISPENSER.getAndIncrement();
+ DefaultApplicationId appId = new DefaultApplicationId(id, name);
+ appIds.put(id, appId);
+ return appId;
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/impl/DefaultApplicationId.java b/core/net/src/main/java/org/onlab/onos/impl/DefaultApplicationId.java
new file mode 100644
index 0000000..eed5fb0
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/impl/DefaultApplicationId.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.impl;
+
+import org.onlab.onos.ApplicationId;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Application id generator class.
+ */
+public class DefaultApplicationId implements ApplicationId {
+
+ private final short id;
+ private final String name;
+
+ // Ban public construction
+ protected DefaultApplicationId(Short id, String identifier) {
+ this.id = id;
+ this.name = identifier;
+ }
+
+ @Override
+ public short id() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof DefaultApplicationId) {
+ DefaultApplicationId other = (DefaultApplicationId) obj;
+ return Objects.equals(this.id, other.id);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("id", id).add("name", name).toString();
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/impl/package-info.java
new file mode 100644
index 0000000..bbe539f
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ *
+ */
+package org.onlab.onos.impl;
\ No newline at end of file
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 425adca..8cde5a3 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -143,7 +143,7 @@
// Applies the specified role to the device; ignores NONE
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
- if (newRole != MastershipRole.NONE) {
+ if (newRole.equals(MastershipRole.NONE)) {
Device device = store.getDevice(deviceId);
// FIXME: Device might not be there yet. (eventual consistent)
if (device == null) {
@@ -257,13 +257,14 @@
// temporarily request for Master Role and mark offline.
if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
log.debug("Device {} disconnected, but I am not the master", deviceId);
+ //let go of any role anyways
+ mastershipService.relinquishMastership(deviceId);
return;
}
DeviceEvent event = store.markOffline(deviceId);
-
+ //we're no longer capable of being master or a candidate.
mastershipService.relinquishMastership(deviceId);
- //we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
post(event);
@@ -319,24 +320,29 @@
}
// Intercepts mastership events
- private class InternalMastershipListener
- implements MastershipListener {
+ private class InternalMastershipListener implements MastershipListener {
+
@Override
public void event(MastershipEvent event) {
- final NodeId myNodeId = clusterService.getLocalNode().id();
- if (myNodeId.equals(event.master())) {
+ final DeviceId did = event.subject();
+ if (isAvailable(did)) {
+ final NodeId myNodeId = clusterService.getLocalNode().id();
- MastershipTerm term = mastershipService.requestTermService()
- .getMastershipTerm(event.subject());
+ if (myNodeId.equals(event.master())) {
+ MastershipTerm term = termService.getMastershipTerm(did);
- if (term.master().equals(myNodeId)) {
- // only set the new term if I am the master
- clockProviderService.setMastershipTerm(event.subject(), term);
+ if (term.master().equals(myNodeId)) {
+ // only set the new term if I am the master
+ clockProviderService.setMastershipTerm(did, term);
+ }
+ applyRole(did, MastershipRole.MASTER);
+ } else {
+ applyRole(did, MastershipRole.STANDBY);
}
-
- applyRole(event.subject(), MastershipRole.MASTER);
} else {
- applyRole(event.subject(), MastershipRole.STANDBY);
+ //device dead to node, give up
+ mastershipService.relinquishMastership(did);
+ applyRole(did, MastershipRole.STANDBY);
}
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a9eddd8..ac8d607 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,10 +5,12 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -26,6 +28,7 @@
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
@@ -52,6 +55,8 @@
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
+ enum BatchState { STARTED, FINISHED, CANCELLED };
+
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
@@ -144,7 +149,7 @@
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
- List<Future<Void>> futures = Lists.newArrayList();
+ List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
@@ -165,10 +170,10 @@
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
- Future<Void> future = provider.executeBatch(b);
+ Future<CompletedBatchOperation> future = provider.executeBatch(b);
futures.add(future);
}
- return new FlowRuleBatchFuture(futures);
+ return new FlowRuleBatchFuture(futures, batches);
}
@Override
@@ -341,59 +346,140 @@
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
- private final List<Future<Void>> futures;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+ private final AtomicReference<BatchState> state;
+ private CompletedBatchOperation overall;
- public FlowRuleBatchFuture(List<Future<Void>> futures) {
+
+
+ public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
this.futures = futures;
+ this.batches = batches;
+ state = new AtomicReference<FlowRuleManager.BatchState>();
+ state.set(BatchState.STARTED);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO Auto-generated method stub
- return false;
+ if (state.get() == BatchState.FINISHED) {
+ return false;
+ }
+ if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
+ return false;
+ }
+ cleanUpBatch();
+ for (Future<CompletedBatchOperation> f : futures) {
+ f.cancel(mayInterruptIfRunning);
+ }
+ return true;
}
@Override
public boolean isCancelled() {
- // TODO Auto-generated method stub
- return false;
+ return state.get() == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
- boolean isDone = true;
- for (Future<Void> future : futures) {
- isDone &= future.isDone();
- }
- return isDone;
+ return state.get() == BatchState.FINISHED;
}
+
@Override
public CompletedBatchOperation get() throws InterruptedException,
- ExecutionException {
- // TODO Auto-generated method stub
- for (Future<Void> future : futures) {
- future.get();
+ ExecutionException {
+
+ if (isDone()) {
+ return overall;
}
- return new CompletedBatchOperation();
+
+ boolean success = true;
+ List<FlowEntry> failed = Lists.newLinkedList();
+ CompletedBatchOperation completed;
+ for (Future<CompletedBatchOperation> future : futures) {
+ completed = future.get();
+ success = validateBatchOperation(failed, completed, future);
+ }
+
+ return finalizeBatchOperation(success, failed);
+
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- // TODO we should decrement the timeout
+
+ if (isDone()) {
+ return overall;
+ }
+ boolean success = true;
+ List<FlowEntry> failed = Lists.newLinkedList();
+ CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
- for (Future<Void> future : futures) {
+
+ for (Future<CompletedBatchOperation> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
- future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ success = validateBatchOperation(failed, completed, future);
}
- return new CompletedBatchOperation();
+ return finalizeBatchOperation(success, failed);
}
+ private boolean validateBatchOperation(List<FlowEntry> failed,
+ CompletedBatchOperation completed,
+ Future<CompletedBatchOperation> future) {
+
+ if (isCancelled()) {
+ throw new CancellationException();
+ }
+ if (!completed.isSuccess()) {
+ failed.addAll(completed.failedItems());
+ cleanUpBatch();
+ cancelAllSubBatches();
+ return false;
+ }
+ return true;
+ }
+
+ private void cancelAllSubBatches() {
+ for (Future<CompletedBatchOperation> f : futures) {
+ f.cancel(true);
+ }
+ }
+
+ private CompletedBatchOperation finalizeBatchOperation(boolean success,
+ List<FlowEntry> failed) {
+ synchronized (this) {
+ if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
+ if (state.get() == BatchState.FINISHED) {
+ return overall;
+ }
+ throw new CancellationException();
+ }
+ overall = new CompletedBatchOperation(success, failed);
+ return overall;
+ }
+ }
+
+ private void cleanUpBatch() {
+ for (FlowRuleBatchEntry fbe : batches.values()) {
+ if (fbe.getOperator() == FlowRuleOperation.ADD ||
+ fbe.getOperator() == FlowRuleOperation.MODIFY) {
+ store.deleteFlowRule(fbe.getTarget());
+ } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+ store.storeFlowRule(fbe.getTarget());
+ }
+ }
+
+ }
}
+
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 16b75f2..5824996 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -13,12 +13,17 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +33,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
@@ -44,7 +50,9 @@
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
@@ -67,7 +75,8 @@
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
- private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ private ExecutorService executor;
+ private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -86,6 +95,8 @@
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
+ executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
@@ -94,6 +105,8 @@
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
+ executor.shutdown();
+ monitorExecutor.shutdown();
log.info("Stopped");
}
@@ -240,14 +253,23 @@
}
}
- // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
- // TODO: implement compilation traversing tree structure
+ /**
+ * Compiles an intent recursively.
+ *
+ * @param intent intent
+ * @return result of compilation
+ */
private List<InstallableIntent> compileIntent(Intent intent) {
- List<InstallableIntent> installable = new ArrayList<>();
- for (Intent compiled : getCompiler(intent).compile(intent)) {
- InstallableIntent installableIntent = (InstallableIntent) compiled;
- installable.add(installableIntent);
+ if (intent instanceof InstallableIntent) {
+ return ImmutableList.of((InstallableIntent) intent);
}
+
+ List<InstallableIntent> installable = new ArrayList<>();
+ // TODO do we need to registerSubclassCompiler?
+ for (Intent compiled : getCompiler(intent).compile(intent)) {
+ installable.addAll(compileIntent(compiled));
+ }
+
return installable;
}
@@ -261,6 +283,7 @@
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
+ List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
@@ -268,17 +291,20 @@
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
- getInstaller(installable).install(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
+ installFutures.add(future);
}
}
- eventDispatcher.post(store.setState(intent, INSTALLED));
-
+ // FIXME we have to wait for the installable intents
+ //eventDispatcher.post(store.setState(intent, INSTALLED));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
- executeRecompilingPhase(intent);
+ // FIXME
+ //executeRecompilingPhase(intent);
}
}
@@ -327,12 +353,14 @@
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
- uninstallIntent(intent);
+ uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
- store.removeInstalledIntents(intent.id());
- eventDispatcher.post(store.setState(intent, WITHDRAWN));
+ // FIXME need to clean up
+ //store.removeInstalledIntents(intent.id());
+ // FIXME
+ //eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
@@ -340,14 +368,17 @@
*
* @param intent intent to be uninstalled
*/
- private void uninstallIntent(Intent intent) {
+ private void uninstallIntent(Intent intent, IntentState nextState) {
+ List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
- getInstaller(installable).uninstall(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
+ uninstallFutures.add(future);
}
}
+ monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
@@ -422,9 +453,10 @@
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
- executeRecompilingPhase(intent);
+ //FIXME
+ //executeRecompilingPhase(intent);
}
if (compileAllFailed) {
@@ -460,4 +492,49 @@
}
}
+ private class IntentInstallMonitor implements Runnable {
+
+ private final Intent intent;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final IntentState nextState;
+
+ public IntentInstallMonitor(Intent intent,
+ List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
+ this.intent = intent;
+ this.futures = futures;
+ this.nextState = nextState;
+ }
+
+ private void updateIntent(Intent intent) {
+ if (nextState == RECOMPILING) {
+ executor.execute(new IntentTask(nextState, intent));
+ } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
+ eventDispatcher.post(store.setState(intent, nextState));
+ } else {
+ log.warn("Invalid next intent state {} for intent {}", nextState, intent);
+ }
+ }
+
+ @Override
+ public void run() {
+ for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
+ Future<CompletedBatchOperation> future = i.next();
+ try {
+ // TODO: we may want to get the future here and go back to the future.
+ CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
+ // TODO check if future succeeded and if not report fail items
+ i.remove();
+
+ } catch (TimeoutException | InterruptedException | ExecutionException te) {
+ log.debug("Intallations of intent {} is still pending", intent);
+ }
+ }
+ if (futures.isEmpty()) {
+ updateIntent(intent);
+ } else {
+ // resubmit ourselves if we are not done yet
+ monitorExecutor.submit(this);
+ }
+ }
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index 0ca75c2..8111681 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -5,7 +5,7 @@
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -13,8 +13,10 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
@@ -45,10 +47,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
- private final ApplicationId appId = ApplicationId.getAppId();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ private ApplicationId appId;
@Activate
public void activate() {
+ appId = coreService.registerApplication("org.onlab.onos.net.intent");
intentManager.registerInstaller(PathIntent.class, this);
}
@@ -57,8 +63,26 @@
intentManager.unregisterInstaller(PathIntent.class);
}
+ /**
+ * Apply a list of FlowRules.
+ *
+ * @param rules rules to apply
+ */
+ private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
+ return future;
+// try {
+// //FIXME don't do this here
+// future.get();
+// } catch (InterruptedException | ExecutionException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+ }
+
@Override
- public void install(PathIntent intent) {
+ public Future<CompletedBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -74,20 +98,14 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
- //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+ return applyBatch(rules);
}
@Override
- public void uninstall(PathIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -103,15 +121,131 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
- //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ return applyBatch(rules);
+ }
+
+ // TODO refactor below this line... ----------------------------
+
+ /**
+ * Generates the series of MatchActionOperations from the
+ * {@link FlowBatchOperation}.
+ * <p>
+ * FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
+ * <p>
+ * FIXME: MatchActionOperations should have dependency field to the other
+ * match action operations, and this method should use this.
+ *
+ * @param op the {@link FlowBatchOperation} object
+ * @return the list of {@link MatchActionOperations} objects
+ */
+ /*
+ private List<MatchActionOperations>
+ generateMatchActionOperationsList(FlowBatchOperation op) {
+
+ // MatchAction operations at head (ingress) switches.
+ MatchActionOperations headOps = matchActionService.createOperationsList();
+
+ // MatchAction operations at rest of the switches.
+ MatchActionOperations tailOps = matchActionService.createOperationsList();
+
+ MatchActionOperations removeOps = matchActionService.createOperationsList();
+
+ for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
+
+ if (e.getOperator() == FlowBatchOperation.Operator.ADD) {
+ generateInstallMatchActionOperations(e, tailOps, headOps);
+ } else if (e.getOperator() == FlowBatchOperation.Operator.REMOVE) {
+ generateRemoveMatchActionOperations(e, removeOps);
+ } else {
+ throw new UnsupportedOperationException(
+ "FlowManager supports ADD and REMOVE operations only.");
+ }
+
+ }
+
+ return Arrays.asList(tailOps, headOps, removeOps);
+ }
+ */
+
+ /**
+ * Generates MatchActionOperations for an INSTALL FlowBatchOperation.
+ * <p/>
+ * FIXME: Currently only supports flows that generate exactly two match
+ * action operation sets.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param tailOps MatchActionOperation set that the tail
+ * MatchActionOperations will be placed in
+ * @param headOps MatchActionOperation set that the head
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateInstallMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations tailOps,
+ MatchActionOperations headOps) {
+
+ if (!(e.getTarget() instanceof Flow)) {
+ throw new IllegalStateException(
+ "The target is not Flow object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ Flow flow = (Flow) e.getTarget();
+ List<MatchActionOperations> maOps = flow.compile(
+ e.getOperator(), matchActionService);
+ verifyNotNull(maOps, "Could not compile the flow: " + flow);
+ verify(maOps.size() == 2,
+ "The flow generates unspported match-action operations.");
+
+ // Map FlowId to MatchActionIds
+ for (MatchActionOperations maOp : maOps) {
+ for (MatchActionOperationEntry entry : maOp.getOperations()) {
+ flowMatchActionsMap.put(
+ KryoFactory.serialize(flow.getId()),
+ KryoFactory.serialize(entry.getTarget()));
+ }
+ }
+
+ // Merge match-action operations
+ for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ tailOps.addOperation(mae);
+ }
+ for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ headOps.addOperation(mae);
}
}
+ */
+ /**
+ * Generates MatchActionOperations for a REMOVE FlowBatchOperation.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param removeOps MatchActionOperation set that the remove
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateRemoveMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations removeOps) {
+
+ if (!(e.getTarget() instanceof FlowId)) {
+ throw new IllegalStateException(
+ "The target is not a FlowId object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ FlowId flowId = (FlowId) e.getTarget();
+
+ for (byte[] matchActionIdBytes :
+ flowMatchActionsMap.remove(KryoFactory.serialize(flowId))) {
+ MatchActionId matchActionId = KryoFactory.deserialize(matchActionIdBytes);
+ removeOps.addOperation(new MatchActionOperationEntry(
+ MatchActionOperations.Operator.REMOVE, matchActionId));
+ }
+ }
+ */
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
index 4933322..ac10384 100644
--- a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
@@ -55,6 +55,7 @@
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
+ private static final String NOT_ARP_REPLY = "ARP is not a reply.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@@ -141,7 +142,7 @@
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
- checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REQUEST);
+ checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REPLY);
Host h = hostService.getHost(HostId.hostId(eth.getDestinationMAC(),
VlanId.vlanId(eth.getVlanID())));
diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
index 2cd6919..e833b4a 100644
--- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/device/impl/DeviceManagerTest.java
@@ -272,7 +272,8 @@
}
}
- private static class TestMastershipService extends MastershipServiceAdapter {
+ private static class TestMastershipService
+ extends MastershipServiceAdapter {
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 86f3ddc..fb579ea 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.event.impl.TestEventDispatcher;
+import org.onlab.onos.impl.DefaultApplicationId;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Device.Type;
@@ -28,6 +29,7 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
@@ -58,6 +60,8 @@
*/
public class FlowRuleManagerTest {
+
+
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID = DeviceId.deviceId("of:001");
private static final int TIMEOUT = 10;
@@ -86,7 +90,7 @@
mgr.addListener(listener);
provider = new TestProvider(PID);
providerService = registry.register(provider);
- appId = ApplicationId.getAppId();
+ appId = new TestApplicationId((short) 0, "FlowRuleManagerTest");
assertTrue("provider should be registered",
registry.getProviders().contains(provider.id()));
}
@@ -408,7 +412,7 @@
}
@Override
- public Future<Void> executeBatch(
+ public Future<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;
@@ -474,4 +478,11 @@
}
+ public class TestApplicationId extends DefaultApplicationId {
+
+ public TestApplicationId(short id, String name) {
+ super(id, name);
+ }
+ }
+
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManagerTest.java
new file mode 100644
index 0000000..ddd4827
--- /dev/null
+++ b/core/net/src/test/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManagerTest.java
@@ -0,0 +1,442 @@
+package org.onlab.onos.net.proxyarp.impl;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultHost;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.HostLocation;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions.OutputInstruction;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.link.LinkListener;
+import org.onlab.onos.net.link.LinkService;
+import org.onlab.onos.net.packet.OutboundPacket;
+import org.onlab.onos.net.packet.PacketProcessor;
+import org.onlab.onos.net.packet.PacketService;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.ARP;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Tests for the {@link ProxyArpManager} class.
+ */
+public class ProxyArpManagerTest {
+
+ private static final int NUM_DEVICES = 4;
+ private static final int NUM_PORTS_PER_DEVICE = 3;
+ private static final int NUM_FLOOD_PORTS = 4;
+
+ private static final IpPrefix IP1 = IpPrefix.valueOf("10.0.0.1/24");
+ private static final IpPrefix IP2 = IpPrefix.valueOf("10.0.0.2/24");
+
+ private static final ProviderId PID = new ProviderId("of", "foo");
+
+ private static final VlanId VLAN1 = VlanId.vlanId((short) 1);
+ private static final VlanId VLAN2 = VlanId.vlanId((short) 2);
+ private static final MacAddress MAC1 = MacAddress.valueOf("00:00:11:00:00:01");
+ private static final MacAddress MAC2 = MacAddress.valueOf("00:00:22:00:00:02");
+ private static final HostId HID1 = HostId.hostId(MAC1, VLAN1);
+ private static final HostId HID2 = HostId.hostId(MAC2, VLAN1);
+
+ private static final DeviceId DID1 = getDeviceId(1);
+ private static final DeviceId DID2 = getDeviceId(2);
+ private static final PortNumber P1 = PortNumber.portNumber(1);
+ private static final HostLocation LOC1 = new HostLocation(DID1, P1, 123L);
+ private static final HostLocation LOC2 = new HostLocation(DID2, P1, 123L);
+
+ private ProxyArpManager proxyArp;
+
+ private TestPacketService packetService;
+
+ private DeviceService deviceService;
+ private LinkService linkService;
+ private HostService hostService;
+
+ @Before
+ public void setUp() throws Exception {
+ proxyArp = new ProxyArpManager();
+ packetService = new TestPacketService();
+ proxyArp.packetService = packetService;
+
+ // Create a host service mock here. Must be replayed by tests once the
+ // expectations have been set up
+ hostService = createMock(HostService.class);
+ proxyArp.hostService = hostService;
+
+ createTopology();
+ proxyArp.deviceService = deviceService;
+ proxyArp.linkService = linkService;
+
+ proxyArp.activate();
+ }
+
+ /**
+ * Creates a fake topology to feed into the ARP module.
+ * <p/>
+ * The default topology is a unidirectional ring topology. Each switch has
+ * 3 ports. Ports 2 and 3 have the links to neighbor switches, and port 1
+ * is free (edge port).
+ */
+ private void createTopology() {
+ deviceService = createMock(DeviceService.class);
+ linkService = createMock(LinkService.class);
+
+ deviceService.addListener(anyObject(DeviceListener.class));
+ linkService.addListener(anyObject(LinkListener.class));
+
+ createDevices(NUM_DEVICES, NUM_PORTS_PER_DEVICE);
+ createLinks(NUM_DEVICES);
+ }
+
+ /**
+ * Creates the devices for the fake topology.
+ */
+ private void createDevices(int numDevices, int numPorts) {
+ List<Device> devices = new ArrayList<>();
+
+ for (int i = 1; i <= numDevices; i++) {
+ DeviceId devId = getDeviceId(i);
+ Device device = createMock(Device.class);
+ expect(device.id()).andReturn(devId).anyTimes();
+ replay(device);
+
+ devices.add(device);
+
+ List<Port> ports = new ArrayList<>();
+ for (int j = 1; j <= numPorts; j++) {
+ Port port = createMock(Port.class);
+ expect(port.number()).andReturn(PortNumber.portNumber(j)).anyTimes();
+ replay(port);
+ ports.add(port);
+ }
+
+ expect(deviceService.getPorts(devId)).andReturn(ports);
+ }
+
+ expect(deviceService.getDevices()).andReturn(devices);
+ replay(deviceService);
+ }
+
+ /**
+ * Creates the links for the fake topology.
+ * NB: Only unidirectional links are created, as for this purpose all we
+ * need is to occupy the ports with some link.
+ */
+ private void createLinks(int numDevices) {
+ List<Link> links = new ArrayList<Link>();
+
+ for (int i = 1; i <= numDevices; i++) {
+ ConnectPoint src = new ConnectPoint(
+ getDeviceId(i),
+ PortNumber.portNumber(2));
+ ConnectPoint dst = new ConnectPoint(
+ getDeviceId((i + 1 > numDevices) ? 1 : i + 1),
+ PortNumber.portNumber(3));
+
+ Link link = createMock(Link.class);
+ expect(link.src()).andReturn(src).anyTimes();
+ expect(link.dst()).andReturn(dst).anyTimes();
+ replay(link);
+
+ links.add(link);
+ }
+
+ expect(linkService.getLinks()).andReturn(links).anyTimes();
+ replay(linkService);
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#known(IpPrefix)} in the case where the
+ * IP address is not known.
+ * Verifies the method returns false.
+ */
+ @Test
+ public void testNotKnown() {
+ expect(hostService.getHostsByIp(IP1)).andReturn(Collections.<Host>emptySet());
+ replay(hostService);
+
+ assertFalse(proxyArp.known(IP1));
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#known(IpPrefix)} in the case where the
+ * IP address is known.
+ * Verifies the method returns true.
+ */
+ @Test
+ public void testKnown() {
+ Host host1 = createMock(Host.class);
+ Host host2 = createMock(Host.class);
+
+ expect(hostService.getHostsByIp(IP1))
+ .andReturn(Sets.newHashSet(host1, host2));
+ replay(hostService);
+
+ assertTrue(proxyArp.known(IP1));
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
+ * destination host is known.
+ * Verifies the correct ARP reply is sent out the correct port.
+ */
+ @Test
+ public void testReplyKnown() {
+ Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC2,
+ Collections.singleton(IP1));
+
+ Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
+ Collections.singleton(IP2));
+
+ expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
+ .andReturn(Collections.singleton(replyer));
+ expect(hostService.getHost(HID2)).andReturn(requestor);
+
+ replay(hostService);
+
+ Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
+
+ proxyArp.reply(arpRequest);
+
+ assertEquals(1, packetService.packets.size());
+ Ethernet arpReply = buildArp(ARP.OP_REPLY, MAC1, MAC2, IP1, IP2);
+ verifyPacketOut(arpReply, LOC1, packetService.packets.get(0));
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
+ * destination host is not known.
+ * Verifies the ARP request is flooded out the correct edge ports.
+ */
+ @Test
+ public void testReplyUnknown() {
+ Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
+ Collections.singleton(IP2));
+
+ expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
+ .andReturn(Collections.<Host>emptySet());
+ expect(hostService.getHost(HID2)).andReturn(requestor);
+
+ replay(hostService);
+
+ Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
+
+ proxyArp.reply(arpRequest);
+
+ verifyFlood(arpRequest);
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#reply(Ethernet)} in the case where the
+ * destination host is known for that IP address, but is not on the same
+ * VLAN as the source host.
+ * Verifies the ARP request is flooded out the correct edge ports.
+ */
+ @Test
+ public void testReplyDifferentVlan() {
+ Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN2, LOC2,
+ Collections.singleton(IP1));
+
+ Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
+ Collections.singleton(IP2));
+
+ expect(hostService.getHostsByIp(IpPrefix.valueOf(IP1.toOctets())))
+ .andReturn(Collections.singleton(replyer));
+ expect(hostService.getHost(HID2)).andReturn(requestor);
+
+ replay(hostService);
+
+ Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC2, null, IP2, IP1);
+
+ proxyArp.reply(arpRequest);
+
+ verifyFlood(arpRequest);
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#forward(Ethernet)} in the case where the
+ * destination host is known.
+ * Verifies the correct ARP request is sent out the correct port.
+ */
+ @Test
+ public void testForwardToHost() {
+ Host host1 = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC1,
+ Collections.singleton(IP1));
+
+ expect(hostService.getHost(HID1)).andReturn(host1);
+ replay(hostService);
+
+ Ethernet arpRequest = buildArp(ARP.OP_REPLY, MAC2, MAC1, IP2, IP1);
+
+ proxyArp.forward(arpRequest);
+
+ assertEquals(1, packetService.packets.size());
+ OutboundPacket packet = packetService.packets.get(0);
+
+ verifyPacketOut(arpRequest, LOC1, packet);
+ }
+
+ /**
+ * Tests {@link ProxyArpManager#forward(Ethernet)} in the case where the
+ * destination host is not known.
+ * Verifies the correct ARP request is flooded out the correct edge ports.
+ */
+ @Test
+ public void testForwardFlood() {
+ expect(hostService.getHost(HID1)).andReturn(null);
+ replay(hostService);
+
+ Ethernet arpRequest = buildArp(ARP.OP_REPLY, MAC2, MAC1, IP2, IP1);
+
+ proxyArp.forward(arpRequest);
+
+ verifyFlood(arpRequest);
+ }
+
+ /**
+ * Verifies that the given packet was flooded out all available edge ports.
+ *
+ * @param packet the packet that was expected to be flooded
+ */
+ private void verifyFlood(Ethernet packet) {
+ assertEquals(NUM_FLOOD_PORTS, packetService.packets.size());
+
+ Collections.sort(packetService.packets,
+ new Comparator<OutboundPacket>() {
+ @Override
+ public int compare(OutboundPacket o1, OutboundPacket o2) {
+ return o1.sendThrough().uri().compareTo(o2.sendThrough().uri());
+ }
+ });
+
+ for (int i = 0; i < NUM_FLOOD_PORTS; i++) {
+ ConnectPoint cp = new ConnectPoint(getDeviceId(i + 1), PortNumber.portNumber(1));
+
+ OutboundPacket outboundPacket = packetService.packets.get(i);
+ verifyPacketOut(packet, cp, outboundPacket);
+ }
+ }
+
+ /**
+ * Verifies the given packet was sent out the given port.
+ *
+ * @param expected the packet that was expected to be sent
+ * @param outPort the port the packet was expected to be sent out
+ * @param actual the actual OutboundPacket to verify
+ */
+ private void verifyPacketOut(Ethernet expected, ConnectPoint outPort,
+ OutboundPacket actual) {
+ assertTrue(Arrays.equals(expected.serialize(), actual.data().array()));
+ assertEquals(1, actual.treatment().instructions().size());
+ assertEquals(outPort.deviceId(), actual.sendThrough());
+ Instruction instruction = actual.treatment().instructions().get(0);
+ assertTrue(instruction instanceof OutputInstruction);
+ assertEquals(outPort.port(), ((OutputInstruction) instruction).port());
+ }
+
+ /**
+ * Returns the device ID of the ith device.
+ *
+ * @param i device to get the ID of
+ * @return the device ID
+ */
+ private static DeviceId getDeviceId(int i) {
+ return DeviceId.deviceId("" + i);
+ }
+
+ /**
+ * Builds an ARP packet with the given parameters.
+ *
+ * @param opcode opcode of the ARP packet
+ * @param srcMac source MAC address
+ * @param dstMac destination MAC address, or null if this is a request
+ * @param srcIp source IP address
+ * @param dstIp destination IP address
+ * @return the ARP packet
+ */
+ private Ethernet buildArp(short opcode, MacAddress srcMac, MacAddress dstMac,
+ IpPrefix srcIp, IpPrefix dstIp) {
+ Ethernet eth = new Ethernet();
+
+ if (dstMac == null) {
+ eth.setDestinationMACAddress(MacAddress.BROADCAST_MAC);
+ } else {
+ eth.setDestinationMACAddress(dstMac.getAddress());
+ }
+
+ eth.setSourceMACAddress(srcMac.getAddress());
+ eth.setEtherType(Ethernet.TYPE_ARP);
+ eth.setVlanID(VLAN1.toShort());
+
+ ARP arp = new ARP();
+ arp.setOpCode(opcode);
+ arp.setProtocolType(ARP.PROTO_TYPE_IP);
+ arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
+
+ arp.setProtocolAddressLength((byte) IpPrefix.INET_LEN);
+ arp.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH);
+ arp.setSenderHardwareAddress(srcMac.getAddress());
+
+ if (dstMac == null) {
+ arp.setTargetHardwareAddress(MacAddress.ZERO_MAC_ADDRESS);
+ } else {
+ arp.setTargetHardwareAddress(dstMac.getAddress());
+ }
+
+ arp.setSenderProtocolAddress(srcIp.toOctets());
+ arp.setTargetProtocolAddress(dstIp.toOctets());
+
+ eth.setPayload(arp);
+ return eth;
+ }
+
+ /**
+ * Test PacketService implementation that simply stores OutboundPackets
+ * passed to {@link #emit(OutboundPacket)} for later verification.
+ */
+ class TestPacketService implements PacketService {
+
+ List<OutboundPacket> packets = new ArrayList<>();
+
+ @Override
+ public void addProcessor(PacketProcessor processor, int priority) {
+ }
+
+ @Override
+ public void removeProcessor(PacketProcessor processor) {
+ }
+
+ @Override
+ public void emit(OutboundPacket packet) {
+ packets.add(packet);
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index d49e00b..084435f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -43,8 +43,8 @@
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
- private final Multimap<ApplicationId, FlowRule> flowEntriesById =
- ArrayListMultimap.<ApplicationId, FlowRule>create();
+ private final Multimap<Short, FlowRule> flowEntriesById =
+ ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
@@ -83,7 +83,7 @@
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
- Collection<FlowRule> rules = flowEntriesById.get(appId);
+ Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index bd7864a..71d42fa 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -1,10 +1,8 @@
package org.onlab.onos.store.cluster.impl;
-import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
@@ -21,17 +19,16 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
-import org.onlab.onos.store.common.OptionalCacheLoader;
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
+import com.hazelcast.core.MultiMap;
/**
- * Distributed implementation of the cluster nodes store.
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
*/
@Component(immediate = true)
@Service
@@ -39,8 +36,21 @@
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
- private IMap<byte[], byte[]> rawMasters;
- private LoadingCache<DeviceId, Optional<NodeId>> masters;
+ //arbitrary lock name
+ private static final String LOCK = "lock";
+ //initial term/TTL value
+ private static final Integer INIT = 0;
+
+ //devices to masters
+ protected IMap<byte[], byte[]> masters;
+ //devices to terms
+ protected IMap<byte[], Integer> terms;
+
+ //re-election related, disjoint-set structures:
+ //device-nodes multiset of available nodes
+ protected MultiMap<byte[], byte[]> standbys;
+ //device-nodes multiset for nodes that have given up on device
+ protected MultiMap<byte[], byte[]> unusable;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -50,99 +60,263 @@
public void activate() {
super.activate();
- rawMasters = theInstance.getMap("masters");
- OptionalCacheLoader<DeviceId, NodeId> nodeLoader
- = new OptionalCacheLoader<>(serializer, rawMasters);
- masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+ masters = theInstance.getMap("masters");
+ terms = theInstance.getMap("terms");
+ standbys = theInstance.getMultiMap("backups");
+ unusable = theInstance.getMultiMap("unusable");
- loadMasters();
+ masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
log.info("Started");
}
- private void loadMasters() {
- for (byte[] keyBytes : rawMasters.keySet()) {
- final DeviceId id = deserialize(keyBytes);
- masters.refresh(id);
- }
- }
-
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- synchronized (this) {
- NodeId currentMaster = getMaster(deviceId);
- if (Objects.equals(currentMaster, nodeId)) {
- return null;
- }
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ byte[] did = serialize(deviceId);
+ byte[] nid = serialize(nodeId);
- // FIXME: for now implementing semantics of setMaster
- rawMasters.put(serialize(deviceId), serialize(nodeId));
- masters.put(deviceId, Optional.of(nodeId));
- return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ NodeId current = deserialize(masters.get(did));
+ if (current == null) {
+ if (standbys.containsEntry(did, nid)) {
+ //was previously standby, or set to standby from master
+ return MastershipRole.STANDBY;
+ } else {
+ return MastershipRole.NONE;
+ }
+ } else {
+ if (current.equals(nodeId)) {
+ //*should* be in unusable, not always
+ return MastershipRole.MASTER;
+ } else {
+ //may be in backups or unusable from earlier retirement
+ return MastershipRole.STANDBY;
+ }
+ }
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ //reinforce mastership
+ evict(nid, did);
+ return null;
+ case STANDBY:
+ //make current master standby
+ byte [] current = masters.get(did);
+ if (current != null) {
+ backup(current, did);
+ }
+ //assign specified node as new master
+ masters.put(did, nid);
+ evict(nid, did);
+ updateTerm(did);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ case NONE:
+ masters.put(did, nid);
+ evict(nid, did);
+ updateTerm(did);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
- return masters.getUnchecked(deviceId).orNull();
+ return deserialize(masters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
- for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
- if (nodeId.equals(entry.getValue().get())) {
- builder.add(entry.getKey());
+
+ for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
+ if (nodeId.equals(deserialize(entry.getValue()))) {
+ builder.add((DeviceId) deserialize(entry.getKey()));
}
}
+
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
- // FIXME: for now we are 'selecting' as master whoever asks
- setMaster(clusterService.getLocalNode().id(), deviceId);
- return MastershipRole.MASTER;
- }
+ NodeId local = clusterService.getLocalNode().id();
+ byte [] did = serialize(deviceId);
+ byte [] lnid = serialize(local);
- @Override
- public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId master = masters.getUnchecked(deviceId).orNull();
- return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ evict(lnid, did);
+ break;
+ case STANDBY:
+ backup(lnid, did);
+ terms.putIfAbsent(did, INIT);
+ break;
+ case NONE:
+ //claim mastership
+ masters.put(did, lnid);
+ evict(lnid, did);
+ updateTerm(did);
+ role = MastershipRole.MASTER;
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ lock.unlock();
+ }
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // FIXME: implement this properly
- return MastershipTerm.of(getMaster(deviceId), 1);
+ byte[] did = serialize(deviceId);
+ if ((masters.get(did) == null) ||
+ (terms.get(did) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(
+ (NodeId) deserialize(masters.get(did)), terms.get(did));
}
@Override
- public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
- boolean removed = rawMasters.remove(serialize(deviceId), serialize(nodeId));
- masters.invalidate(deviceId);
- if (!removed) {
- return null;
- }
- Optional<NodeId> newMaster = masters.getUnchecked(deviceId);
- if (newMaster.isPresent()) {
- return new MastershipEvent(MASTER_CHANGED, deviceId, newMaster.get());
- } else {
- // FIXME: probably need to express NO_MASTER somehow.
- return null;
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+ MastershipEvent event = null;
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId);
+ backup(nid, did);
+ break;
+ case STANDBY:
+ //fall through to reinforce role
+ case NONE:
+ backup(nid, did);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ lock.unlock();
}
}
- private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
- public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
- super(cache);
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
+ MastershipEvent event = null;
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId);
+ evict(nid, did);
+ break;
+ case STANDBY:
+ //fall through to reinforce relinquishment
+ case NONE:
+ evict(nid, did);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ lock.unlock();
}
+ }
+
+ //helper to fetch a new master candidate for a given device.
+ private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(current);
+
+ //if this is an queue it'd be neater.
+ byte [] backup = null;
+ for (byte [] n : standbys.get(serialize(deviceId))) {
+ if (!current.equals(deserialize(n))) {
+ backup = n;
+ break;
+ }
+ }
+
+ if (backup == null) {
+ masters.remove(did, nid);
+ return null;
+ } else {
+ masters.put(did, backup);
+ evict(backup, did);
+ Integer term = terms.get(did);
+ terms.put(did, ++term);
+ return new MastershipEvent(
+ MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
+ }
+ }
+
+ //adds node to pool(s) of backups and moves them from unusable.
+ private void backup(byte [] nodeId, byte [] deviceId) {
+ if (!standbys.containsEntry(deviceId, nodeId)) {
+ standbys.put(deviceId, nodeId);
+ }
+ if (unusable.containsEntry(deviceId, nodeId)) {
+ unusable.remove(deviceId, nodeId);
+ }
+ }
+
+ //adds node to unusable and evicts it from backup pool.
+ private void evict(byte [] nodeId, byte [] deviceId) {
+ if (!unusable.containsEntry(deviceId, nodeId)) {
+ unusable.put(deviceId, nodeId);
+ }
+ if (standbys.containsEntry(deviceId, nodeId)) {
+ standbys.remove(deviceId, nodeId);
+ }
+ }
+
+ //adds or updates term information.
+ private void updateTerm(byte [] deviceId) {
+ Integer term = terms.get(deviceId);
+ if (term == null) {
+ terms.put(deviceId, INIT);
+ } else {
+ terms.put(deviceId, ++term);
+ }
+ }
+
+ private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
@Override
protected void onAdd(DeviceId deviceId, NodeId nodeId) {
@@ -151,12 +325,13 @@
@Override
protected void onRemove(DeviceId deviceId, NodeId nodeId) {
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
@Override
protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ //only addition indicates a change in mastership
+ //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
}
}
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..bf1bb38
--- /dev/null
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,333 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipEvent.Type;
+import org.onlab.onos.cluster.MastershipStoreDelegate;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.common.StoreManager;
+import org.onlab.onos.store.common.StoreService;
+import org.onlab.onos.store.common.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+ private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+ private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+ private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+
+ private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+ private static final NodeId N1 = new NodeId("node1");
+ private static final NodeId N2 = new NodeId("node2");
+
+ private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+ private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+ private DistributedMastershipStore dms;
+ private TestDistributedMastershipStore testStore;
+ private KryoSerializer serializationMgr;
+ private StoreManager storeMgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // TODO should find a way to clean Hazelcast instance without shutdown.
+ Config config = TestStoreManager.getTestConfig();
+
+ storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+ storeMgr.activate();
+
+ serializationMgr = new KryoSerializer();
+
+ dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+ dms.clusterService = new TestClusterService();
+ dms.activate();
+
+ testStore = (TestDistributedMastershipStore) dms;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dms.deactivate();
+
+ storeMgr.deactivate();
+ }
+
+ @Test
+ public void getRole() {
+ assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+ testStore.put(DID1, N1, true, false, true);
+ assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+ assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+ }
+
+ @Test
+ public void getMaster() {
+ assertTrue("wrong store state:", dms.masters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ assertEquals("wrong master:", N1, dms.getMaster(DID1));
+ assertNull("wrong master:", dms.getMaster(DID2));
+ }
+
+ @Test
+ public void getDevices() {
+ assertTrue("wrong store state:", dms.masters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ testStore.put(DID2, N1, true, false, false);
+ testStore.put(DID3, N2, true, false, false);
+
+ assertEquals("wrong devices",
+ Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+ }
+
+ @Test
+ public void requestRoleAndTerm() {
+ //CN1 is "local"
+ testStore.setCurrent(CN1);
+
+ //if already MASTER, nothing should happen
+ testStore.put(DID2, N1, true, false, false);
+ assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+
+ //populate maps with DID1, N1 thru NONE case
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertTrue("wrong state for store:", !dms.terms.isEmpty());
+ assertEquals("wrong term",
+ MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+ //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong number of entries:", 2, dms.terms.size());
+
+ //change term and requestRole() again; should persist
+ testStore.increment(DID2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void setMaster() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+ //switch over to N2
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+ //orphan switch - should be rare case
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+ //disconnect and reconnect - sign of failing re-election or single-instance channel
+ testStore.reset(true, false, false);
+ dms.setMaster(N2, DID2);
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void relinquishRole() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ //no backup, no new MASTER/event
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+
+ dms.requestRole(DID1);
+
+ //add backup CN2, get it elected MASTER by relinquishing
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
+ assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+ //STANDBY - nothing here, either
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+ assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
+
+ //all nodes "give up" on device, which goes back to NONE.
+ assertNull("wrong event:", dms.relinquishRole(N2, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
+
+ assertEquals("wrong number of retired nodes", 2, dms.unusable.size());
+
+ //bring nodes back
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong number of backup nodes", 1, dms.standbys.size());
+
+ //NONE - nothing happens
+ assertNull("wrong event:", dms.relinquishRole(N1, DID2));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+
+ }
+
+ @Ignore("Ignore until Delegate spec. is clear.")
+ @Test
+ public void testEvents() throws InterruptedException {
+ //shamelessly copy other distributed store tests
+ final CountDownLatch addLatch = new CountDownLatch(1);
+
+ MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() {
+ @Override
+ public void notify(MastershipEvent event) {
+ assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
+ assertEquals("wrong subject", DID1, event.subject());
+ assertEquals("wrong subject", N1, event.master());
+ addLatch.countDown();
+ }
+ };
+
+ dms.setDelegate(checkAdd);
+ dms.setMaster(N1, DID1);
+ //this will fail until we do something about single-instance-ness
+ assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ private class TestDistributedMastershipStore extends
+ DistributedMastershipStore {
+ public TestDistributedMastershipStore(StoreService storeService,
+ KryoSerializer kryoSerialization) {
+ this.storeService = storeService;
+ this.serializer = kryoSerialization;
+ }
+
+ //helper to populate master/backup structures
+ public void put(DeviceId dev, NodeId node,
+ boolean master, boolean backup, boolean term) {
+ byte [] n = serialize(node);
+ byte [] d = serialize(dev);
+
+ if (master) {
+ dms.masters.put(d, n);
+ dms.unusable.put(d, n);
+ dms.standbys.remove(d, n);
+ }
+ if (backup) {
+ dms.standbys.put(d, n);
+ dms.masters.remove(d, n);
+ dms.unusable.remove(d, n);
+ }
+ if (term) {
+ dms.terms.put(d, 0);
+ }
+ }
+
+ //a dumb utility function.
+ public void dump() {
+ System.out.println("standbys");
+ for (Map.Entry<byte [], byte []> e : standbys.entrySet()) {
+ System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
+ }
+ System.out.println("unusable");
+ for (Map.Entry<byte [], byte []> e : unusable.entrySet()) {
+ System.out.println(deserialize(e.getKey()) + ":" + deserialize(e.getValue()));
+ }
+ }
+
+ //clears structures
+ public void reset(boolean store, boolean backup, boolean term) {
+ if (store) {
+ dms.masters.clear();
+ dms.unusable.clear();
+ }
+ if (backup) {
+ dms.standbys.clear();
+ }
+ if (term) {
+ dms.terms.clear();
+ }
+ }
+
+ //increment term for a device
+ public void increment(DeviceId dev) {
+ Integer t = dms.terms.get(serialize(dev));
+ if (t != null) {
+ dms.terms.put(serialize(dev), ++t);
+ }
+ }
+
+ //sets the "local" node
+ public void setCurrent(ControllerNode node) {
+ ((TestClusterService) clusterService).current = node;
+ }
+ }
+
+ private class TestClusterService implements ClusterService {
+
+ protected ControllerNode current;
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return current;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(CN1, CN2);
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+
+ }
+
+}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index d49e00b..084435f 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -43,8 +43,8 @@
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
- private final Multimap<ApplicationId, FlowRule> flowEntriesById =
- ArrayListMultimap.<ApplicationId, FlowRule>create();
+ private final Multimap<Short, FlowRule> flowEntriesById =
+ ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
@@ -83,7 +83,7 @@
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
- Collection<FlowRule> rules = flowEntriesById.get(appId);
+ Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 7ff797c..2d50851 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -42,8 +42,8 @@
private final Multimap<DeviceId, FlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, FlowEntry>create();
- private final Multimap<ApplicationId, FlowRule> flowEntriesById =
- ArrayListMultimap.<ApplicationId, FlowRule>create();
+ private final Multimap<Short, FlowRule> flowEntriesById =
+ ArrayListMultimap.<Short, FlowRule>create();
@Activate
public void activate() {
@@ -82,7 +82,7 @@
@Override
public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
- Collection<FlowRule> rules = flowEntriesById.get(appId);
+ Collection<FlowRule> rules = flowEntriesById.get(appId.id());
if (rules == null) {
return Collections.emptyList();
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
index 0439d79..e8096ea 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
@@ -174,7 +174,7 @@
}
@Override
- public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
@@ -214,4 +214,9 @@
return backup;
}
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ return setStandby(nodeId, deviceId);
+ }
+
}
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
index 32d3d68..1e8e5c7 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -129,22 +129,22 @@
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
- sms.unsetMaster(N1, DID1);
+ sms.setStandby(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
- sms.unsetMaster(N1, DID1);
+ sms.setStandby(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
- assertNull("wrong event", sms.unsetMaster(N1, DID1));
+ assertNull("wrong event", sms.setStandby(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
- assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
+ assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
}
//helper to populate master/backup structures
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
index 178041d..5be7c69 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
@@ -981,11 +981,13 @@
// switch was a duplicate-dpid, calling the method below would clear
// all state for the original switch (with the same dpid),
// which we obviously don't want.
+ log.info("{}:removal called");
sw.removeConnectedSwitch();
} else {
// A duplicate was disconnected on this ChannelHandler,
// this is the same switch reconnecting, but the original state was
// not cleaned up - XXX check liveness of original ChannelHandler
+ log.info("{}:duplicate found");
duplicateDpidFound = Boolean.FALSE;
}
} else {
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
index e8ebcd1..716f7ec 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -307,9 +307,11 @@
connectedSwitches.remove(dpid);
OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
if (sw == null) {
+ log.warn("sw was null for {}", dpid);
sw = activeEqualSwitches.remove(dpid);
}
for (OpenFlowSwitchListener l : ofSwitchListener) {
+ log.warn("removal for {}", dpid);
l.switchRemoved(dpid);
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 78f5874..9568f1f 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -27,6 +27,8 @@
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction;
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction;
import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFlowAdd;
+import org.projectfloodlight.openflow.protocol.OFFlowDelete;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowModFlags;
import org.projectfloodlight.openflow.protocol.action.OFAction;
@@ -68,12 +70,13 @@
this.cookie = flowRule.id();
}
- public OFFlowMod buildFlowAdd() {
+ public OFFlowAdd buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
- OFFlowMod fm = factory.buildFlowAdd()
+ OFFlowAdd fm = factory.buildFlowAdd()
+ .setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -92,6 +95,7 @@
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
+ .setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -104,11 +108,12 @@
}
- public OFFlowMod buildFlowDel() {
+ public OFFlowDelete buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
- OFFlowMod fm = factory.buildFlowDelete()
+ OFFlowDelete fm = factory.buildFlowDelete()
+ .setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 0aca754..ac0bb61 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,6 +2,7 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -21,9 +22,12 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
@@ -40,6 +44,7 @@
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -52,6 +57,11 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
@@ -70,6 +80,8 @@
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
+ enum BatchState { STARTED, FINISHED, CANCELLED };
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -88,6 +100,9 @@
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
+ private final Map<Long, InstallationFuture> pendingFMs =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -143,9 +158,47 @@
removeFlowRule(flowRules);
}
+ @Override
+ public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
+ OFFlowMod mod = null;
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ FlowModBuilder builder = new FlowModBuilder(flowRule, sw.factory());
+ switch (fbe.getOperator()) {
+ case ADD:
+ mod = builder.buildFlowAdd();
+ break;
+ case REMOVE:
+ mod = builder.buildFlowDel();
+ break;
+ case MODIFY:
+ mod = builder.buildFlowMod();
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ if (mod != null) {
+ sw.sendMsg(mod);
+ fmXids.put(mod.getXid(), fbe);
+ } else {
+ log.error("Conversion of flowrule {} failed.", flowRule);
+ }
- //TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
- // possibly barriers as well. May not be internal at all...
+ }
+ InstallationFuture installation = new InstallationFuture(sws, fmXids);
+ for (Long xid : fmXids.keySet()) {
+ pendingFMs.put(xid, installation);
+ }
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
@@ -175,7 +228,6 @@
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
- //TODO: make this better
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
@@ -191,7 +243,7 @@
}
break;
case ERROR:
- future = pendingFutures.get(msg.getXid());
+ future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
@@ -203,10 +255,7 @@
}
@Override
- public void roleAssertFailed(Dpid dpid, RoleState role) {
- // TODO Auto-generated method stub
-
- }
+ public void roleAssertFailed(Dpid dpid, RoleState role) {}
private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
if (stats.getStatsType() != OFStatsType.FLOW) {
@@ -230,7 +279,6 @@
}
private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
- // TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
if (reply.getVersion().equals(OFVersion.OF_10) ||
reply.getMatch().getMatchFields().iterator().hasNext()) {
return false;
@@ -251,104 +299,91 @@
}
return false;
}
-
}
-
- @Override
- public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
- final Set<Dpid> sws = new HashSet<Dpid>();
-
- for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- FlowRule flowRule = fbe.getTarget();
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sws.add(new Dpid(sw.getId()));
- switch (fbe.getOperator()) {
- case ADD:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
- break;
- case REMOVE:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
- break;
- case MODIFY:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
- break;
- default:
- log.error("Unsupported batch operation {}", fbe.getOperator());
- }
- }
- InstallationFuture installation = new InstallationFuture(sws);
- pendingFutures.put(U32.f(batch.hashCode()), installation);
- installation.verify(batch.hashCode());
- return installation;
- }
-
- private class InstallationFuture implements Future<Void> {
+ private class InstallationFuture implements Future<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final Map<Long, FlowRuleBatchEntry> fms;
+
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
+ private Integer pendingXid;
+ private BatchState state;
- public InstallationFuture(Set<Dpid> sws) {
+ public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
+ this.state = BatchState.STARTED;
this.sws = sws;
+ this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
- //TODO add reason to flowentry
+ FlowEntry fe = null;
+ FlowRuleBatchEntry fbe = fms.get(msg.getXid());
+ FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
- //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
+ OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
+ bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
+ OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
+ badins.getCode().ordinal());
break;
case BAD_MATCH:
+ OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
+ badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
- break;
- case EXPERIMENTER:
+ OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
+ badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
+ OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
+ fmFail.getCode().ordinal());
break;
+ case EXPERIMENTER:
case GROUP_MOD_FAILED:
- break;
case HELLO_FAILED:
- break;
case METER_MOD_FAILED:
- break;
case PORT_MOD_FAILED:
- break;
case QUEUE_OP_FAILED:
- break;
case ROLE_REQUEST_FAILED:
- break;
case SWITCH_CONFIG_FAILED:
- break;
case TABLE_FEATURES_FAILED:
- break;
case TABLE_MOD_FAILED:
+ fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
break;
default:
- break;
+ log.error("Unknown error type {}", msg.getErrType());
}
+ offendingFlowMods.add(fe);
}
+
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
- sws.remove(controller.getSwitch(dpid));
+ sws.remove(dpid);
countDownLatch.countDown();
+ cleanUp();
+
}
+
public void verify(Integer id) {
+ pendingXid = id;
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
@@ -356,41 +391,59 @@
.setXid(id);
sw.sendMsg(builder.build());
}
-
-
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO Auto-generated method stub
- return false;
+ this.state = BatchState.CANCELLED;
+ cleanUp();
+ for (FlowRuleBatchEntry fbe : fms.values()) {
+ if (fbe.getOperator() == FlowRuleOperation.ADD ||
+ fbe.getOperator() == FlowRuleOperation.MODIFY) {
+ removeFlowRule(fbe.getTarget());
+ } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+ applyRule(fbe.getTarget());
+ }
+
+ }
+ return isCancelled();
}
@Override
public boolean isCancelled() {
- // TODO Auto-generated method stub
- return false;
+ return this.state == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
- return sws.isEmpty();
+ return this.state == BatchState.FINISHED;
}
@Override
- public Void get() throws InterruptedException, ExecutionException {
+ public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
- //return offendingFlowMods;
- return null;
+ this.state = BatchState.FINISHED;
+ return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
@Override
- public Void get(long timeout, TimeUnit unit)
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- countDownLatch.await(timeout, unit);
- //return offendingFlowMods;
- return null;
+ if (countDownLatch.await(timeout, unit)) {
+ this.state = BatchState.FINISHED;
+ return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ }
+ throw new TimeoutException();
+ }
+
+ private void cleanUp() {
+ if (sws.isEmpty()) {
+ pendingFutures.remove(pendingXid);
+ for (Long xid : fms.keySet()) {
+ pendingFMs.remove(xid);
+ }
+ }
}
}
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index 6c1444f..a17ae2a 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -33,6 +33,7 @@
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
+alias ow='onos-watch'
alias go='ob && ot && onos -w'
alias pub='onos-push-update-bundle'
diff --git a/tools/dev/bin/onos-build-selective-hook b/tools/dev/bin/onos-build-selective-hook
index 233ab1a..0b39713 100755
--- a/tools/dev/bin/onos-build-selective-hook
+++ b/tools/dev/bin/onos-build-selective-hook
@@ -1,6 +1,6 @@
#------------------------------------------------------------------------------
-# Echoes project-level directory if a Java file within is newer than its
-# class file counterpart
+# Echoes project-level directory if a Java file within is newer than the
+# target directory.
#------------------------------------------------------------------------------
javaFile=${1#*\/src\/*\/java/}
@@ -10,9 +10,7 @@
src=${1/$javaFile/}
project=${src/src*/}
-classFile=${javaFile/.java/.class}
+target=$project/target
-[ ${project}target/classes/$classFile -nt ${src}$javaFile -o \
- ${project}target/test-classes/$classFile -nt ${src}$javaFile ] \
- || echo ${src/src*/}
+[ $target -nt ${src}$javaFile ] || echo ${src/src*/}
diff --git a/tools/test/bin/onos-push-update-bundle b/tools/test/bin/onos-push-update-bundle
index 4f8ca7d..d700027 100755
--- a/tools/test/bin/onos-push-update-bundle
+++ b/tools/test/bin/onos-push-update-bundle
@@ -7,7 +7,7 @@
. $ONOS_ROOT/tools/build/envDefaults
cd ~/.m2/repository
-jar=$(find org/onlab -type f -name '*.jar' | grep $1 | grep -v -e -tests | head -n 1)
+jar=$(find org/onlab -type f -name '*.jar' | grep -e $1 | grep -v -e -tests | head -n 1)
[ -z "$jar" ] && echo "No bundle $1 found for" && exit 1
diff --git a/tools/test/bin/onos-watch b/tools/test/bin/onos-watch
new file mode 100755
index 0000000..a9eb0e3
--- /dev/null
+++ b/tools/test/bin/onos-watch
@@ -0,0 +1,17 @@
+#!/bin/bash
+#-------------------------------------------------------------------------------
+# Monitors selected set of ONOS commands using the system watch command.
+#-------------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+node=${1:-$OCI}
+
+commands="${2:-summary,intents,flows,hosts}"
+
+aux=/tmp/onos-watch.$$
+trap "rm -f $aux" EXIT
+
+echo "$commands" | tr ',' '\n' > $aux
+watch $3 "onos $node -b <$aux 2>/dev/null"
diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
index b098513..8494d4e 100644
--- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
+++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
@@ -11,6 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
@@ -18,7 +19,6 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.Timer;
/**
@@ -70,16 +70,20 @@
/**
* Default Reporter for this metrics manager.
*/
- private final Slf4jReporter reporter;
+ //private final Slf4jReporter reporter;
+ private final ConsoleReporter reporter;
public MetricsManager() {
this.metricsRegistry = new MetricRegistry();
- this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
- .outputTo(log)
+// this.reporter = Slf4jReporter.forRegistry(this.metricsRegistry)
+// .outputTo(log)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MICROSECONDS)
+// .build();
+ this.reporter = ConsoleReporter.forRegistry(this.metricsRegistry)
.convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.NANOSECONDS)
+ .convertDurationsTo(TimeUnit.MICROSECONDS)
.build();
- reporter.start(1, TimeUnit.MINUTES);
}
@Activate