DistributedFlowRuleStore: remote batch support
Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 85f928a..e5aa3e8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,15 +3,20 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.onlab.util.Tools.namedThreads;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
@@ -22,7 +27,9 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
@@ -52,8 +59,12 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -74,13 +85,26 @@
ArrayListMultimap.<Short, FlowRule>create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ReplicaInfoService replicaInfoManager;
+ protected ReplicaInfoService replicaInfoManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+
+ // FIXME switch to expiraing map/Cache?
+ private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+
+ private final ExecutorService futureListeners =
+ Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -97,36 +121,26 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.info("received add request for {}", rule);
- storeFlowRule(rule);
- // FIXME what to respond.
- try {
- message.respond(SERIALIZER.encode("ACK"));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- });
+ public void handle(final ClusterMessage message) {
+ FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+ log.info("received batch request {}", operation);
+ final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
- clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+ f.addListener(new Runnable() {
- @Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.info("received delete request for {}", rule);
- deleteFlowRule(rule);
- // FIXME what to respond.
- try {
- message.respond(SERIALIZER.encode("ACK"));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
-
+ @Override
+ public void run() {
+ CompletedBatchOperation result = Futures.getUnchecked(f);
+ try {
+ message.respond(SERIALIZER.encode(result));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, futureListeners);
}
});
@@ -159,7 +173,13 @@
// make it device specific.
@Override
public int getFlowRuleCount() {
- return flowEntries.size();
+ // implementing in-efficient operation for debugging purpose.
+ int sum = 0;
+ for (Device device : deviceService.getDevices()) {
+ final DeviceId did = device.id();
+ sum += Iterables.size(getFlowEntries(did));
+ }
+ return sum;
}
@Override
@@ -218,6 +238,7 @@
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
+ @Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
@@ -236,7 +257,7 @@
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.STORE_FLOW_RULE,
+ APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
try {
@@ -250,7 +271,7 @@
return null;
}
- private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+ private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
List<FlowEntry> toRemove = new ArrayList<>();
List<FlowEntry> toAdd = new ArrayList<>();
// TODO: backup changes to hazelcast map
@@ -261,8 +282,8 @@
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
+ toRemove.add(entry);
}
- toRemove.add(entry);
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
@@ -276,9 +297,13 @@
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
- notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
- // TODO: imlpement this.
- return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
+
+ SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+ final int batchId = localBatchIdGen.incrementAndGet();
+
+ pendingFutures.put(batchId, r);
+ notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+ return r;
}
@Override
@@ -293,18 +318,9 @@
return addOrUpdateFlowRuleInternal(rule);
}
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ log.error("Tried to update FlowRule {} state,"
+ + " while the Node was not the master.", rule);
+ return null;
}
private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
@@ -338,18 +354,9 @@
return removeFlowRuleInternal(rule);
}
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ log.error("Tried to remove FlowRule {},"
+ + " while the Node was not the master.", rule);
+ return null;
}
private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -364,6 +371,11 @@
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
+ SettableFuture<CompletedBatchOperation> future
+ = pendingFutures.get(event.subject().batchId());
+ if (future != null) {
+ future.set(event.result());
+ }
notifyDelegate(event);
}
}