blob: 8fd074419811dceca507c7feb2bd1b3a98bda752 [file] [log] [blame]
Toshio Koideb7a578c2014-08-22 18:00:54 -07001package net.onrc.onos.core.flowmanager;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4
5import java.util.HashSet;
6import java.util.Map.Entry;
7import java.util.Set;
8
9import net.onrc.onos.api.flowmanager.Flow;
10import net.onrc.onos.api.flowmanager.FlowId;
11import net.onrc.onos.api.flowmanager.FlowState;
12import net.onrc.onos.core.datagrid.ISharedCollectionsService;
13import net.onrc.onos.core.util.serializers.KryoFactory;
14
15import com.hazelcast.core.IMap;
16
17/**
18 * Manages the distributed shared flow map.
19 */
20class SharedFlowMap implements FlowMap {
21 private static final String FLOWMAP_NAME = "flow_map";
22 private static final String FLOWSTATEMAP_NAME = "flowstate_map";
23
24 private final IMap<String, byte[]> flowMap;
25 private final IMap<String, byte[]> flowStateMap;
26 private final SharedFlowMapEventDispatcher dispatcher;
27
28 /**
29 * Creates instance using {@link ISharedCollectionsService} service.
30 *
31 * @param service the {@link ISharedCollectionsService} service
32 */
33 SharedFlowMap(ISharedCollectionsService service) {
34 this.flowMap = checkNotNull(service.getConcurrentMap(
35 FLOWMAP_NAME, String.class, byte[].class));
36 this.flowStateMap = checkNotNull(service.getConcurrentMap(
37 FLOWSTATEMAP_NAME, String.class, byte[].class));
38 this.dispatcher = new SharedFlowMapEventDispatcher(flowMap, flowStateMap);
39 }
40
41 @Override
42 public Flow get(FlowId id) {
43 byte[] buf = flowMap.get(checkNotNull(id).toString());
44 if (buf == null) {
45 return null;
46 }
47 return KryoFactory.deserialize(buf);
48 }
49
50 @Override
51 public boolean put(Flow flow) {
52 checkNotNull(flow);
53 byte[] buf = KryoFactory.serialize(flow);
54 flowMap.set(flow.getId().toString(), buf);
55 return true;
56 }
57
58 @Override
59 public Flow remove(FlowId id) {
60 String flowIdStr = checkNotNull(id).toString();
61 byte[] buf = flowMap.remove(flowIdStr);
62 if (buf == null) {
63 return null;
64 }
65 flowStateMap.remove(flowIdStr);
66 return KryoFactory.deserialize(buf);
67 }
68
69 @Override
70 public Set<Flow> getAll() {
71 Set<Flow> flows = new HashSet<>();
72 for (Entry<String, byte[]> entry : flowMap.entrySet()) {
73 flows.add((Flow) KryoFactory.deserialize(entry.getValue()));
74 }
75 return flows;
76 }
77
78 @Override
79 public boolean setState(FlowId id, FlowState state, FlowState expectedState) {
80 final String key = checkNotNull(id, "FlowId is not specified.").toString();
81 final byte[] oldValue = KryoFactory.serialize(expectedState);
82 final byte[] newValue = KryoFactory.serialize(state);
83
84 if (!flowMap.containsKey(key)) {
85 return false;
86 }
87
88 if (expectedState == FlowState.SUBMITTED) {
89 // The absence of the key means SUBMITTED state.
90 return flowStateMap.putIfAbsent(key, newValue) == null;
91 }
92
93 return flowStateMap.replace(key, oldValue, newValue);
94 };
95
96 @Override
97 public FlowState getState(FlowId id) {
98 final String key = checkNotNull(id, "FlowId is not specified.").toString();
99 if (!flowMap.containsKey(key)) {
100 return null;
101 }
102
103 final byte[] buf = flowStateMap.get(key);
104 if (buf == null) {
105 // The absence of the key means SUBMITTED state.
106 return FlowState.SUBMITTED;
107 }
108 return KryoFactory.deserialize(buf);
109 }
110
111 @Override
112 public void addListener(FlowMapEventListener listener) {
113 dispatcher.addListener(listener);
114 }
115
116 @Override
117 public void removeListener(FlowMapEventListener listener) {
118 dispatcher.removeListener(listener);
119 }
120}