Towards a distributed flow rule store
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index bde57c6..85f928a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -5,10 +5,14 @@
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -19,11 +23,17 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
@@ -43,6 +53,7 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -50,7 +61,7 @@
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
- extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -92,7 +103,7 @@
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received add request for {}", rule);
- storeFlowEntryInternal(rule);
+ storeFlowRule(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
@@ -108,7 +119,7 @@
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received delete request for {}", rule);
- deleteFlowRuleInternal(rule);
+ deleteFlowRule(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
@@ -118,6 +129,22 @@
}
});
+
+ clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRule rule = SERIALIZER.decode(message.payload());
+ log.info("received get flow entry request for {}", rule);
+ FlowEntry flowEntry = getFlowEntryInternal(rule);
+ try {
+ message.respond(SERIALIZER.encode(flowEntry));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+
log.info("Started");
}
@@ -127,6 +154,9 @@
}
+ // TODO: This is not a efficient operation on a distributed sharded
+ // flow store. We need to revisit the need for this operation or at least
+ // make it device specific.
@Override
public int getFlowRuleCount() {
return flowEntries.size();
@@ -134,7 +164,26 @@
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
- return getFlowEntryInternal(rule);
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return getFlowEntryInternal(rule);
+ }
+
+ log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), rule.deviceId());
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
}
private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
@@ -165,19 +214,30 @@
}
@Override
- public boolean storeFlowRule(FlowRule rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return storeFlowEntryInternal(rule);
+ public void storeFlowRule(FlowRule rule) {
+ storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+ }
+
+ public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+ if (operation.getOperations().isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
- log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), rule.deviceId());
+ DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return storeBatchInternal(operation);
+ }
+
+ log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.STORE_FLOW_RULE,
- SERIALIZER.encode(rule));
+ SERIALIZER.encode(operation));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -186,58 +246,44 @@
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
- return false;
+
+ return null;
}
- private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
- StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
- DeviceId deviceId = flowRule.deviceId();
- // write to local copy.
- if (!flowEntries.containsEntry(deviceId, flowEntry)) {
- flowEntries.put(deviceId, flowEntry);
- flowEntriesById.put(flowRule.appId(), flowEntry);
- notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
- return true;
+ private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+ List<FlowEntry> toRemove = new ArrayList<>();
+ List<FlowEntry> toAdd = new ArrayList<>();
+ // TODO: backup changes to hazelcast map
+ for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+ FlowRule flowRule = batchEntry.getTarget();
+ FlowRuleOperation op = batchEntry.getOperator();
+ if (op.equals(FlowRuleOperation.REMOVE)) {
+ StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ }
+ toRemove.add(entry);
+ } else if (op.equals(FlowRuleOperation.ADD)) {
+ StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+ DeviceId deviceId = flowRule.deviceId();
+ if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+ flowEntries.put(deviceId, flowEntry);
+ flowEntriesById.put(flowRule.appId(), flowEntry);
+ toAdd.add(flowEntry);
+ }
+ }
}
- // write to backup.
- // TODO: write to a hazelcast map.
- return false;
+ if (toAdd.isEmpty() && toRemove.isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ }
+ notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
+ // TODO: imlpement this.
+ return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
}
@Override
- public synchronized boolean deleteFlowRule(FlowRule rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return deleteFlowRuleInternal(rule);
- }
-
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.DELETE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
- return false;
- }
-
- private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
- StoredFlowEntry entry = getFlowEntryInternal(flowRule);
- if (entry == null) {
- return false;
- }
- entry.setState(FlowEntryState.PENDING_REMOVE);
-
- // TODO: also update backup.
-
- notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
-
- return true;
+ public void deleteFlowRule(FlowRule rule) {
+ storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
}
@Override
@@ -315,4 +361,9 @@
}
// TODO: also update backup.
}
+
+ @Override
+ public void batchOperationComplete(FlowRuleBatchEvent event) {
+ notifyDelegate(event);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index a43dad6..ca833b8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -12,4 +12,5 @@
public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
new MessageSubject("peer-forward-add-or-update-flow-rule");
public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+ public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 9de6d8c..e6259d8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -26,10 +26,12 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instructions;
@@ -93,6 +95,8 @@
HostId.class,
HostDescription.class,
DefaultHostDescription.class,
+ DefaultFlowEntry.class,
+ StoredFlowEntry.class,
DefaultFlowRule.class,
FlowId.class,
DefaultTrafficSelector.class,
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index a96cacb..3d10d3d 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -3,6 +3,8 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -10,6 +12,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -17,11 +20,17 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
@@ -33,6 +42,7 @@
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.Futures;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
@@ -40,7 +50,7 @@
@Component(immediate = true)
@Service
public class SimpleFlowRuleStore
- extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -148,12 +158,11 @@
}
@Override
- public boolean storeFlowRule(FlowRule rule) {
- final boolean added = storeFlowRuleInternal(rule);
- return added;
+ public void storeFlowRule(FlowRule rule) {
+ storeFlowRuleInternal(rule);
}
- private boolean storeFlowRuleInternal(FlowRule rule) {
+ private void storeFlowRuleInternal(FlowRule rule) {
StoredFlowEntry f = new DefaultFlowEntry(rule);
final DeviceId did = f.deviceId();
final FlowId fid = f.id();
@@ -162,19 +171,20 @@
for (StoredFlowEntry fe : existing) {
if (fe.equals(rule)) {
// was already there? ignore
- return false;
+ return;
}
}
// new flow rule added
existing.add(f);
- // TODO: Should we notify only if it's "remote" event?
- //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
- return true;
+ notifyDelegate(FlowRuleBatchEvent.create(
+ new FlowRuleBatchRequest(
+ Arrays.<FlowEntry>asList(f),
+ Collections.<FlowEntry>emptyList())));
}
}
@Override
- public boolean deleteFlowRule(FlowRule rule) {
+ public void deleteFlowRule(FlowRule rule) {
List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
synchronized (entries) {
@@ -184,13 +194,11 @@
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
//notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
- return true;
}
}
}
}
//log.warn("Cannot find rule {}", rule);
- return false;
}
@Override
@@ -236,4 +244,24 @@
}
return null;
}
+
+ @Override
+ public Future<CompletedBatchOperation> storeBatch(
+ FlowRuleBatchOperation batchOperation) {
+ for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+ if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
+ storeFlowRule(entry.getTarget());
+ } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
+ deleteFlowRule(entry.getTarget());
+ } else {
+ throw new UnsupportedOperationException("Unsupported operation type");
+ }
+ }
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ }
+
+ @Override
+ public void batchOperationComplete(FlowRuleBatchEvent event) {
+ notifyDelegate(event);
+ }
}