Add a distributed map implementation of FlowMap.
- Added SharedFlowMap as an implementation of FlowMap.
- SharedFlowMap is a distributed map implementation based on ISharedCollectionsService.
- This task is a part of ONOS-1688 and ONOS-1736.
Change-Id: I57961c415636b480bbfb4b6c117530afc34b9b94
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
new file mode 100644
index 0000000..8fd0744
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMap.java
@@ -0,0 +1,120 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * Manages the distributed shared flow map.
+ */
+class SharedFlowMap implements FlowMap {
+ private static final String FLOWMAP_NAME = "flow_map";
+ private static final String FLOWSTATEMAP_NAME = "flowstate_map";
+
+ private final IMap<String, byte[]> flowMap;
+ private final IMap<String, byte[]> flowStateMap;
+ private final SharedFlowMapEventDispatcher dispatcher;
+
+ /**
+ * Creates instance using {@link ISharedCollectionsService} service.
+ *
+ * @param service the {@link ISharedCollectionsService} service
+ */
+ SharedFlowMap(ISharedCollectionsService service) {
+ this.flowMap = checkNotNull(service.getConcurrentMap(
+ FLOWMAP_NAME, String.class, byte[].class));
+ this.flowStateMap = checkNotNull(service.getConcurrentMap(
+ FLOWSTATEMAP_NAME, String.class, byte[].class));
+ this.dispatcher = new SharedFlowMapEventDispatcher(flowMap, flowStateMap);
+ }
+
+ @Override
+ public Flow get(FlowId id) {
+ byte[] buf = flowMap.get(checkNotNull(id).toString());
+ if (buf == null) {
+ return null;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public boolean put(Flow flow) {
+ checkNotNull(flow);
+ byte[] buf = KryoFactory.serialize(flow);
+ flowMap.set(flow.getId().toString(), buf);
+ return true;
+ }
+
+ @Override
+ public Flow remove(FlowId id) {
+ String flowIdStr = checkNotNull(id).toString();
+ byte[] buf = flowMap.remove(flowIdStr);
+ if (buf == null) {
+ return null;
+ }
+ flowStateMap.remove(flowIdStr);
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public Set<Flow> getAll() {
+ Set<Flow> flows = new HashSet<>();
+ for (Entry<String, byte[]> entry : flowMap.entrySet()) {
+ flows.add((Flow) KryoFactory.deserialize(entry.getValue()));
+ }
+ return flows;
+ }
+
+ @Override
+ public boolean setState(FlowId id, FlowState state, FlowState expectedState) {
+ final String key = checkNotNull(id, "FlowId is not specified.").toString();
+ final byte[] oldValue = KryoFactory.serialize(expectedState);
+ final byte[] newValue = KryoFactory.serialize(state);
+
+ if (!flowMap.containsKey(key)) {
+ return false;
+ }
+
+ if (expectedState == FlowState.SUBMITTED) {
+ // The absence of the key means SUBMITTED state.
+ return flowStateMap.putIfAbsent(key, newValue) == null;
+ }
+
+ return flowStateMap.replace(key, oldValue, newValue);
+ };
+
+ @Override
+ public FlowState getState(FlowId id) {
+ final String key = checkNotNull(id, "FlowId is not specified.").toString();
+ if (!flowMap.containsKey(key)) {
+ return null;
+ }
+
+ final byte[] buf = flowStateMap.get(key);
+ if (buf == null) {
+ // The absence of the key means SUBMITTED state.
+ return FlowState.SUBMITTED;
+ }
+ return KryoFactory.deserialize(buf);
+ }
+
+ @Override
+ public void addListener(FlowMapEventListener listener) {
+ dispatcher.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(FlowMapEventListener listener) {
+ dispatcher.removeListener(listener);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java
new file mode 100644
index 0000000..cc41ce0
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/SharedFlowMapEventDispatcher.java
@@ -0,0 +1,125 @@
+package net.onrc.onos.core.flowmanager;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.core.util.serializers.KryoFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IMap;
+
+/**
+ * This class is used for managing listeners of the {@link SharedFlowMap}.
+ */
+class SharedFlowMapEventDispatcher implements EntryListener<String, byte[]> {
+ private CopyOnWriteArraySet<FlowMapEventListener> listeners;
+ private static final Logger log = LoggerFactory
+ .getLogger(SharedFlowMapEventDispatcher.class);
+
+ /**
+ * Creates dispatcher using flow map objects.
+ *
+ * @param flowMap the flow map object
+ * @param flowStateMap the flow state map object
+ */
+ SharedFlowMapEventDispatcher(IMap<String, byte[]> flowMap,
+ IMap<String, byte[]> flowStateMap) {
+ listeners = new CopyOnWriteArraySet<>();
+ flowMap.addEntryListener(this, true);
+ flowStateMap.addEntryListener(this, true);
+ }
+
+ /**
+ * Adds a listener for listening events related to the map.
+ *
+ * @param listener the {@link FlowMapEventListener} to be added
+ */
+ void addListener(FlowMapEventListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Removes a listener for listening events related to the map.
+ *
+ * @param listener the {@link FlowMapEventListener} to be removed
+ */
+ void removeListener(FlowMapEventListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof Flow) {
+ // Handles events from flowMap.
+ final Flow flow = (Flow) value;
+ log.trace("Flow {} was added", flow);
+ for (FlowMapEventListener e : listeners) {
+ e.flowAdded(flow.getId(), flow);
+ e.flowStateChanged(flow.getId(), null, FlowState.SUBMITTED);
+ }
+ } else if (value instanceof FlowState) {
+ // Handles events from flowStateMap.
+ final FlowState state = (FlowState) value;
+ final FlowId id = new FlowId(Long.parseLong(event.getKey()));
+ log.trace("FlowState of FlowId {} was set to {}", id, state);
+ for (FlowMapEventListener e : listeners) {
+ e.flowStateChanged(id, FlowState.SUBMITTED, state);
+ }
+ } else {
+ throw new IllegalStateException("Added illegal value: " + value.toString());
+ }
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof Flow) {
+ // Handles events from flowMap.
+ final Flow flow = (Flow) value;
+ log.trace("Flow {} was removed", flow);
+ for (FlowMapEventListener e : listeners) {
+ e.flowRemoved(flow.getId());
+ }
+ } else if (value instanceof FlowState) {
+ // Handles events from flowStateMap.
+ log.trace("FlowState {} of FlowId {} was removed", value, event.getKey());
+ } else {
+ throw new IllegalStateException("Removed illegal value: " + value.toString());
+ }
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<String, byte[]> event) {
+ final Object value = KryoFactory.deserialize(event.getValue());
+ if (value instanceof Flow) {
+ // Handles events from flowMap.
+ log.trace("Flow Updated by {}", value);
+ } else if (value instanceof FlowState) {
+ // Handles events from flowStateMap.
+ Object oldValue = KryoFactory.deserialize(event.getOldValue());
+ final FlowState state = (FlowState) value;
+ final FlowState oldState = (FlowState) oldValue;
+ final FlowId id = new FlowId(Long.parseLong(event.getKey()));
+ log.trace("FlowState of FlowId {} was updated from {} to {}",
+ id, oldState, state);
+ for (FlowMapEventListener e : listeners) {
+ e.flowStateChanged(id, oldState, state);
+ }
+ } else {
+ throw new IllegalStateException("Updated illegal value: " + value.toString());
+ }
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<String, byte[]> event) {
+ // do nothing.
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 5fec5b2..c9c3c7b 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -18,6 +18,7 @@
import net.onrc.onos.api.batchoperation.BatchOperationEntry;
import net.onrc.onos.api.flowmanager.FlowId;
import net.onrc.onos.api.flowmanager.FlowLink;
+import net.onrc.onos.api.flowmanager.FlowState;
import net.onrc.onos.api.flowmanager.OpticalPathFlow;
import net.onrc.onos.api.flowmanager.PacketPathFlow;
import net.onrc.onos.api.flowmanager.SingleDstTreeFlow;
@@ -252,6 +253,7 @@
// New flow manager related classes
kryo.register(FlowId.class);
+ kryo.register(FlowState.class);
kryo.register(net.onrc.onos.api.flowmanager.Path.class);
kryo.register(Tree.class);
kryo.register(FlowLink.class);
diff --git a/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowMapTest.java b/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowMapTest.java
new file mode 100644
index 0000000..a910969
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/flowmanager/SharedFlowMapTest.java
@@ -0,0 +1,134 @@
+package net.onrc.onos.core.flowmanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowId;
+import net.onrc.onos.api.flowmanager.FlowLink;
+import net.onrc.onos.api.flowmanager.FlowState;
+import net.onrc.onos.api.flowmanager.PacketPathFlow;
+import net.onrc.onos.api.flowmanager.Path;
+import net.onrc.onos.core.datagrid.ISharedCollectionsService;
+import net.onrc.onos.core.datastore.hazelcast.DummySharedCollectionsService;
+import net.onrc.onos.core.matchaction.action.Action;
+import net.onrc.onos.core.matchaction.action.ModifyDstMacAction;
+import net.onrc.onos.core.matchaction.action.OutputAction;
+import net.onrc.onos.core.matchaction.match.PacketMatch;
+import net.onrc.onos.core.matchaction.match.PacketMatchBuilder;
+import net.onrc.onos.core.util.PortNumber;
+import net.onrc.onos.core.util.SwitchPort;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link SharedFlowMap}.
+ */
+public class SharedFlowMapTest {
+ private ISharedCollectionsService scs;
+ private Path path;
+ private PacketMatch match;
+ private List<Action> actions;
+ private Flow flow;
+
+ @Before
+ public void setUp() throws Exception {
+ scs = new DummySharedCollectionsService();
+
+ path = new Path();
+ path.add(new FlowLink(
+ new SwitchPort(1, (short) 10), new SwitchPort(2, (short) 11)));
+
+ PacketMatchBuilder builder = new PacketMatchBuilder();
+ builder.setDstMac(MACAddress.valueOf(54321));
+ match = builder.build();
+
+ actions = Arrays.asList(
+ new ModifyDstMacAction(MACAddress.valueOf(12345)),
+ new OutputAction(PortNumber.uint16((short) 101)));
+
+ flow = new PacketPathFlow(new FlowId(1L), match, PortNumber.uint32(12345), path,
+ actions, 0, 0);
+ }
+
+ /**
+ * Tests if the constructor initializes its field correctly.
+ */
+ @Test
+ public void testConstructor() {
+ SharedFlowMap map = new SharedFlowMap(scs);
+ Set<Flow> flows = map.getAll();
+ assertNotNull(flows);
+ assertTrue(flows.isEmpty());
+ }
+
+ /**
+ * Tests the basic functionality of the flow map. This test puts, gets and
+ * removes a flow and checks these operations are executed properly.
+ */
+ @Test
+ public void testAddGetRemoveFlow() {
+ SharedFlowMap map = new SharedFlowMap(scs);
+
+ // Check if the stored flow can be retrieved its ID
+ assertTrue(map.put(flow));
+ final Flow obtainedFlow = map.get(flow.getId());
+ assertNotNull(obtainedFlow);
+ assertEquals(flow.getId(), obtainedFlow.getId());
+ assertEquals(match, obtainedFlow.getMatch());
+
+ // Check if it will not return flow with nonexistent ID
+ FlowId nonexistentId = new FlowId(99999L);
+ assertFalse(nonexistentId.equals(flow.getId()));
+ assertNull(map.get(nonexistentId));
+
+ // Check if it will remove the flow and it will not return the flow
+ // after the removal
+ final Flow removedFlow = map.remove(flow.getId());
+ assertNotNull(removedFlow);
+ assertEquals(flow.getId(), removedFlow.getId());
+ final Flow nullFlow = map.get(flow.getId());
+ assertNull(nullFlow);
+ }
+
+ /**
+ * Tests the basic functionality of the flow state on the map. This test put
+ * the flow and changes state of it.
+ */
+ @Test
+ public void testStateChangeOfFlow() {
+ SharedFlowMap map = new SharedFlowMap(scs);
+
+ // Check if the state of nonexistent flow is not exist
+ assertNull(map.getState(flow.getId()));
+
+ // Check if it won't change the state of nonexistent flow
+ assertFalse(map.setState(flow.getId(), FlowState.COMPILED, FlowState.SUBMITTED));
+ assertNull(map.getState(flow.getId()));
+
+ // Check if the initial state is SUBMITTED
+ assertTrue(map.put(flow));
+ assertEquals(FlowState.SUBMITTED, map.getState(flow.getId()));
+
+ // Check if it won't change the state if the expected state was wrong
+ assertFalse(map.setState(flow.getId(), FlowState.INSTALLED, FlowState.COMPILED));
+ assertEquals(FlowState.SUBMITTED, map.getState(flow.getId()));
+
+ // Check if it changes the state if the expected state was correct
+ assertTrue(map.setState(flow.getId(), FlowState.COMPILED, FlowState.SUBMITTED));
+ assertEquals(FlowState.COMPILED, map.getState(flow.getId()));
+
+ // Check if it won't return the state if the flow was removed
+ assertEquals(flow, map.remove(flow.getId()));
+ assertNull(map.getState(flow.getId()));
+ }
+}