blob: 60ab307880b7d123fe34aba50e4e45850525a1ef [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07005import static org.onlab.util.Tools.namedThreads;
alshabibbb42cad2014-09-25 11:43:05 -07006
alshabibbb42cad2014-09-25 11:43:05 -07007import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -07008import java.util.Map;
Madan Jampani117aaae2014-10-23 10:04:05 -07009import java.util.Set;
alshabib193525b2014-10-08 18:58:03 -070010import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -070011import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070012import java.util.concurrent.ExecutorService;
Madan Jampani117aaae2014-10-23 10:04:05 -070013import java.util.concurrent.Executors;
alshabib902d41b2014-10-07 16:52:05 -070014import java.util.concurrent.Future;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070017import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070018
alshabib57044ba2014-09-16 15:58:01 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070025import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070026import org.onlab.onos.event.AbstractListenerRegistry;
27import org.onlab.onos.event.EventDeliveryService;
28import org.onlab.onos.net.Device;
29import org.onlab.onos.net.DeviceId;
30import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070031import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070032import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070033import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070034import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070035import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070036import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070037import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070038import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070040import org.onlab.onos.net.flow.FlowRuleEvent;
41import org.onlab.onos.net.flow.FlowRuleListener;
42import org.onlab.onos.net.flow.FlowRuleProvider;
43import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
44import org.onlab.onos.net.flow.FlowRuleProviderService;
45import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070046import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070047import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070048import org.onlab.onos.net.provider.AbstractProviderRegistry;
49import org.onlab.onos.net.provider.AbstractProviderService;
50import org.slf4j.Logger;
51
alshabib902d41b2014-10-07 16:52:05 -070052import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070053import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070054import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070055import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070056import com.google.common.collect.Sets;
57import com.google.common.util.concurrent.Futures;
58import com.google.common.util.concurrent.ListenableFuture;
alshabiba7f7ca82014-09-22 11:41:23 -070059
tome4729872014-09-23 00:37:37 -070060/**
61 * Provides implementation of the flow NB & SB APIs.
62 */
alshabib57044ba2014-09-16 15:58:01 -070063@Component(immediate = true)
64@Service
tom202175a2014-09-19 19:00:11 -070065public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070066 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
67 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070068
alshabib193525b2014-10-08 18:58:03 -070069 enum BatchState { STARTED, FINISHED, CANCELLED };
70
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070071 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070072 private final Logger log = getLogger(getClass());
73
74 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070075 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070076
alshabibbb42cad2014-09-25 11:43:05 -070077 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070078
Thomas Vachuska8ac922d2014-10-23 16:17:03 -070079 private final ExecutorService futureListeners =
80 Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070081
tombe988312014-09-19 18:38:47 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070084
alshabib57044ba2014-09-16 15:58:01 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070086 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070087
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070089 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070090
91 @Activate
92 public void activate() {
tomc78acee2014-09-24 15:16:55 -070093 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070094 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
95 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700100 futureListeners.shutdownNow();
101
tomc78acee2014-09-24 15:16:55 -0700102 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700103 eventDispatcher.removeSink(FlowRuleEvent.class);
104 log.info("Stopped");
105 }
106
107 @Override
tom9b4030d2014-10-06 10:39:03 -0700108 public int getFlowRuleCount() {
109 return store.getFlowRuleCount();
110 }
111
112 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700113 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700114 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700115 }
116
117 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700118 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700119 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700120 FlowRule f = flowRules[i];
Madan Jampani117aaae2014-10-23 10:04:05 -0700121 store.storeFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700122 }
123 }
124
125 private void applyFlowRulesToProviders(FlowRule... flowRules) {
126 DeviceId did = null;
127 FlowRuleProvider frp = null;
128 for (FlowRule f : flowRules) {
129 if (!f.deviceId().equals(did)) {
130 did = f.deviceId();
131 final Device device = deviceService.getDevice(did);
132 frp = getProvider(device.providerId());
133 }
134 if (frp != null) {
135 frp.applyFlowRule(f);
136 }
alshabib57044ba2014-09-16 15:58:01 -0700137 }
alshabib57044ba2014-09-16 15:58:01 -0700138 }
139
140 @Override
141 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700142 FlowRule f;
alshabib57044ba2014-09-16 15:58:01 -0700143 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700144 f = flowRules[i];
Madan Jampani117aaae2014-10-23 10:04:05 -0700145 store.deleteFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700146 }
147 }
148
149 private void removeFlowRulesFromProviders(FlowRule... flowRules) {
150 DeviceId did = null;
151 FlowRuleProvider frp = null;
152 for (FlowRule f : flowRules) {
153 if (!f.deviceId().equals(did)) {
154 did = f.deviceId();
155 final Device device = deviceService.getDevice(did);
tom7951b232014-10-06 13:35:30 -0700156 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700157 }
158 if (frp != null) {
tom7951b232014-10-06 13:35:30 -0700159 frp.removeFlowRule(f);
160 }
alshabib57044ba2014-09-16 15:58:01 -0700161 }
alshabiba68eb962014-09-24 20:34:13 -0700162 }
alshabib57044ba2014-09-16 15:58:01 -0700163
alshabiba68eb962014-09-24 20:34:13 -0700164 @Override
165 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700166 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700167 FlowRuleProvider frp;
168 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700169
alshabiba68eb962014-09-24 20:34:13 -0700170 for (FlowRule f : rules) {
171 store.deleteFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700172 // FIXME: only accept request and push to provider on internal event
alshabiba68eb962014-09-24 20:34:13 -0700173 device = deviceService.getDevice(f.deviceId());
174 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700175 // FIXME: flows removed from store and flows removed from might diverge
176 // get rid of #removeRulesById?
alshabiba68eb962014-09-24 20:34:13 -0700177 frp.removeRulesById(id, f);
178 }
179 }
180
181 @Override
182 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700183 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700184 }
185
186 @Override
alshabib902d41b2014-10-07 16:52:05 -0700187 public Future<CompletedBatchOperation> applyBatch(
188 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700189 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700190 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700191 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700192 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
193 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700194 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700195 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700196
197 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700198 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700199 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
200 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700201 futures.add(future);
202 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700203 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700204 }
205
206 @Override
alshabib57044ba2014-09-16 15:58:01 -0700207 public void addListener(FlowRuleListener listener) {
208 listenerRegistry.addListener(listener);
209 }
210
211 @Override
212 public void removeListener(FlowRuleListener listener) {
213 listenerRegistry.removeListener(listener);
214 }
215
216 @Override
217 protected FlowRuleProviderService createProviderService(
218 FlowRuleProvider provider) {
219 return new InternalFlowRuleProviderService(provider);
220 }
221
222 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700223 extends AbstractProviderService<FlowRuleProvider>
224 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700225
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700226 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
227
alshabib57044ba2014-09-16 15:58:01 -0700228 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
229 super(provider);
230 }
231
232 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700233 public void flowRemoved(FlowEntry flowEntry) {
234 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700235 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700236 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700237 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700238 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700239 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700240 return;
241 }
alshabib1c319ff2014-10-04 20:29:09 -0700242 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700243 FlowRuleProvider frp = getProvider(device.providerId());
244 FlowRuleEvent event = null;
245 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700246 case ADDED:
247 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700248 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700249 break;
250 case PENDING_REMOVE:
251 case REMOVED:
252 event = store.removeFlowRule(stored);
253 break;
254 default:
255 break;
alshabib57044ba2014-09-16 15:58:01 -0700256
alshabiba68eb962014-09-24 20:34:13 -0700257 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700258 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700259 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700260 post(event);
261 }
alshabib57044ba2014-09-16 15:58:01 -0700262 }
263
alshabibba5ac482014-10-02 17:15:20 -0700264
alshabib1c319ff2014-10-04 20:29:09 -0700265 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700266 checkNotNull(flowRule, FLOW_RULE_NULL);
267 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700268 Device device = deviceService.getDevice(flowRule.deviceId());
269 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700270 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700271 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700272 case PENDING_REMOVE:
273 case REMOVED:
274 event = store.removeFlowRule(flowRule);
275 frp.removeFlowRule(flowRule);
276 break;
277 case ADDED:
278 case PENDING_ADD:
279 frp.applyFlowRule(flowRule);
280 break;
281 default:
282 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700283 }
284
alshabibbb42cad2014-09-25 11:43:05 -0700285 if (event != null) {
286 log.debug("Flow {} removed", flowRule);
287 post(event);
288 }
alshabib57044ba2014-09-16 15:58:01 -0700289
290 }
291
alshabibba5ac482014-10-02 17:15:20 -0700292
293 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700294 checkNotNull(flowRule, FLOW_RULE_NULL);
295 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700296 FlowRuleProvider frp = getProvider(flowRule.deviceId());
297 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700298 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700299 }
300
alshabibba5ac482014-10-02 17:15:20 -0700301
alshabib1c319ff2014-10-04 20:29:09 -0700302 private void flowAdded(FlowEntry flowEntry) {
303 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700304 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700305
alshabib1c319ff2014-10-04 20:29:09 -0700306 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700307
alshabib1c319ff2014-10-04 20:29:09 -0700308 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700309 if (event == null) {
310 log.debug("No flow store event generated.");
311 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700312 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700313 post(event);
314 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700315 } else {
Madan Jampani117aaae2014-10-23 10:04:05 -0700316 log.info("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700317 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700318 }
alshabib219ebaa2014-09-22 15:41:24 -0700319
alshabib57044ba2014-09-16 15:58:01 -0700320 }
321
alshabib1c319ff2014-10-04 20:29:09 -0700322 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
323 if (storedRule == null) {
324 return false;
325 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700326 if (storedRule.isPermanent()) {
327 return true;
328 }
329
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700330 final long timeout = storedRule.timeout() * 1000;
331 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700332 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700333 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700334 return true;
335 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700336 if (!lastSeen.containsKey(storedRule)) {
337 // checking for the first time
338 lastSeen.put(storedRule, storedRule.lastSeen());
339 // Use following if lastSeen attr. was removed.
340 //lastSeen.put(storedRule, currentTime);
341 }
342 Long last = lastSeen.get(storedRule);
343 if (last == null) {
344 // concurrently removed? let the liveness check fail
345 return false;
346 }
alshabib85c41972014-10-03 13:48:39 -0700347
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700348 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700349 return true;
350 }
351 return false;
alshabibba5ac482014-10-02 17:15:20 -0700352 }
353
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700354 // Posts the specified event to the local event dispatcher.
355 private void post(FlowRuleEvent event) {
356 if (event != null) {
357 eventDispatcher.post(event);
358 }
359 }
alshabib5c370ff2014-09-18 10:12:14 -0700360
361 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700362 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
363 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700364
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700365 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700366 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700367 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700368 flowAdded(rule);
369 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700370 // the device has a rule the store does not have
371 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700372 }
373 }
alshabib1c319ff2014-10-04 20:29:09 -0700374 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700375 // there are rules in the store that aren't on the switch
376 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700377
alshabiba7f7ca82014-09-22 11:41:23 -0700378 }
alshabib5c370ff2014-09-18 10:12:14 -0700379 }
alshabib57044ba2014-09-16 15:58:01 -0700380 }
381
tomc78acee2014-09-24 15:16:55 -0700382 // Store delegate to re-post events emitted from the store.
383 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 // TODO: Right now we only dispatch events at individual flowEntry level.
385 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700386 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700387 public void notify(FlowRuleBatchEvent event) {
388 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700389 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 case BATCH_OPERATION_REQUESTED:
Madan Jampani31961c12014-10-23 12:06:58 -0700391 for (FlowEntry entry : request.toAdd()) {
392 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
393 }
394 for (FlowEntry entry : request.toRemove()) {
395 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
396 }
397 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700398
Madan Jampani117aaae2014-10-23 10:04:05 -0700399 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700400
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 FlowRuleProvider flowRuleProvider =
402 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
403 final ListenableFuture<CompletedBatchOperation> result =
404 flowRuleProvider.executeBatch(batchOperation);
405 result.addListener(new Runnable() {
406 @Override
407 public void run() {
Thomas Vachuska8ac922d2014-10-23 16:17:03 -0700408 store.batchOperationComplete(FlowRuleBatchEvent.completed(request,
409 Futures.getUnchecked(result)));
Madan Jampani117aaae2014-10-23 10:04:05 -0700410 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700411 }, futureListeners);
Madan Jampani117aaae2014-10-23 10:04:05 -0700412
413 break;
414 case BATCH_OPERATION_COMPLETED:
415 Set<FlowEntry> failedItems = event.result().failedItems();
416 for (FlowEntry entry : request.toAdd()) {
417 if (!failedItems.contains(entry)) {
418 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
419 }
420 }
421 for (FlowEntry entry : request.toRemove()) {
422 if (!failedItems.contains(entry)) {
423 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
424 }
425 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700426 break;
427 default:
428 break;
429 }
tomc78acee2014-09-24 15:16:55 -0700430 }
431 }
alshabib902d41b2014-10-07 16:52:05 -0700432
Madan Jampani117aaae2014-10-23 10:04:05 -0700433 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700434
alshabib193525b2014-10-08 18:58:03 -0700435 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700436 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700437 private final AtomicReference<BatchState> state;
438 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700439
alshabib193525b2014-10-08 18:58:03 -0700440 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700441 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700442 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700443 this.batches = batches;
444 state = new AtomicReference<FlowRuleManager.BatchState>();
445 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700446 }
447
448 @Override
449 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700450 if (state.get() == BatchState.FINISHED) {
451 return false;
452 }
453 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
454 return false;
455 }
456 cleanUpBatch();
457 for (Future<CompletedBatchOperation> f : futures) {
458 f.cancel(mayInterruptIfRunning);
459 }
460 return true;
alshabib902d41b2014-10-07 16:52:05 -0700461 }
462
463 @Override
464 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700465 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700466 }
467
468 @Override
469 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700470 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700471 }
472
alshabib193525b2014-10-08 18:58:03 -0700473
alshabib902d41b2014-10-07 16:52:05 -0700474 @Override
475 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700476 ExecutionException {
477
478 if (isDone()) {
479 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700480 }
alshabib193525b2014-10-08 18:58:03 -0700481
482 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700483 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700484 CompletedBatchOperation completed;
485 for (Future<CompletedBatchOperation> future : futures) {
486 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700487 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700488 }
489
490 return finalizeBatchOperation(success, failed);
491
alshabib902d41b2014-10-07 16:52:05 -0700492 }
493
494 @Override
495 public CompletedBatchOperation get(long timeout, TimeUnit unit)
496 throws InterruptedException, ExecutionException,
497 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700498
499 if (isDone()) {
500 return overall;
501 }
502 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700503 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700504 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700505 long start = System.nanoTime();
506 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700507
508 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700509 long now = System.nanoTime();
510 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700511 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700512 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700513 }
alshabib193525b2014-10-08 18:58:03 -0700514 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700515 }
516
Madan Jampani117aaae2014-10-23 10:04:05 -0700517 private boolean validateBatchOperation(Set<FlowEntry> failed,
alshabib3effd042014-10-17 12:00:31 -0700518 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700519
520 if (isCancelled()) {
521 throw new CancellationException();
522 }
523 if (!completed.isSuccess()) {
524 failed.addAll(completed.failedItems());
525 cleanUpBatch();
526 cancelAllSubBatches();
527 return false;
528 }
529 return true;
530 }
531
532 private void cancelAllSubBatches() {
533 for (Future<CompletedBatchOperation> f : futures) {
534 f.cancel(true);
535 }
536 }
537
538 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Madan Jampani117aaae2014-10-23 10:04:05 -0700539 Set<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700540 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700541 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
542 if (state.get() == BatchState.FINISHED) {
543 return overall;
544 }
545 throw new CancellationException();
546 }
547 overall = new CompletedBatchOperation(success, failed);
548 return overall;
549 }
550 }
551
552 private void cleanUpBatch() {
553 for (FlowRuleBatchEntry fbe : batches.values()) {
554 if (fbe.getOperator() == FlowRuleOperation.ADD ||
555 fbe.getOperator() == FlowRuleOperation.MODIFY) {
556 store.deleteFlowRule(fbe.getTarget());
557 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700558 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700559 store.storeFlowRule(fbe.getTarget());
560 }
561 }
alshabib193525b2014-10-08 18:58:03 -0700562 }
alshabib902d41b2014-10-07 16:52:05 -0700563 }
alshabib57044ba2014-09-16 15:58:01 -0700564}