blob: a7378684556936b9858ce33ee3d66e00ffbfadba [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07006import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -07007
Madan Jampani38b250d2014-10-17 11:02:38 -07008import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -07009import java.util.ArrayList;
10import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070011import java.util.Collection;
12import java.util.Collections;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070013import java.util.Map;
Madan Jampanif5fdef02014-10-23 21:58:10 -070014import java.util.Set;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070015import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070017import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070018import java.util.concurrent.TimeUnit;
19import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070020import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070021import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070022
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070028import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070029import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070030import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070031import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070032import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070033import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070034import org.onlab.onos.net.flow.DefaultFlowEntry;
35import org.onlab.onos.net.flow.FlowEntry;
36import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070037import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070038import org.onlab.onos.net.flow.FlowRuleBatchEntry;
39import org.onlab.onos.net.flow.FlowRuleBatchEvent;
40import org.onlab.onos.net.flow.FlowRuleBatchOperation;
41import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070042import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070044import org.onlab.onos.net.flow.FlowRuleEvent.Type;
45import org.onlab.onos.net.flow.FlowRuleStore;
46import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070047import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070048import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070049import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
50import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070051import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070052import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
53import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070054import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070055import org.onlab.onos.store.serializers.DistributedStoreSerializers;
56import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070057import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070058import org.slf4j.Logger;
59
60import com.google.common.collect.ArrayListMultimap;
61import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070062import com.google.common.collect.Iterables;
63import com.google.common.collect.Maps;
alshabib339a3d92014-09-26 17:54:32 -070064import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070065import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070066import com.google.common.util.concurrent.ListenableFuture;
67import com.google.common.util.concurrent.SettableFuture;
alshabib339a3d92014-09-26 17:54:32 -070068
69/**
Madan Jampani38b250d2014-10-17 11:02:38 -070070 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070071 */
alshabib339a3d92014-09-26 17:54:32 -070072@Component(immediate = true)
73@Service
74public class DistributedFlowRuleStore
Madan Jampani117aaae2014-10-23 10:04:05 -070075 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -070076 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070077
78 private final Logger log = getLogger(getClass());
79
80 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070081 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
82 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070083
alshabib92c65ad2014-10-08 21:56:05 -070084 private final Multimap<Short, FlowRule> flowEntriesById =
85 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070086
Madan Jampani38b250d2014-10-17 11:02:38 -070087 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070088 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070089
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070091 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -070092
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094 protected ClusterService clusterService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected DeviceService deviceService;
98
99 private final AtomicInteger localBatchIdGen = new AtomicInteger();
100
101
102 // FIXME switch to expiraing map/Cache?
103 private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
104
105 private final ExecutorService futureListeners =
106 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
107
Madan Jampani38b250d2014-10-17 11:02:38 -0700108
109 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
110 @Override
111 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700112 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700113 .register(DistributedStoreSerializers.COMMON)
114 .build()
115 .populate(1);
116 }
117 };
118
119 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700120 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700121
alshabib339a3d92014-09-26 17:54:32 -0700122 @Activate
123 public void activate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700124 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700125
126 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700127 public void handle(final ClusterMessage message) {
128 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
129 log.info("received batch request {}", operation);
130 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700131
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700132 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700133
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700134 @Override
135 public void run() {
136 CompletedBatchOperation result = Futures.getUnchecked(f);
137 try {
138 message.respond(SERIALIZER.encode(result));
139 } catch (IOException e) {
140 log.error("Failed to respond back", e);
141 }
142 }
143 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700144 }
145 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700146
147 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
148
149 @Override
150 public void handle(ClusterMessage message) {
151 FlowRule rule = SERIALIZER.decode(message.payload());
152 log.info("received get flow entry request for {}", rule);
153 FlowEntry flowEntry = getFlowEntryInternal(rule);
154 try {
155 message.respond(SERIALIZER.encode(flowEntry));
156 } catch (IOException e) {
157 log.error("Failed to respond back", e);
158 }
159 }
160 });
161
Madan Jampanif5fdef02014-10-23 21:58:10 -0700162 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
163
164 @Override
165 public void handle(ClusterMessage message) {
166 DeviceId deviceId = SERIALIZER.decode(message.payload());
167 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
168 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
169 try {
170 message.respond(SERIALIZER.encode(flowEntries));
171 } catch (IOException e) {
172 log.error("Failed to respond to peer's getFlowEntries request", e);
173 }
174 }
175 });
176
alshabib339a3d92014-09-26 17:54:32 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
182 log.info("Stopped");
183 }
184
185
Madan Jampani117aaae2014-10-23 10:04:05 -0700186 // TODO: This is not a efficient operation on a distributed sharded
187 // flow store. We need to revisit the need for this operation or at least
188 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700189 @Override
tom9b4030d2014-10-06 10:39:03 -0700190 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700191 // implementing in-efficient operation for debugging purpose.
192 int sum = 0;
193 for (Device device : deviceService.getDevices()) {
194 final DeviceId did = device.id();
195 sum += Iterables.size(getFlowEntries(did));
196 }
197 return sum;
tom9b4030d2014-10-06 10:39:03 -0700198 }
199
200 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700201 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700202 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
203 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
204 return getFlowEntryInternal(rule);
205 }
206
207 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
208 replicaInfo.master().orNull(), rule.deviceId());
209
210 ClusterMessage message = new ClusterMessage(
211 clusterService.getLocalNode().id(),
212 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
213 SERIALIZER.encode(rule));
214
215 try {
216 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
217 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
218 } catch (IOException | TimeoutException e) {
219 // FIXME: throw a FlowStoreException
220 throw new RuntimeException(e);
221 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700222 }
223
224 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
225 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700226 if (f.equals(rule)) {
227 return f;
228 }
229 }
230 return null;
231 }
232
233 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700234 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700235
236 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
237 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
238 return getFlowEntriesInternal(deviceId);
239 }
240
241 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
242 replicaInfo.master().orNull(), deviceId);
243
244 ClusterMessage message = new ClusterMessage(
245 clusterService.getLocalNode().id(),
246 GET_DEVICE_FLOW_ENTRIES,
247 SERIALIZER.encode(deviceId));
248
249 try {
250 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
251 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
252 } catch (IOException | TimeoutException e) {
253 // FIXME: throw a FlowStoreException
254 throw new RuntimeException(e);
255 }
256 }
257
258 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700259 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700260 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700261 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700262 }
263 return ImmutableSet.copyOf(rules);
264 }
265
266 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700267 public void storeFlowRule(FlowRule rule) {
268 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
269 }
270
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700271 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700272 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
273 if (operation.getOperations().isEmpty()) {
274 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700275 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700276
Madan Jampani117aaae2014-10-23 10:04:05 -0700277 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
278
279 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
280
281 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
282 return storeBatchInternal(operation);
283 }
284
285 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
286 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700287
Madan Jampani38b250d2014-10-17 11:02:38 -0700288 ClusterMessage message = new ClusterMessage(
289 clusterService.getLocalNode().id(),
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700290 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700291 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700292
293 try {
294 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
295 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
296 } catch (IOException | TimeoutException e) {
297 // FIXME: throw a FlowStoreException
298 throw new RuntimeException(e);
299 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700300
301 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700302 }
303
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700304 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700305 List<FlowEntry> toRemove = new ArrayList<>();
306 List<FlowEntry> toAdd = new ArrayList<>();
307 // TODO: backup changes to hazelcast map
308 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
309 FlowRule flowRule = batchEntry.getTarget();
310 FlowRuleOperation op = batchEntry.getOperator();
311 if (op.equals(FlowRuleOperation.REMOVE)) {
312 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
313 if (entry != null) {
314 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700315 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700316 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700317 } else if (op.equals(FlowRuleOperation.ADD)) {
318 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
319 DeviceId deviceId = flowRule.deviceId();
320 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
321 flowEntries.put(deviceId, flowEntry);
322 flowEntriesById.put(flowRule.appId(), flowEntry);
323 toAdd.add(flowEntry);
324 }
325 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700326 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700327 if (toAdd.isEmpty() && toRemove.isEmpty()) {
328 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
329 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700330
331 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
332 final int batchId = localBatchIdGen.incrementAndGet();
333
334 pendingFutures.put(batchId, r);
335 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
336 return r;
alshabib339a3d92014-09-26 17:54:32 -0700337 }
338
339 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 public void deleteFlowRule(FlowRule rule) {
341 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700342 }
343
344 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700345 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
346 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700347 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700348 return addOrUpdateFlowRuleInternal(rule);
349 }
350
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700351 log.error("Tried to update FlowRule {} state,"
352 + " while the Node was not the master.", rule);
353 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700354 }
355
356 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700357 DeviceId did = rule.deviceId();
358
359 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700360 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700361 if (stored != null) {
362 stored.setBytes(rule.bytes());
363 stored.setLife(rule.life());
364 stored.setPackets(rule.packets());
365 if (stored.state() == FlowEntryState.PENDING_ADD) {
366 stored.setState(FlowEntryState.ADDED);
367 return new FlowRuleEvent(Type.RULE_ADDED, rule);
368 }
alshabib339a3d92014-09-26 17:54:32 -0700369 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
370 }
371
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700372 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
373 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700374 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700375
376 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700377 }
378
379 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700380 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
381 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700382 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700383 // bypass and handle it locally
384 return removeFlowRuleInternal(rule);
385 }
386
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700387 log.error("Tried to remove FlowRule {},"
388 + " while the Node was not the master.", rule);
389 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700390 }
391
392 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700393 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700394 if (flowEntries.remove(rule.deviceId(), rule)) {
395 return new FlowRuleEvent(RULE_REMOVED, rule);
396 } else {
397 return null;
398 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700399 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700400 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700401
402 @Override
403 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700404 SettableFuture<CompletedBatchOperation> future
405 = pendingFutures.get(event.subject().batchId());
406 if (future != null) {
407 future.set(event.result());
408 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700409 notifyDelegate(event);
410 }
alshabib339a3d92014-09-26 17:54:32 -0700411}