Add a distributed map implementation of FlowBatchMap.

- Added SharedFlowBatchMap as an implementation of FlowBatchMap.
- SharedFlowBatchMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1842.

Change-Id: I480b3adecc493f82edbd6245e2f8aaf702c65d4f
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
new file mode 100644
index 0000000..7107be8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
@@ -0,0 +1,129 @@
+package net.onrc.onos.core.flowmanager;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * This class is used for managing listeners of the {@link SharedFlowBatchMap}.
+ */
+public class SharedFlowBatchMapEventDispatcher implements EntryListener<String, byte[]> {
+    private CopyOnWriteArraySet<FlowBatchMapEventListener> listeners;
+    private static final Logger log = LoggerFactory
+            .getLogger(SharedFlowBatchMapEventDispatcher.class);
+
+    /**
+     * Creates dispatcher using flow batch map objects.
+     *
+     * @param flowBatchMap the flow batch map object
+     * @param flowBatchStateMap the flow batch state map object
+     */
+    public SharedFlowBatchMapEventDispatcher(IMap<String, byte[]> flowBatchMap,
+            IMap<String, byte[]> flowBatchStateMap) {
+        listeners = new CopyOnWriteArraySet<>();
+        flowBatchMap.addEntryListener(this, true);
+        flowBatchStateMap.addEntryListener(this, true);
+    }
+
+    /**
+     * Adds a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowBatchMapEventListener} to be added
+     */
+    public void addListener(FlowBatchMapEventListener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Removes a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowBatchMapEventListener} to be removed
+     */
+    public void removeListener(FlowBatchMapEventListener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void entryAdded(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch operation ID:{}, {} was added", id, flowOp);
+            for (FlowBatchMapEventListener e : listeners) {
+                FlowBatchOperation copiedFlowOp =
+                        new FlowBatchOperation(flowOp.getOperations());
+                e.flowBatchOperationAdded(id, copiedFlowOp);
+                e.flowBatchOperationStateChanged(id, null, FlowBatchState.SUBMITTED);
+            }
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            final FlowBatchState state = (FlowBatchState) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("FlowState of FlowId {} was set to {}", id, state);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationStateChanged(id, FlowBatchState.SUBMITTED, state);
+            }
+        } else {
+            throw new IllegalStateException("Added illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryRemoved(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch operation ID:{}, {} was removed", id, flowOp);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationRemoved(id);
+            }
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            log.trace("Flow batch state {} of ID:{} was removed", value, event.getKey());
+        } else {
+            throw new IllegalStateException("Removed illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryUpdated(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            log.trace("Flow batch operation ID:{} updated by {}", event.getKey(), value);
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            Object oldValue = KryoFactory.deserialize(event.getOldValue());
+            final FlowBatchState currentState = (FlowBatchState) value;
+            final FlowBatchState oldState = (FlowBatchState) oldValue;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch state of ID:{} was updated from {} to {}",
+                    id, oldState, currentState);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationStateChanged(id, oldState, currentState);
+            }
+        } else {
+            throw new IllegalStateException("Updated illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryEvicted(EntryEvent<String, byte[]> event) {
+        // do nothing.
+    }
+
+}