blob: cac31c2db7899f86047633cbe477014760cfeefe [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07006import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -07007
Madan Jampani38b250d2014-10-17 11:02:38 -07008import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -07009import java.util.ArrayList;
10import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070011import java.util.Collection;
12import java.util.Collections;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070013import java.util.Map;
Madan Jampanif5fdef02014-10-23 21:58:10 -070014import java.util.Set;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070015import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070017import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070018import java.util.concurrent.TimeUnit;
19import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070020import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070021import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070022
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070028import org.apache.felix.scr.annotations.Service;
29import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070030import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070031import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070032import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070034import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070035import org.onlab.onos.net.flow.DefaultFlowEntry;
36import org.onlab.onos.net.flow.FlowEntry;
37import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070038import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import org.onlab.onos.net.flow.FlowRuleBatchEntry;
40import org.onlab.onos.net.flow.FlowRuleBatchEvent;
41import org.onlab.onos.net.flow.FlowRuleBatchOperation;
42import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070043import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070044import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070045import org.onlab.onos.net.flow.FlowRuleEvent.Type;
46import org.onlab.onos.net.flow.FlowRuleStore;
47import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070048import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070049import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070050import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
51import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070052import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070053import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
54import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070055import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070056import org.onlab.onos.store.serializers.DistributedStoreSerializers;
57import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070058import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070059import org.slf4j.Logger;
60
61import com.google.common.collect.ArrayListMultimap;
62import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070063import com.google.common.collect.Iterables;
64import com.google.common.collect.Maps;
alshabib339a3d92014-09-26 17:54:32 -070065import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070066import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070067import com.google.common.util.concurrent.ListenableFuture;
68import com.google.common.util.concurrent.SettableFuture;
alshabib339a3d92014-09-26 17:54:32 -070069
70/**
Madan Jampani38b250d2014-10-17 11:02:38 -070071 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070072 */
alshabib339a3d92014-09-26 17:54:32 -070073@Component(immediate = true)
74@Service
75public class DistributedFlowRuleStore
Madan Jampani117aaae2014-10-23 10:04:05 -070076 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -070077 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070078
79 private final Logger log = getLogger(getClass());
80
81 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070082 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
83 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070084
alshabib92c65ad2014-10-08 21:56:05 -070085 private final Multimap<Short, FlowRule> flowEntriesById =
86 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070087
Madan Jampani38b250d2014-10-17 11:02:38 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070089 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070090
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070092 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -070093
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070095 protected ClusterService clusterService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected DeviceService deviceService;
99
100 private final AtomicInteger localBatchIdGen = new AtomicInteger();
101
102
103 // FIXME switch to expiraing map/Cache?
104 private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
105
106 private final ExecutorService futureListeners =
107 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
108
Madan Jampani38b250d2014-10-17 11:02:38 -0700109
110 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
111 @Override
112 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700113 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700114 .register(DistributedStoreSerializers.COMMON)
115 .build()
116 .populate(1);
117 }
118 };
119
120 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700121 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700122
alshabib339a3d92014-09-26 17:54:32 -0700123 @Activate
124 public void activate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700125 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700126
127 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700128 public void handle(final ClusterMessage message) {
129 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
130 log.info("received batch request {}", operation);
131 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700132
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700133 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700134
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135 @Override
136 public void run() {
137 CompletedBatchOperation result = Futures.getUnchecked(f);
138 try {
139 message.respond(SERIALIZER.encode(result));
140 } catch (IOException e) {
141 log.error("Failed to respond back", e);
142 }
143 }
144 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700145 }
146 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700147
148 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
149
150 @Override
151 public void handle(ClusterMessage message) {
152 FlowRule rule = SERIALIZER.decode(message.payload());
153 log.info("received get flow entry request for {}", rule);
154 FlowEntry flowEntry = getFlowEntryInternal(rule);
155 try {
156 message.respond(SERIALIZER.encode(flowEntry));
157 } catch (IOException e) {
158 log.error("Failed to respond back", e);
159 }
160 }
161 });
162
Madan Jampanif5fdef02014-10-23 21:58:10 -0700163 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
164
165 @Override
166 public void handle(ClusterMessage message) {
167 DeviceId deviceId = SERIALIZER.decode(message.payload());
168 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
169 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
170 try {
171 message.respond(SERIALIZER.encode(flowEntries));
172 } catch (IOException e) {
173 log.error("Failed to respond to peer's getFlowEntries request", e);
174 }
175 }
176 });
177
alshabib339a3d92014-09-26 17:54:32 -0700178 log.info("Started");
179 }
180
181 @Deactivate
182 public void deactivate() {
183 log.info("Stopped");
184 }
185
186
Madan Jampani117aaae2014-10-23 10:04:05 -0700187 // TODO: This is not a efficient operation on a distributed sharded
188 // flow store. We need to revisit the need for this operation or at least
189 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700190 @Override
tom9b4030d2014-10-06 10:39:03 -0700191 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700192 // implementing in-efficient operation for debugging purpose.
193 int sum = 0;
194 for (Device device : deviceService.getDevices()) {
195 final DeviceId did = device.id();
196 sum += Iterables.size(getFlowEntries(did));
197 }
198 return sum;
tom9b4030d2014-10-06 10:39:03 -0700199 }
200
201 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700202 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700203 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
204 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
205 return getFlowEntryInternal(rule);
206 }
207
208 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
209 replicaInfo.master().orNull(), rule.deviceId());
210
211 ClusterMessage message = new ClusterMessage(
212 clusterService.getLocalNode().id(),
213 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
214 SERIALIZER.encode(rule));
215
216 try {
217 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
218 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
219 } catch (IOException | TimeoutException e) {
220 // FIXME: throw a FlowStoreException
221 throw new RuntimeException(e);
222 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700223 }
224
225 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
226 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700227 if (f.equals(rule)) {
228 return f;
229 }
230 }
231 return null;
232 }
233
234 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700235 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700236
237 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
238 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
239 return getFlowEntriesInternal(deviceId);
240 }
241
242 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
243 replicaInfo.master().orNull(), deviceId);
244
245 ClusterMessage message = new ClusterMessage(
246 clusterService.getLocalNode().id(),
247 GET_DEVICE_FLOW_ENTRIES,
248 SERIALIZER.encode(deviceId));
249
250 try {
251 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
252 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
253 } catch (IOException | TimeoutException e) {
254 // FIXME: throw a FlowStoreException
255 throw new RuntimeException(e);
256 }
257 }
258
259 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700260 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700261 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700262 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700263 }
264 return ImmutableSet.copyOf(rules);
265 }
266
267 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700268 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700269 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700270 if (rules == null) {
271 return Collections.emptyList();
272 }
273 return ImmutableSet.copyOf(rules);
274 }
275
276 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700277 public void storeFlowRule(FlowRule rule) {
278 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
279 }
280
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700281 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700282 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
283 if (operation.getOperations().isEmpty()) {
284 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700285 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700286
Madan Jampani117aaae2014-10-23 10:04:05 -0700287 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
288
289 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
290
291 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
292 return storeBatchInternal(operation);
293 }
294
295 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
296 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700297
Madan Jampani38b250d2014-10-17 11:02:38 -0700298 ClusterMessage message = new ClusterMessage(
299 clusterService.getLocalNode().id(),
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700300 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700301 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700302
303 try {
304 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
305 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
306 } catch (IOException | TimeoutException e) {
307 // FIXME: throw a FlowStoreException
308 throw new RuntimeException(e);
309 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700310
311 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700312 }
313
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700314 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700315 List<FlowEntry> toRemove = new ArrayList<>();
316 List<FlowEntry> toAdd = new ArrayList<>();
317 // TODO: backup changes to hazelcast map
318 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
319 FlowRule flowRule = batchEntry.getTarget();
320 FlowRuleOperation op = batchEntry.getOperator();
321 if (op.equals(FlowRuleOperation.REMOVE)) {
322 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
323 if (entry != null) {
324 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700325 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700326 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700327 } else if (op.equals(FlowRuleOperation.ADD)) {
328 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
329 DeviceId deviceId = flowRule.deviceId();
330 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
331 flowEntries.put(deviceId, flowEntry);
332 flowEntriesById.put(flowRule.appId(), flowEntry);
333 toAdd.add(flowEntry);
334 }
335 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700336 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700337 if (toAdd.isEmpty() && toRemove.isEmpty()) {
338 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
339 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700340
341 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
342 final int batchId = localBatchIdGen.incrementAndGet();
343
344 pendingFutures.put(batchId, r);
345 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
346 return r;
alshabib339a3d92014-09-26 17:54:32 -0700347 }
348
349 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700350 public void deleteFlowRule(FlowRule rule) {
351 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700352 }
353
354 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700355 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
356 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700357 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700358 return addOrUpdateFlowRuleInternal(rule);
359 }
360
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700361 log.error("Tried to update FlowRule {} state,"
362 + " while the Node was not the master.", rule);
363 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700364 }
365
366 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700367 DeviceId did = rule.deviceId();
368
369 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700370 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700371 if (stored != null) {
372 stored.setBytes(rule.bytes());
373 stored.setLife(rule.life());
374 stored.setPackets(rule.packets());
375 if (stored.state() == FlowEntryState.PENDING_ADD) {
376 stored.setState(FlowEntryState.ADDED);
377 return new FlowRuleEvent(Type.RULE_ADDED, rule);
378 }
alshabib339a3d92014-09-26 17:54:32 -0700379 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
380 }
381
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700382 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
383 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700384 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700385
386 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700387 }
388
389 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700390 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
391 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700392 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700393 // bypass and handle it locally
394 return removeFlowRuleInternal(rule);
395 }
396
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700397 log.error("Tried to remove FlowRule {},"
398 + " while the Node was not the master.", rule);
399 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700400 }
401
402 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700403 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700404 if (flowEntries.remove(rule.deviceId(), rule)) {
405 return new FlowRuleEvent(RULE_REMOVED, rule);
406 } else {
407 return null;
408 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700409 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700410 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700411
412 @Override
413 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700414 SettableFuture<CompletedBatchOperation> future
415 = pendingFutures.get(event.subject().batchId());
416 if (future != null) {
417 future.set(event.result());
418 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700419 notifyDelegate(event);
420 }
alshabib339a3d92014-09-26 17:54:32 -0700421}