Add a distributed map implementation of FlowMap.

- Added SharedFlowMap as an implementation of FlowMap.
- SharedFlowMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1688 and ONOS-1736.

Change-Id: I57961c415636b480bbfb4b6c117530afc34b9b94
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
new file mode 100644
index 0000000..8fd0744
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
@@ -0,0 +1,120 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * Manages the distributed shared flow map.
+ */
+class SharedFlowMap implements FlowMap {
+    private static final String FLOWMAP_NAME = "flow_map";
+    private static final String FLOWSTATEMAP_NAME = "flowstate_map";
+
+    private final IMap<String, byte[]> flowMap;
+    private final IMap<String, byte[]> flowStateMap;
+    private final SharedFlowMapEventDispatcher dispatcher;
+
+    /**
+     * Creates instance using {@link ISharedCollectionsService} service.
+     *
+     * @param service the {@link ISharedCollectionsService} service
+     */
+    SharedFlowMap(ISharedCollectionsService service) {
+        this.flowMap = checkNotNull(service.getConcurrentMap(
+                FLOWMAP_NAME, String.class, byte[].class));
+        this.flowStateMap = checkNotNull(service.getConcurrentMap(
+                FLOWSTATEMAP_NAME, String.class, byte[].class));
+        this.dispatcher = new SharedFlowMapEventDispatcher(flowMap, flowStateMap);
+    }
+
+    @Override
+    public Flow get(FlowId id) {
+        byte[] buf = flowMap.get(checkNotNull(id).toString());
+        if (buf == null) {
+            return null;
+        }
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public boolean put(Flow flow) {
+        checkNotNull(flow);
+        byte[] buf = KryoFactory.serialize(flow);
+        flowMap.set(flow.getId().toString(), buf);
+        return true;
+    }
+
+    @Override
+    public Flow remove(FlowId id) {
+        String flowIdStr = checkNotNull(id).toString();
+        byte[] buf = flowMap.remove(flowIdStr);
+        if (buf == null) {
+            return null;
+        }
+        flowStateMap.remove(flowIdStr);
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public Set<Flow> getAll() {
+        Set<Flow> flows = new HashSet<>();
+        for (Entry<String, byte[]> entry : flowMap.entrySet()) {
+            flows.add((Flow) KryoFactory.deserialize(entry.getValue()));
+        }
+        return flows;
+    }
+
+    @Override
+    public boolean setState(FlowId id, FlowState state, FlowState expectedState) {
+        final String key = checkNotNull(id, "FlowId is not specified.").toString();
+        final byte[] oldValue = KryoFactory.serialize(expectedState);
+        final byte[] newValue = KryoFactory.serialize(state);
+
+        if (!flowMap.containsKey(key)) {
+            return false;
+        }
+
+        if (expectedState == FlowState.SUBMITTED) {
+            // The absence of the key means SUBMITTED state.
+            return flowStateMap.putIfAbsent(key, newValue) == null;
+        }
+
+        return flowStateMap.replace(key, oldValue, newValue);
+    };
+
+    @Override
+    public FlowState getState(FlowId id) {
+        final String key = checkNotNull(id, "FlowId is not specified.").toString();
+        if (!flowMap.containsKey(key)) {
+            return null;
+        }
+
+        final byte[] buf = flowStateMap.get(key);
+        if (buf == null) {
+            // The absence of the key means SUBMITTED state.
+            return FlowState.SUBMITTED;
+        }
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public void addListener(FlowMapEventListener listener) {
+        dispatcher.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(FlowMapEventListener listener) {
+        dispatcher.removeListener(listener);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java
new file mode 100644
index 0000000..cc41ce0
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java
@@ -0,0 +1,125 @@
+package net.onrc.onos.core.flowmanager;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * This class is used for managing listeners of the {@link SharedFlowMap}.
+ */
+class SharedFlowMapEventDispatcher implements EntryListener<String, byte[]> {
+    private CopyOnWriteArraySet<FlowMapEventListener> listeners;
+    private static final Logger log = LoggerFactory
+            .getLogger(SharedFlowMapEventDispatcher.class);
+
+    /**
+     * Creates dispatcher using flow map objects.
+     *
+     * @param flowMap the flow map object
+     * @param flowStateMap the flow state map object
+     */
+    SharedFlowMapEventDispatcher(IMap<String, byte[]> flowMap,
+            IMap<String, byte[]> flowStateMap) {
+        listeners = new CopyOnWriteArraySet<>();
+        flowMap.addEntryListener(this, true);
+        flowStateMap.addEntryListener(this, true);
+    }
+
+    /**
+     * Adds a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowMapEventListener} to be added
+     */
+    void addListener(FlowMapEventListener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Removes a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowMapEventListener} to be removed
+     */
+    void removeListener(FlowMapEventListener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void entryAdded(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof Flow) {
+            // Handles events from flowMap.
+            final Flow flow = (Flow) value;
+            log.trace("Flow {} was added", flow);
+            for (FlowMapEventListener e : listeners) {
+                e.flowAdded(flow.getId(), flow);
+                e.flowStateChanged(flow.getId(), null, FlowState.SUBMITTED);
+            }
+        } else if (value instanceof FlowState) {
+            // Handles events from flowStateMap.
+            final FlowState state = (FlowState) value;
+            final FlowId id = new FlowId(Long.parseLong(event.getKey()));
+            log.trace("FlowState of FlowId {} was set to {}", id, state);
+            for (FlowMapEventListener e : listeners) {
+                e.flowStateChanged(id, FlowState.SUBMITTED, state);
+            }
+        } else {
+            throw new IllegalStateException("Added illegal value: " + value.toString());
+        }
+    }
+
+    @Override
+    public void entryRemoved(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof Flow) {
+            // Handles events from flowMap.
+            final Flow flow = (Flow) value;
+            log.trace("Flow {} was removed", flow);
+            for (FlowMapEventListener e : listeners) {
+                e.flowRemoved(flow.getId());
+            }
+        } else if (value instanceof FlowState) {
+            // Handles events from flowStateMap.
+            log.trace("FlowState {} of FlowId {} was removed", value, event.getKey());
+        } else {
+            throw new IllegalStateException("Removed illegal value: " + value.toString());
+        }
+    }
+
+    @Override
+    public void entryUpdated(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof Flow) {
+            // Handles events from flowMap.
+            log.trace("Flow Updated by {}", value);
+        } else if (value instanceof FlowState) {
+            // Handles events from flowStateMap.
+            Object oldValue = KryoFactory.deserialize(event.getOldValue());
+            final FlowState state = (FlowState) value;
+            final FlowState oldState = (FlowState) oldValue;
+            final FlowId id = new FlowId(Long.parseLong(event.getKey()));
+            log.trace("FlowState of FlowId {} was updated from {} to {}",
+                    id, oldState, state);
+            for (FlowMapEventListener e : listeners) {
+                e.flowStateChanged(id, oldState, state);
+            }
+        } else {
+            throw new IllegalStateException("Updated illegal value: " + value.toString());
+        }
+    }
+
+    @Override
+    public void entryEvicted(EntryEvent<String, byte[]> event) {
+        // do nothing.
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 5fec5b2..c9c3c7b 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -18,6 +18,7 @@
 import net.onrc.onos.api.batchoperation.BatchOperationEntry;
 import net.onrc.onos.api.flowmanager.FlowId;
 import net.onrc.onos.api.flowmanager.FlowLink;
+import net.onrc.onos.api.flowmanager.FlowState;
 import net.onrc.onos.api.flowmanager.OpticalPathFlow;
 import net.onrc.onos.api.flowmanager.PacketPathFlow;
 import net.onrc.onos.api.flowmanager.SingleDstTreeFlow;
@@ -252,6 +253,7 @@
 
         // New flow manager related classes
         kryo.register(FlowId.class);
+        kryo.register(FlowState.class);
         kryo.register(net.onrc.onos.api.flowmanager.Path.class);
         kryo.register(Tree.class);
         kryo.register(FlowLink.class);