Fixing flow rule batches
Problem should now be fixed. Hashing on enums last is a bad
idea because the enum value could be 0.
Change-Id: Ib29e90b393b5285be2807729b52e69b121340f09
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 018d6f3..214bde3 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,21 +15,11 @@
*/
package org.onlab.onos.provider.of.flow.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ExecutionList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -80,15 +70,23 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
-import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses an OpenFlow controller to detect network
@@ -124,6 +122,8 @@
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+ private final AtomicLong xidCounter = new AtomicLong(0);
+
/**
* Creates an OpenFlow host provider.
*/
@@ -154,6 +154,7 @@
log.info("Stopped");
}
+
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
@@ -167,7 +168,6 @@
}
-
@Override
public void removeFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
@@ -188,11 +188,15 @@
}
@Override
- public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws =
Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
- OFFlowMod mod = null;
+ /*
+ * Use identity hash map for reference equality as we could have equal
+ * flow mods for different switches.
+ */
+ Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
@@ -208,6 +212,7 @@
}
sws.add(new Dpid(sw.getId()));
FlowModBuilder builder = FlowModBuilder.builder(flowRule, sw.factory());
+ OFFlowMod mod = null;
switch (fbe.getOperator()) {
case ADD:
mod = builder.buildFlowAdd();
@@ -222,25 +227,29 @@
log.error("Unsupported batch operation {}", fbe.getOperator());
}
if (mod != null) {
- sw.sendMsg(mod);
- fmXids.put(mod.getXid(), fbe);
+ mods.put(mod, sw);
+ fmXids.put(xidCounter.getAndIncrement(), fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
-
}
InstallationFuture installation = new InstallationFuture(sws, fmXids);
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
- pendingFutures.put(U32.f(batch.hashCode()), installation);
- installation.verify(U32.f(batch.hashCode()));
+ pendingFutures.put(installation.xid(), installation);
+ for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
+ OpenFlowSwitch sw = entry.getValue();
+ OFFlowMod mod = entry.getKey();
+ sw.sendMsg(mod);
+ }
+ installation.verify();
return installation;
}
private class InternalFlowProvider
- implements OpenFlowSwitchListener, OpenFlowEventListener {
+ implements OpenFlowSwitchListener, OpenFlowEventListener {
private final Multimap<DeviceId, FlowEntry> completeEntries =
@@ -274,36 +283,36 @@
public void handleMessage(Dpid dpid, OFMessage msg) {
InstallationFuture future = null;
switch (msg.getType()) {
- case FLOW_REMOVED:
- OFFlowRemoved removed = (OFFlowRemoved) msg;
+ case FLOW_REMOVED:
+ OFFlowRemoved removed = (OFFlowRemoved) msg;
- FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
- providerService.flowRemoved(fr);
- break;
- case STATS_REPLY:
- pushFlowMetrics(dpid, (OFStatsReply) msg);
- break;
- case BARRIER_REPLY:
- future = pendingFutures.get(msg.getXid());
- if (future != null) {
- future.satisfyRequirement(dpid);
- }
- break;
- case ERROR:
- future = pendingFMs.get(msg.getXid());
- if (future != null) {
- future.fail((OFErrorMsg) msg, dpid);
- }
- break;
- default:
- log.debug("Unhandled message type: {}", msg.getType());
+ FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+ providerService.flowRemoved(fr);
+ break;
+ case STATS_REPLY:
+ pushFlowMetrics(dpid, (OFStatsReply) msg);
+ break;
+ case BARRIER_REPLY:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.satisfyRequirement(dpid);
+ }
+ break;
+ case ERROR:
+ future = pendingFMs.get(msg.getXid());
+ if (future != null) {
+ future.fail((OFErrorMsg) msg, dpid);
+ }
+ break;
+ default:
+ log.debug("Unhandled message type: {}", msg.getType());
}
}
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested,
- RoleState response) {
+ RoleState response) {
// Do nothing here for now.
}
@@ -352,8 +361,9 @@
}
- private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
+ private class InstallationFuture implements Future<CompletedBatchOperation> {
+ private final Long xid;
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
@@ -361,18 +371,22 @@
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
private final CountDownLatch countDownLatch;
- private Long pendingXid;
private BatchState state;
private final ExecutionList executionList = new ExecutionList();
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
+ this.xid = xidCounter.getAndIncrement();
this.state = BatchState.STARTED;
this.sws = sws;
this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
+ public Long xid() {
+ return xid;
+ }
+
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
@@ -385,27 +399,27 @@
case BAD_ACTION:
OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
- bad.getCode().ordinal());
+ bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
- badins.getCode().ordinal());
+ badins.getCode().ordinal());
break;
case BAD_MATCH:
OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
- badMatch.getCode().ordinal());
+ badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
- badReq.getCode().ordinal());
+ badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
- fmFail.getCode().ordinal());
+ fmFail.getCode().ordinal());
break;
case EXPERIMENTER:
case GROUP_MOD_FAILED:
@@ -434,13 +448,12 @@
}
- public void verify(Long id) {
- pendingXid = id;
+ public void verify() {
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
- .setXid(id);
+ .setXid(xid);
sw.sendMsg(builder.build());
}
}
@@ -462,7 +475,6 @@
}
}
- invokeCallbacks();
return true;
}
@@ -481,6 +493,7 @@
countDownLatch.await();
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ //FIXME do cleanup here
return result;
}
@@ -491,6 +504,7 @@
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ // FIXME do cleanup here
return result;
}
throw new TimeoutException();
@@ -498,9 +512,7 @@
private void cleanUp() {
if (isDone() || isCancelled()) {
- if (pendingXid != null) {
- pendingFutures.remove(pendingXid);
- }
+ pendingFutures.remove(xid);
for (Long xid : fms.keySet()) {
pendingFMs.remove(xid);
}
@@ -509,21 +521,10 @@
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
- if (countDownLatch.getCount() == 0) {
- invokeCallbacks();
- }
sws.remove(dpid);
+ //FIXME don't do cleanup here
cleanUp();
}
-
- @Override
- public void addListener(Runnable runnable, Executor executor) {
- executionList.add(runnable, executor);
- }
-
- private void invokeCallbacks() {
- executionList.execute();
- }
}
}