Merge "Add constraint for waypoints"
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index f7f6478..117a9a0 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -23,6 +23,10 @@
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipListener;
+import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
@@ -50,15 +54,20 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
+ private final MastershipListener mastershipListener = new InnerMastershipListener();
@Activate
public void activate() {
clusterService.addListener(clusterListener);
deviceService.addListener(deviceListener);
intentService.addListener(intentListener);
+ mastershipService.addListener(mastershipListener);
log.info("Started");
}
@@ -67,6 +76,7 @@
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
intentService.removeListener(intentListener);
+ mastershipService.removeListener(mastershipListener);
log.info("Stopped");
}
@@ -100,6 +110,18 @@
log.info(message, event.subject());
}
}
+
+ private class InnerMastershipListener implements MastershipListener {
+ @Override
+ public void event(MastershipEvent event) {
+ final NodeId myId = clusterService.getLocalNode().id();
+ if (myId.equals(event.roleInfo().master())) {
+ log.info("I have control/I wish you luck {}", event);
+ } else {
+ log.info("you have control {}", event);
+ }
+ }
+ }
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
index 7d3bfc2..13f3000 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java
@@ -15,14 +15,16 @@
*/
package org.onlab.onos.cli.net;
+import java.util.List;
+
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
-import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.HostToHostIntent;
import org.onlab.onos.net.intent.IntentService;
@@ -31,7 +33,7 @@
*/
@Command(scope = "onos", name = "add-host-intent",
description = "Installs host-to-host connectivity intent")
-public class AddHostToHostIntentCommand extends AbstractShellCommand {
+public class AddHostToHostIntentCommand extends ConnectivityIntentCommand {
@Argument(index = 0, name = "one", description = "One host ID",
required = true, multiValued = false)
@@ -50,9 +52,11 @@
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
+ List<Constraint> constraints = buildConstraints();
HostToHostIntent intent = new HostToHostIntent(appId(), oneId, twoId,
- selector, treatment);
+ selector, treatment,
+ constraints);
service.submit(intent);
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java
index 9e8e2fc..1cd695e 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java
@@ -23,11 +23,13 @@
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import static org.onlab.onos.net.DeviceId.deviceId;
@@ -69,9 +71,11 @@
TrafficSelector selector = buildTrafficSelector();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
+ List<Constraint> constraints = buildConstraints();
Intent intent = new MultiPointToSinglePointIntent(appId(), selector, treatment,
- ingressPoints, egress);
+ ingressPoints, egress,
+ constraints);
service.submit(intent);
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
index ed98c7e..26bb1c0 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java
@@ -15,6 +15,8 @@
*/
package org.onlab.onos.cli.net;
+import java.util.List;
+
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.net.ConnectPoint;
@@ -22,6 +24,7 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
@@ -63,8 +66,10 @@
TrafficSelector selector = buildTrafficSelector();
TrafficTreatment treatment = builder().build();
+ List<Constraint> constraints = buildConstraints();
+
Intent intent = new PointToPointIntent(appId(), selector, treatment,
- ingress, egress);
+ ingress, egress, constraints);
service.submit(intent);
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java
index d8ec3c7..e4fc5aa 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java
@@ -15,14 +15,21 @@
*/
package org.onlab.onos.cli.net;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.karaf.shell.commands.Option;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.intent.Constraint;
+import org.onlab.onos.net.intent.constraint.BandwidthConstraint;
+import org.onlab.onos.net.intent.constraint.LambdaConstraint;
+import org.onlab.onos.net.resource.Bandwidth;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
-import com.google.common.base.Strings;
+import static com.google.common.base.Strings.isNullOrEmpty;
/**
* Base class for command line operations for connectivity based intents.
@@ -41,6 +48,14 @@
required = false, multiValued = false)
private String ethTypeString = "";
+ @Option(name = "-b", aliases = "--bandwidth", description = "Bandwidth",
+ required = false, multiValued = false)
+ private String bandwidthString = "";
+
+ @Option(name = "-l", aliases = "--lambda", description = "Lambda",
+ required = false, multiValued = false)
+ private boolean lambda = false;
+
/**
* Constructs a traffic selector based on the command line arguments
* presented to the command.
@@ -50,21 +65,43 @@
TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder();
Short ethType = Ethernet.TYPE_IPV4;
- if (!Strings.isNullOrEmpty(ethTypeString)) {
+ if (!isNullOrEmpty(ethTypeString)) {
EthType ethTypeParameter = EthType.valueOf(ethTypeString);
ethType = ethTypeParameter.value();
}
selectorBuilder.matchEthType(ethType);
- if (!Strings.isNullOrEmpty(srcMacString)) {
+ if (!isNullOrEmpty(srcMacString)) {
selectorBuilder.matchEthSrc(MacAddress.valueOf(srcMacString));
}
- if (!Strings.isNullOrEmpty(dstMacString)) {
+ if (!isNullOrEmpty(dstMacString)) {
selectorBuilder.matchEthDst(MacAddress.valueOf(dstMacString));
}
return selectorBuilder.build();
}
+ /**
+ * Builds the constraint list for this command based on the command line
+ * parameters.
+ *
+ * @return List of constraint objects describing the constraints requested
+ */
+ protected List<Constraint> buildConstraints() {
+ final List<Constraint> constraints = new LinkedList<>();
+
+ // Check for a bandwidth specification
+ if (!isNullOrEmpty(bandwidthString)) {
+ final double bandwidthValue = Double.parseDouble(bandwidthString);
+ constraints.add(new BandwidthConstraint(Bandwidth.valueOf(bandwidthValue)));
+ }
+
+ // Check for a lambda specification
+ if (lambda) {
+ constraints.add(new LambdaConstraint(null));
+ }
+
+ return constraints;
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
index 3fad93d..893270a 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
@@ -105,6 +105,7 @@
.add("appId", appId())
.add("selector", selector())
.add("treatment", treatment())
+ .add("constraints", constraints())
.add("one", one)
.add("two", two)
.toString();
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
index 0c47aad..90907fb 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
@@ -22,6 +22,7 @@
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
+import java.util.List;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
@@ -65,6 +66,38 @@
}
/**
+ * Creates a new multi-to-single point connectivity intent for the specified
+ * traffic selector and treatment.
+ *
+ * @param appId application identifier
+ * @param selector traffic selector
+ * @param treatment treatment
+ * @param ingressPoints set of ports from which ingress traffic originates
+ * @param egressPoint port to which traffic will egress
+ * @param constraints constraints to apply to the intent
+ * @throws NullPointerException if {@code ingressPoints} or
+ * {@code egressPoint} is null.
+ * @throws IllegalArgumentException if the size of {@code ingressPoints} is
+ * not more than 1
+ */
+ public MultiPointToSinglePointIntent(ApplicationId appId,
+ TrafficSelector selector,
+ TrafficTreatment treatment,
+ Set<ConnectPoint> ingressPoints,
+ ConnectPoint egressPoint,
+ List<Constraint> constraints) {
+ super(id(MultiPointToSinglePointIntent.class, selector, treatment,
+ ingressPoints, egressPoint), appId, null, selector, treatment,
+ constraints);
+
+ checkNotNull(ingressPoints);
+ checkArgument(!ingressPoints.isEmpty(), "Ingress point set cannot be empty");
+
+ this.ingressPoints = Sets.newHashSet(ingressPoints);
+ this.egressPoint = checkNotNull(egressPoint);
+ }
+
+ /**
* Constructor for serializer.
*/
protected MultiPointToSinglePointIntent() {
@@ -101,6 +134,7 @@
.add("treatment", treatment())
.add("ingress", ingressPoints())
.add("egress", egressPoint())
+ .add("constraints", constraints())
.toString();
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
index 9189bae..9f8816d 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
@@ -15,6 +15,8 @@
*/
package org.onlab.onos.net.intent;
+import java.util.List;
+
import com.google.common.base.MoreObjects;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.Path;
@@ -46,6 +48,24 @@
}
/**
+ * Creates a new point-to-point intent with the supplied ingress/egress
+ * ports and using the specified explicit path.
+ *
+ * @param appId application identifier
+ * @param selector traffic selector
+ * @param treatment treatment
+ * @param path traversed links
+ * @param constraints optional list of constraints
+ * @throws NullPointerException {@code path} is null
+ */
+ public PathIntent(ApplicationId appId, TrafficSelector selector,
+ TrafficTreatment treatment, Path path, List<Constraint> constraints) {
+ super(id(PathIntent.class, selector, treatment, path, constraints), appId,
+ resources(path.links()), selector, treatment, constraints);
+ this.path = path;
+ }
+
+ /**
* Constructor for serializer.
*/
protected PathIntent() {
@@ -75,6 +95,7 @@
.add("appId", appId())
.add("selector", selector())
.add("treatment", treatment())
+ .add("constraints", constraints())
.add("path", path)
.toString();
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java
index 4cf1830..8a31900 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java
@@ -118,13 +118,14 @@
@Override
public double weight(TopologyEdge edge) {
- if (constraints == null) {
+ if (constraints == null || !constraints.iterator().hasNext()) {
return 1.0;
}
// iterate over all constraints in order and return the weight of
// the first one with fast fail over the first failure
Iterator<Constraint> it = constraints.iterator();
+
double cost = it.next().cost(edge.link(), resourceService);
while (it.hasNext() && cost > 0) {
if (it.next().cost(edge.link(), resourceService) < 0) {
@@ -132,6 +133,7 @@
}
}
return cost;
+
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
index 605d3c7..4d989ac 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
@@ -70,7 +70,8 @@
HostToHostIntent intent) {
TrafficSelector selector = builder(intent.selector())
.matchEthSrc(src.mac()).matchEthDst(dst.mac()).build();
- return new PathIntent(intent.appId(), selector, intent.treatment(), path);
+ return new PathIntent(intent.appId(), selector, intent.treatment(),
+ path, intent.constraints());
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 31df87a..af2af24 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -77,7 +77,7 @@
@Service
public class IntentManager
implements IntentService, IntentExtensionService {
- private final Logger log = getLogger(getClass());
+ private static final Logger log = getLogger(IntentManager.class);
public static final String INTENT_NULL = "Intent cannot be null";
public static final String INTENT_ID_NULL = "Intent ID cannot be null";
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
index c32c8ee..5c8eb94 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java
@@ -77,7 +77,8 @@
private Intent createPathIntent(Path path,
PointToPointIntent intent) {
return new PathIntent(intent.appId(),
- intent.selector(), intent.treatment(), path);
+ intent.selector(), intent.treatment(), path,
+ intent.constraints());
}
}
diff --git a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
index ddd805f..33961e4 100644
--- a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
@@ -28,6 +28,7 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.mastership.MastershipService;
+import org.onlab.onos.mastership.MastershipStore;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.trivial.impl.SimpleMastershipStore;
@@ -57,9 +58,9 @@
public void setUp() {
mgr = new MastershipManager();
service = mgr;
- mgr.store = new SimpleMastershipStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.clusterService = new TestClusterService();
+ mgr.store = new TestSimpleMastershipStore(mgr.clusterService);
mgr.activate();
}
@@ -74,7 +75,8 @@
@Test
public void setRole() {
mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
- assertEquals("wrong local role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
+ assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
+ assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
//set to master
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
@@ -182,4 +184,12 @@
}
}
+
+ private final class TestSimpleMastershipStore extends SimpleMastershipStore
+ implements MastershipStore {
+
+ public TestSimpleMastershipStore(ClusterService clusterService) {
+ super.clusterService = clusterService;
+ }
+ }
}
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 248e763..f0f37c7 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -64,6 +64,12 @@
-->
<dependency>
+ <groupId>org.mapdb</groupId>
+ <artifactId>mapdb</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index 42e0799..6c2ad6a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -283,16 +283,15 @@
case MASTER:
NodeId newMaster = reelect(nodeId, deviceId, rv);
rv.reassign(nodeId, NONE, STANDBY);
+ updateTerm(deviceId);
if (newMaster != null) {
- updateTerm(deviceId);
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
} else {
// no master candidate
roleMap.put(deviceId, rv);
- // FIXME: Should there be new event type?
- // or should we issue null Master event?
- return null;
+ // TODO: Should there be new event type for no MASTER?
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
}
case STANDBY:
return null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
index a22464a..9f7af4a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
@@ -1,5 +1,9 @@
package org.onlab.onos.store.service;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
import com.google.common.base.MoreObjects;
/**
@@ -10,9 +14,21 @@
private final String tableName;
private final String key;
+ /**
+ * Creates a read request,
+ * which will retrieve the specified key from the table.
+ *
+ * @param tableName name of the table
+ * @param key key in the table
+ * @return ReadRequest
+ */
+ public static ReadRequest get(String tableName, String key) {
+ return new ReadRequest(tableName, key);
+ }
+
public ReadRequest(String tableName, String key) {
- this.tableName = tableName;
- this.key = key;
+ this.tableName = checkNotNull(tableName);
+ this.key = checkNotNull(key);
}
/**
@@ -38,4 +54,26 @@
.add("key", key)
.toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, tableName);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ReadRequest other = (ReadRequest) obj;
+ return Objects.equals(this.key, other.key) &&
+ Objects.equals(this.tableName, other.tableName);
+ }
+
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
index 852fb07..ae6969c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
@@ -38,6 +38,28 @@
return version;
}
+ /**
+ * Creates a copy of given VersionedValue.
+ *
+ * @param original VersionedValue to create a copy
+ * @return same as original if original or it's value is null,
+ * otherwise creates a copy.
+ */
+ public static VersionedValue copy(VersionedValue original) {
+ if (original == null) {
+ return null;
+ }
+ if (original.value == null) {
+ // immutable, no need to copy
+ return original;
+ } else {
+ return new VersionedValue(
+ Arrays.copyOf(original.value,
+ original.value.length),
+ original.version);
+ }
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 0c2aacd..582e00c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -132,8 +132,8 @@
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
- log.warn("Request to {} failed. Will retry "
- + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
+ log.warn("{} Request to {} failed. Will retry in {} ms",
+ message.subject(), remoteNode, RETRY_INTERVAL_MILLIS);
THREAD_POOL.schedule(
this,
RETRY_INTERVAL_MILLIS,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 7d94847..b3eaeb4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -3,12 +3,17 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -57,37 +62,37 @@
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to ping request", e);
- }
- });
+ handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to poll request", e);
- }
- });
+ handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to sync request", e);
- }
- });
+ handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+ handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
+ } else {
+ throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
+ }
+ }
+
+ private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+
+ private final ClusterMessage message;
+
+ public PostExecutionTask(ClusterMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void accept(R response, Throwable t) {
+ if (t != null) {
+ log.error("Processing for " + message.subject() + " failed.", t);
+ } else {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
- log.error("Failed to respond to submit request", e);
+ log.error("Failed to respond to " + response.getClass().getName(), e);
}
- });
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 2822f25..9662976 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -5,8 +5,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-
import net.kuujo.copycat.Command;
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
@@ -20,6 +18,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
@@ -64,8 +63,8 @@
}
@Query
- public Set<String> listTables() {
- return state.getTables().keySet();
+ public List<String> listTables() {
+ return ImmutableList.copyOf(state.getTables().keySet());
}
@Query
@@ -77,7 +76,7 @@
results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
continue;
}
- VersionedValue value = table.get(request.key());
+ VersionedValue value = VersionedValue.copy(table.get(request.key()));
results.add(new InternalReadResult(
InternalReadResult.Status.OK,
new ReadResult(
@@ -90,6 +89,8 @@
@Command
public List<InternalWriteResult> write(List<WriteRequest> requests) {
+
+ // applicability check
boolean abort = false;
List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
for (WriteRequest request : requests) {
@@ -133,8 +134,13 @@
return results;
}
+ // apply changes
for (WriteRequest request : requests) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ // FIXME: If this method could be called by multiple thread,
+ // synchronization scope is wrong.
+ // Whole function including applicability check needs to be protected.
+ // Confirm copycat's thread safety requirement for StateMachine
synchronized (table) {
VersionedValue previousValue =
table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
@@ -166,8 +172,8 @@
try {
return SERIALIZER.encode(state);
} catch (Exception e) {
- log.error("Snapshot serialization error", e);
- return null;
+ log.error("Failed to take snapshot", e);
+ throw new SnapshotException(e);
}
}
@@ -176,7 +182,8 @@
try {
this.state = SERIALIZER.decode(data);
} catch (Exception e) {
- log.error("Snapshot deserialization error", e);
+ log.error("Failed to install from snapshot", e);
+ throw new SnapshotException(e);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
new file mode 100644
index 0000000..893c311
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -0,0 +1,280 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
+
+import org.mapdb.Atomic;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.TxBlock;
+import org.mapdb.TxMaker;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MapDB based log implementation.
+ */
+public class MapDBLog implements Log {
+
+ private final File dbFile;
+ private TxMaker txMaker;
+ private final StoreSerializer serializer;
+ private static final String LOG_NAME = "log";
+ private static final String SIZE_FIELD_NAME = "size";
+
+ public MapDBLog(File dbFile, StoreSerializer serializer) {
+ this.dbFile = dbFile;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open() throws IOException {
+ txMaker = DBMaker
+ .newFileDB(dbFile)
+ .makeTxMaker();
+ }
+
+ @Override
+ public void close() throws IOException {
+ assertIsOpen();
+ txMaker.close();
+ txMaker = null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return txMaker != null;
+ }
+
+ protected void assertIsOpen() {
+ checkState(isOpen(), "The log is not currently open.");
+ }
+
+ @Override
+ public long appendEntry(Entry entry) {
+ checkArgument(entry != null, "expecting non-null entry");
+ return appendEntries(entry).get(0);
+ }
+
+ @Override
+ public List<Long> appendEntries(Entry... entries) {
+ checkArgument(entries != null, "expecting non-null entries");
+ return appendEntries(Arrays.asList(entries));
+ }
+
+ @Override
+ public List<Long> appendEntries(List<Entry> entries) {
+ assertIsOpen();
+ checkArgument(entries != null, "expecting non-null entries");
+ final List<Long> indices = Lists.newArrayList();
+
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
+ for (Entry entry : entries) {
+ byte[] entryBytes = serializer.encode(entry);
+ log.put(nextIndex, entryBytes);
+ size.addAndGet(entryBytes.length);
+ indices.add(nextIndex);
+ nextIndex++;
+ }
+ }
+ });
+
+ return indices;
+ }
+
+ @Override
+ public boolean containsEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.containsKey(index);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ log.clear();
+ size.set(0);
+ }
+ });
+ }
+
+ @Override
+ public <T extends Entry> T firstEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long firstIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.firstKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> List<T> getEntries(long from, long to) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ if (log.isEmpty()) {
+ throw new LogIndexOutOfBoundsException("Log is empty");
+ } else if (from < log.firstKey()) {
+ throw new LogIndexOutOfBoundsException("From index out of bounds.");
+ } else if (to > log.lastKey()) {
+ throw new LogIndexOutOfBoundsException("To index out of bounds.");
+ }
+ List<T> entries = new ArrayList<>((int) (to - from + 1));
+ for (long i = from; i <= to; i++) {
+ T entry = serializer.decode(log.get(i));
+ entries.add(entry);
+ }
+ return entries;
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T getEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ byte[] entryBytes = log.get(index);
+ return entryBytes == null ? null : serializer.decode(entryBytes);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T lastEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long lastIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.lastKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void removeAfter(long index) {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long startIndex = index + 1;
+ long endIndex = log.lastKey();
+ for (long i = startIndex; i <= endIndex; ++i) {
+ byte[] entryBytes = log.remove(i);
+ size.addAndGet(-1L * entryBytes.length);
+ }
+ }
+ });
+ }
+
+ @Override
+ public long size() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ return size.get();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ assertIsOpen();
+ }
+
+ @Override
+ public void compact(long index, Entry entry) throws IOException {
+
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
+ long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
+ size.addAndGet(-1 * deletedBytes);
+ byte[] entryBytes = serializer.encode(entry);
+ byte[] existingEntry = log.put(index, entryBytes);
+ size.addAndGet(entryBytes.length - existingEntry.length);
+ db.compact();
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
new file mode 100644
index 0000000..4cfc13b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.DatabaseException;
+
+/**
+ * Exception that indicates a problem with the state machine snapshotting.
+ */
+@SuppressWarnings("serial")
+public class SnapshotException extends DatabaseException {
+ public SnapshotException(Throwable t) {
+ super(t);
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
new file mode 100644
index 0000000..75beefd
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
@@ -0,0 +1,193 @@
+package org.onlab.onos.store.service.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test the MapDBLog implementation.
+ */
+public class MapDBLogTest {
+
+ private static final String DB_FILE_NAME = "mapdbTest";
+ private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
+ private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
+ private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
+ private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
+ private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
+
+ private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
+
+ private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
+ private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
+ private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
+ private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
+
+ private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Files.deleteIfExists(new File(DB_FILE_NAME).toPath());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAssertOpen() {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.size();
+ }
+
+ @Test
+ public void testAppendEntry() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntry(TEST_ENTRY1);
+ OperationEntry first = log.firstEntry();
+ OperationEntry last = log.lastEntry();
+ new EqualsTester()
+ .addEqualityGroup(first, last, TEST_ENTRY1)
+ .testEquals();
+ Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+ Assert.assertEquals(1, log.firstIndex());
+ Assert.assertEquals(1, log.lastIndex());
+ }
+
+ @Test
+ public void testAppendEntries() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
+ OperationEntry first = log.firstEntry();
+ OperationEntry last = log.lastEntry();
+ new EqualsTester()
+ .addEqualityGroup(first, TEST_ENTRY1)
+ .addEqualityGroup(last, TEST_ENTRY3)
+ .testEquals();
+ Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
+ Assert.assertEquals(1, log.firstIndex());
+ Assert.assertEquals(3, log.lastIndex());
+ Assert.assertTrue(log.containsEntry(1));
+ Assert.assertTrue(log.containsEntry(2));
+ }
+
+ @Test
+ public void testDelete() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
+ log.delete();
+ Assert.assertEquals(0, log.size());
+ Assert.assertTrue(log.isEmpty());
+ Assert.assertEquals(0, log.firstIndex());
+ Assert.assertNull(log.firstEntry());
+ Assert.assertEquals(0, log.lastIndex());
+ Assert.assertNull(log.lastEntry());
+ }
+
+ @Test
+ public void testGetEntries() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ Assert.assertEquals(
+ TEST_ENTRY1_SIZE +
+ TEST_ENTRY2_SIZE +
+ TEST_ENTRY3_SIZE +
+ TEST_ENTRY4_SIZE, log.size());
+
+ List<Entry> entries = log.getEntries(2, 3);
+ new EqualsTester()
+ .addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
+ .addEqualityGroup(entries.get(0), TEST_ENTRY2)
+ .addEqualityGroup(entries.get(1), TEST_ENTRY3)
+ .testEquals();
+ }
+
+ @Test
+ public void testRemoveAfter() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.removeAfter(1);
+ Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
+ .testEquals();
+ }
+
+ @Test
+ public void testAddAfterRemove() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.removeAfter(1);
+ log.appendEntry(TEST_ENTRY4);
+ Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ Assert.assertFalse(log.isOpen());
+ log.open();
+ Assert.assertTrue(log.isOpen());
+ log.close();
+ Assert.assertFalse(log.isOpen());
+ }
+
+ @Test
+ public void testReopen() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.close();
+ log.open();
+
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+ .addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(),
+ TEST_ENTRY1_SIZE +
+ TEST_ENTRY2_SIZE +
+ TEST_ENTRY3_SIZE +
+ TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+
+ @Test
+ public void testCompact() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.compact(3, TEST_SNAPSHOT_ENTRY);
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(),
+ TEST_SNAPSHOT_ENTRY_SIZE +
+ TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
index 0f36393..62c084e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStore.java
@@ -29,8 +29,13 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.RoleInfo;
@@ -44,7 +49,8 @@
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import static org.onlab.onos.mastership.MastershipEvent.Type.*;
@@ -60,23 +66,65 @@
private final Logger log = getLogger(getClass());
- public static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
-
private static final int NOTHING = 0;
private static final int INIT = 1;
- private ControllerNode instance =
- new DefaultControllerNode(new NodeId("local"), LOCALHOST);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
//devices mapped to their masters, to emulate multiple nodes
protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
//emulate backups with pile of nodes
- protected final Set<NodeId> backups = new HashSet<>();
+ protected final Map<DeviceId, List<NodeId>> backups = new HashMap<>();
//terms
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
@Activate
public void activate() {
+ if (clusterService == null) {
+ // just for ease of unit test
+ final ControllerNode instance =
+ new DefaultControllerNode(new NodeId("local"),
+ IpAddress.valueOf("127.0.0.1"));
+
+ clusterService = new ClusterService() {
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return instance;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return ImmutableSet.of(instance);
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ if (instance.id().equals(nodeId)) {
+ return instance;
+ }
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ if (instance.id().equals(nodeId)) {
+ return State.ACTIVE;
+ } else {
+ return State.INACTIVE;
+ }
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+ };
+ }
log.info("Started");
}
@@ -86,31 +134,27 @@
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- MastershipRole role = getRole(nodeId, deviceId);
+ public synchronized MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- synchronized (this) {
- switch (role) {
- case MASTER:
- return null;
- case STANDBY:
- masterMap.put(deviceId, nodeId);
- termMap.get(deviceId).incrementAndGet();
- backups.add(nodeId);
- break;
- case NONE:
- masterMap.put(deviceId, nodeId);
- termMap.put(deviceId, new AtomicInteger(INIT));
- backups.add(nodeId);
- break;
- default:
- log.warn("unknown Mastership Role {}", role);
- return null;
- }
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ // no-op
+ return null;
+ case STANDBY:
+ case NONE:
+ NodeId prevMaster = masterMap.put(deviceId, nodeId);
+ incrementTerm(deviceId);
+ removeFromBackups(deviceId, nodeId);
+ addToBackup(deviceId, prevMaster);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
}
return new MastershipEvent(MASTER_CHANGED, deviceId,
- new RoleInfo(nodeId, Lists.newLinkedList(backups)));
+ getNodes(deviceId));
}
@Override
@@ -118,12 +162,11 @@
return masterMap.get(deviceId);
}
+ // synchronized for atomic read
@Override
- public RoleInfo getNodes(DeviceId deviceId) {
- List<NodeId> nodes = new ArrayList<>();
- nodes.addAll(backups);
-
- return new RoleInfo(masterMap.get(deviceId), nodes);
+ public synchronized RoleInfo getNodes(DeviceId deviceId) {
+ return new RoleInfo(masterMap.get(deviceId),
+ backups.getOrDefault(deviceId, ImmutableList.of()));
}
@Override
@@ -134,69 +177,97 @@
ids.add(d.getKey());
}
}
- return Collections.unmodifiableSet(ids);
+ return ids;
}
@Override
- public MastershipRole requestRole(DeviceId deviceId) {
+ public synchronized MastershipRole requestRole(DeviceId deviceId) {
//query+possible reelection
- NodeId node = instance.id();
+ NodeId node = clusterService.getLocalNode().id();
MastershipRole role = getRole(node, deviceId);
switch (role) {
case MASTER:
- break;
+ return MastershipRole.MASTER;
case STANDBY:
- synchronized (this) {
- //try to "re-elect", since we're really not distributed
- NodeId rel = reelect(node);
- if (rel == null) {
- masterMap.put(deviceId, node);
- termMap.put(deviceId, new AtomicInteger(INIT));
- role = MastershipRole.MASTER;
- }
- backups.add(node);
- }
- break;
- case NONE:
- //first to get to it, say we are master
- synchronized (this) {
+ if (getMaster(deviceId) == null) {
+ // no master => become master
masterMap.put(deviceId, node);
- termMap.put(deviceId, new AtomicInteger(INIT));
- backups.add(node);
- role = MastershipRole.MASTER;
+ incrementTerm(deviceId);
+ // remove from backup list
+ removeFromBackups(deviceId, node);
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
+ getNodes(deviceId)));
+ return MastershipRole.MASTER;
}
- break;
+ return MastershipRole.STANDBY;
+ case NONE:
+ if (getMaster(deviceId) == null) {
+ // no master => become master
+ masterMap.put(deviceId, node);
+ incrementTerm(deviceId);
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
+ getNodes(deviceId)));
+ return MastershipRole.MASTER;
+ }
+ // add to backup list
+ if (addToBackup(deviceId, node)) {
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
+ getNodes(deviceId)));
+ }
+ return MastershipRole.STANDBY;
default:
log.warn("unknown Mastership Role {}", role);
}
return role;
}
+ // add to backup if not there already, silently ignores null node
+ private synchronized boolean addToBackup(DeviceId deviceId, NodeId nodeId) {
+ boolean modified = false;
+ List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
+ if (nodeId != null && !stbys.contains(nodeId)) {
+ stbys.add(nodeId);
+ modified = true;
+ }
+ backups.put(deviceId, stbys);
+ return modified;
+ }
+
+ private synchronized boolean removeFromBackups(DeviceId deviceId, NodeId node) {
+ List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
+ boolean modified = stbys.remove(node);
+ backups.put(deviceId, stbys);
+ return modified;
+ }
+
+ private synchronized void incrementTerm(DeviceId deviceId) {
+ AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING));
+ term.incrementAndGet();
+ termMap.put(deviceId, term);
+ }
+
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
//just query
NodeId current = masterMap.get(deviceId);
MastershipRole role;
- if (current == null) {
- if (backups.contains(nodeId)) {
- role = MastershipRole.STANDBY;
- } else {
- role = MastershipRole.NONE;
- }
+ if (current != null && current.equals(nodeId)) {
+ return MastershipRole.MASTER;
+ }
+
+ if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) {
+ role = MastershipRole.STANDBY;
} else {
- if (current.equals(nodeId)) {
- role = MastershipRole.MASTER;
- } else {
- role = MastershipRole.STANDBY;
- }
+ role = MastershipRole.NONE;
}
return role;
}
+ // synchronized for atomic read
@Override
- public MastershipTerm getTermFor(DeviceId deviceId) {
+ public synchronized MastershipTerm getTermFor(DeviceId deviceId) {
if ((termMap.get(deviceId) == null)) {
return MastershipTerm.of(masterMap.get(deviceId), NOTHING);
}
@@ -205,72 +276,71 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public synchronized MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
- synchronized (this) {
- switch (role) {
- case MASTER:
- NodeId backup = reelect(nodeId);
- if (backup == null) {
- masterMap.remove(deviceId);
- } else {
- masterMap.put(deviceId, backup);
- termMap.get(deviceId).incrementAndGet();
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- new RoleInfo(backup, Lists.newLinkedList(backups)));
- }
- case STANDBY:
- case NONE:
- if (!termMap.containsKey(deviceId)) {
- termMap.put(deviceId, new AtomicInteger(INIT));
- }
- backups.add(nodeId);
- break;
- default:
- log.warn("unknown Mastership Role {}", role);
+ switch (role) {
+ case MASTER:
+ NodeId backup = reelect(deviceId, nodeId);
+ if (backup == null) {
+ // no master alternative
+ masterMap.remove(deviceId);
+ // TODO: Should there be new event type for no MASTER?
+ return new MastershipEvent(MASTER_CHANGED, deviceId,
+ getNodes(deviceId));
+ } else {
+ NodeId prevMaster = masterMap.put(deviceId, backup);
+ incrementTerm(deviceId);
+ addToBackup(deviceId, prevMaster);
+ return new MastershipEvent(MASTER_CHANGED, deviceId,
+ getNodes(deviceId));
}
+ case STANDBY:
+ case NONE:
+ boolean modified = addToBackup(deviceId, nodeId);
+ if (modified) {
+ return new MastershipEvent(BACKUPS_CHANGED, deviceId,
+ getNodes(deviceId));
+ }
+ default:
+ log.warn("unknown Mastership Role {}", role);
}
return null;
}
//dumbly selects next-available node that's not the current one
//emulate leader election
- private NodeId reelect(NodeId nodeId) {
+ private synchronized NodeId reelect(DeviceId did, NodeId nodeId) {
+ List<NodeId> stbys = backups.getOrDefault(did, Collections.emptyList());
NodeId backup = null;
- for (NodeId n : backups) {
+ for (NodeId n : stbys) {
if (!n.equals(nodeId)) {
backup = n;
break;
}
}
- backups.remove(backup);
+ stbys.remove(backup);
return backup;
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public synchronized MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
- synchronized (this) {
- switch (role) {
- case MASTER:
- NodeId backup = reelect(nodeId);
- backups.remove(nodeId);
- if (backup == null) {
- masterMap.remove(deviceId);
- } else {
- masterMap.put(deviceId, backup);
- termMap.get(deviceId).incrementAndGet();
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- new RoleInfo(backup, Lists.newLinkedList(backups)));
- }
- case STANDBY:
- backups.remove(nodeId);
- case NONE:
- default:
- log.warn("unknown Mastership Role {}", role);
+ switch (role) {
+ case MASTER:
+ NodeId backup = reelect(deviceId, nodeId);
+ masterMap.put(deviceId, backup);
+ incrementTerm(deviceId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId,
+ getNodes(deviceId));
+ case STANDBY:
+ if (removeFromBackups(deviceId, nodeId)) {
+ return new MastershipEvent(BACKUPS_CHANGED, deviceId,
+ getNodes(deviceId));
}
+ case NONE:
+ default:
+ log.warn("unknown Mastership Role {}", role);
}
return null;
}
-
}
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
index 711e366..0998e0a 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -15,6 +15,8 @@
*/
package org.onlab.onos.store.trivial.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +24,7 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
@@ -74,6 +77,7 @@
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
//N2 is master but N1 is only in backups set
+ put(DID4, N1, false, true);
put(DID4, N2, true, false);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID4));
}
@@ -127,12 +131,12 @@
put(DID1, N1, false, false);
assertEquals("wrong role", MASTER, sms.requestRole(DID1));
- //STANDBY without backup - become MASTER
+ //was STANDBY - become MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID2));
- //STANDBY with backup - stay STANDBY
- put(DID3, N2, false, true);
+ //other MASTER - stay STANDBY
+ put(DID3, N2, true, false);
assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
//local (N1) is MASTER - stay MASTER
@@ -145,30 +149,34 @@
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.setStandby(N1, DID1);
- assertTrue("not backed up", sms.backups.contains(N1));
- sms.termMap.clear();
+ assertTrue("not backed up", sms.backups.get(DID1).contains(N1));
+ int prev = sms.termMap.get(DID1).get();
sms.setStandby(N1, DID1);
- assertTrue("term not set", sms.termMap.containsKey(DID1));
+ assertEquals("term should not change", prev, sms.termMap.get(DID1).get());
//no backup, MASTER
- put(DID1, N1, true, true);
- assertNull("wrong event", sms.setStandby(N1, DID1));
+ put(DID1, N1, true, false);
+ assertNull("expect no MASTER event", sms.setStandby(N1, DID1).roleInfo().master());
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
+ put(DID1, N2, false, true);
put(DID2, N2, true, true);
- assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
+ MastershipEvent event = sms.setStandby(N1, DID1);
+ assertEquals("wrong event", MASTER_CHANGED, event.type());
+ assertEquals("wrong master", N2, event.roleInfo().master());
}
//helper to populate master/backup structures
- private void put(DeviceId dev, NodeId node, boolean store, boolean backup) {
- if (store) {
+ private void put(DeviceId dev, NodeId node, boolean master, boolean backup) {
+ if (master) {
sms.masterMap.put(dev, node);
- }
- if (backup) {
- sms.backups.add(node);
+ } else if (backup) {
+ List<NodeId> stbys = sms.backups.getOrDefault(dev, new ArrayList<>());
+ stbys.add(node);
+ sms.backups.put(dev, stbys);
}
sms.termMap.put(dev, new AtomicInteger());
}
diff --git a/features/features.xml b/features/features.xml
index 0e7dfbf..b8ef86f 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -56,6 +56,9 @@
<bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle>
<bundle>mvn:org.onlab.onos/onlab-thirdparty/1.0.0-SNAPSHOT</bundle>
+
+ <bundle>mvn:org.mapdb/mapdb/1.0.6</bundle>
+
<!-- FIXME: resolce Chronicle's dependency issue
<bundle>mvn:net.openhft/lang/6.4.6</bundle>
<bundle>mvn:net.openhft/affinity/2.1.1</bundle>