blob: bde57c622fa5b5cc428ca49d8a69356eac0b0327 [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
alshabib339a3d92014-09-26 17:54:32 -07006
Madan Jampani38b250d2014-10-17 11:02:38 -07007import java.io.IOException;
alshabib339a3d92014-09-26 17:54:32 -07008import java.util.Collection;
9import java.util.Collections;
Madan Jampani38b250d2014-10-17 11:02:38 -070010import java.util.concurrent.TimeUnit;
11import java.util.concurrent.TimeoutException;
alshabib339a3d92014-09-26 17:54:32 -070012
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070016import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070018import org.apache.felix.scr.annotations.Service;
19import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070020import org.onlab.onos.cluster.ClusterService;
alshabib339a3d92014-09-26 17:54:32 -070021import org.onlab.onos.net.DeviceId;
alshabib1c319ff2014-10-04 20:29:09 -070022import org.onlab.onos.net.flow.DefaultFlowEntry;
23import org.onlab.onos.net.flow.FlowEntry;
24import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070025import org.onlab.onos.net.flow.FlowRule;
alshabib339a3d92014-09-26 17:54:32 -070026import org.onlab.onos.net.flow.FlowRuleEvent;
27import org.onlab.onos.net.flow.FlowRuleEvent.Type;
28import org.onlab.onos.net.flow.FlowRuleStore;
29import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070030import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070031import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070032import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
33import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070034import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070035import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
36import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070037import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070038import org.onlab.onos.store.serializers.DistributedStoreSerializers;
39import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070040import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070041import org.slf4j.Logger;
42
43import com.google.common.collect.ArrayListMultimap;
44import com.google.common.collect.ImmutableSet;
45import com.google.common.collect.Multimap;
46
47/**
Madan Jampani38b250d2014-10-17 11:02:38 -070048 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070049 */
alshabib339a3d92014-09-26 17:54:32 -070050@Component(immediate = true)
51@Service
52public class DistributedFlowRuleStore
alshabib1c319ff2014-10-04 20:29:09 -070053 extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
54 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070055
56 private final Logger log = getLogger(getClass());
57
58 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070059 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
60 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070061
alshabib92c65ad2014-10-08 21:56:05 -070062 private final Multimap<Short, FlowRule> flowEntriesById =
63 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070064
Madan Jampani38b250d2014-10-17 11:02:38 -070065 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070066 private ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070067
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 private ClusterCommunicationService clusterCommunicator;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 private ClusterService clusterService;
73
74 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
75 @Override
76 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070077 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -070078 .register(DistributedStoreSerializers.COMMON)
79 .build()
80 .populate(1);
81 }
82 };
83
84 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070085 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -070086
alshabib339a3d92014-09-26 17:54:32 -070087 @Activate
88 public void activate() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070089 clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
90
91 @Override
92 public void handle(ClusterMessage message) {
93 FlowRule rule = SERIALIZER.decode(message.payload());
94 log.info("received add request for {}", rule);
95 storeFlowEntryInternal(rule);
96 // FIXME what to respond.
97 try {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070098 message.respond(SERIALIZER.encode("ACK"));
99 } catch (IOException e) {
100 log.error("Failed to respond back", e);
101 }
102 }
103 });
104
105 clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
106
107 @Override
108 public void handle(ClusterMessage message) {
109 FlowRule rule = SERIALIZER.decode(message.payload());
110 log.info("received delete request for {}", rule);
111 deleteFlowRuleInternal(rule);
112 // FIXME what to respond.
113 try {
114 message.respond(SERIALIZER.encode("ACK"));
115 } catch (IOException e) {
116 log.error("Failed to respond back", e);
117 }
118
119 }
120 });
alshabib339a3d92014-09-26 17:54:32 -0700121 log.info("Started");
122 }
123
124 @Deactivate
125 public void deactivate() {
126 log.info("Stopped");
127 }
128
129
130 @Override
tom9b4030d2014-10-06 10:39:03 -0700131 public int getFlowRuleCount() {
132 return flowEntries.size();
133 }
134
135 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700136 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700137 return getFlowEntryInternal(rule);
138 }
139
140 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
141 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700142 if (f.equals(rule)) {
143 return f;
144 }
145 }
146 return null;
147 }
148
149 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700150 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700151 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700152 if (rules == null) {
153 return Collections.emptyList();
154 }
155 return ImmutableSet.copyOf(rules);
156 }
157
158 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700159 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700160 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700161 if (rules == null) {
162 return Collections.emptyList();
163 }
164 return ImmutableSet.copyOf(rules);
165 }
166
167 @Override
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700168 public boolean storeFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700169 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700170 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700171 return storeFlowEntryInternal(rule);
alshabib339a3d92014-09-26 17:54:32 -0700172 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700173
Madan Jampani6f065272014-10-21 22:02:16 -0700174 log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}",
175 replicaInfo.master().orNull(), rule.deviceId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700176
Madan Jampani38b250d2014-10-17 11:02:38 -0700177 ClusterMessage message = new ClusterMessage(
178 clusterService.getLocalNode().id(),
179 FlowStoreMessageSubjects.STORE_FLOW_RULE,
180 SERIALIZER.encode(rule));
181
182 try {
183 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
184 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
185 } catch (IOException | TimeoutException e) {
186 // FIXME: throw a FlowStoreException
187 throw new RuntimeException(e);
188 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700189 return false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700190 }
191
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700192 private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700193 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700194 DeviceId deviceId = flowRule.deviceId();
195 // write to local copy.
196 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
197 flowEntries.put(deviceId, flowEntry);
198 flowEntriesById.put(flowRule.appId(), flowEntry);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700199 notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
200 return true;
Madan Jampani38b250d2014-10-17 11:02:38 -0700201 }
202 // write to backup.
203 // TODO: write to a hazelcast map.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700204 return false;
alshabib339a3d92014-09-26 17:54:32 -0700205 }
206
207 @Override
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700208 public synchronized boolean deleteFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700209 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700210 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700211 return deleteFlowRuleInternal(rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700212 }
213
214 ClusterMessage message = new ClusterMessage(
215 clusterService.getLocalNode().id(),
216 FlowStoreMessageSubjects.DELETE_FLOW_RULE,
217 SERIALIZER.encode(rule));
218
219 try {
220 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
221 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
222 } catch (IOException | TimeoutException e) {
223 // FIXME: throw a FlowStoreException
224 throw new RuntimeException(e);
225 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700226 return false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700227 }
228
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700229 private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700230 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
alshabib1c319ff2014-10-04 20:29:09 -0700231 if (entry == null) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700232 return false;
alshabib339a3d92014-09-26 17:54:32 -0700233 }
alshabib1c319ff2014-10-04 20:29:09 -0700234 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700235
Madan Jampani38b250d2014-10-17 11:02:38 -0700236 // TODO: also update backup.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700237
238 notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
239
240 return true;
alshabib339a3d92014-09-26 17:54:32 -0700241 }
242
243 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700244 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
245 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700246 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700247 return addOrUpdateFlowRuleInternal(rule);
248 }
249
250 ClusterMessage message = new ClusterMessage(
251 clusterService.getLocalNode().id(),
252 FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
253 SERIALIZER.encode(rule));
254
255 try {
256 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
257 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
258 } catch (IOException | TimeoutException e) {
259 // FIXME: throw a FlowStoreException
260 throw new RuntimeException(e);
261 }
262 }
263
264 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700265 DeviceId did = rule.deviceId();
266
267 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700268 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700269 if (stored != null) {
270 stored.setBytes(rule.bytes());
271 stored.setLife(rule.life());
272 stored.setPackets(rule.packets());
273 if (stored.state() == FlowEntryState.PENDING_ADD) {
274 stored.setState(FlowEntryState.ADDED);
275 return new FlowRuleEvent(Type.RULE_ADDED, rule);
276 }
alshabib339a3d92014-09-26 17:54:32 -0700277 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
278 }
279
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700280 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
281 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700282 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700283
284 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700285 }
286
287 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700288 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
289 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700290 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700291 // bypass and handle it locally
292 return removeFlowRuleInternal(rule);
293 }
294
295 ClusterMessage message = new ClusterMessage(
296 clusterService.getLocalNode().id(),
297 FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
298 SERIALIZER.encode(rule));
299
300 try {
301 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
302 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
303 } catch (IOException | TimeoutException e) {
304 // FIXME: throw a FlowStoreException
305 throw new RuntimeException(e);
306 }
307 }
308
309 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700310 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700311 if (flowEntries.remove(rule.deviceId(), rule)) {
312 return new FlowRuleEvent(RULE_REMOVED, rule);
313 } else {
314 return null;
315 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700316 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700317 }
alshabib339a3d92014-09-26 17:54:32 -0700318}