Add a distributed map implementation of FlowBatchMap.
- Added SharedFlowBatchMap as an implementation of FlowBatchMap.
- SharedFlowBatchMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1842.
Change-Id: I480b3adecc493f82edbd6245e2f8aaf702c65d4f
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
index 6886d68..153bcf3 100644
--- a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
@@ -41,7 +41,7 @@
* Gets all {@link FlowBatchOperation} objects existing in the map.
* <p>
* The changes to the returned set does not affect the original map.
-
+ *
* @return a set of {@link FlowBatchOperation} objects
*/
Set<FlowBatchOperation> getAll();
@@ -84,9 +84,10 @@
void removeListener(FlowBatchMapEventListener listener);
/**
- * Checks if this instance is a leader of the map.
+ * Checks if the specified flow batch operation is stored in local storage.
*
- * @return true if it is leader, false otherwise
+ * @param id the ID of the batch operation
+ * @return true if the specified batch operation is stored in local storage
*/
- boolean isLeader();
+ boolean isLocal(FlowBatchId id);
}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
index 93a0bdf..345568f 100644
--- a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
@@ -28,7 +28,7 @@
* @param oldState the old state of the {@link FlowBatchOperation}
* @param currentState the current state of the {@link FlowBatchOperation}
*/
- void flowStateChanged(FlowBatchId id,
+ void flowBatchOperationStateChanged(FlowBatchId id,
FlowBatchState oldState, FlowBatchState currentState);
}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java
new file mode 100644
index 0000000..368f88d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java
@@ -0,0 +1,132 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * An implementation of shared distributed map of {@link FlowBatchMap}.
+ * <p>
+ * This class's implementation is almost the same to {@link SharedFlowMap}. The
+ * base class for them should be implemented.
+ */
+public class SharedFlowBatchMap implements FlowBatchMap {
+ private static final String FLOWBATCHMAP_NAME = "flowbatch_map";
+ private static final String FLOWBATCHSTATEMAP_NAME = "flowbatchstate_map";
+
+ private final IMap<String, byte[]> flowBatchMap;
+ private final IMap<String, byte[]> flowBatchStateMap;
+ private final SharedFlowBatchMapEventDispatcher dispatcher;
+
+ /**
+ * Creates instance using {@link ISharedCollectionsService} service.
+ *
+ * @param service the {@link ISharedCollectionsService} service
+ */
+ SharedFlowBatchMap(ISharedCollectionsService service) {
+ this.flowBatchMap = checkNotNull(service.getConcurrentMap(
+ FLOWBATCHMAP_NAME, String.class, byte[].class));
+ this.flowBatchStateMap = checkNotNull(service.getConcurrentMap(
+ FLOWBATCHSTATEMAP_NAME, String.class, byte[].class));
+ this.dispatcher = new SharedFlowBatchMapEventDispatcher(
+ flowBatchMap, flowBatchStateMap);
+ }
+
+ @Override
+ public FlowBatchOperation get(FlowBatchId id) {
+ byte[] buf = flowBatchMap.get(checkNotNull(id).toString());
+ if (buf == null) {
+ return null;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public boolean put(FlowBatchId id, FlowBatchOperation flowOp) {
+ byte[] buf = KryoFactory.serialize(checkNotNull(flowOp));
+ flowBatchMap.set(id.toString(), checkNotNull(buf));
+ return true;
+ }
+
+ @Override
+ public FlowBatchOperation remove(FlowBatchId id) {
+ String flowBatchIdStr = checkNotNull(id).toString();
+ byte[] buf = flowBatchMap.remove(flowBatchIdStr);
+ if (buf == null) {
+ return null;
+ }
+ flowBatchStateMap.remove(flowBatchIdStr);
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public Set<FlowBatchOperation> getAll() {
+ Set<FlowBatchOperation> flowBatchs = new HashSet<>();
+ for (Entry<String, byte[]> entry : flowBatchMap.entrySet()) {
+ flowBatchs.add((FlowBatchOperation)
+ KryoFactory.deserialize(entry.getValue()));
+ }
+ return flowBatchs;
+ }
+
+ @Override
+ public boolean setState(FlowBatchId id, FlowBatchState state,
+ FlowBatchState expectedState) {
+ final String key = checkNotNull(
+ id, "FlowBatchId is not specified.").toString();
+ final byte[] oldValue = KryoFactory.serialize(expectedState);
+ final byte[] newValue = KryoFactory.serialize(state);
+
+ if (!flowBatchMap.containsKey(key)) {
+ return false;
+ }
+
+ if (expectedState == FlowBatchState.SUBMITTED) {
+ // The absence of the key means SUBMITTED state.
+ return flowBatchStateMap.putIfAbsent(key, newValue) == null;
+ }
+
+ return flowBatchStateMap.replace(key, oldValue, newValue);
+ }
+
+ @Override
+ public FlowBatchState getState(FlowBatchId id) {
+ final String key = checkNotNull(
+ id, "FlowBatchId is not specified.").toString();
+ if (!flowBatchMap.containsKey(key)) {
+ return null;
+ }
+
+ final byte[] buf = flowBatchStateMap.get(key);
+ if (buf == null) {
+ // The absence of the key means SUBMITTED state.
+ return FlowBatchState.SUBMITTED;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public void addListener(FlowBatchMapEventListener listener) {
+ dispatcher.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(FlowBatchMapEventListener listener) {
+ dispatcher.removeListener(listener);
+ }
+
+ @Override
+ public boolean isLocal(FlowBatchId id) {
+ return flowBatchMap.localKeySet().contains(checkNotNull(id).toString());
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
new file mode 100644
index 0000000..7107be8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
@@ -0,0 +1,129 @@
+package net.onrc.onos.core.flowmanager;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * This class is used for managing listeners of the {@link SharedFlowBatchMap}.
+ */
+public class SharedFlowBatchMapEventDispatcher implements EntryListener<String, byte[]> {
+ private CopyOnWriteArraySet<FlowBatchMapEventListener> listeners;
+ private static final Logger log = LoggerFactory
+ .getLogger(SharedFlowBatchMapEventDispatcher.class);
+
+ /**
+ * Creates dispatcher using flow batch map objects.
+ *
+ * @param flowBatchMap the flow batch map object
+ * @param flowBatchStateMap the flow batch state map object
+ */
+ public SharedFlowBatchMapEventDispatcher(IMap<String, byte[]> flowBatchMap,
+ IMap<String, byte[]> flowBatchStateMap) {
+ listeners = new CopyOnWriteArraySet<>();
+ flowBatchMap.addEntryListener(this, true);
+ flowBatchStateMap.addEntryListener(this, true);
+ }
+
+ /**
+ * Adds a listener for listening events related to the map.
+ *
+ * @param listener the {@link FlowBatchMapEventListener} to be added
+ */
+ public void addListener(FlowBatchMapEventListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Removes a listener for listening events related to the map.
+ *
+ * @param listener the {@link FlowBatchMapEventListener} to be removed
+ */
+ public void removeListener(FlowBatchMapEventListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof FlowBatchOperation) {
+ // Handles events from flowBatchMap.
+ final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+ final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+ log.trace("Flow batch operation ID:{}, {} was added", id, flowOp);
+ for (FlowBatchMapEventListener e : listeners) {
+ FlowBatchOperation copiedFlowOp =
+ new FlowBatchOperation(flowOp.getOperations());
+ e.flowBatchOperationAdded(id, copiedFlowOp);
+ e.flowBatchOperationStateChanged(id, null, FlowBatchState.SUBMITTED);
+ }
+ } else if (value instanceof FlowBatchState) {
+ // Handles events from flowBatchStateMap.
+ final FlowBatchState state = (FlowBatchState) value;
+ final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+ log.trace("FlowState of FlowId {} was set to {}", id, state);
+ for (FlowBatchMapEventListener e : listeners) {
+ e.flowBatchOperationStateChanged(id, FlowBatchState.SUBMITTED, state);
+ }
+ } else {
+ throw new IllegalStateException("Added illegal value: " + value);
+ }
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof FlowBatchOperation) {
+ // Handles events from flowBatchMap.
+ final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+ final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+ log.trace("Flow batch operation ID:{}, {} was removed", id, flowOp);
+ for (FlowBatchMapEventListener e : listeners) {
+ e.flowBatchOperationRemoved(id);
+ }
+ } else if (value instanceof FlowBatchState) {
+ // Handles events from flowBatchStateMap.
+ log.trace("Flow batch state {} of ID:{} was removed", value, event.getKey());
+ } else {
+ throw new IllegalStateException("Removed illegal value: " + value);
+ }
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof FlowBatchOperation) {
+ // Handles events from flowBatchMap.
+ log.trace("Flow batch operation ID:{} updated by {}", event.getKey(), value);
+ } else if (value instanceof FlowBatchState) {
+ // Handles events from flowBatchStateMap.
+ Object oldValue = KryoFactory.deserialize(event.getOldValue());
+ final FlowBatchState currentState = (FlowBatchState) value;
+ final FlowBatchState oldState = (FlowBatchState) oldValue;
+ final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+ log.trace("Flow batch state of ID:{} was updated from {} to {}",
+ id, oldState, currentState);
+ for (FlowBatchMapEventListener e : listeners) {
+ e.flowBatchOperationStateChanged(id, oldState, currentState);
+ }
+ } else {
+ throw new IllegalStateException("Updated illegal value: " + value);
+ }
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<String, byte[]> event) {
+ // do nothing.
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index c9c3c7b..f5bf01d 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -16,6 +16,8 @@
import net.floodlightcontroller.core.IFloodlightProviderService.Role;
import net.floodlightcontroller.util.MACAddress;
import net.onrc.onos.api.batchoperation.BatchOperationEntry;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
import net.onrc.onos.api.flowmanager.FlowId;
import net.onrc.onos.api.flowmanager.FlowLink;
import net.onrc.onos.api.flowmanager.FlowState;
@@ -254,6 +256,9 @@
// New flow manager related classes
kryo.register(FlowId.class);
kryo.register(FlowState.class);
+ kryo.register(FlowBatchOperation.class);
+ kryo.register(FlowBatchOperation.Operator.class);
+ kryo.register(FlowBatchState.class);
kryo.register(net.onrc.onos.api.flowmanager.Path.class);
kryo.register(Tree.class);
kryo.register(FlowLink.class);