Initial DistributedDlowRuleStore
- known bug: responding to ClusterMessage not possible.
Change-Id: Iaa4245c64d2a6219d7c48ed30ddca7d558dbc177
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
index b17449d..97efa5a 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
@@ -24,7 +24,18 @@
/**
* Signifies that a rule has been updated.
*/
- RULE_UPDATED
+ RULE_UPDATED,
+
+ // internal event between Manager <-> Store
+
+ /*
+ * Signifies that a request to add flow rule has been added to the store.
+ */
+ RULE_ADD_REQUESTED,
+ /*
+ * Signifies that a request to remove flow rule has been added to the store.
+ */
+ RULE_REMOVE_REQUESTED,
}
/**
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 5ce7eb1..abb9a10 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -44,16 +44,18 @@
* Stores a new flow rule without generating events.
*
* @param rule the flow rule to add
+ * @return true if the rule should be handled locally
*/
- void storeFlowRule(FlowRule rule);
+ boolean storeFlowRule(FlowRule rule);
/**
* Marks a flow rule for deletion. Actual deletion will occur
* when the provider indicates that the flow has been removed.
*
* @param rule the flow rule to delete
+ * @return true if the rule should be handled locally
*/
- void deleteFlowRule(FlowRule rule);
+ boolean deleteFlowRule(FlowRule rule);
/**
* Stores a new flow rule, or updates an existing entry.
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 9ea99c3..525946e 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -104,24 +104,52 @@
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = flowRules[i];
- final Device device = deviceService.getDevice(f.deviceId());
- final FlowRuleProvider frp = getProvider(device.providerId());
- store.storeFlowRule(f);
- frp.applyFlowRule(f);
+ boolean local = store.storeFlowRule(f);
+ if (local) {
+ // TODO: aggregate all local rules and push down once?
+ applyFlowRulesToProviders(f);
+ }
+ }
+ }
+
+ private void applyFlowRulesToProviders(FlowRule... flowRules) {
+ DeviceId did = null;
+ FlowRuleProvider frp = null;
+ for (FlowRule f : flowRules) {
+ if (!f.deviceId().equals(did)) {
+ did = f.deviceId();
+ final Device device = deviceService.getDevice(did);
+ frp = getProvider(device.providerId());
+ }
+ if (frp != null) {
+ frp.applyFlowRule(f);
+ }
}
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
FlowRule f;
- FlowRuleProvider frp;
- Device device;
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
- device = deviceService.getDevice(f.deviceId());
- store.deleteFlowRule(f);
- if (device != null) {
+ boolean local = store.deleteFlowRule(f);
+ if (local) {
+ // TODO: aggregate all local rules and push down once?
+ removeFlowRulesFromProviders(f);
+ }
+ }
+ }
+
+ private void removeFlowRulesFromProviders(FlowRule... flowRules) {
+ DeviceId did = null;
+ FlowRuleProvider frp = null;
+ for (FlowRule f : flowRules) {
+ if (!f.deviceId().equals(did)) {
+ did = f.deviceId();
+ final Device device = deviceService.getDevice(did);
frp = getProvider(device.providerId());
+ }
+ if (frp != null) {
frp.removeFlowRule(f);
}
}
@@ -135,8 +163,11 @@
for (FlowRule f : rules) {
store.deleteFlowRule(f);
+ // FIXME: only accept request and push to provider on internal event
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
+ // FIXME: flows removed from store and flows removed from might diverge
+ // get rid of #removeRulesById?
frp.removeRulesById(id, f);
}
}
@@ -352,7 +383,23 @@
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
@Override
public void notify(FlowRuleEvent event) {
- eventDispatcher.post(event);
+ switch (event.type()) {
+ case RULE_ADD_REQUESTED:
+ applyFlowRulesToProviders(event.subject());
+ break;
+ case RULE_REMOVE_REQUESTED:
+ removeFlowRulesFromProviders(event.subject());
+ break;
+
+ case RULE_ADDED:
+ case RULE_REMOVED:
+ case RULE_UPDATED:
+ // only dispatch events related to switch
+ eventDispatcher.post(event);
+ break;
+ default:
+ break;
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index e5b2ed6..3da5d25 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -2,6 +2,7 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
import java.io.IOException;
import java.util.Collection;
@@ -30,6 +31,7 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
@@ -80,10 +82,44 @@
};
// TODO: make this configurable
- private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+ private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Activate
public void activate() {
+ clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRule rule = SERIALIZER.decode(message.payload());
+ log.info("received add request for {}", rule);
+ storeFlowEntryInternal(rule);
+ // FIXME what to respond.
+ try {
+ // FIXME: #respond() not working. responded message is
+ // handled by this sender node and never goes back.
+ message.respond(SERIALIZER.encode("ACK"));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+
+ clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRule rule = SERIALIZER.decode(message.payload());
+ log.info("received delete request for {}", rule);
+ deleteFlowRuleInternal(rule);
+ // FIXME what to respond.
+ try {
+ message.respond(SERIALIZER.encode("ACK"));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+
+ }
+ });
log.info("Started");
}
@@ -131,13 +167,14 @@
}
@Override
- public void storeFlowRule(FlowRule rule) {
+ public boolean storeFlowRule(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- storeFlowEntryInternal(rule);
- return;
+ return storeFlowEntryInternal(rule);
}
+ log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
+
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.STORE_FLOW_RULE,
@@ -150,26 +187,29 @@
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
+ return false;
}
- private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+ private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
// write to local copy.
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
flowEntriesById.put(flowRule.appId(), flowEntry);
+ notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
+ return true;
}
// write to backup.
// TODO: write to a hazelcast map.
+ return false;
}
@Override
- public synchronized void deleteFlowRule(FlowRule rule) {
+ public synchronized boolean deleteFlowRule(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- deleteFlowRuleInternal(rule);
- return;
+ return deleteFlowRuleInternal(rule);
}
ClusterMessage message = new ClusterMessage(
@@ -184,15 +224,21 @@
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
+ return false;
}
- private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+ private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry == null) {
- return;
+ return false;
}
entry.setState(FlowEntryState.PENDING_REMOVE);
+
// TODO: also update backup.
+
+ notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
+
+ return true;
}
@Override
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 33ba95b..a96cacb 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -148,8 +148,9 @@
}
@Override
- public void storeFlowRule(FlowRule rule) {
+ public boolean storeFlowRule(FlowRule rule) {
final boolean added = storeFlowRuleInternal(rule);
+ return added;
}
private boolean storeFlowRuleInternal(FlowRule rule) {
@@ -166,13 +167,14 @@
}
// new flow rule added
existing.add(f);
- // TODO: notify through delegate about remote event?
+ // TODO: Should we notify only if it's "remote" event?
+ //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
return true;
}
}
@Override
- public void deleteFlowRule(FlowRule rule) {
+ public boolean deleteFlowRule(FlowRule rule) {
List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
synchronized (entries) {
@@ -180,12 +182,15 @@
if (entry.equals(rule)) {
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
- return;
+ // TODO: Should we notify only if it's "remote" event?
+ //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
+ return true;
}
}
}
}
//log.warn("Cannot find rule {}", rule);
+ return false;
}
@Override