Merge remote-tracking branch 'origin/master'
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
new file mode 100644
index 0000000..bde752e
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -0,0 +1,6 @@
+package org.onlab.onos.net.flow;
+
+public class CompletedBatchOperation {
+
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 8b30c74..410aed4 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -2,12 +2,13 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.intent.BatchOperationTarget;
/**
* Represents a generalized match & action pair to be applied to
* an infrastucture device.
*/
-public interface FlowRule {
+public interface FlowRule extends BatchOperationTarget {
static final int MAX_TIMEOUT = 60;
static final int MIN_PRIORITY = 0;
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
new file mode 100644
index 0000000..d5a1472
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.net.flow;
+
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.intent.BatchOperationEntry;
+
+
+public class FlowRuleBatchEntry
+ extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
+
+ public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
+ super(operator, target);
+ }
+
+ public enum FlowRuleOperation {
+ ADD,
+ REMOVE,
+ MODIFY
+ }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
new file mode 100644
index 0000000..74ef165
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.net.flow;
+
+import java.util.Collection;
+
+import org.onlab.onos.net.intent.BatchOperation;
+
+public class FlowRuleBatchOperation
+ extends BatchOperation<FlowRuleBatchEntry> {
+
+ public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+ super(operations);
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index c4e2f92..68762ac 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -1,6 +1,9 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
/**
@@ -34,4 +37,6 @@
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
+ Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
index 8600c54..6d04810 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
@@ -1,5 +1,7 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
@@ -66,7 +68,12 @@
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
- //Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
+ /**
+ * Applies a batch operation of FlowRules.
+ *
+ * @return future indicating the state of the batch operation
+ */
+ Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Adds the specified flow rule listener.
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
index 5d0cbb8..72a9847 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
@@ -1,12 +1,13 @@
package org.onlab.onos.net.intent;
//TODO is this the right package?
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* A list of BatchOperationEntry.
*
@@ -15,7 +16,7 @@
*/
public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
- private List<T> ops;
+ private final List<T> ops;
/**
* Creates new {@link BatchOperation} object.
@@ -30,7 +31,7 @@
*
* @param batchOperations the list of batch operation entries.
*/
- public BatchOperation(List<T> batchOperations) {
+ public BatchOperation(Collection<T> batchOperations) {
ops = new LinkedList<>(checkNotNull(batchOperations));
}
@@ -61,6 +62,10 @@
/**
* Adds an operation.
+ * FIXME: Brian promises that the Intent Framework
+ * will not modify the batch operation after it has submitted it.
+ * Ali would prefer immutablity, but trusts brian for better or
+ * for worse.
*
* @param entry the operation to be added
* @return this object if succeeded, null otherwise
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
index b5dfa88..4e57d33 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
@@ -15,14 +15,7 @@
private final T operator;
private final U target;
- /**
- * Default constructor for serializer.
- */
- @Deprecated
- protected BatchOperationEntry() {
- this.operator = null;
- this.target = null;
- }
+
/**
* Constructs new instance for the entry of the BatchOperation.
diff --git a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
index 759b62a..a5f81c7 100644
--- a/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/ClockProviderService.java
@@ -12,6 +12,7 @@
/**
* Updates the mastership term for the specified deviceId.
+ *
* @param deviceId device identifier.
* @param term mastership term.
*/
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 3c4a885..00fd24d 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -144,6 +144,10 @@
private void applyRole(DeviceId deviceId, MastershipRole newRole) {
if (newRole != MastershipRole.NONE) {
Device device = store.getDevice(deviceId);
+ // FIXME: Device might not be there yet. (eventual consistent)
+ if (device == null) {
+ return;
+ }
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
@@ -193,16 +197,38 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
checkValidity();
+
+ log.info("Device {} connected", deviceId);
+ // check my Role
+ MastershipRole role = mastershipService.requestRoleFor(deviceId);
+
+ if (role != MastershipRole.MASTER) {
+ // TODO: Do we need to tell the Provider that
+ // I am no longer the MASTER?
+ return;
+ }
+
+ // Master:
+ MastershipTerm term = mastershipService.requestTermService()
+ .getMastershipTerm(deviceId);
+ if (!term.master().equals(clusterService.getLocalNode().id())) {
+ // I've lost mastership after I thought I was MASTER.
+ return;
+ }
+ clockProviderService.setMastershipTerm(deviceId, term);
+
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
- // If there was a change of any kind, trigger role selection
- // process.
+ // If there was a change of any kind, tell the provider
+ // I am the master.
+ // Note: can be null, if mastership was lost.
if (event != null) {
- log.info("Device {} connected", deviceId);
- mastershipService.requestRoleFor(deviceId);
- provider().roleChanged(event.subject(),
- mastershipService.requestRoleFor(deviceId));
+ // TODO: Check switch reconnected case, is it assured that
+ // event will not be null?
+
+ // FIXME: 1st argument should be deviceId
+ provider().roleChanged(event.subject(), role);
post(event);
}
}
@@ -211,6 +237,10 @@
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
+ if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
+ log.debug("Device {} disconnected, but I am not the master", deviceId);
+ return;
+ }
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of mastership.
@@ -272,9 +302,15 @@
@Override
public void event(MastershipEvent event) {
if (event.master().equals(clusterService.getLocalNode().id())) {
+
MastershipTerm term = mastershipService.requestTermService()
.getMastershipTerm(event.subject());
- clockProviderService.setMastershipTerm(event.subject(), term);
+
+ if (term.master().equals(clusterService.getLocalNode().id())) {
+ // only set if I am the master
+ clockProviderService.setMastershipTerm(event.subject(), term);
+ }
+
applyRole(event.subject(), MastershipRole.MASTER);
} else {
applyRole(event.subject(), MastershipRole.STANDBY);
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index ce11cea..a9eddd8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,10 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -18,8 +22,11 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -32,7 +39,9 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -131,6 +140,38 @@
}
@Override
+ public Future<CompletedBatchOperation> applyBatch(
+ FlowRuleBatchOperation batch) {
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ ArrayListMultimap.create();
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ final FlowRule f = fbe.getTarget();
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ batches.put(frp, fbe);
+ switch (fbe.getOperator()) {
+ case ADD:
+ store.storeFlowRule(f);
+ break;
+ case REMOVE:
+ store.deleteFlowRule(f);
+ break;
+ case MODIFY:
+ default:
+ log.error("Batch operation type {} unsupported.", fbe.getOperator());
+ }
+ }
+ for (FlowRuleProvider provider : batches.keySet()) {
+ FlowRuleBatchOperation b =
+ new FlowRuleBatchOperation(batches.get(provider));
+ Future<Void> future = provider.executeBatch(b);
+ futures.add(future);
+ }
+ return new FlowRuleBatchFuture(futures);
+ }
+
+ @Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
@@ -296,4 +337,63 @@
eventDispatcher.post(event);
}
}
+
+ private class FlowRuleBatchFuture
+ implements Future<CompletedBatchOperation> {
+
+ private final List<Future<Void>> futures;
+
+ public FlowRuleBatchFuture(List<Future<Void>> futures) {
+ this.futures = futures;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ boolean isDone = true;
+ for (Future<Void> future : futures) {
+ isDone &= future.isDone();
+ }
+ return isDone;
+ }
+
+ @Override
+ public CompletedBatchOperation get() throws InterruptedException,
+ ExecutionException {
+ // TODO Auto-generated method stub
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ return new CompletedBatchOperation();
+ }
+
+ @Override
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // TODO we should decrement the timeout
+ long start = System.nanoTime();
+ long end = start + unit.toNanos(timeout);
+ for (Future<Void> future : futures) {
+ long now = System.nanoTime();
+ long thisTimeout = end - now;
+ future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ }
+ return new CompletedBatchOperation();
+ }
+
+ }
+
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index a0995e4..0ca75c2 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -4,6 +4,8 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -16,6 +18,9 @@
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
@@ -24,6 +29,8 @@
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
+
/**
* Installer for {@link PathIntent path connectivity intents}.
*/
@@ -56,19 +63,27 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
-
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
+
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
- flowRuleService.applyFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
+ //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
-
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
@Override
@@ -77,6 +92,7 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
@@ -86,9 +102,16 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
-
- flowRuleService.removeFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
+ //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 7463671..86f3ddc 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@@ -32,6 +33,7 @@
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -42,6 +44,7 @@
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
@@ -404,6 +407,13 @@
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
+ @Override
+ public Future<Void> executeBatch(
+ BatchOperation<FlowRuleBatchEntry> batch) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index ee3e789..43df15f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -45,4 +45,9 @@
MessageSubject that = (MessageSubject) obj;
return Objects.equals(this.value, that.value);
}
+
+ // for serializer
+ protected MessageSubject() {
+ this.value = "";
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7b05401..df0f169 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -3,11 +3,11 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -26,6 +26,7 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
@@ -34,6 +35,7 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +52,6 @@
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
- // FIXME: `members` should go away and should be using ClusterService
- private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@@ -59,11 +59,14 @@
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(ClusterMessage.class)
+ .register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
+ .register(byte[].class)
+ .register(MessageSubject.class)
.build()
.populate(1);
}
@@ -73,7 +76,15 @@
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
- messagingService = new NettyMessagingService(localNode.tcpPort());
+ NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+ // FIXME: workaround until it becomes a service.
+ try {
+ netty.activate();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("NettyMessagingService#activate", e);
+ }
+ messagingService = netty;
log.info("Started");
}
@@ -86,7 +97,7 @@
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
- for (ControllerNode node : members.values()) {
+ for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
@@ -107,13 +118,16 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- ControllerNode node = members.get(toNodeId);
+ ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ log.info("sending...");
+ Response resp = messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ resp.get(1, TimeUnit.SECONDS);
+ log.info("sent...");
return true;
- } catch (IOException e) {
+ } catch (IOException | TimeoutException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
}
@@ -137,7 +151,7 @@
@Override
public void addNode(ControllerNode node) {
- members.put(node.id(), node);
+ //members.put(node.id(), node);
}
@Override
@@ -146,7 +160,7 @@
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- members.remove(node.id());
+ //members.remove(node.id());
}
// Sends a heart beat to all peers.
@@ -181,7 +195,8 @@
}
}
- private static class InternalClusterMessageHandler implements MessageHandler {
+ // FIXME: revert static
+ private class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
@@ -191,8 +206,18 @@
@Override
public void handle(Message message) {
- ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
- handler.handle(clusterMessage);
+ // FIXME: remove me
+ log.info("InternalClusterMessageHandler.handle({})", message);
+ try {
+ log.info("before decode");
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ log.info("Subject:({}), Sender:({})", clusterMessage.subject(), clusterMessage.sender());
+ handler.handle(clusterMessage);
+ message.respond("ACK".getBytes());
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("failed", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2f1e504..31df710 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -113,6 +113,7 @@
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 26f1d7f..4214384 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -35,4 +35,11 @@
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
+
+ // for serializer
+ protected InternalDeviceEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.deviceDescription = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index 48e3be6..64e77ca 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -37,4 +37,11 @@
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
+
+ // for serializer
+ protected InternalPortEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescriptions = null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 0bdfdbf..7d3854b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -35,4 +35,11 @@
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
+
+ // for serializer
+ protected InternalPortStatusEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescription = null;
+ }
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index aafbe4b..fafd450 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -119,8 +119,8 @@
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ // FIXME: implement this
+ return MastershipTerm.of(getMaster(deviceId), 1);
}
@Override
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index ade651e..78f5874 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -68,7 +68,7 @@
this.cookie = flowRule.id();
}
- public OFFlowMod buildFlowMod() {
+ public OFFlowMod buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
@@ -86,6 +86,24 @@
}
+ public OFFlowMod buildFlowMod() {
+ Match match = buildMatch();
+ List<OFAction> actions = buildActions();
+
+ //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
+ OFFlowMod fm = factory.buildFlowModify()
+ .setCookie(U64.of(cookie.value()))
+ .setBufferId(OFBufferId.NO_BUFFER)
+ .setActions(actions)
+ .setMatch(match)
+ .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
+ .setPriority(priority)
+ .build();
+
+ return fm;
+
+ }
+
public OFFlowMod buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index eac3c18..0aca754 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,8 +2,17 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -14,9 +23,11 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
@@ -27,6 +38,8 @@
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -42,9 +55,11 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -70,6 +85,9 @@
private final InternalFlowProvider listener = new InternalFlowProvider();
+ private final Map<Long, InstallationFuture> pendingFutures =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -101,7 +119,7 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
@@ -154,6 +172,7 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
+ InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
@@ -166,7 +185,17 @@
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.satisfyRequirement(dpid);
+ }
+ break;
case ERROR:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.fail((OFErrorMsg) msg, dpid);
+ }
+ break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -226,6 +255,144 @@
}
+ @Override
+ public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ switch (fbe.getOperator()) {
+ case ADD:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
+ break;
+ case REMOVE:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
+ break;
+ case MODIFY:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ }
+ InstallationFuture installation = new InstallationFuture(sws);
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+ private class InstallationFuture implements Future<Void> {
+
+ private final Set<Dpid> sws;
+ private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+
+ private final CountDownLatch countDownLatch;
+
+ public InstallationFuture(Set<Dpid> sws) {
+ this.sws = sws;
+ countDownLatch = new CountDownLatch(sws.size());
+ }
+
+ public void fail(OFErrorMsg msg, Dpid dpid) {
+ ok.set(false);
+ //TODO add reason to flowentry
+ //TODO handle specific error msgs
+ //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
+ switch (msg.getErrType()) {
+ case BAD_ACTION:
+ break;
+ case BAD_INSTRUCTION:
+ break;
+ case BAD_MATCH:
+ break;
+ case BAD_REQUEST:
+ break;
+ case EXPERIMENTER:
+ break;
+ case FLOW_MOD_FAILED:
+ break;
+ case GROUP_MOD_FAILED:
+ break;
+ case HELLO_FAILED:
+ break;
+ case METER_MOD_FAILED:
+ break;
+ case PORT_MOD_FAILED:
+ break;
+ case QUEUE_OP_FAILED:
+ break;
+ case ROLE_REQUEST_FAILED:
+ break;
+ case SWITCH_CONFIG_FAILED:
+ break;
+ case TABLE_FEATURES_FAILED:
+ break;
+ case TABLE_MOD_FAILED:
+ break;
+ default:
+ break;
+
+ }
+
+ }
+
+ public void satisfyRequirement(Dpid dpid) {
+ log.warn("Satisfaction from switch {}", dpid);
+ sws.remove(controller.getSwitch(dpid));
+ countDownLatch.countDown();
+ }
+
+ public void verify(Integer id) {
+ for (Dpid dpid : sws) {
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(id);
+ sw.sendMsg(builder.build());
+ }
+
+
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sws.isEmpty();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ countDownLatch.await();
+ //return offendingFlowMods;
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ countDownLatch.await(timeout, unit);
+ //return offendingFlowMods;
+ return null;
+ }
+
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index d4832e5..b494ce9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,11 +8,16 @@
import java.util.Arrays;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
@@ -57,4 +62,10 @@
checkState(false, "Must not be here");
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 716efb9..c0a84df 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -1,5 +1,8 @@
package org.onlab.netty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -11,6 +14,8 @@
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
@@ -47,4 +52,10 @@
// write payload.
out.writeBytes(payload);
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 48aeb30..5a51ad4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -248,6 +248,7 @@
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
deleted file mode 100644
index 3869948..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-
-import org.onlab.metrics.MetricsComponent;
-import org.onlab.metrics.MetricsFeature;
-import org.onlab.metrics.MetricsManager;
-
-import com.codahale.metrics.Timer;
-
-// FIXME: Should be move out to test or app
-public final class SimpleClient {
- private SimpleClient() {
- }
-
- public static void main(String... args) throws Exception {
- NettyMessagingService messaging = new TestNettyMessagingService(9081);
- MetricsManager metrics = new MetricsManager();
- messaging.activate();
- metrics.activate();
- MetricsFeature feature = new MetricsFeature("timers");
- MetricsComponent component = metrics.registerComponent("NettyMessaging");
- Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
- final int warmup = 100;
- for (int i = 0; i < warmup; i++) {
- Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
-
- Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
- final int iterations = 1000000;
- for (int i = 0; i < iterations; i++) {
- Timer.Context context = sendAndReceiveTimer.time();
- Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8080), "echo",
- "Hello World".getBytes());
- System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
- context.stop();
- }
- metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
- }
-
- public static class TestNettyMessagingService extends NettyMessagingService {
- public TestNettyMessagingService(int port) throws Exception {
- super(port);
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
deleted file mode 100644
index b8ae5b0..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.onlab.netty;
-
-//FIXME: Should be move out to test or app
-public final class SimpleServer {
- private SimpleServer() {}
-
- public static void main(String... args) throws Exception {
- NettyMessagingService server = new NettyMessagingService(8080);
- server.activate();
- server.registerHandler("simple", new LoggingHandler());
- server.registerHandler("echo", new EchoHandler());
- }
-}