blob: 97a2c7179c8b804a656d213b901802bb7fe383ef [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Brian O'Connora3e5cd52015-12-05 15:59:19 -080018 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Iterables;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Futures;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Modified;
28 import org.apache.felix.scr.annotations.Property;
29 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.apache.felix.scr.annotations.Service;
32 import org.onlab.util.KryoNamespace;
33 import org.onlab.util.Tools;
34 import org.onosproject.cfg.ComponentConfigService;
35 import org.onosproject.cluster.ClusterService;
36 import org.onosproject.cluster.NodeId;
37 import org.onosproject.core.CoreService;
38 import org.onosproject.core.IdGenerator;
39 import org.onosproject.mastership.MastershipService;
40 import org.onosproject.net.DeviceId;
41 import org.onosproject.net.device.DeviceService;
42 import org.onosproject.net.flow.CompletedBatchOperation;
43 import org.onosproject.net.flow.DefaultFlowEntry;
44 import org.onosproject.net.flow.FlowEntry;
45 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
46 import org.onosproject.net.flow.FlowId;
47 import org.onosproject.net.flow.FlowRule;
48 import org.onosproject.net.flow.FlowRuleBatchEntry;
49 import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
50 import org.onosproject.net.flow.FlowRuleBatchEvent;
51 import org.onosproject.net.flow.FlowRuleBatchOperation;
52 import org.onosproject.net.flow.FlowRuleBatchRequest;
53 import org.onosproject.net.flow.FlowRuleEvent;
54 import org.onosproject.net.flow.FlowRuleEvent.Type;
55 import org.onosproject.net.flow.FlowRuleService;
56 import org.onosproject.net.flow.FlowRuleStore;
57 import org.onosproject.net.flow.FlowRuleStoreDelegate;
58 import org.onosproject.net.flow.StoredFlowEntry;
59 import org.onosproject.net.flow.TableStatisticsEntry;
60 import org.onosproject.persistence.PersistenceService;
61 import org.onosproject.store.AbstractStore;
62 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63 import org.onosproject.store.cluster.messaging.ClusterMessage;
64 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
65 import org.onosproject.store.flow.ReplicaInfoEvent;
66 import org.onosproject.store.flow.ReplicaInfoEventListener;
67 import org.onosproject.store.flow.ReplicaInfoService;
68 import org.onosproject.store.impl.MastershipBasedTimestamp;
69 import org.onosproject.store.serializers.KryoNamespaces;
70 import org.onosproject.store.serializers.KryoSerializer;
71 import org.onosproject.store.serializers.StoreSerializer;
72 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
73 import org.onosproject.store.service.EventuallyConsistentMap;
74 import org.onosproject.store.service.EventuallyConsistentMapEvent;
75 import org.onosproject.store.service.EventuallyConsistentMapListener;
76 import org.onosproject.store.service.Serializer;
77 import org.onosproject.store.service.StorageService;
78 import org.onosproject.store.service.WallClockTimestamp;
79 import org.osgi.service.component.ComponentContext;
80 import org.slf4j.Logger;
Madan Jampani86940d92015-05-06 11:47:57 -070081
Brian O'Connora3e5cd52015-12-05 15:59:19 -080082 import java.util.Collections;
83 import java.util.Dictionary;
84 import java.util.HashSet;
85 import java.util.List;
86 import java.util.Map;
Sho SHIMIZU828bc162016-01-13 23:10:43 -080087 import java.util.Objects;
Brian O'Connora3e5cd52015-12-05 15:59:19 -080088 import java.util.Set;
89 import java.util.concurrent.ExecutorService;
90 import java.util.concurrent.Executors;
91 import java.util.concurrent.ScheduledExecutorService;
92 import java.util.concurrent.ScheduledFuture;
93 import java.util.concurrent.TimeUnit;
94 import java.util.concurrent.atomic.AtomicInteger;
95 import java.util.concurrent.atomic.AtomicReference;
96 import java.util.stream.Collectors;
Madan Jampani86940d92015-05-06 11:47:57 -070097
Brian O'Connora3e5cd52015-12-05 15:59:19 -080098 import static com.google.common.base.Strings.isNullOrEmpty;
99 import static org.onlab.util.Tools.get;
100 import static org.onlab.util.Tools.groupedThreads;
101 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
102 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
103 import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -0700104
105/**
106 * Manages inventory of flow rules using a distributed state management protocol.
107 */
108@Component(immediate = true, enabled = true)
109@Service
110public class NewDistributedFlowRuleStore
111 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
112 implements FlowRuleStore {
113
114 private final Logger log = getLogger(getClass());
115
116 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
117 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700118 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700119 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700120 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampaniadea8902015-06-04 17:39:45 -0700121 // number of devices whose flow entries will be backed up in one communication round
122 private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
Madan Jampani86940d92015-05-06 11:47:57 -0700123
124 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
125 label = "Number of threads in the message handler pool")
126 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
127
128 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
129 label = "Indicates whether backups are enabled or not")
130 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
131
Madan Jampani08bf17b2015-05-06 16:25:26 -0700132 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
133 label = "Delay in ms between successive backup runs")
134 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700135 @Property(name = "persistenceEnabled", boolValue = false,
136 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
137 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700138
Madan Jampani86940d92015-05-06 11:47:57 -0700139 private InternalFlowTable flowTable = new InternalFlowTable();
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ReplicaInfoService replicaInfoManager;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected ClusterCommunicationService clusterCommunicator;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected DeviceService deviceService;
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected CoreService coreService;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157 protected ComponentConfigService configService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160 protected MastershipService mastershipService;
161
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163 protected PersistenceService persistenceService;
164
Madan Jampani86940d92015-05-06 11:47:57 -0700165 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
166 private ExecutorService messageHandlingExecutor;
167
Madan Jampani08bf17b2015-05-06 16:25:26 -0700168 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700169 private final ScheduledExecutorService backupSenderExecutor =
170 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
171
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700172 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
173 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
174 new InternalTableStatsListener();
175
176 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
177 protected StorageService storageService;
178
Madan Jampani86940d92015-05-06 11:47:57 -0700179 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
180 @Override
181 protected void setupKryoPool() {
182 serializerPool = KryoNamespace.newBuilder()
183 .register(DistributedStoreSerializers.STORE_COMMON)
184 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700185 .build();
186 }
187 };
188
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700189 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
190 .register(KryoNamespaces.API)
191 .register(MastershipBasedTimestamp.class);
192
193
Madan Jampani86940d92015-05-06 11:47:57 -0700194 private IdGenerator idGenerator;
195 private NodeId local;
196
197 @Activate
198 public void activate(ComponentContext context) {
199 configService.registerProperties(getClass());
200
201 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
202
203 local = clusterService.getLocalNode().id();
204
205 messageHandlingExecutor = Executors.newFixedThreadPool(
206 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
207
208 registerMessageHandlers(messageHandlingExecutor);
209
Madan Jampani08bf17b2015-05-06 16:25:26 -0700210 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700211 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700212 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
213 flowTable::backup,
214 0,
215 backupPeriod,
216 TimeUnit.MILLISECONDS);
217 }
Madan Jampani86940d92015-05-06 11:47:57 -0700218
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700219 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
220 .withName("onos-flow-table-stats")
221 .withSerializer(SERIALIZER_BUILDER)
222 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
223 .withTimestampProvider((k, v) -> new WallClockTimestamp())
224 .withTombstonesDisabled()
225 .build();
226 deviceTableStats.addListener(tableStatsListener);
227
Madan Jampani86940d92015-05-06 11:47:57 -0700228 logConfig("Started");
229 }
230
231 @Deactivate
232 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700233 if (backupEnabled) {
234 replicaInfoManager.removeListener(flowTable);
235 backupTask.cancel(true);
236 }
Madan Jampani86940d92015-05-06 11:47:57 -0700237 configService.unregisterProperties(getClass(), false);
238 unregisterMessageHandlers();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700239 deviceTableStats.removeListener(tableStatsListener);
240 deviceTableStats.destroy();
Madan Jampani86940d92015-05-06 11:47:57 -0700241 messageHandlingExecutor.shutdownNow();
242 backupSenderExecutor.shutdownNow();
243 log.info("Stopped");
244 }
245
246 @SuppressWarnings("rawtypes")
247 @Modified
248 public void modified(ComponentContext context) {
249 if (context == null) {
250 backupEnabled = DEFAULT_BACKUP_ENABLED;
251 logConfig("Default config");
252 return;
253 }
254
255 Dictionary properties = context.getProperties();
256 int newPoolSize;
257 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700258 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700259 try {
260 String s = get(properties, "msgHandlerPoolSize");
261 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
262
263 s = get(properties, "backupEnabled");
264 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
265
Madan Jampani08bf17b2015-05-06 16:25:26 -0700266 s = get(properties, "backupPeriod");
267 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
268
Madan Jampani86940d92015-05-06 11:47:57 -0700269 } catch (NumberFormatException | ClassCastException e) {
270 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
271 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700272 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700273 }
274
Madan Jampani08bf17b2015-05-06 16:25:26 -0700275 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700276 if (newBackupEnabled != backupEnabled) {
277 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700278 if (!backupEnabled) {
279 replicaInfoManager.removeListener(flowTable);
280 if (backupTask != null) {
281 backupTask.cancel(false);
282 backupTask = null;
283 }
284 } else {
285 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700286 }
287 restartBackupTask = backupEnabled;
288 }
289 if (newBackupPeriod != backupPeriod) {
290 backupPeriod = newBackupPeriod;
291 restartBackupTask = backupEnabled;
292 }
293 if (restartBackupTask) {
294 if (backupTask != null) {
295 // cancel previously running task
296 backupTask.cancel(false);
297 }
298 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
299 flowTable::backup,
300 0,
301 backupPeriod,
302 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700303 }
304 if (newPoolSize != msgHandlerPoolSize) {
305 msgHandlerPoolSize = newPoolSize;
306 ExecutorService oldMsgHandler = messageHandlingExecutor;
307 messageHandlingExecutor = Executors.newFixedThreadPool(
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800308 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700309
310 // replace previously registered handlers.
311 registerMessageHandlers(messageHandlingExecutor);
312 oldMsgHandler.shutdown();
313 }
314 logConfig("Reconfigured");
315 }
316
317 private void registerMessageHandlers(ExecutorService executor) {
318
319 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
320 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
321 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
322 clusterCommunicator.addSubscriber(
323 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
324 clusterCommunicator.addSubscriber(
325 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
326 clusterCommunicator.addSubscriber(
327 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
328 clusterCommunicator.addSubscriber(
329 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
330 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700331 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700332 }
333
334 private void unregisterMessageHandlers() {
335 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
336 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
337 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
338 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
339 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
340 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
341 }
342
343 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700344 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
345 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700346 }
347
348 // This is not a efficient operation on a distributed sharded
349 // flow store. We need to revisit the need for this operation or at least
350 // make it device specific.
351 @Override
352 public int getFlowRuleCount() {
353 AtomicInteger sum = new AtomicInteger(0);
354 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
355 return sum.get();
356 }
357
358 @Override
359 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700360 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700361
362 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700363 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700364 return null;
365 }
366
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800367 if (Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700368 return flowTable.getFlowEntry(rule);
369 }
370
371 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
372 master, rule.deviceId());
373
374 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
375 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
376 SERIALIZER::encode,
377 SERIALIZER::decode,
378 master),
379 FLOW_RULE_STORE_TIMEOUT_MILLIS,
380 TimeUnit.MILLISECONDS,
381 null);
382 }
383
384 @Override
385 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700386 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700387
388 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700389 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700390 return Collections.emptyList();
391 }
392
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800393 if (Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700394 return flowTable.getFlowEntries(deviceId);
395 }
396
397 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
398 master, deviceId);
399
400 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
401 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
402 SERIALIZER::encode,
403 SERIALIZER::decode,
404 master),
405 FLOW_RULE_STORE_TIMEOUT_MILLIS,
406 TimeUnit.MILLISECONDS,
407 Collections.emptyList());
408 }
409
410 @Override
411 public void storeFlowRule(FlowRule rule) {
412 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700413 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700414 rule.deviceId(), idGenerator.getNewId()));
415 }
416
417 @Override
418 public void storeBatch(FlowRuleBatchOperation operation) {
419 if (operation.getOperations().isEmpty()) {
420 notifyDelegate(FlowRuleBatchEvent.completed(
421 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
422 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
423 return;
424 }
425
426 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700427 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700428
429 if (master == null) {
430 log.warn("No master for {} : flows will be marked for removal", deviceId);
431
432 updateStoreInternal(operation);
433
434 notifyDelegate(FlowRuleBatchEvent.completed(
435 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
436 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
437 return;
438 }
439
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800440 if (Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700441 storeBatchInternal(operation);
442 return;
443 }
444
445 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
446 master, deviceId);
447
Madan Jampani175e8fd2015-05-20 14:10:45 -0700448 clusterCommunicator.unicast(operation,
449 APPLY_BATCH_FLOWS,
450 SERIALIZER::encode,
451 master)
452 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700453 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700454 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700455
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700456 Set<FlowRule> allFailures = operation.getOperations()
457 .stream()
458 .map(op -> op.target())
459 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700460
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700461 notifyDelegate(FlowRuleBatchEvent.completed(
462 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
463 new CompletedBatchOperation(false, allFailures, deviceId)));
464 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700465 });
Madan Jampani86940d92015-05-06 11:47:57 -0700466 }
467
468 private void storeBatchInternal(FlowRuleBatchOperation operation) {
469
470 final DeviceId did = operation.deviceId();
471 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
472 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
473 if (currentOps.isEmpty()) {
474 batchOperationComplete(FlowRuleBatchEvent.completed(
475 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
476 new CompletedBatchOperation(true, Collections.emptySet(), did)));
477 return;
478 }
479
480 notifyDelegate(FlowRuleBatchEvent.requested(new
481 FlowRuleBatchRequest(operation.id(),
482 currentOps), operation.deviceId()));
483 }
484
485 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
486 return operation.getOperations().stream().map(
487 op -> {
488 StoredFlowEntry entry;
489 switch (op.operator()) {
490 case ADD:
491 entry = new DefaultFlowEntry(op.target());
492 // always add requested FlowRule
493 // Note: 2 equal FlowEntry may have different treatment
494 flowTable.remove(entry.deviceId(), entry);
495 flowTable.add(entry);
496
497 return op;
498 case REMOVE:
499 entry = flowTable.getFlowEntry(op.target());
500 if (entry != null) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800501 //FIXME modification of "stored" flow entry outside of flow table
Madan Jampani86940d92015-05-06 11:47:57 -0700502 entry.setState(FlowEntryState.PENDING_REMOVE);
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800503 log.debug("Setting state of rule to pending remove: {}", entry);
Madan Jampani86940d92015-05-06 11:47:57 -0700504 return op;
505 }
506 break;
507 case MODIFY:
508 //TODO: figure this out at some point
509 break;
510 default:
511 log.warn("Unknown flow operation operator: {}", op.operator());
512 }
513 return null;
514 }
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800515 ).filter(Objects::nonNull).collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700516 }
517
518 @Override
519 public void deleteFlowRule(FlowRule rule) {
520 storeBatch(
521 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700522 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700523 new FlowRuleBatchEntry(
524 FlowRuleOperation.REMOVE,
525 rule)), rule.deviceId(), idGenerator.getNewId()));
526 }
527
528 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800529 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
530 if (mastershipService.isLocalMaster(rule.deviceId())) {
531 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
532 if (stored != null &&
533 stored.state() != FlowEntryState.PENDING_ADD) {
534 stored.setState(FlowEntryState.PENDING_ADD);
535 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
536 }
537 }
538 return null;
539 }
540
541 @Override
Madan Jampani86940d92015-05-06 11:47:57 -0700542 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700543 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800544 if (Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700545 return addOrUpdateFlowRuleInternal(rule);
546 }
547
548 log.warn("Tried to update FlowRule {} state,"
549 + " while the Node was not the master.", rule);
550 return null;
551 }
552
553 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
554 // check if this new rule is an update to an existing entry
555 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
556 if (stored != null) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800557 //FIXME modification of "stored" flow entry outside of flow table
Madan Jampani86940d92015-05-06 11:47:57 -0700558 stored.setBytes(rule.bytes());
559 stored.setLife(rule.life());
560 stored.setPackets(rule.packets());
Jonathan Hart89e981f2016-01-04 13:59:55 -0800561 stored.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700562 if (stored.state() == FlowEntryState.PENDING_ADD) {
563 stored.setState(FlowEntryState.ADDED);
564 return new FlowRuleEvent(Type.RULE_ADDED, rule);
565 }
566 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
567 }
568
569 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
570 // TODO: also update backup if the behavior is correct.
571 flowTable.add(rule);
572 return null;
573 }
574
575 @Override
576 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
577 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700578 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700579
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800580 if (Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700581 // bypass and handle it locally
582 return removeFlowRuleInternal(rule);
583 }
584
585 if (master == null) {
586 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
587 // TODO: revisit if this should be null (="no-op") or Exception
588 return null;
589 }
590
591 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
592 master, deviceId);
593
594 return Futures.get(clusterCommunicator.sendAndReceive(
595 rule,
596 REMOVE_FLOW_ENTRY,
597 SERIALIZER::encode,
598 SERIALIZER::decode,
599 master),
600 FLOW_RULE_STORE_TIMEOUT_MILLIS,
601 TimeUnit.MILLISECONDS,
602 RuntimeException.class);
603 }
604
605 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
606 final DeviceId deviceId = rule.deviceId();
607 // This is where one could mark a rule as removed and still keep it in the store.
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800608 final FlowEntry removed = flowTable.remove(deviceId, rule);
609 // rule may be partial rule that is missing treatment, we should use rule from store instead
610 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700611 }
612
613 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800614 public void purgeFlowRule(DeviceId deviceId) {
615 flowTable.purgeFlowRule(deviceId);
616 }
617
618 @Override
Madan Jampani86940d92015-05-06 11:47:57 -0700619 public void batchOperationComplete(FlowRuleBatchEvent event) {
620 //FIXME: need a per device pending response
621 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
622 if (nodeId == null) {
623 notifyDelegate(event);
624 } else {
625 // TODO check unicast return value
626 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
627 //error log: log.warn("Failed to respond to peer for batch operation result");
628 }
629 }
630
631 private final class OnStoreBatch implements ClusterMessageHandler {
632
633 @Override
634 public void handle(final ClusterMessage message) {
635 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
636 log.debug("received batch request {}", operation);
637
638 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700639 NodeId master = mastershipService.getMasterFor(deviceId);
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800640 if (!Objects.equals(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700641 Set<FlowRule> failures = new HashSet<>(operation.size());
642 for (FlowRuleBatchEntry op : operation.getOperations()) {
643 failures.add(op.target());
644 }
645 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
646 // This node is no longer the master, respond as all failed.
647 // TODO: we might want to wrap response in envelope
648 // to distinguish sw programming failure and hand over
649 // it make sense in the latter case to retry immediately.
650 message.respond(SERIALIZER.encode(allFailed));
651 return;
652 }
653
654 pendingResponses.put(operation.id(), message.sender());
655 storeBatchInternal(operation);
656 }
657 }
658
Madan Jampanif7536ab2015-05-07 23:23:23 -0700659 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700660
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800661 //TODO replace the Map<V,V> with ExtendedSet
662 private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
Madan Jampani5c3766c2015-06-02 15:54:41 -0700663 flowEntries = Maps.newConcurrentMap();
Madan Jampani86940d92015-05-06 11:47:57 -0700664
665 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
666 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
667 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
668
Madan Jampanif7536ab2015-05-07 23:23:23 -0700669 @Override
670 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700671 if (!backupEnabled) {
672 return;
673 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700674 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
675 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700676 NodeId master = mastershipService.getMasterFor(deviceId);
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800677 if (!Objects.equals(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700678 // ignore since this event is for a device this node does not manage.
679 return;
680 }
Madan Jampani7267c552015-05-20 22:39:17 -0700681 NodeId newBackupNode = getBackupNode(deviceId);
682 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800683 if (Objects.equals(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700684 // ignore since backup location hasn't changed.
685 return;
686 }
Madan Jampani7267c552015-05-20 22:39:17 -0700687 if (currentBackupNode != null && newBackupNode == null) {
688 // Current backup node is most likely down and no alternate backup node
689 // has been chosen. Clear current backup location so that we can resume
690 // backups when either current backup comes online or a different backup node
691 // is chosen.
692 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
693 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
694 lastBackupNodes.remove(deviceId);
695 lastBackupTimes.remove(deviceId);
696 return;
697 // TODO: Pick any available node as backup and ensure hand-off occurs when
698 // a new master is elected.
699 }
Madan Jampani44839b82015-06-12 13:57:41 -0700700 log.debug("Backup location for {} has changed from {} to {}.",
Madan Jampani7267c552015-05-20 22:39:17 -0700701 deviceId, currentBackupNode, newBackupNode);
702 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700703 0,
704 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700705 }
706 }
707
Madan Jampaniadea8902015-06-04 17:39:45 -0700708 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
709 // split up the devices into smaller batches and send them separately.
710 Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
711 .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
712 }
713
Madan Jampanif7536ab2015-05-07 23:23:23 -0700714 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
Madan Jampaniadea8902015-06-04 17:39:45 -0700715 if (deviceIds.isEmpty()) {
716 return;
717 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700718 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800719 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
720 deviceFlowEntries = Maps.newConcurrentMap();
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700721 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800722 clusterCommunicator.<Map<DeviceId,
723 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
724 Set<DeviceId>>
725 sendAndReceive(deviceFlowEntries,
726 FLOW_TABLE_BACKUP,
727 SERIALIZER::encode,
728 SERIALIZER::decode,
729 nodeId)
730 .whenComplete((backedupDevices, error) -> {
731 Set<DeviceId> devicesNotBackedup = error != null ?
732 deviceFlowEntries.keySet() :
733 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
734 if (devicesNotBackedup.size() > 0) {
735 log.warn("Failed to backup devices: {}. Reason: {}",
736 devicesNotBackedup, error.getMessage());
737 }
738 if (backedupDevices != null) {
739 backedupDevices.forEach(id -> {
740 lastBackupTimes.put(id, System.currentTimeMillis());
741 lastBackupNodes.put(id, nodeId);
742 });
743 }
744 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700745 }
746
Madan Jampani86940d92015-05-06 11:47:57 -0700747 /**
748 * Returns the flow table for specified device.
749 *
750 * @param deviceId identifier of the device
751 * @return Map representing Flow Table of given device.
752 */
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800753 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700754 if (persistenceEnabled) {
755 return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800756 .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700757 .withName("FlowTable:" + deviceId.toString())
758 .withSerializer(new Serializer() {
759 @Override
760 public <T> byte[] encode(T object) {
761 return SERIALIZER.encode(object);
762 }
763
764 @Override
765 public <T> T decode(byte[] bytes) {
766 return SERIALIZER.decode(bytes);
767 }
768 })
769 .build());
770 } else {
771 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
772 }
Madan Jampani86940d92015-05-06 11:47:57 -0700773 }
774
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800775 private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
776 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
Madan Jampani86940d92015-05-06 11:47:57 -0700777 }
778
779 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800780 return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
Madan Jampani86940d92015-05-06 11:47:57 -0700781 }
782
783 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800784 return getFlowTable(deviceId).values().stream()
785 .flatMap(m -> m.values().stream())
786 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700787 }
788
789 public StoredFlowEntry getFlowEntry(FlowRule rule) {
790 return getFlowEntryInternal(rule);
791 }
792
793 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
794 return getFlowEntriesInternal(deviceId);
795 }
796
797 public void add(FlowEntry rule) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800798 getFlowEntriesInternal(rule.deviceId(), rule.id())
799 .compute((StoredFlowEntry) rule, (k, stored) -> {
800 //TODO compare stored and rule timestamps
801 //TODO the key is not updated
802 return (StoredFlowEntry) rule;
803 });
Madan Jampani86940d92015-05-06 11:47:57 -0700804 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
805 }
806
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800807 public FlowEntry remove(DeviceId deviceId, FlowEntry rule) {
808 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
809 getFlowEntriesInternal(rule.deviceId(), rule.id())
810 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
811 if (rule instanceof DefaultFlowEntry) {
812 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
813 if (stored instanceof DefaultFlowEntry) {
814 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
815 if (toRemove.created() < storedEntry.created()) {
816 log.debug("Trying to remove more recent flow entry {} (stored: {})",
817 toRemove, stored);
818 // the key is not updated, removedRule remains null
819 return stored;
820 }
821 }
822 }
823 removedRule.set(stored);
824 return null;
825 });
826
827 if (removedRule.get() != null) {
Madan Jampani86940d92015-05-06 11:47:57 -0700828 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800829 return removedRule.get();
830 } else {
831 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700832 }
833 }
834
Charles Chan0c7c43b2016-01-14 17:39:20 -0800835 public void purgeFlowRule(DeviceId deviceId) {
836 flowEntries.remove(deviceId);
837 }
838
Madan Jampani86940d92015-05-06 11:47:57 -0700839 private NodeId getBackupNode(DeviceId deviceId) {
840 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
841 // pick the standby which is most likely to become next master
842 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
843 }
844
845 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700846 if (!backupEnabled) {
847 return;
848 }
Madan Jampani86940d92015-05-06 11:47:57 -0700849 try {
850 // determine the set of devices that we need to backup during this run.
851 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
852 .stream()
853 .filter(deviceId -> {
854 Long lastBackupTime = lastBackupTimes.get(deviceId);
855 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
856 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700857 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700858 return lastBackupTime == null
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800859 || !Objects.equals(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700860 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
861 })
862 .collect(Collectors.toSet());
863
864 // compute a mapping from node to the set of devices whose flow entries it should backup
865 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
866 devicesToBackup.forEach(deviceId -> {
867 NodeId backupLocation = getBackupNode(deviceId);
868 if (backupLocation != null) {
869 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
870 .add(deviceId);
871 }
872 });
Madan Jampani86940d92015-05-06 11:47:57 -0700873 // send the device flow entries to their respective backup nodes
Madan Jampaniadea8902015-06-04 17:39:45 -0700874 devicesToBackupByNode.forEach(this::sendBackups);
Madan Jampani86940d92015-05-06 11:47:57 -0700875 } catch (Exception e) {
876 log.error("Backup failed.", e);
877 }
878 }
879
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800880 private Set<DeviceId> onBackupReceipt(Map<DeviceId,
881 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700882 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700883 Set<DeviceId> backedupDevices = Sets.newHashSet();
884 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700885 flowTables.forEach((deviceId, deviceFlowTable) -> {
886 // Only process those devices are that not managed by the local node.
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800887 if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800888 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
889 getFlowTable(deviceId);
Madan Jampania98bf932015-06-02 12:01:36 -0700890 backupFlowTable.clear();
891 backupFlowTable.putAll(deviceFlowTable);
892 backedupDevices.add(deviceId);
893 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700894 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700895 } catch (Exception e) {
896 log.warn("Failure processing backup request", e);
897 }
898 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700899 }
900 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700901
902 @Override
903 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
904 List<TableStatisticsEntry> tableStats) {
905 deviceTableStats.put(deviceId, tableStats);
906 return null;
907 }
908
909 @Override
910 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
911 NodeId master = mastershipService.getMasterFor(deviceId);
912
913 if (master == null) {
914 log.debug("Failed to getTableStats: No master for {}", deviceId);
915 return Collections.emptyList();
916 }
917
918 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
919 if (tableStats == null) {
920 return Collections.emptyList();
921 }
922 return ImmutableList.copyOf(tableStats);
923 }
924
925 private class InternalTableStatsListener
926 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
927 @Override
928 public void event(EventuallyConsistentMapEvent<DeviceId,
929 List<TableStatisticsEntry>> event) {
930 //TODO: Generate an event to listeners (do we need?)
931 }
932 }
Madan Jampani86940d92015-05-06 11:47:57 -0700933}