Initial implementation for DistributedFlowRuleStore utilizing master/backup replication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 084435f..5062416 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,14 +3,20 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
@@ -21,6 +27,13 @@
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
+import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
@@ -28,9 +41,8 @@
import com.google.common.collect.Multimap;
/**
- * Manages inventory of flow rules using trivial in-memory implementation.
+ * Manages inventory of flow rules using a distributed state management protocol.
*/
-//FIXME I LIE. I AIN'T DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
@@ -46,6 +58,28 @@
private final Multimap<Short, FlowRule> flowEntriesById =
ArrayListMultimap.<Short, FlowRule>create();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ReplicaInfoManager replicaInfoManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(DistributedStoreSerializers.COMMON)
+ .build()
+ .populate(1);
+ }
+ };
+
+ // TODO: make this configurable
+ private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+
@Activate
public void activate() {
log.info("Started");
@@ -91,26 +125,92 @@
}
@Override
- public synchronized void storeFlowRule(FlowRule rule) {
- FlowEntry f = new DefaultFlowEntry(rule);
- DeviceId did = f.deviceId();
- if (!flowEntries.containsEntry(did, f)) {
- flowEntries.put(did, f);
- flowEntriesById.put(rule.appId(), f);
+ public void storeFlowRule(FlowRule rule) {
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+ storeFlowEntryInternal(rule);
+ return;
}
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.STORE_FLOW_RULE,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+ FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+ DeviceId deviceId = flowRule.deviceId();
+ // write to local copy.
+ if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+ flowEntries.put(deviceId, flowEntry);
+ flowEntriesById.put(flowRule.appId(), flowEntry);
+ }
+ // write to backup.
+ // TODO: write to a hazelcast map.
}
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
- FlowEntry entry = getFlowEntry(rule);
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+ deleteFlowRuleInternal(rule);
+ return;
+ }
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.DELETE_FLOW_RULE,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+ FlowEntry entry = getFlowEntry(flowRule);
if (entry == null) {
return;
}
entry.setState(FlowEntryState.PENDING_REMOVE);
+ // TODO: also update backup.
}
@Override
- public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+ public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+ return addOrUpdateFlowRuleInternal(rule);
+ }
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
@@ -128,15 +228,39 @@
flowEntries.put(did, rule);
return null;
+
+ // TODO: also update backup.
}
@Override
- public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+ public FlowRuleEvent removeFlowRule(FlowEntry rule) {
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+ // bypass and handle it locally
+ return removeFlowRuleInternal(rule);
+ }
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
// This is where one could mark a rule as removed and still keep it in the store.
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
+ // TODO: also update backup.
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
new file mode 100644
index 0000000..a43dad6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.flow.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedFlowRuleStore peer-peer communication.
+ */
+public final class FlowStoreMessageSubjects {
+ private FlowStoreMessageSubjects() {}
+ public static final MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
+ public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
+ public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
+ new MessageSubject("peer-forward-add-or-update-flow-rule");
+ public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index b63f844..f17c268 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -26,6 +26,7 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.link.DefaultLinkDescription;
@@ -86,7 +87,8 @@
Timestamp.class,
HostId.class,
HostDescription.class,
- DefaultHostDescription.class
+ DefaultHostDescription.class,
+ DefaultFlowRule.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())