DistributedFlowRuleStore: remote batch support
Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
index 4ba3366..d0d1820 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
@@ -30,7 +30,7 @@
* @param request batch operation request.
* @return event.
*/
- public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
+ public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
return event;
}
@@ -41,7 +41,7 @@
* @param result completed batch operation result.
* @return event.
*/
- public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
+ public static FlowRuleBatchEvent completed(FlowRuleBatchRequest request, CompletedBatchOperation result) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
return event;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index 0414fcb..34e3d31 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -9,10 +9,12 @@
public class FlowRuleBatchRequest {
+ private final int batchId;
private final List<FlowEntry> toAdd;
private final List<FlowEntry> toRemove;
- public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+ public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+ this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
@@ -35,4 +37,8 @@
}
return new FlowRuleBatchOperation(entries);
}
+
+ public int batchId() {
+ return batchId;
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3ef9fc8..e91bb89 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -2,12 +2,14 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.namedThreads;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -74,6 +76,8 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
+ private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -92,6 +96,8 @@
@Deactivate
public void deactivate() {
+ futureListeners.shutdownNow();
+
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
@@ -398,9 +404,9 @@
result.addListener(new Runnable() {
@Override
public void run() {
- store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(request, Futures.getUnchecked(result)));
}
- }, Executors.newCachedThreadPool());
+ }, futureListeners);
break;
case BATCH_OPERATION_COMPLETED:
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 85f928a..e5aa3e8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,15 +3,20 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.onlab.util.Tools.namedThreads;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
@@ -22,7 +27,9 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
@@ -52,8 +59,12 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -74,13 +85,26 @@
ArrayListMultimap.<Short, FlowRule>create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ReplicaInfoService replicaInfoManager;
+ protected ReplicaInfoService replicaInfoManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+
+ // FIXME switch to expiraing map/Cache?
+ private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+
+ private final ExecutorService futureListeners =
+ Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -97,36 +121,26 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.info("received add request for {}", rule);
- storeFlowRule(rule);
- // FIXME what to respond.
- try {
- message.respond(SERIALIZER.encode("ACK"));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- });
+ public void handle(final ClusterMessage message) {
+ FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+ log.info("received batch request {}", operation);
+ final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
- clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+ f.addListener(new Runnable() {
- @Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.info("received delete request for {}", rule);
- deleteFlowRule(rule);
- // FIXME what to respond.
- try {
- message.respond(SERIALIZER.encode("ACK"));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
-
+ @Override
+ public void run() {
+ CompletedBatchOperation result = Futures.getUnchecked(f);
+ try {
+ message.respond(SERIALIZER.encode(result));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, futureListeners);
}
});
@@ -159,7 +173,13 @@
// make it device specific.
@Override
public int getFlowRuleCount() {
- return flowEntries.size();
+ // implementing in-efficient operation for debugging purpose.
+ int sum = 0;
+ for (Device device : deviceService.getDevices()) {
+ final DeviceId did = device.id();
+ sum += Iterables.size(getFlowEntries(did));
+ }
+ return sum;
}
@Override
@@ -218,6 +238,7 @@
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
+ @Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
@@ -236,7 +257,7 @@
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.STORE_FLOW_RULE,
+ APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
try {
@@ -250,7 +271,7 @@
return null;
}
- private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+ private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
List<FlowEntry> toRemove = new ArrayList<>();
List<FlowEntry> toAdd = new ArrayList<>();
// TODO: backup changes to hazelcast map
@@ -261,8 +282,8 @@
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
+ toRemove.add(entry);
}
- toRemove.add(entry);
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
@@ -276,9 +297,13 @@
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
- notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
- // TODO: imlpement this.
- return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
+
+ SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+ final int batchId = localBatchIdGen.incrementAndGet();
+
+ pendingFutures.put(batchId, r);
+ notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+ return r;
}
@Override
@@ -293,18 +318,9 @@
return addOrUpdateFlowRuleInternal(rule);
}
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ log.error("Tried to update FlowRule {} state,"
+ + " while the Node was not the master.", rule);
+ return null;
}
private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
@@ -338,18 +354,9 @@
return removeFlowRuleInternal(rule);
}
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ log.error("Tried to remove FlowRule {},"
+ + " while the Node was not the master.", rule);
+ return null;
}
private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -364,6 +371,11 @@
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
+ SettableFuture<CompletedBatchOperation> future
+ = pendingFutures.get(event.subject().batchId());
+ if (future != null) {
+ future.set(event.result());
+ }
notifyDelegate(event);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index ca833b8..ef68b55 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -7,10 +7,10 @@
*/
public final class FlowStoreMessageSubjects {
private FlowStoreMessageSubjects() {}
- public static final MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
- public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
- public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
- new MessageSubject("peer-forward-add-or-update-flow-rule");
- public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
- public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
+
+ public static final MessageSubject APPLY_BATCH_FLOWS
+ = new MessageSubject("peer-forward-apply-batch");
+
+ public static final MessageSubject GET_FLOW_ENTRY
+ = new MessageSubject("peer-forward-get-flow-entry");
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index 316a3b4..f9352fe 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -58,7 +58,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
@Activate
public void activate() {
@@ -76,9 +75,9 @@
}
};
- roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer);
+ roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
- terms = new SMap(theInstance.getMap("terms"), this.serializer);
+ terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
clusterSize = theInstance.getAtomicLong("clustersize");
log.info("Started");
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 7fddb01..0e9e19c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
@@ -27,12 +28,15 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
@@ -80,6 +84,7 @@
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
+ LinkedList.class,
//
//
ControllerNode.State.class,
@@ -118,7 +123,11 @@
DefaultTrafficTreatment.class,
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
- RoleInfo.class
+ RoleInfo.class,
+ FlowRuleBatchOperation.class,
+ CompletedBatchOperation.class,
+ FlowRuleBatchEntry.class,
+ FlowRuleBatchEntry.FlowRuleOperation.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index bbfc263..8210a2f 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -176,8 +176,8 @@
}
// new flow rule added
existing.add(f);
- notifyDelegate(FlowRuleBatchEvent.create(
- new FlowRuleBatchRequest(
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest( 1, /* FIXME generate something */
Arrays.<FlowEntry>asList(f),
Collections.<FlowEntry>emptyList())));
}
@@ -194,8 +194,8 @@
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
- notifyDelegate(FlowRuleBatchEvent.create(
- new FlowRuleBatchRequest(
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest(1, /* FIXME generate something */
Collections.<FlowEntry>emptyList(),
Arrays.<FlowEntry>asList(entry))));
}