Add a distributed map implementation of FlowMap.
- Added SharedFlowMap as an implementation of FlowMap.
- SharedFlowMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1688 and ONOS-1736.
Change-Id: I57961c415636b480bbfb4b6c117530afc34b9b94
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
new file mode 100644
index 0000000..8fd0744
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
@@ -0,0 +1,120 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * Manages the distributed shared flow map.
+ */
+class SharedFlowMap implements FlowMap {
+ private static final String FLOWMAP_NAME = "flow_map";
+ private static final String FLOWSTATEMAP_NAME = "flowstate_map";
+
+ private final IMap<String, byte[]> flowMap;
+ private final IMap<String, byte[]> flowStateMap;
+ private final SharedFlowMapEventDispatcher dispatcher;
+
+ /**
+ * Creates instance using {@link ISharedCollectionsService} service.
+ *
+ * @param service the {@link ISharedCollectionsService} service
+ */
+ SharedFlowMap(ISharedCollectionsService service) {
+ this.flowMap = checkNotNull(service.getConcurrentMap(
+ FLOWMAP_NAME, String.class, byte[].class));
+ this.flowStateMap = checkNotNull(service.getConcurrentMap(
+ FLOWSTATEMAP_NAME, String.class, byte[].class));
+ this.dispatcher = new SharedFlowMapEventDispatcher(flowMap, flowStateMap);
+ }
+
+ @Override
+ public Flow get(FlowId id) {
+ byte[] buf = flowMap.get(checkNotNull(id).toString());
+ if (buf == null) {
+ return null;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public boolean put(Flow flow) {
+ checkNotNull(flow);
+ byte[] buf = KryoFactory.serialize(flow);
+ flowMap.set(flow.getId().toString(), buf);
+ return true;
+ }
+
+ @Override
+ public Flow remove(FlowId id) {
+ String flowIdStr = checkNotNull(id).toString();
+ byte[] buf = flowMap.remove(flowIdStr);
+ if (buf == null) {
+ return null;
+ }
+ flowStateMap.remove(flowIdStr);
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public Set<Flow> getAll() {
+ Set<Flow> flows = new HashSet<>();
+ for (Entry<String, byte[]> entry : flowMap.entrySet()) {
+ flows.add((Flow) KryoFactory.deserialize(entry.getValue()));
+ }
+ return flows;
+ }
+
+ @Override
+ public boolean setState(FlowId id, FlowState state, FlowState expectedState) {
+ final String key = checkNotNull(id, "FlowId is not specified.").toString();
+ final byte[] oldValue = KryoFactory.serialize(expectedState);
+ final byte[] newValue = KryoFactory.serialize(state);
+
+ if (!flowMap.containsKey(key)) {
+ return false;
+ }
+
+ if (expectedState == FlowState.SUBMITTED) {
+ // The absence of the key means SUBMITTED state.
+ return flowStateMap.putIfAbsent(key, newValue) == null;
+ }
+
+ return flowStateMap.replace(key, oldValue, newValue);
+ };
+
+ @Override
+ public FlowState getState(FlowId id) {
+ final String key = checkNotNull(id, "FlowId is not specified.").toString();
+ if (!flowMap.containsKey(key)) {
+ return null;
+ }
+
+ final byte[] buf = flowStateMap.get(key);
+ if (buf == null) {
+ // The absence of the key means SUBMITTED state.
+ return FlowState.SUBMITTED;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public void addListener(FlowMapEventListener listener) {
+ dispatcher.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(FlowMapEventListener listener) {
+ dispatcher.removeListener(listener);
+ }
+}