Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
index b3e03b3..e843770 100644
--- a/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
@@ -1,6 +1,7 @@
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.CoreService;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRuleService;
@@ -21,7 +22,8 @@
protected void execute() {
TopologyService topologyService = get(TopologyService.class);
Topology topology = topologyService.currentTopology();
- print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
+ print("version=%s, nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
+ get(CoreService.class).version().toString(),
get(ClusterService.class).getNodes().size(),
get(DeviceService.class).getDeviceCount(),
get(LinkService.class).getLinkCount(),
diff --git a/core/api/src/main/java/org/onlab/onos/CoreService.java b/core/api/src/main/java/org/onlab/onos/CoreService.java
new file mode 100644
index 0000000..32c36c5
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/CoreService.java
@@ -0,0 +1,15 @@
+package org.onlab.onos;
+
+/**
+ * Service for interacting with the core system of the controller.
+ */
+public interface CoreService {
+
+ /**
+ * Returns the product version.
+ *
+ * @return product version
+ */
+ Version version();
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/Version.java b/core/api/src/main/java/org/onlab/onos/Version.java
new file mode 100644
index 0000000..5d071b7
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/Version.java
@@ -0,0 +1,113 @@
+package org.onlab.onos;
+
+import java.util.Objects;
+
+import static java.lang.Integer.parseInt;
+
+/**
+ * Representation of the product version.
+ */
+public final class Version {
+
+ public static final String FORMAT = "%d.%d.%d.%s";
+
+ private final int major;
+ private final int minor;
+ private final int patch;
+ private final String build;
+
+ private final String format;
+
+ // Creates a new version descriptor
+ private Version(int major, int minor, int patch, String build) {
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ this.build = build;
+ this.format = String.format(FORMAT, major, minor, patch, build);
+ }
+
+
+ /**
+ * Creates a new version from the specified constituent numbers.
+ *
+ * @param major major version number
+ * @param minor minod version number
+ * @param patch version patch number
+ * @param build build string
+ * @return version descriptor
+ */
+ public static Version version(int major, int minor, int patch, String build) {
+ return new Version(major, minor, patch, build);
+ }
+
+ /**
+ * Creates a new version by parsing the specified string.
+ *
+ * @param string version string
+ * @return version descriptor
+ */
+ public static Version version(String string) {
+ String[] fields = string.split("[.-]");
+ return new Version(parseInt(fields[0]), parseInt(fields[1]),
+ parseInt(fields[2]), fields[3]);
+ }
+
+ /**
+ * Returns the major version number.
+ *
+ * @return major version number
+ */
+ public int major() {
+ return major;
+ }
+
+ /**
+ * Returns the minor version number.
+ *
+ * @return minor version number
+ */
+ public int minor() {
+ return minor;
+ }
+
+ /**
+ * Returns the version patch number.
+ *
+ * @return patch number
+ */
+ public int patch() {
+ return patch;
+ }
+
+ /**
+ * Returns the version build string.
+ *
+ * @return build string
+ */
+ public String build() {
+ return build;
+ }
+
+ @Override
+ public String toString() {
+ return format;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof Version) {
+ final Version other = (Version) obj;
+ return Objects.equals(this.format, other.format);
+ }
+ return false;
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
new file mode 100644
index 0000000..bde752e
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -0,0 +1,6 @@
+package org.onlab.onos.net.flow;
+
+public class CompletedBatchOperation {
+
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 8b30c74..410aed4 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -2,12 +2,13 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.intent.BatchOperationTarget;
/**
* Represents a generalized match & action pair to be applied to
* an infrastucture device.
*/
-public interface FlowRule {
+public interface FlowRule extends BatchOperationTarget {
static final int MAX_TIMEOUT = 60;
static final int MIN_PRIORITY = 0;
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
new file mode 100644
index 0000000..d5a1472
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.net.flow;
+
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.intent.BatchOperationEntry;
+
+
+public class FlowRuleBatchEntry
+ extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
+
+ public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
+ super(operator, target);
+ }
+
+ public enum FlowRuleOperation {
+ ADD,
+ REMOVE,
+ MODIFY
+ }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
new file mode 100644
index 0000000..74ef165
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.net.flow;
+
+import java.util.Collection;
+
+import org.onlab.onos.net.intent.BatchOperation;
+
+public class FlowRuleBatchOperation
+ extends BatchOperation<FlowRuleBatchEntry> {
+
+ public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+ super(operations);
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index c4e2f92..68762ac 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -1,6 +1,9 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
/**
@@ -34,4 +37,6 @@
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
+ Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
index 8600c54..6d04810 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
@@ -1,5 +1,7 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
@@ -66,7 +68,12 @@
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
- //Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
+ /**
+ * Applies a batch operation of FlowRules.
+ *
+ * @return future indicating the state of the batch operation
+ */
+ Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Adds the specified flow rule listener.
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
index 5d0cbb8..72a9847 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
@@ -1,12 +1,13 @@
package org.onlab.onos.net.intent;
//TODO is this the right package?
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* A list of BatchOperationEntry.
*
@@ -15,7 +16,7 @@
*/
public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
- private List<T> ops;
+ private final List<T> ops;
/**
* Creates new {@link BatchOperation} object.
@@ -30,7 +31,7 @@
*
* @param batchOperations the list of batch operation entries.
*/
- public BatchOperation(List<T> batchOperations) {
+ public BatchOperation(Collection<T> batchOperations) {
ops = new LinkedList<>(checkNotNull(batchOperations));
}
@@ -61,6 +62,10 @@
/**
* Adds an operation.
+ * FIXME: Brian promises that the Intent Framework
+ * will not modify the batch operation after it has submitted it.
+ * Ali would prefer immutablity, but trusts brian for better or
+ * for worse.
*
* @param entry the operation to be added
* @return this object if succeeded, null otherwise
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
index b5dfa88..4e57d33 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
@@ -15,14 +15,7 @@
private final T operator;
private final U target;
- /**
- * Default constructor for serializer.
- */
- @Deprecated
- protected BatchOperationEntry() {
- this.operator = null;
- this.target = null;
- }
+
/**
* Constructs new instance for the entry of the BatchOperation.
diff --git a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
index 759b62a..a5f81c7 100644
--- a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
@@ -12,6 +12,7 @@
/**
* Updates the mastership term for the specified deviceId.
+ *
* @param deviceId device identifier.
* @param term mastership term.
*/
diff --git a/core/api/src/test/java/org/onlab/onos/VersionTest.java b/core/api/src/test/java/org/onlab/onos/VersionTest.java
new file mode 100644
index 0000000..e357f9d
--- /dev/null
+++ b/core/api/src/test/java/org/onlab/onos/VersionTest.java
@@ -0,0 +1,50 @@
+package org.onlab.onos;
+
+import com.google.common.testing.EqualsTester;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.onlab.onos.Version.version;
+
+/**
+ * Tests of the version descriptor.
+ */
+public class VersionTest {
+
+ @Test
+ public void fromParts() {
+ Version v = version(1, 2, 3, "4321");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "4321", v.build());
+ }
+
+ @Test
+ public void fromString() {
+ Version v = version("1.2.3.4321");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "4321", v.build());
+ }
+
+ @Test
+ public void snapshot() {
+ Version v = version("1.2.3-SNAPSHOT");
+ assertEquals("wrong major", 1, v.major());
+ assertEquals("wrong minor", 2, v.minor());
+ assertEquals("wrong patch", 3, v.patch());
+ assertEquals("wrong build", "SNAPSHOT", v.build());
+ }
+
+ @Test
+ public void testEquals() {
+ new EqualsTester()
+ .addEqualityGroup(version("1.2.3.4321"), version(1, 2, 3, "4321"))
+ .addEqualityGroup(version("1.9.3.4321"), version(1, 9, 3, "4321"))
+ .addEqualityGroup(version("1.2.8.4321"), version(1, 2, 8, "4321"))
+ .addEqualityGroup(version("1.2.3.x"), version(1, 2, 3, "x"))
+ .testEquals();
+ }
+}
\ No newline at end of file
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
new file mode 100644
index 0000000..4b1191f
--- /dev/null
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/CoreManager.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.cluster.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.CoreService;
+import org.onlab.onos.Version;
+import org.onlab.util.Tools;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Core service implementation.
+ */
+@Component
+@Service
+public class CoreManager implements CoreService {
+
+ private static final File VERSION_FILE = new File("../VERSION");
+ private static Version version = Version.version("1.0.0-SNAPSHOT");
+
+ // TODO: work in progress
+
+ @Activate
+ public void activate() {
+ List<String> versionLines = Tools.slurp(VERSION_FILE);
+ if (versionLines != null && !versionLines.isEmpty()) {
+ version = Version.version(versionLines.get(0));
+ }
+ }
+
+ @Override
+ public Version version() {
+ return version;
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 3c4a885..00fd24d 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -144,6 +144,10 @@
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
if (newRole != MastershipRole.NONE) {
Device device = store.getDevice(deviceId);
+ // FIXME: Device might not be there yet. (eventual consistent)
+ if (device == null) {
+ return;
+ }
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
@@ -193,16 +197,38 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
+
+ log.info("Device {} connected", deviceId);
+ // check my Role
+ MastershipRole role = mastershipService.requestRoleFor(deviceId);
+
+ if (role != MastershipRole.MASTER) {
+ // TODO: Do we need to tell the Provider that
+ // I am no longer the MASTER?
+ return;
+ }
+
+ // Master:
+ MastershipTerm term = mastershipService.requestTermService()
+ .getMastershipTerm(deviceId);
+ if (!term.master().equals(clusterService.getLocalNode().id())) {
+ // I've lost mastership after I thought I was MASTER.
+ return;
+ }
+ clockProviderService.setMastershipTerm(deviceId, term);
+
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
- // If there was a change of any kind, trigger role selection
- // process.
+ // If there was a change of any kind, tell the provider
+ // I am the master.
+ // Note: can be null, if mastership was lost.
if (event != null) {
- log.info("Device {} connected", deviceId);
- mastershipService.requestRoleFor(deviceId);
- provider().roleChanged(event.subject(),
- mastershipService.requestRoleFor(deviceId));
+ // TODO: Check switch reconnected case, is it assured that
+ // event will not be null?
+
+ // FIXME: 1st argument should be deviceId
+ provider().roleChanged(event.subject(), role);
post(event);
}
}
@@ -211,6 +237,10 @@
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
+ if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
+ log.debug("Device {} disconnected, but I am not the master", deviceId);
+ return;
+ }
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of mastership.
@@ -272,9 +302,15 @@
@Override
public void event(MastershipEvent event) {
if (event.master().equals(clusterService.getLocalNode().id())) {
+
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
- clockProviderService.setMastershipTerm(event.subject(), term);
+
+ if (term.master().equals(clusterService.getLocalNode().id())) {
+ // only set if I am the master
+ clockProviderService.setMastershipTerm(event.subject(), term);
+ }
+
applyRole(event.subject(), MastershipRole.MASTER);
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index ce11cea..a9eddd8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,10 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -18,8 +22,11 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -32,7 +39,9 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -131,6 +140,38 @@
}
@Override
+ public Future<CompletedBatchOperation> applyBatch(
+ FlowRuleBatchOperation batch) {
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ ArrayListMultimap.create();
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ final FlowRule f = fbe.getTarget();
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ batches.put(frp, fbe);
+ switch (fbe.getOperator()) {
+ case ADD:
+ store.storeFlowRule(f);
+ break;
+ case REMOVE:
+ store.deleteFlowRule(f);
+ break;
+ case MODIFY:
+ default:
+ log.error("Batch operation type {} unsupported.", fbe.getOperator());
+ }
+ }
+ for (FlowRuleProvider provider : batches.keySet()) {
+ FlowRuleBatchOperation b =
+ new FlowRuleBatchOperation(batches.get(provider));
+ Future<Void> future = provider.executeBatch(b);
+ futures.add(future);
+ }
+ return new FlowRuleBatchFuture(futures);
+ }
+
+ @Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
@@ -296,4 +337,63 @@
eventDispatcher.post(event);
}
}
+
+ private class FlowRuleBatchFuture
+ implements Future<CompletedBatchOperation> {
+
+ private final List<Future<Void>> futures;
+
+ public FlowRuleBatchFuture(List<Future<Void>> futures) {
+ this.futures = futures;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ boolean isDone = true;
+ for (Future<Void> future : futures) {
+ isDone &= future.isDone();
+ }
+ return isDone;
+ }
+
+ @Override
+ public CompletedBatchOperation get() throws InterruptedException,
+ ExecutionException {
+ // TODO Auto-generated method stub
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ return new CompletedBatchOperation();
+ }
+
+ @Override
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // TODO we should decrement the timeout
+ long start = System.nanoTime();
+ long end = start + unit.toNanos(timeout);
+ for (Future<Void> future : futures) {
+ long now = System.nanoTime();
+ long thisTimeout = end - now;
+ future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ }
+ return new CompletedBatchOperation();
+ }
+
+ }
+
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index a0995e4..0ca75c2 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -4,6 +4,8 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -16,6 +18,9 @@
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
@@ -24,6 +29,8 @@
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
+
/**
* Installer for {@link PathIntent path connectivity intents}.
*/
@@ -56,19 +63,27 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
-
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
+
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
- flowRuleService.applyFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
+ //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
-
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
@Override
@@ -77,6 +92,7 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
@@ -86,9 +102,16 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
-
- flowRuleService.removeFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
+ //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 7463671..86f3ddc 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@@ -32,6 +33,7 @@
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -42,6 +44,7 @@
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
@@ -404,6 +407,13 @@
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
+ @Override
+ public Future<Void> executeBatch(
+ BatchOperation<FlowRuleBatchEntry> batch) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index ee3e789..43df15f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -45,4 +45,9 @@
MessageSubject that = (MessageSubject) obj;
return Objects.equals(this.value, that.value);
}
+
+ // for serializer
+ protected MessageSubject() {
+ this.value = "";
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7b05401..c732c16 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -3,11 +3,11 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -26,14 +26,17 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +53,6 @@
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
- // FIXME: `members` should go away and should be using ClusterService
- private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@@ -59,11 +60,14 @@
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(ClusterMessage.class)
+ .register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
+ .register(byte[].class)
+ .register(MessageSubject.class, new MessageSubjectSerializer())
.build()
.populate(1);
}
@@ -73,7 +77,15 @@
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
- messagingService = new NettyMessagingService(localNode.tcpPort());
+ NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+ // FIXME: workaround until it becomes a service.
+ try {
+ netty.activate();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("NettyMessagingService#activate", e);
+ }
+ messagingService = netty;
log.info("Started");
}
@@ -86,7 +98,7 @@
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
- for (ControllerNode node : members.values()) {
+ for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
@@ -107,13 +119,17 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- ControllerNode node = members.get(toNodeId);
+ ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ log.info("sending...");
+ Response resp = messagingService.sendAndReceive(nodeEp,
+ message.subject().value(), SERIALIZER.encode(message));
+ resp.get(1, TimeUnit.SECONDS);
+ log.info("sent...");
return true;
- } catch (IOException e) {
+ } catch (IOException | TimeoutException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
}
@@ -137,7 +153,7 @@
@Override
public void addNode(ControllerNode node) {
- members.put(node.id(), node);
+ //members.put(node.id(), node);
}
@Override
@@ -146,7 +162,7 @@
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- members.remove(node.id());
+ //members.remove(node.id());
}
// Sends a heart beat to all peers.
@@ -181,7 +197,8 @@
}
}
- private static class InternalClusterMessageHandler implements MessageHandler {
+ // FIXME: revert static
+ private class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
@@ -191,8 +208,18 @@
@Override
public void handle(Message message) {
- ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
- handler.handle(clusterMessage);
+ // FIXME: remove me
+ log.info("InternalClusterMessageHandler.handle({})", message);
+ try {
+ log.info("before decode");
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ log.info("Subject:({}), Sender:({})", clusterMessage.subject(), clusterMessage.sender());
+ handler.handle(clusterMessage);
+ message.respond("ACK".getBytes());
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("failed", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2f1e504..31df710 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -113,6 +113,7 @@
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 26f1d7f..4214384 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -35,4 +35,11 @@
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
+
+ // for serializer
+ protected InternalDeviceEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.deviceDescription = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index 48e3be6..64e77ca 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -37,4 +37,11 @@
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
+
+ // for serializer
+ protected InternalPortEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescriptions = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 0bdfdbf..7d3854b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -35,4 +35,11 @@
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
+
+ // for serializer
+ protected InternalPortStatusEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescription = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index f4dadad..c0cefd6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -3,7 +3,6 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@@ -11,6 +10,9 @@
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+ /**
+ * Creates a serializer for {@link ClusterMessage}.
+ */
public ClusterMessageSerializer() {
// does not accept null
super(false);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
index 9250076..516915e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
@@ -14,7 +14,7 @@
public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
/**
- * Default constructor.
+ * Creates a serializer for {@link MastershipBasedTimestamp}.
*/
public MastershipBasedTimestampSerializer() {
// non-null, immutable
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
new file mode 100644
index 0000000..bb6b292
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
+
+ /**
+ * Creates a serializer for {@link MessageSubject}.
+ */
+ public MessageSubjectSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+
+ @Override
+ public void write(Kryo kryo, Output output, MessageSubject object) {
+ output.writeString(object.value());
+ }
+
+ @Override
+ public MessageSubject read(Kryo kryo, Input input,
+ Class<MessageSubject> type) {
+ return new MessageSubject(input.readString());
+ }
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index aafbe4b..fafd450 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -119,8 +119,8 @@
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ // FIXME: implement this
+ return MastershipTerm.of(getMaster(deviceId), 1);
}
@Override
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index ade651e..78f5874 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -68,7 +68,7 @@
this.cookie = flowRule.id();
}
- public OFFlowMod buildFlowMod() {
+ public OFFlowMod buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
@@ -86,6 +86,24 @@
}
+ public OFFlowMod buildFlowMod() {
+ Match match = buildMatch();
+ List<OFAction> actions = buildActions();
+
+ //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
+ OFFlowMod fm = factory.buildFlowModify()
+ .setCookie(U64.of(cookie.value()))
+ .setBufferId(OFBufferId.NO_BUFFER)
+ .setActions(actions)
+ .setMatch(match)
+ .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
+ .setPriority(priority)
+ .build();
+
+ return fm;
+
+ }
+
public OFFlowMod buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index eac3c18..0aca754 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,8 +2,17 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -14,9 +23,11 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
@@ -27,6 +38,8 @@
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -42,9 +55,11 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -70,6 +85,9 @@
private final InternalFlowProvider listener = new InternalFlowProvider();
+ private final Map<Long, InstallationFuture> pendingFutures =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -101,7 +119,7 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
@@ -154,6 +172,7 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
+ InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
@@ -166,7 +185,17 @@
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.satisfyRequirement(dpid);
+ }
+ break;
case ERROR:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.fail((OFErrorMsg) msg, dpid);
+ }
+ break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -226,6 +255,144 @@
}
+ @Override
+ public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ switch (fbe.getOperator()) {
+ case ADD:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
+ break;
+ case REMOVE:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
+ break;
+ case MODIFY:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ }
+ InstallationFuture installation = new InstallationFuture(sws);
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+ private class InstallationFuture implements Future<Void> {
+
+ private final Set<Dpid> sws;
+ private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+
+ private final CountDownLatch countDownLatch;
+
+ public InstallationFuture(Set<Dpid> sws) {
+ this.sws = sws;
+ countDownLatch = new CountDownLatch(sws.size());
+ }
+
+ public void fail(OFErrorMsg msg, Dpid dpid) {
+ ok.set(false);
+ //TODO add reason to flowentry
+ //TODO handle specific error msgs
+ //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
+ switch (msg.getErrType()) {
+ case BAD_ACTION:
+ break;
+ case BAD_INSTRUCTION:
+ break;
+ case BAD_MATCH:
+ break;
+ case BAD_REQUEST:
+ break;
+ case EXPERIMENTER:
+ break;
+ case FLOW_MOD_FAILED:
+ break;
+ case GROUP_MOD_FAILED:
+ break;
+ case HELLO_FAILED:
+ break;
+ case METER_MOD_FAILED:
+ break;
+ case PORT_MOD_FAILED:
+ break;
+ case QUEUE_OP_FAILED:
+ break;
+ case ROLE_REQUEST_FAILED:
+ break;
+ case SWITCH_CONFIG_FAILED:
+ break;
+ case TABLE_FEATURES_FAILED:
+ break;
+ case TABLE_MOD_FAILED:
+ break;
+ default:
+ break;
+
+ }
+
+ }
+
+ public void satisfyRequirement(Dpid dpid) {
+ log.warn("Satisfaction from switch {}", dpid);
+ sws.remove(controller.getSwitch(dpid));
+ countDownLatch.countDown();
+ }
+
+ public void verify(Integer id) {
+ for (Dpid dpid : sws) {
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(id);
+ sw.sendMsg(builder.build());
+ }
+
+
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sws.isEmpty();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ countDownLatch.await();
+ //return offendingFlowMods;
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ countDownLatch.await(timeout, unit);
+ //return offendingFlowMods;
+ return null;
+ }
+
+ }
}
diff --git a/tools/build/onos-package b/tools/build/onos-package
index 2d6b954..ac5b52d 100755
--- a/tools/build/onos-package
+++ b/tools/build/onos-package
@@ -60,6 +60,13 @@
cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_VERSION/onos-branding-*.jar \
$ONOS_STAGE/$KARAF_DIST/lib
+# Patch in the ONOS version file use the build number or the user name for
+# build postfix in place of the SNAPSHOT post-fix.
+build=${BUILD_NUMBER:-$(id -un)}
+grep '<version>' $ONOS_ROOT/pom.xml | head -n1 | \
+ sed 's:.*<version>::g;s:</version>.*::g' | sed "s/SNAPSHOT/$build/g" \
+ >> $ONOS_STAGE/VERSION
+
# Now package up the ONOS tar file
cd $ONOS_STAGE_ROOT
COPYFILE_DISABLE=1 tar zcf $ONOS_TAR $ONOS_BITS
diff --git a/tools/package/bin/onos-service b/tools/package/bin/onos-service
index a5de5bd..c030887 100755
--- a/tools/package/bin/onos-service
+++ b/tools/package/bin/onos-service
@@ -4,6 +4,7 @@
#-------------------------------------------------------------------------------
export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/}
+export JAVA_OPTS="-Xms256M -Xmx2048M"
cd /opt/onos
/opt/onos/apache-karaf-3.0.1/bin/karaf "$@"
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index c5162f6..5643098 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -4,6 +4,12 @@
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ThreadFactory;
public abstract class Tools {
@@ -66,4 +72,24 @@
}
}
+ /**
+ * Slurps the contents of a file into a list of strings, one per line.
+ *
+ * @param path file path
+ * @return file contents
+ */
+ public static List<String> slurp(File path) {
+ try (BufferedReader br = new BufferedReader(new FileReader(path))) {
+ List<String> lines = new ArrayList<>();
+ String line;
+ while ((line = br.readLine()) != null) {
+ lines.add(line);
+ }
+ return lines;
+
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index d4832e5..b494ce9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,11 +8,16 @@
import java.util.Arrays;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
@@ -57,4 +62,10 @@
checkState(false, "Must not be here");
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 716efb9..d026dec 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -1,5 +1,8 @@
package org.onlab.netty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -11,6 +14,8 @@
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
@@ -31,11 +36,6 @@
// write preamble
out.writeBytes(PREAMBLE);
- try {
- SERIALIZER.encode(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
byte[] payload = SERIALIZER.encode(message);
// write payload length
@@ -47,4 +47,10 @@
// write payload.
out.writeBytes(payload);
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 48aeb30..5a51ad4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -248,6 +248,7 @@
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
deleted file mode 100644
index 3869948..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-
-import org.onlab.metrics.MetricsComponent;
-import org.onlab.metrics.MetricsFeature;
-import org.onlab.metrics.MetricsManager;
-
-import com.codahale.metrics.Timer;
-
-// FIXME: Should be move out to test or app
-public final class SimpleClient {
- private SimpleClient() {
- }
-
- public static void main(String... args) throws Exception {
- NettyMessagingService messaging = new TestNettyMessagingService(9081);
- MetricsManager metrics = new MetricsManager();
- messaging.activate();
- metrics.activate();
- MetricsFeature feature = new MetricsFeature("timers");
- MetricsComponent component = metrics.registerComponent("NettyMessaging");
- Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
- final int warmup = 100;
- for (int i = 0; i < warmup; i++) {
- Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
-
- Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
- final int iterations = 1000000;
- for (int i = 0; i < iterations; i++) {
- Timer.Context context = sendAndReceiveTimer.time();
- Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8080), "echo",
- "Hello World".getBytes());
- System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
- }
-
- public static class TestNettyMessagingService extends NettyMessagingService {
- public TestNettyMessagingService(int port) throws Exception {
- super(port);
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
deleted file mode 100644
index b8ae5b0..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.onlab.netty;
-
-//FIXME: Should be move out to test or app
-public final class SimpleServer {
- private SimpleServer() {}
-
- public static void main(String... args) throws Exception {
- NettyMessagingService server = new NettyMessagingService(8080);
- server.activate();
- server.registerHandler("simple", new LoggingHandler());
- server.registerHandler("echo", new EchoHandler());
- }
-}