Add a distributed map implementation of FlowBatchMap.

- Added SharedFlowBatchMap as an implementation of FlowBatchMap.
- SharedFlowBatchMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1842.

Change-Id: I480b3adecc493f82edbd6245e2f8aaf702c65d4f
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
index 6886d68..153bcf3 100644
--- a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMap.java
@@ -41,7 +41,7 @@
      * Gets all {@link FlowBatchOperation} objects existing in the map.
      * <p>
      * The changes to the returned set does not affect the original map.
-
+     *
      * @return a set of {@link FlowBatchOperation} objects
      */
     Set<FlowBatchOperation> getAll();
@@ -84,9 +84,10 @@
     void removeListener(FlowBatchMapEventListener listener);
 
     /**
-     * Checks if this instance is a leader of the map.
+     * Checks if the specified flow batch operation is stored in local storage.
      *
-     * @return true if it is leader, false otherwise
+     * @param id the ID of the batch operation
+     * @return true if the specified batch operation is stored in local storage
      */
-    boolean isLeader();
+    boolean isLocal(FlowBatchId id);
 }
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
index 93a0bdf..345568f 100644
--- a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchMapEventListener.java
@@ -28,7 +28,7 @@
      * @param oldState the old state of the {@link FlowBatchOperation}
      * @param currentState the current state of the {@link FlowBatchOperation}
      */
