blob: 30b9008cd5299a4c83d90e26762064c012032e41 [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07006import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -07007
Madan Jampani38b250d2014-10-17 11:02:38 -07008import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -07009import java.util.ArrayList;
10import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070011import java.util.Collection;
12import java.util.Collections;
Madan Jampanif5fdef02014-10-23 21:58:10 -070013import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070014import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070015import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070017import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070018import java.util.concurrent.TimeUnit;
19import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070020import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070021import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070022
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070028import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070029import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070030import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070031import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070032import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070033import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070034import org.onlab.onos.net.flow.DefaultFlowEntry;
35import org.onlab.onos.net.flow.FlowEntry;
36import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070037import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070038import org.onlab.onos.net.flow.FlowRuleBatchEntry;
39import org.onlab.onos.net.flow.FlowRuleBatchEvent;
40import org.onlab.onos.net.flow.FlowRuleBatchOperation;
41import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070042import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070044import org.onlab.onos.net.flow.FlowRuleEvent.Type;
45import org.onlab.onos.net.flow.FlowRuleStore;
46import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070047import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070048import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070049import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
50import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070051import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070052import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070053import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070054import org.onlab.onos.store.serializers.DistributedStoreSerializers;
55import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070056import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070057import org.slf4j.Logger;
58
Madan Jampani24f9efb2014-10-24 18:56:23 -070059import com.google.common.base.Function;
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070060import com.google.common.cache.Cache;
61import com.google.common.cache.CacheBuilder;
alshabib339a3d92014-09-26 17:54:32 -070062import com.google.common.collect.ArrayListMultimap;
63import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070064import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070065import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070066import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070067import com.google.common.util.concurrent.ListenableFuture;
68import com.google.common.util.concurrent.SettableFuture;
alshabib339a3d92014-09-26 17:54:32 -070069
70/**
Madan Jampani38b250d2014-10-17 11:02:38 -070071 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070072 */
alshabib339a3d92014-09-26 17:54:32 -070073@Component(immediate = true)
74@Service
75public class DistributedFlowRuleStore
Madan Jampani117aaae2014-10-23 10:04:05 -070076 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -070077 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070078
79 private final Logger log = getLogger(getClass());
80
81 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070082 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
83 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070084
alshabib92c65ad2014-10-08 21:56:05 -070085 private final Multimap<Short, FlowRule> flowEntriesById =
86 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070087
Madan Jampani38b250d2014-10-17 11:02:38 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070089 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070090
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070092 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -070093
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070095 protected ClusterService clusterService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected DeviceService deviceService;
99
100 private final AtomicInteger localBatchIdGen = new AtomicInteger();
101
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700102 // TODO: make this configurable
103 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700104
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700105 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
106 CacheBuilder.newBuilder()
107 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
108 // TODO Explicitly fail the future if expired?
109 //.removalListener(listener)
110 .build();
111
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700112
113 private final ExecutorService futureListeners =
114 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
115
Madan Jampani38b250d2014-10-17 11:02:38 -0700116
117 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
118 @Override
119 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700120 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700121 .register(DistributedStoreSerializers.COMMON)
122 .build()
123 .populate(1);
124 }
125 };
126
127 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700128 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700129
alshabib339a3d92014-09-26 17:54:32 -0700130 @Activate
131 public void activate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700132 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700133
134 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135 public void handle(final ClusterMessage message) {
136 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
137 log.info("received batch request {}", operation);
138 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700139
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700140 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700141
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700142 @Override
143 public void run() {
144 CompletedBatchOperation result = Futures.getUnchecked(f);
145 try {
146 message.respond(SERIALIZER.encode(result));
147 } catch (IOException e) {
148 log.error("Failed to respond back", e);
149 }
150 }
151 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700152 }
153 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700154
155 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
156
157 @Override
158 public void handle(ClusterMessage message) {
159 FlowRule rule = SERIALIZER.decode(message.payload());
160 log.info("received get flow entry request for {}", rule);
161 FlowEntry flowEntry = getFlowEntryInternal(rule);
162 try {
163 message.respond(SERIALIZER.encode(flowEntry));
164 } catch (IOException e) {
165 log.error("Failed to respond back", e);
166 }
167 }
168 });
169
Madan Jampanif5fdef02014-10-23 21:58:10 -0700170 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
171
172 @Override
173 public void handle(ClusterMessage message) {
174 DeviceId deviceId = SERIALIZER.decode(message.payload());
175 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
176 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
177 try {
178 message.respond(SERIALIZER.encode(flowEntries));
179 } catch (IOException e) {
180 log.error("Failed to respond to peer's getFlowEntries request", e);
181 }
182 }
183 });
184
alshabib339a3d92014-09-26 17:54:32 -0700185 log.info("Started");
186 }
187
188 @Deactivate
189 public void deactivate() {
190 log.info("Stopped");
191 }
192
193
Madan Jampani117aaae2014-10-23 10:04:05 -0700194 // TODO: This is not a efficient operation on a distributed sharded
195 // flow store. We need to revisit the need for this operation or at least
196 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700197 @Override
tom9b4030d2014-10-06 10:39:03 -0700198 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700199 // implementing in-efficient operation for debugging purpose.
200 int sum = 0;
201 for (Device device : deviceService.getDevices()) {
202 final DeviceId did = device.id();
203 sum += Iterables.size(getFlowEntries(did));
204 }
205 return sum;
tom9b4030d2014-10-06 10:39:03 -0700206 }
207
208 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700209 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700210 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
211 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
212 return getFlowEntryInternal(rule);
213 }
214
215 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
216 replicaInfo.master().orNull(), rule.deviceId());
217
218 ClusterMessage message = new ClusterMessage(
219 clusterService.getLocalNode().id(),
220 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
221 SERIALIZER.encode(rule));
222
223 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700224 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
225 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
226 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700227 // FIXME: throw a FlowStoreException
228 throw new RuntimeException(e);
229 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700230 }
231
232 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
233 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700234 if (f.equals(rule)) {
235 return f;
236 }
237 }
238 return null;
239 }
240
241 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700242 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700243
244 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
245 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
246 return getFlowEntriesInternal(deviceId);
247 }
248
249 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
250 replicaInfo.master().orNull(), deviceId);
251
252 ClusterMessage message = new ClusterMessage(
253 clusterService.getLocalNode().id(),
254 GET_DEVICE_FLOW_ENTRIES,
255 SERIALIZER.encode(deviceId));
256
257 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700258 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
259 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
260 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700261 // FIXME: throw a FlowStoreException
262 throw new RuntimeException(e);
263 }
264 }
265
266 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700267 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700268 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700269 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700270 }
271 return ImmutableSet.copyOf(rules);
272 }
273
274 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700275 public void storeFlowRule(FlowRule rule) {
276 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
277 }
278
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700279 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700280 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
281 if (operation.getOperations().isEmpty()) {
282 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700283 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700284
Madan Jampani117aaae2014-10-23 10:04:05 -0700285 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
286
287 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
288
289 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
290 return storeBatchInternal(operation);
291 }
292
293 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
294 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700295
Madan Jampani38b250d2014-10-17 11:02:38 -0700296 ClusterMessage message = new ClusterMessage(
297 clusterService.getLocalNode().id(),
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700298 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700299 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700300
301 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700302 ListenableFuture<byte[]> responseFuture =
303 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
304 return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
305 @Override
306 public CompletedBatchOperation apply(byte[] input) {
307 return SERIALIZER.decode(input);
308 }
309 });
310 } catch (IOException e) {
311 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700312 }
313 }
314
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700315 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700316 List<FlowEntry> toRemove = new ArrayList<>();
317 List<FlowEntry> toAdd = new ArrayList<>();
318 // TODO: backup changes to hazelcast map
319 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
320 FlowRule flowRule = batchEntry.getTarget();
321 FlowRuleOperation op = batchEntry.getOperator();
322 if (op.equals(FlowRuleOperation.REMOVE)) {
323 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
324 if (entry != null) {
325 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700326 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700327 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700328 } else if (op.equals(FlowRuleOperation.ADD)) {
329 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
330 DeviceId deviceId = flowRule.deviceId();
331 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
332 flowEntries.put(deviceId, flowEntry);
333 flowEntriesById.put(flowRule.appId(), flowEntry);
334 toAdd.add(flowEntry);
335 }
336 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700337 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700338 if (toAdd.isEmpty() && toRemove.isEmpty()) {
339 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
340 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700341
342 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
343 final int batchId = localBatchIdGen.incrementAndGet();
344
345 pendingFutures.put(batchId, r);
346 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
347 return r;
alshabib339a3d92014-09-26 17:54:32 -0700348 }
349
350 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700351 public void deleteFlowRule(FlowRule rule) {
352 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700353 }
354
355 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700356 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
357 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700358 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700359 return addOrUpdateFlowRuleInternal(rule);
360 }
361
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700362 log.error("Tried to update FlowRule {} state,"
363 + " while the Node was not the master.", rule);
364 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700365 }
366
367 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700368 DeviceId did = rule.deviceId();
369
370 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700371 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700372 if (stored != null) {
373 stored.setBytes(rule.bytes());
374 stored.setLife(rule.life());
375 stored.setPackets(rule.packets());
376 if (stored.state() == FlowEntryState.PENDING_ADD) {
377 stored.setState(FlowEntryState.ADDED);
378 return new FlowRuleEvent(Type.RULE_ADDED, rule);
379 }
alshabib339a3d92014-09-26 17:54:32 -0700380 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
381 }
382
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700383 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
384 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700385 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700386
387 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700388 }
389
390 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700391 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
392 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700393 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700394 // bypass and handle it locally
395 return removeFlowRuleInternal(rule);
396 }
397
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700398 log.error("Tried to remove FlowRule {},"
399 + " while the Node was not the master.", rule);
400 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700401 }
402
403 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700404 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700405 if (flowEntries.remove(rule.deviceId(), rule)) {
406 return new FlowRuleEvent(RULE_REMOVED, rule);
407 } else {
408 return null;
409 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700410 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700411 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700412
413 @Override
414 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700415 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700416 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700417 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700418 if (future != null) {
419 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700420 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700421 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700422 notifyDelegate(event);
423 }
alshabib339a3d92014-09-26 17:54:32 -0700424}