blob: 85f928a8e1b1c15c2cf0c1302622703ac3b94b3f [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
alshabib339a3d92014-09-26 17:54:32 -07006
Madan Jampani38b250d2014-10-17 11:02:38 -07007import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -07008import java.util.ArrayList;
9import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070010import java.util.Collection;
11import java.util.Collections;
Madan Jampani117aaae2014-10-23 10:04:05 -070012import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070013import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
Madan Jampani117aaae2014-10-23 10:04:05 -070015import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070016
17import org.apache.felix.scr.annotations.Activate;
18import org.apache.felix.scr.annotations.Component;
19import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070020import org.apache.felix.scr.annotations.Reference;
21import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070022import org.apache.felix.scr.annotations.Service;
23import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070024import org.onlab.onos.cluster.ClusterService;
alshabib339a3d92014-09-26 17:54:32 -070025import org.onlab.onos.net.DeviceId;
Madan Jampani117aaae2014-10-23 10:04:05 -070026import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070027import org.onlab.onos.net.flow.DefaultFlowEntry;
28import org.onlab.onos.net.flow.FlowEntry;
29import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070030import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070031import org.onlab.onos.net.flow.FlowRuleBatchEntry;
32import org.onlab.onos.net.flow.FlowRuleBatchEvent;
33import org.onlab.onos.net.flow.FlowRuleBatchOperation;
34import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070035import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070036import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070037import org.onlab.onos.net.flow.FlowRuleEvent.Type;
38import org.onlab.onos.net.flow.FlowRuleStore;
39import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070040import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070041import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070042import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
43import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070044import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070045import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
46import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070047import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070048import org.onlab.onos.store.serializers.DistributedStoreSerializers;
49import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070050import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070051import org.slf4j.Logger;
52
53import com.google.common.collect.ArrayListMultimap;
54import com.google.common.collect.ImmutableSet;
55import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070056import com.google.common.util.concurrent.Futures;
alshabib339a3d92014-09-26 17:54:32 -070057
58/**
Madan Jampani38b250d2014-10-17 11:02:38 -070059 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070060 */
alshabib339a3d92014-09-26 17:54:32 -070061@Component(immediate = true)
62@Service
63public class DistributedFlowRuleStore
Madan Jampani117aaae2014-10-23 10:04:05 -070064 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -070065 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070066
67 private final Logger log = getLogger(getClass());
68
69 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070070 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
71 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070072
alshabib92c65ad2014-10-08 21:56:05 -070073 private final Multimap<Short, FlowRule> flowEntriesById =
74 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070075
Madan Jampani38b250d2014-10-17 11:02:38 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070077 private ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 private ClusterCommunicationService clusterCommunicator;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 private ClusterService clusterService;
84
85 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070088 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -070089 .register(DistributedStoreSerializers.COMMON)
90 .build()
91 .populate(1);
92 }
93 };
94
95 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070096 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -070097
alshabib339a3d92014-09-26 17:54:32 -070098 @Activate
99 public void activate() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700100 clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
101
102 @Override
103 public void handle(ClusterMessage message) {
104 FlowRule rule = SERIALIZER.decode(message.payload());
105 log.info("received add request for {}", rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700106 storeFlowRule(rule);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700107 // FIXME what to respond.
108 try {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700109 message.respond(SERIALIZER.encode("ACK"));
110 } catch (IOException e) {
111 log.error("Failed to respond back", e);
112 }
113 }
114 });
115
116 clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
117
118 @Override
119 public void handle(ClusterMessage message) {
120 FlowRule rule = SERIALIZER.decode(message.payload());
121 log.info("received delete request for {}", rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700122 deleteFlowRule(rule);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700123 // FIXME what to respond.
124 try {
125 message.respond(SERIALIZER.encode("ACK"));
126 } catch (IOException e) {
127 log.error("Failed to respond back", e);
128 }
129
130 }
131 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700132
133 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
134
135 @Override
136 public void handle(ClusterMessage message) {
137 FlowRule rule = SERIALIZER.decode(message.payload());
138 log.info("received get flow entry request for {}", rule);
139 FlowEntry flowEntry = getFlowEntryInternal(rule);
140 try {
141 message.respond(SERIALIZER.encode(flowEntry));
142 } catch (IOException e) {
143 log.error("Failed to respond back", e);
144 }
145 }
146 });
147
alshabib339a3d92014-09-26 17:54:32 -0700148 log.info("Started");
149 }
150
151 @Deactivate
152 public void deactivate() {
153 log.info("Stopped");
154 }
155
156
Madan Jampani117aaae2014-10-23 10:04:05 -0700157 // TODO: This is not a efficient operation on a distributed sharded
158 // flow store. We need to revisit the need for this operation or at least
159 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700160 @Override
tom9b4030d2014-10-06 10:39:03 -0700161 public int getFlowRuleCount() {
162 return flowEntries.size();
163 }
164
165 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700166 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700167 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
168 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
169 return getFlowEntryInternal(rule);
170 }
171
172 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
173 replicaInfo.master().orNull(), rule.deviceId());
174
175 ClusterMessage message = new ClusterMessage(
176 clusterService.getLocalNode().id(),
177 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
178 SERIALIZER.encode(rule));
179
180 try {
181 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
182 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
183 } catch (IOException | TimeoutException e) {
184 // FIXME: throw a FlowStoreException
185 throw new RuntimeException(e);
186 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700187 }
188
189 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
190 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700191 if (f.equals(rule)) {
192 return f;
193 }
194 }
195 return null;
196 }
197
198 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700199 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700200 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700201 if (rules == null) {
202 return Collections.emptyList();
203 }
204 return ImmutableSet.copyOf(rules);
205 }
206
207 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700208 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700209 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700210 if (rules == null) {
211 return Collections.emptyList();
212 }
213 return ImmutableSet.copyOf(rules);
214 }
215
216 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700217 public void storeFlowRule(FlowRule rule) {
218 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
219 }
220
221 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
222 if (operation.getOperations().isEmpty()) {
223 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700224 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700225
Madan Jampani117aaae2014-10-23 10:04:05 -0700226 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
227
228 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
229
230 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
231 return storeBatchInternal(operation);
232 }
233
234 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
235 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700236
Madan Jampani38b250d2014-10-17 11:02:38 -0700237 ClusterMessage message = new ClusterMessage(
238 clusterService.getLocalNode().id(),
239 FlowStoreMessageSubjects.STORE_FLOW_RULE,
Madan Jampani117aaae2014-10-23 10:04:05 -0700240 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700241
242 try {
243 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
244 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
245 } catch (IOException | TimeoutException e) {
246 // FIXME: throw a FlowStoreException
247 throw new RuntimeException(e);
248 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700249
250 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700251 }
252
Madan Jampani117aaae2014-10-23 10:04:05 -0700253 private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
254 List<FlowEntry> toRemove = new ArrayList<>();
255 List<FlowEntry> toAdd = new ArrayList<>();
256 // TODO: backup changes to hazelcast map
257 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
258 FlowRule flowRule = batchEntry.getTarget();
259 FlowRuleOperation op = batchEntry.getOperator();
260 if (op.equals(FlowRuleOperation.REMOVE)) {
261 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
262 if (entry != null) {
263 entry.setState(FlowEntryState.PENDING_REMOVE);
264 }
265 toRemove.add(entry);
266 } else if (op.equals(FlowRuleOperation.ADD)) {
267 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
268 DeviceId deviceId = flowRule.deviceId();
269 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
270 flowEntries.put(deviceId, flowEntry);
271 flowEntriesById.put(flowRule.appId(), flowEntry);
272 toAdd.add(flowEntry);
273 }
274 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700275 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700276 if (toAdd.isEmpty() && toRemove.isEmpty()) {
277 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
278 }
279 notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
280 // TODO: imlpement this.
281 return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
alshabib339a3d92014-09-26 17:54:32 -0700282 }
283
284 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700285 public void deleteFlowRule(FlowRule rule) {
286 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700287 }
288
289 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700290 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
291 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700292 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700293 return addOrUpdateFlowRuleInternal(rule);
294 }
295
296 ClusterMessage message = new ClusterMessage(
297 clusterService.getLocalNode().id(),
298 FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
299 SERIALIZER.encode(rule));
300
301 try {
302 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
303 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
304 } catch (IOException | TimeoutException e) {
305 // FIXME: throw a FlowStoreException
306 throw new RuntimeException(e);
307 }
308 }
309
310 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700311 DeviceId did = rule.deviceId();
312
313 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700314 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700315 if (stored != null) {
316 stored.setBytes(rule.bytes());
317 stored.setLife(rule.life());
318 stored.setPackets(rule.packets());
319 if (stored.state() == FlowEntryState.PENDING_ADD) {
320 stored.setState(FlowEntryState.ADDED);
321 return new FlowRuleEvent(Type.RULE_ADDED, rule);
322 }
alshabib339a3d92014-09-26 17:54:32 -0700323 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
324 }
325
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700326 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
327 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700328 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700329
330 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700331 }
332
333 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700334 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
335 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700336 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700337 // bypass and handle it locally
338 return removeFlowRuleInternal(rule);
339 }
340
341 ClusterMessage message = new ClusterMessage(
342 clusterService.getLocalNode().id(),
343 FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
344 SERIALIZER.encode(rule));
345
346 try {
347 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
348 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
349 } catch (IOException | TimeoutException e) {
350 // FIXME: throw a FlowStoreException
351 throw new RuntimeException(e);
352 }
353 }
354
355 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700356 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700357 if (flowEntries.remove(rule.deviceId(), rule)) {
358 return new FlowRuleEvent(RULE_REMOVED, rule);
359 } else {
360 return null;
361 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700362 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700363 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700364
365 @Override
366 public void batchOperationComplete(FlowRuleBatchEvent event) {
367 notifyDelegate(event);
368 }
alshabib339a3d92014-09-26 17:54:32 -0700369}