-    void flowStateChanged(FlowBatchId id,
+    void flowBatchOperationStateChanged(FlowBatchId id,
             FlowBatchState oldState, FlowBatchState currentState);
 
 }
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java
new file mode 100644
index 0000000..368f88d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMap.java
@@ -0,0 +1,132 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * An implementation of shared distributed map of {@link FlowBatchMap}.
+ * <p>
+ * This class's implementation is almost the same to {@link SharedFlowMap}. The
+ * base class for them should be implemented.
+ */
+public class SharedFlowBatchMap implements FlowBatchMap {
+    private static final String FLOWBATCHMAP_NAME = "flowbatch_map";
+    private static final String FLOWBATCHSTATEMAP_NAME = "flowbatchstate_map";
+
+    private final IMap<String, byte[]> flowBatchMap;
+    private final IMap<String, byte[]> flowBatchStateMap;
+    private final SharedFlowBatchMapEventDispatcher dispatcher;
+
+    /**
+     * Creates instance using {@link ISharedCollectionsService} service.
+     *
+     * @param service the {@link ISharedCollectionsService} service
+     */
+    SharedFlowBatchMap(ISharedCollectionsService service) {
+        this.flowBatchMap = checkNotNull(service.getConcurrentMap(
+                FLOWBATCHMAP_NAME, String.class, byte[].class));
+        this.flowBatchStateMap = checkNotNull(service.getConcurrentMap(
+                FLOWBATCHSTATEMAP_NAME, String.class, byte[].class));
+        this.dispatcher = new SharedFlowBatchMapEventDispatcher(
+                flowBatchMap, flowBatchStateMap);
+    }
+
+    @Override
+    public FlowBatchOperation get(FlowBatchId id) {
+        byte[] buf = flowBatchMap.get(checkNotNull(id).toString());
+        if (buf == null) {
+            return null;
+        }
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public boolean put(FlowBatchId id, FlowBatchOperation flowOp) {
+        byte[] buf = KryoFactory.serialize(checkNotNull(flowOp));
+        flowBatchMap.set(id.toString(), checkNotNull(buf));
+        return true;
+    }
+
+    @Override
+    public FlowBatchOperation remove(FlowBatchId id) {
+        String flowBatchIdStr = checkNotNull(id).toString();
+        byte[] buf = flowBatchMap.remove(flowBatchIdStr);
+        if (buf == null) {
+            return null;
+        }
+        flowBatchStateMap.remove(flowBatchIdStr);
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public Set<FlowBatchOperation> getAll() {
+        Set<FlowBatchOperation> flowBatchs = new HashSet<>();
+        for (Entry<String, byte[]> entry : flowBatchMap.entrySet()) {
+            flowBatchs.add((FlowBatchOperation)
+                    KryoFactory.deserialize(entry.getValue()));
+        }
+        return flowBatchs;
+    }
+
+    @Override
+    public boolean setState(FlowBatchId id, FlowBatchState state,
+            FlowBatchState expectedState) {
+        final String key = checkNotNull(
+                id, "FlowBatchId is not specified.").toString();
+        final byte[] oldValue = KryoFactory.serialize(expectedState);
+        final byte[] newValue = KryoFactory.serialize(state);
+
+        if (!flowBatchMap.containsKey(key)) {
+            return false;
+        }
+
+        if (expectedState == FlowBatchState.SUBMITTED) {
+            // The absence of the key means SUBMITTED state.
+            return flowBatchStateMap.putIfAbsent(key, newValue) == null;
+        }
+
+        return flowBatchStateMap.replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public FlowBatchState getState(FlowBatchId id) {
+        final String key = checkNotNull(
+                id, "FlowBatchId is not specified.").toString();
+        if (!flowBatchMap.containsKey(key)) {
+            return null;
+        }
+
+        final byte[] buf = flowBatchStateMap.get(key);
+        if (buf == null) {
+            // The absence of the key means SUBMITTED state.
+            return FlowBatchState.SUBMITTED;
+        }
+        return KryoFactory.deserialize(buf);
+    }
+
+    @Override
+    public void addListener(FlowBatchMapEventListener listener) {
+        dispatcher.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(FlowBatchMapEventListener listener) {
+        dispatcher.removeListener(listener);
+    }
+
+    @Override
+    public boolean isLocal(FlowBatchId id) {
+        return flowBatchMap.localKeySet().contains(checkNotNull(id).toString());
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
new file mode 100644
index 0000000..7107be8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapEventDispatcher.java
@@ -0,0 +1,129 @@
+package net.onrc.onos.core.flowmanager;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * This class is used for managing listeners of the {@link SharedFlowBatchMap}.
+ */
+public class SharedFlowBatchMapEventDispatcher implements EntryListener<String, byte[]> {
+    private CopyOnWriteArraySet<FlowBatchMapEventListener> listeners;
+    private static final Logger log = LoggerFactory
+            .getLogger(SharedFlowBatchMapEventDispatcher.class);
+
+    /**
+     * Creates dispatcher using flow batch map objects.
+     *
+     * @param flowBatchMap the flow batch map object
+     * @param flowBatchStateMap the flow batch state map object
+     */
+    public SharedFlowBatchMapEventDispatcher(IMap<String, byte[]> flowBatchMap,
+            IMap<String, byte[]> flowBatchStateMap) {
+        listeners = new CopyOnWriteArraySet<>();
+        flowBatchMap.addEntryListener(this, true);
+        flowBatchStateMap.addEntryListener(this, true);
+    }
+
+    /**
+     * Adds a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowBatchMapEventListener} to be added
+     */
+    public void addListener(FlowBatchMapEventListener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Removes a listener for listening events related to the map.
+     *
+     * @param listener the {@link FlowBatchMapEventListener} to be removed
+     */
+    public void removeListener(FlowBatchMapEventListener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void entryAdded(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch operation ID:{}, {} was added", id, flowOp);
+            for (FlowBatchMapEventListener e : listeners) {
+                FlowBatchOperation copiedFlowOp =
+                        new FlowBatchOperation(flowOp.getOperations());
+                e.flowBatchOperationAdded(id, copiedFlowOp);
+                e.flowBatchOperationStateChanged(id, null, FlowBatchState.SUBMITTED);
+            }
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            final FlowBatchState state = (FlowBatchState) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("FlowState of FlowId {} was set to {}", id, state);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationStateChanged(id, FlowBatchState.SUBMITTED, state);
+            }
+        } else {
+            throw new IllegalStateException("Added illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryRemoved(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            final FlowBatchOperation flowOp = (FlowBatchOperation) value;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch operation ID:{}, {} was removed", id, flowOp);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationRemoved(id);
+            }
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            log.trace("Flow batch state {} of ID:{} was removed", value, event.getKey());
+        } else {
+            throw new IllegalStateException("Removed illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryUpdated(EntryEvent<String, byte[]> event) {
+        final Object value = KryoFactory.deserialize(event.getValue());
+        if (value instanceof FlowBatchOperation) {
+            // Handles events from flowBatchMap.
+            log.trace("Flow batch operation ID:{} updated by {}", event.getKey(), value);
+        } else if (value instanceof FlowBatchState) {
+            // Handles events from flowBatchStateMap.
+            Object oldValue = KryoFactory.deserialize(event.getOldValue());
+            final FlowBatchState currentState = (FlowBatchState) value;
+            final FlowBatchState oldState = (FlowBatchState) oldValue;
+            final FlowBatchId id = new FlowBatchId(Long.parseLong(event.getKey()));
+            log.trace("Flow batch state of ID:{} was updated from {} to {}",
+                    id, oldState, currentState);
+            for (FlowBatchMapEventListener e : listeners) {
+                e.flowBatchOperationStateChanged(id, oldState, currentState);
+            }
+        } else {
+            throw new IllegalStateException("Updated illegal value: " + value);
+        }
+    }
+
+    @Override
+    public void entryEvicted(EntryEvent<String, byte[]> event) {
+        // do nothing.
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index c9c3c7b..f5bf01d 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -16,6 +16,8 @@
 import net.floodlightcontroller.core.IFloodlightProviderService.Role;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.api.batchoperation.BatchOperationEntry;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
 import net.onrc.onos.api.flowmanager.FlowId;
 import net.onrc.onos.api.flowmanager.FlowLink;
 import net.onrc.onos.api.flowmanager.FlowState;
@@ -254,6 +256,9 @@
         // New flow manager related classes
         kryo.register(FlowId.class);
         kryo.register(FlowState.class);
+        kryo.register(FlowBatchOperation.class);
+        kryo.register(FlowBatchOperation.Operator.class);
+        kryo.register(FlowBatchState.class);
         kryo.register(net.onrc.onos.api.flowmanager.Path.class);
         kryo.register(Tree.class);
         kryo.register(FlowLink.class);
diff --git a/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapTest.java b/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapTest.java
new file mode 100644
index 0000000..fae7086
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowBatchMapTest.java
@@ -0,0 +1,143 @@
+package net.onrc.onos.core.flowmanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowLink;
+import net.onrc.onos.api.flowmanager.PacketPathFlow;
+import net.onrc.onos.api.flowmanager.Path;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.datastore.hazelcast.DummySharedCollectionsService;
+import net.onrc.onos.core.matchaction.action.Action;
+import net.onrc.onos.core.matchaction.action.ModifyDstMacAction;
+import net.onrc.onos.core.matchaction.action.OutputAction;
+import net.onrc.onos.core.matchaction.match.PacketMatch;
+import net.onrc.onos.core.matchaction.match.PacketMatchBuilder;
+import net.onrc.onos.core.util.PortNumber;
+import net.onrc.onos.core.util.SwitchPort;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SharedFlowBatchMapTest {
+    private ISharedCollectionsService scs;
+    private FlowBatchOperation flowOp;
+
+    @Before
+    public void setUp() throws Exception {
+        scs = new DummySharedCollectionsService();
+
+        Path path = new Path();
+        path.add(new FlowLink(
+                new SwitchPort(1, (short) 10), new SwitchPort(2, (short) 11)));
+
+        PacketMatchBuilder builder = new PacketMatchBuilder();
+        builder.setDstMac(MACAddress.valueOf(54321));
+        PacketMatch match1 = builder.build();
+
+        builder = new PacketMatchBuilder();
+        builder.setDstMac(MACAddress.valueOf(12345));
+        PacketMatch match2 = builder.build();
+
+        List<Action> actions1 = Arrays.asList(
+                new ModifyDstMacAction(MACAddress.valueOf(12345)),
+                new OutputAction(PortNumber.uint16((short) 101)));
+
+        List<Action> actions2 = Arrays.asList(
+                new ModifyDstMacAction(MACAddress.valueOf(54321)),
+                new OutputAction(PortNumber.uint16((short) 101)));
+
+        PacketPathFlow flow1 = new PacketPathFlow(new FlowId(1L), match1,
+                PortNumber.uint32(12345), path, actions1, 0, 0);
+
+        PacketPathFlow flow2 = new PacketPathFlow(new FlowId(2L), match2,
+                PortNumber.uint32(54321), path, actions2, 0, 0);
+
+        flowOp = new FlowBatchOperation();
+        flowOp.addAddFlowOperation(flow1);
+        flowOp.addAddFlowOperation(flow2);
+    }
+
+    /**
+     * Tests if the constructor initializes its field correctly.
+     */
+    @Test
+    public void testConstructor() {
+        SharedFlowBatchMap map = new SharedFlowBatchMap(scs);
+        Set<FlowBatchOperation> flowOps = map.getAll();
+        assertNotNull(flowOps);
+        assertTrue(flowOps.isEmpty());
+    }
+
+    /**
+     * Tests the basic functionality of the flow batch map. This test puts gets
+     * and removes a batch operation and checks these operations are executed
+     * properly.
+     */
+    @Test
+    public void testAddGetRemoveFlowOp() {
+        SharedFlowBatchMap map = new SharedFlowBatchMap(scs);
+        final FlowBatchId id = new FlowBatchId(100L);
+
+        // Check if the stored flow batch operation can be retrieved its ID
+        assertTrue(map.put(id, flowOp));
+        final FlowBatchOperation obtainedFlowOp = map.get(id);
+        assertNotNull(obtainedFlowOp);
+        assertEquals(2, obtainedFlowOp.size());
+
+        // Check if it will not return flow with nonexistent ID
+        FlowBatchId nonexistentId = new FlowBatchId(99999L);
+        assertFalse(nonexistentId.equals(id));
+        assertNull(map.get(nonexistentId));
+
+        // Check if it will remove the operation and it will not return the
+        // operation after the removal
+        final FlowBatchOperation removedFlowOp = map.remove(id);
+        assertNotNull(removedFlowOp);
+        assertEquals(2, removedFlowOp.size());
+        final FlowBatchOperation nullFlowOp = map.get(id);
+        assertNull(nullFlowOp);
+    }
+
+    @Test
+    public void testChangeStateOfFlowOp() {
+        SharedFlowBatchMap map = new SharedFlowBatchMap(scs);
+
+        final FlowBatchId id = new FlowBatchId(100L);
+
+        // Check if the state of nonexistent flow is not exist
+        assertNull(map.getState(id));
+
+        // Check if it won't change the state of nonexistent flow
+        assertFalse(map.setState(id, FlowBatchState.EXECUTING, FlowBatchState.SUBMITTED));
+        assertNull(map.getState(id));
+
+        // Check if the initial state is SUBMITTED
+        assertTrue(map.put(id, flowOp));
+        assertEquals(FlowBatchState.SUBMITTED, map.getState(id));
+
+        // Check if it won't change the state if the expected state was wrong
+        assertFalse(map.setState(id, FlowBatchState.COMPLETED, FlowBatchState.EXECUTING));
+        assertEquals(FlowBatchState.SUBMITTED, map.getState(id));
+
+        // Check if it changes the state if the expected state was correct
+        assertTrue(map.setState(id, FlowBatchState.EXECUTING, FlowBatchState.SUBMITTED));
+        assertEquals(FlowBatchState.EXECUTING, map.getState(id));
+
+        // Check if it won't return the state if the flow was removed
+        assertEquals(flowOp, map.remove(id));
+        assertNull(map.getState(id));
+    }
+}