blob: 368f88da6e64086348129b234d2d42b65b249860 [file] [log] [blame]
Toshio Koided6cbec32014-08-27 15:23:44 -07001package net.onrc.onos.core.flowmanager;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4
5import java.util.HashSet;
6import java.util.Map.Entry;
7import java.util.Set;
8
9import net.onrc.onos.api.flowmanager.FlowBatchId;
10import net.onrc.onos.api.flowmanager.FlowBatchOperation;
11import net.onrc.onos.api.flowmanager.FlowBatchState;
12import net.onrc.onos.core.datagrid.ISharedCollectionsService;
13import net.onrc.onos.core.util.serializers.KryoFactory;
14
15import com.hazelcast.core.IMap;
16
17/**
18 * An implementation of shared distributed map of {@link FlowBatchMap}.
19 * <p>
20 * This class's implementation is almost the same to {@link SharedFlowMap}. The
21 * base class for them should be implemented.
22 */
23public class SharedFlowBatchMap implements FlowBatchMap {
24 private static final String FLOWBATCHMAP_NAME = "flowbatch_map";
25 private static final String FLOWBATCHSTATEMAP_NAME = "flowbatchstate_map";
26
27 private final IMap<String, byte[]> flowBatchMap;
28 private final IMap<String, byte[]> flowBatchStateMap;
29 private final SharedFlowBatchMapEventDispatcher dispatcher;
30
31 /**
32 * Creates instance using {@link ISharedCollectionsService} service.
33 *
34 * @param service the {@link ISharedCollectionsService} service
35 */
36 SharedFlowBatchMap(ISharedCollectionsService service) {
37 this.flowBatchMap = checkNotNull(service.getConcurrentMap(
38 FLOWBATCHMAP_NAME, String.class, byte[].class));
39 this.flowBatchStateMap = checkNotNull(service.getConcurrentMap(
40 FLOWBATCHSTATEMAP_NAME, String.class, byte[].class));
41 this.dispatcher = new SharedFlowBatchMapEventDispatcher(
42 flowBatchMap, flowBatchStateMap);
43 }
44
45 @Override
46 public FlowBatchOperation get(FlowBatchId id) {
47 byte[] buf = flowBatchMap.get(checkNotNull(id).toString());
48 if (buf == null) {
49 return null;
50 }
51 return KryoFactory.deserialize(buf);
52 }
53
54 @Override
55 public boolean put(FlowBatchId id, FlowBatchOperation flowOp) {
56 byte[] buf = KryoFactory.serialize(checkNotNull(flowOp));
57 flowBatchMap.set(id.toString(), checkNotNull(buf));
58 return true;
59 }
60
61 @Override
62 public FlowBatchOperation remove(FlowBatchId id) {
63 String flowBatchIdStr = checkNotNull(id).toString();
64 byte[] buf = flowBatchMap.remove(flowBatchIdStr);
65 if (buf == null) {
66 return null;
67 }
68 flowBatchStateMap.remove(flowBatchIdStr);
69 return KryoFactory.deserialize(buf);
70 }
71
72 @Override
73 public Set<FlowBatchOperation> getAll() {
74 Set<FlowBatchOperation> flowBatchs = new HashSet<>();
75 for (Entry<String, byte[]> entry : flowBatchMap.entrySet()) {
76 flowBatchs.add((FlowBatchOperation)
77 KryoFactory.deserialize(entry.getValue()));
78 }
79 return flowBatchs;
80 }
81
82 @Override
83 public boolean setState(FlowBatchId id, FlowBatchState state,
84 FlowBatchState expectedState) {
85 final String key = checkNotNull(
86 id, "FlowBatchId is not specified.").toString();
87 final byte[] oldValue = KryoFactory.serialize(expectedState);
88 final byte[] newValue = KryoFactory.serialize(state);
89
90 if (!flowBatchMap.containsKey(key)) {
91 return false;
92 }
93
94 if (expectedState == FlowBatchState.SUBMITTED) {
95 // The absence of the key means SUBMITTED state.
96 return flowBatchStateMap.putIfAbsent(key, newValue) == null;
97 }
98
99 return flowBatchStateMap.replace(key, oldValue, newValue);
100 }
101
102 @Override
103 public FlowBatchState getState(FlowBatchId id) {
104 final String key = checkNotNull(
105 id, "FlowBatchId is not specified.").toString();
106 if (!flowBatchMap.containsKey(key)) {
107 return null;
108 }
109
110 final byte[] buf = flowBatchStateMap.get(key);
111 if (buf == null) {
112 // The absence of the key means SUBMITTED state.
113 return FlowBatchState.SUBMITTED;
114 }
115 return KryoFactory.deserialize(buf);
116 }
117
118 @Override
119 public void addListener(FlowBatchMapEventListener listener) {
120 dispatcher.addListener(listener);
121 }
122
123 @Override
124 public void removeListener(FlowBatchMapEventListener listener) {
125 dispatcher.removeListener(listener);
126 }
127
128 @Override
129 public boolean isLocal(FlowBatchId id) {
130 return flowBatchMap.localKeySet().contains(checkNotNull(id).toString());
131 }
132}