blob: 1695e5ff576cd503305fd4d1dfdaad0d242e14e6 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070019import com.google.common.collect.ImmutableList;
Madan Jampani85a9b0d2015-06-03 17:15:44 -070020import com.google.common.collect.ImmutableMap;
Madan Jampani86940d92015-05-06 11:47:57 -070021import com.google.common.collect.Iterables;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
24import com.google.common.util.concurrent.Futures;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Modified;
30import org.apache.felix.scr.annotations.Property;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.util.KryoNamespace;
Madan Jampani86940d92015-05-06 11:47:57 -070035import org.onlab.util.Tools;
36import org.onosproject.cfg.ComponentConfigService;
37import org.onosproject.cluster.ClusterService;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.IdGenerator;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.flow.CompletedBatchOperation;
45import org.onosproject.net.flow.DefaultFlowEntry;
46import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowEntry.FlowEntryState;
48import org.onosproject.net.flow.FlowId;
49import org.onosproject.net.flow.FlowRule;
50import org.onosproject.net.flow.FlowRuleBatchEntry;
51import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
52import org.onosproject.net.flow.FlowRuleBatchEvent;
53import org.onosproject.net.flow.FlowRuleBatchOperation;
54import org.onosproject.net.flow.FlowRuleBatchRequest;
55import org.onosproject.net.flow.FlowRuleEvent;
56import org.onosproject.net.flow.FlowRuleEvent.Type;
57import org.onosproject.net.flow.FlowRuleService;
58import org.onosproject.net.flow.FlowRuleStore;
59import org.onosproject.net.flow.FlowRuleStoreDelegate;
60import org.onosproject.net.flow.StoredFlowEntry;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070061import org.onosproject.net.flow.TableStatisticsEntry;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -070062import org.onosproject.persistence.PersistenceService;
Madan Jampani86940d92015-05-06 11:47:57 -070063import org.onosproject.store.AbstractStore;
64import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
65import org.onosproject.store.cluster.messaging.ClusterMessage;
66import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070067import org.onosproject.store.flow.ReplicaInfoEvent;
68import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070069import org.onosproject.store.flow.ReplicaInfoService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070070import org.onosproject.store.impl.MastershipBasedTimestamp;
71import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani86940d92015-05-06 11:47:57 -070072import org.onosproject.store.serializers.KryoSerializer;
73import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070074import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070075import org.onosproject.store.service.EventuallyConsistentMap;
76import org.onosproject.store.service.EventuallyConsistentMapEvent;
77import org.onosproject.store.service.EventuallyConsistentMapListener;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -070078import org.onosproject.store.service.Serializer;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070079import org.onosproject.store.service.StorageService;
80import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani86940d92015-05-06 11:47:57 -070081import org.osgi.service.component.ComponentContext;
82import org.slf4j.Logger;
83
Madan Jampani86940d92015-05-06 11:47:57 -070084import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.List;
88import java.util.Map;
89import java.util.Set;
Madan Jampani86940d92015-05-06 11:47:57 -070090import java.util.concurrent.ExecutorService;
91import java.util.concurrent.Executors;
92import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070093import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070094import java.util.concurrent.TimeUnit;
95import java.util.concurrent.atomic.AtomicInteger;
96import java.util.stream.Collectors;
97
Madan Jampani86940d92015-05-06 11:47:57 -070098import static com.google.common.base.Strings.isNullOrEmpty;
99import static org.onlab.util.Tools.get;
100import static org.onlab.util.Tools.groupedThreads;
101import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
102import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Manages inventory of flow rules using a distributed state management protocol.
107 */
108@Component(immediate = true, enabled = true)
109@Service
110public class NewDistributedFlowRuleStore
111 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
112 implements FlowRuleStore {
113
114 private final Logger log = getLogger(getClass());
115
116 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
117 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700118 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700119 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700120 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampaniadea8902015-06-04 17:39:45 -0700121 // number of devices whose flow entries will be backed up in one communication round
122 private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
Madan Jampani86940d92015-05-06 11:47:57 -0700123
124 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
125 label = "Number of threads in the message handler pool")
126 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
127
128 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
129 label = "Indicates whether backups are enabled or not")
130 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
131
Madan Jampani08bf17b2015-05-06 16:25:26 -0700132 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
133 label = "Delay in ms between successive backup runs")
134 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700135 @Property(name = "persistenceEnabled", boolValue = false,
136 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
137 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700138
Madan Jampani86940d92015-05-06 11:47:57 -0700139 private InternalFlowTable flowTable = new InternalFlowTable();
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ReplicaInfoService replicaInfoManager;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected ClusterCommunicationService clusterCommunicator;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected DeviceService deviceService;
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected CoreService coreService;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157 protected ComponentConfigService configService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160 protected MastershipService mastershipService;
161
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163 protected PersistenceService persistenceService;
164
Madan Jampani86940d92015-05-06 11:47:57 -0700165 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
166 private ExecutorService messageHandlingExecutor;
167
Madan Jampani08bf17b2015-05-06 16:25:26 -0700168 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700169 private final ScheduledExecutorService backupSenderExecutor =
170 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
171
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700172 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
173 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
174 new InternalTableStatsListener();
175
176 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
177 protected StorageService storageService;
178
Madan Jampani86940d92015-05-06 11:47:57 -0700179 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
180 @Override
181 protected void setupKryoPool() {
182 serializerPool = KryoNamespace.newBuilder()
183 .register(DistributedStoreSerializers.STORE_COMMON)
184 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700185 .build();
186 }
187 };
188
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700189 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
190 .register(KryoNamespaces.API)
191 .register(MastershipBasedTimestamp.class);
192
193
Madan Jampani86940d92015-05-06 11:47:57 -0700194 private IdGenerator idGenerator;
195 private NodeId local;
196
197 @Activate
198 public void activate(ComponentContext context) {
199 configService.registerProperties(getClass());
200
201 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
202
203 local = clusterService.getLocalNode().id();
204
205 messageHandlingExecutor = Executors.newFixedThreadPool(
206 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
207
208 registerMessageHandlers(messageHandlingExecutor);
209
Madan Jampani08bf17b2015-05-06 16:25:26 -0700210 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700211 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700212 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
213 flowTable::backup,
214 0,
215 backupPeriod,
216 TimeUnit.MILLISECONDS);
217 }
Madan Jampani86940d92015-05-06 11:47:57 -0700218
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700219 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
220 .withName("onos-flow-table-stats")
221 .withSerializer(SERIALIZER_BUILDER)
222 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
223 .withTimestampProvider((k, v) -> new WallClockTimestamp())
224 .withTombstonesDisabled()
225 .build();
226 deviceTableStats.addListener(tableStatsListener);
227
Madan Jampani86940d92015-05-06 11:47:57 -0700228 logConfig("Started");
229 }
230
231 @Deactivate
232 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700233 if (backupEnabled) {
234 replicaInfoManager.removeListener(flowTable);
235 backupTask.cancel(true);
236 }
Madan Jampani86940d92015-05-06 11:47:57 -0700237 configService.unregisterProperties(getClass(), false);
238 unregisterMessageHandlers();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700239 deviceTableStats.removeListener(tableStatsListener);
240 deviceTableStats.destroy();
Madan Jampani86940d92015-05-06 11:47:57 -0700241 messageHandlingExecutor.shutdownNow();
242 backupSenderExecutor.shutdownNow();
243 log.info("Stopped");
244 }
245
246 @SuppressWarnings("rawtypes")
247 @Modified
248 public void modified(ComponentContext context) {
249 if (context == null) {
250 backupEnabled = DEFAULT_BACKUP_ENABLED;
251 logConfig("Default config");
252 return;
253 }
254
255 Dictionary properties = context.getProperties();
256 int newPoolSize;
257 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700258 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700259 try {
260 String s = get(properties, "msgHandlerPoolSize");
261 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
262
263 s = get(properties, "backupEnabled");
264 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
265
Madan Jampani08bf17b2015-05-06 16:25:26 -0700266 s = get(properties, "backupPeriod");
267 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
268
Madan Jampani86940d92015-05-06 11:47:57 -0700269 } catch (NumberFormatException | ClassCastException e) {
270 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
271 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700272 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700273 }
274
Madan Jampani08bf17b2015-05-06 16:25:26 -0700275 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700276 if (newBackupEnabled != backupEnabled) {
277 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700278 if (!backupEnabled) {
279 replicaInfoManager.removeListener(flowTable);
280 if (backupTask != null) {
281 backupTask.cancel(false);
282 backupTask = null;
283 }
284 } else {
285 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700286 }
287 restartBackupTask = backupEnabled;
288 }
289 if (newBackupPeriod != backupPeriod) {
290 backupPeriod = newBackupPeriod;
291 restartBackupTask = backupEnabled;
292 }
293 if (restartBackupTask) {
294 if (backupTask != null) {
295 // cancel previously running task
296 backupTask.cancel(false);
297 }
298 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
299 flowTable::backup,
300 0,
301 backupPeriod,
302 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700303 }
304 if (newPoolSize != msgHandlerPoolSize) {
305 msgHandlerPoolSize = newPoolSize;
306 ExecutorService oldMsgHandler = messageHandlingExecutor;
307 messageHandlingExecutor = Executors.newFixedThreadPool(
308 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
309
310 // replace previously registered handlers.
311 registerMessageHandlers(messageHandlingExecutor);
312 oldMsgHandler.shutdown();
313 }
314 logConfig("Reconfigured");
315 }
316
317 private void registerMessageHandlers(ExecutorService executor) {
318
319 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
320 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
321 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
322 clusterCommunicator.addSubscriber(
323 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
324 clusterCommunicator.addSubscriber(
325 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
326 clusterCommunicator.addSubscriber(
327 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
328 clusterCommunicator.addSubscriber(
329 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
330 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700331 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700332 }
333
334 private void unregisterMessageHandlers() {
335 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
336 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
337 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
338 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
339 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
340 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
341 }
342
343 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700344 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
345 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700346 }
347
348 // This is not a efficient operation on a distributed sharded
349 // flow store. We need to revisit the need for this operation or at least
350 // make it device specific.
351 @Override
352 public int getFlowRuleCount() {
353 AtomicInteger sum = new AtomicInteger(0);
354 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
355 return sum.get();
356 }
357
358 @Override
359 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700360 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700361
362 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700363 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700364 return null;
365 }
366
367 if (Objects.equal(local, master)) {
368 return flowTable.getFlowEntry(rule);
369 }
370
371 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
372 master, rule.deviceId());
373
374 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
375 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
376 SERIALIZER::encode,
377 SERIALIZER::decode,
378 master),
379 FLOW_RULE_STORE_TIMEOUT_MILLIS,
380 TimeUnit.MILLISECONDS,
381 null);
382 }
383
384 @Override
385 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700386 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700387
388 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700389 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700390 return Collections.emptyList();
391 }
392
393 if (Objects.equal(local, master)) {
394 return flowTable.getFlowEntries(deviceId);
395 }
396
397 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
398 master, deviceId);
399
400 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
401 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
402 SERIALIZER::encode,
403 SERIALIZER::decode,
404 master),
405 FLOW_RULE_STORE_TIMEOUT_MILLIS,
406 TimeUnit.MILLISECONDS,
407 Collections.emptyList());
408 }
409
410 @Override
411 public void storeFlowRule(FlowRule rule) {
412 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700413 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700414 rule.deviceId(), idGenerator.getNewId()));
415 }
416
417 @Override
418 public void storeBatch(FlowRuleBatchOperation operation) {
419 if (operation.getOperations().isEmpty()) {
420 notifyDelegate(FlowRuleBatchEvent.completed(
421 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
422 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
423 return;
424 }
425
426 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700427 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700428
429 if (master == null) {
430 log.warn("No master for {} : flows will be marked for removal", deviceId);
431
432 updateStoreInternal(operation);
433
434 notifyDelegate(FlowRuleBatchEvent.completed(
435 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
436 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
437 return;
438 }
439
440 if (Objects.equal(local, master)) {
441 storeBatchInternal(operation);
442 return;
443 }
444
445 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
446 master, deviceId);
447
Madan Jampani175e8fd2015-05-20 14:10:45 -0700448 clusterCommunicator.unicast(operation,
449 APPLY_BATCH_FLOWS,
450 SERIALIZER::encode,
451 master)
452 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700453 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700454 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700455
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700456 Set<FlowRule> allFailures = operation.getOperations()
457 .stream()
458 .map(op -> op.target())
459 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700460
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700461 notifyDelegate(FlowRuleBatchEvent.completed(
462 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
463 new CompletedBatchOperation(false, allFailures, deviceId)));
464 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700465 });
Madan Jampani86940d92015-05-06 11:47:57 -0700466 }
467
468 private void storeBatchInternal(FlowRuleBatchOperation operation) {
469
470 final DeviceId did = operation.deviceId();
471 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
472 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
473 if (currentOps.isEmpty()) {
474 batchOperationComplete(FlowRuleBatchEvent.completed(
475 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
476 new CompletedBatchOperation(true, Collections.emptySet(), did)));
477 return;
478 }
479
480 notifyDelegate(FlowRuleBatchEvent.requested(new
481 FlowRuleBatchRequest(operation.id(),
482 currentOps), operation.deviceId()));
483 }
484
485 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
486 return operation.getOperations().stream().map(
487 op -> {
488 StoredFlowEntry entry;
489 switch (op.operator()) {
490 case ADD:
491 entry = new DefaultFlowEntry(op.target());
492 // always add requested FlowRule
493 // Note: 2 equal FlowEntry may have different treatment
494 flowTable.remove(entry.deviceId(), entry);
495 flowTable.add(entry);
496
497 return op;
498 case REMOVE:
499 entry = flowTable.getFlowEntry(op.target());
500 if (entry != null) {
501 entry.setState(FlowEntryState.PENDING_REMOVE);
502 return op;
503 }
504 break;
505 case MODIFY:
506 //TODO: figure this out at some point
507 break;
508 default:
509 log.warn("Unknown flow operation operator: {}", op.operator());
510 }
511 return null;
512 }
513 ).filter(op -> op != null).collect(Collectors.toSet());
514 }
515
516 @Override
517 public void deleteFlowRule(FlowRule rule) {
518 storeBatch(
519 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700520 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700521 new FlowRuleBatchEntry(
522 FlowRuleOperation.REMOVE,
523 rule)), rule.deviceId(), idGenerator.getNewId()));
524 }
525
526 @Override
527 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700528 NodeId master = mastershipService.getMasterFor(rule.deviceId());
529 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700530 return addOrUpdateFlowRuleInternal(rule);
531 }
532
533 log.warn("Tried to update FlowRule {} state,"
534 + " while the Node was not the master.", rule);
535 return null;
536 }
537
538 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
539 // check if this new rule is an update to an existing entry
540 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
541 if (stored != null) {
542 stored.setBytes(rule.bytes());
543 stored.setLife(rule.life());
544 stored.setPackets(rule.packets());
545 if (stored.state() == FlowEntryState.PENDING_ADD) {
546 stored.setState(FlowEntryState.ADDED);
547 return new FlowRuleEvent(Type.RULE_ADDED, rule);
548 }
549 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
550 }
551
552 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
553 // TODO: also update backup if the behavior is correct.
554 flowTable.add(rule);
555 return null;
556 }
557
558 @Override
559 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
560 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700561 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700562
563 if (Objects.equal(local, master)) {
564 // bypass and handle it locally
565 return removeFlowRuleInternal(rule);
566 }
567
568 if (master == null) {
569 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
570 // TODO: revisit if this should be null (="no-op") or Exception
571 return null;
572 }
573
574 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
575 master, deviceId);
576
577 return Futures.get(clusterCommunicator.sendAndReceive(
578 rule,
579 REMOVE_FLOW_ENTRY,
580 SERIALIZER::encode,
581 SERIALIZER::decode,
582 master),
583 FLOW_RULE_STORE_TIMEOUT_MILLIS,
584 TimeUnit.MILLISECONDS,
585 RuntimeException.class);
586 }
587
588 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
589 final DeviceId deviceId = rule.deviceId();
590 // This is where one could mark a rule as removed and still keep it in the store.
591 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
592 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
593 }
594
595 @Override
596 public void batchOperationComplete(FlowRuleBatchEvent event) {
597 //FIXME: need a per device pending response
598 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
599 if (nodeId == null) {
600 notifyDelegate(event);
601 } else {
602 // TODO check unicast return value
603 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
604 //error log: log.warn("Failed to respond to peer for batch operation result");
605 }
606 }
607
608 private final class OnStoreBatch implements ClusterMessageHandler {
609
610 @Override
611 public void handle(final ClusterMessage message) {
612 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
613 log.debug("received batch request {}", operation);
614
615 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700616 NodeId master = mastershipService.getMasterFor(deviceId);
617 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700618 Set<FlowRule> failures = new HashSet<>(operation.size());
619 for (FlowRuleBatchEntry op : operation.getOperations()) {
620 failures.add(op.target());
621 }
622 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
623 // This node is no longer the master, respond as all failed.
624 // TODO: we might want to wrap response in envelope
625 // to distinguish sw programming failure and hand over
626 // it make sense in the latter case to retry immediately.
627 message.respond(SERIALIZER.encode(allFailed));
628 return;
629 }
630
631 pendingResponses.put(operation.id(), message.sender());
632 storeBatchInternal(operation);
633 }
634 }
635
Madan Jampanif7536ab2015-05-07 23:23:23 -0700636 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700637
Madan Jampani5c3766c2015-06-02 15:54:41 -0700638 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
639 flowEntries = Maps.newConcurrentMap();
Madan Jampani86940d92015-05-06 11:47:57 -0700640
641 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
642 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
643 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
644
Madan Jampanif7536ab2015-05-07 23:23:23 -0700645 @Override
646 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700647 if (!backupEnabled) {
648 return;
649 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700650 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
651 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700652 NodeId master = mastershipService.getMasterFor(deviceId);
653 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700654 // ignore since this event is for a device this node does not manage.
655 return;
656 }
Madan Jampani7267c552015-05-20 22:39:17 -0700657 NodeId newBackupNode = getBackupNode(deviceId);
658 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
659 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700660 // ignore since backup location hasn't changed.
661 return;
662 }
Madan Jampani7267c552015-05-20 22:39:17 -0700663 if (currentBackupNode != null && newBackupNode == null) {
664 // Current backup node is most likely down and no alternate backup node
665 // has been chosen. Clear current backup location so that we can resume
666 // backups when either current backup comes online or a different backup node
667 // is chosen.
668 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
669 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
670 lastBackupNodes.remove(deviceId);
671 lastBackupTimes.remove(deviceId);
672 return;
673 // TODO: Pick any available node as backup and ensure hand-off occurs when
674 // a new master is elected.
675 }
Madan Jampani44839b82015-06-12 13:57:41 -0700676 log.debug("Backup location for {} has changed from {} to {}.",
Madan Jampani7267c552015-05-20 22:39:17 -0700677 deviceId, currentBackupNode, newBackupNode);
678 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700679 0,
680 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700681 }
682 }
683
Madan Jampaniadea8902015-06-04 17:39:45 -0700684 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
685 // split up the devices into smaller batches and send them separately.
686 Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
687 .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
688 }
689
Madan Jampanif7536ab2015-05-07 23:23:23 -0700690 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
Madan Jampaniadea8902015-06-04 17:39:45 -0700691 if (deviceIds.isEmpty()) {
692 return;
693 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700694 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Madan Jampani654b58a2015-05-22 11:28:11 -0700695 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700696 Maps.newConcurrentMap();
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700697 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
Madan Jampani654b58a2015-05-22 11:28:11 -0700698 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
699 deviceFlowEntries,
700 FLOW_TABLE_BACKUP,
701 SERIALIZER::encode,
702 SERIALIZER::decode,
703 nodeId)
704 .whenComplete((backedupDevices, error) -> {
705 Set<DeviceId> devicesNotBackedup = error != null ?
706 deviceFlowEntries.keySet() :
707 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
708 if (devicesNotBackedup.size() > 0) {
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700709 log.warn("Failed to backup devices: {}. Reason: {}",
710 devicesNotBackedup, error.getMessage());
Madan Jampani654b58a2015-05-22 11:28:11 -0700711 }
712 if (backedupDevices != null) {
713 backedupDevices.forEach(id -> {
714 lastBackupTimes.put(id, System.currentTimeMillis());
715 lastBackupNodes.put(id, nodeId);
716 });
717 }
718 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700719 }
720
Madan Jampani86940d92015-05-06 11:47:57 -0700721 /**
722 * Returns the flow table for specified device.
723 *
724 * @param deviceId identifier of the device
725 * @return Map representing Flow Table of given device.
726 */
Madan Jampani5c3766c2015-06-02 15:54:41 -0700727 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700728 if (persistenceEnabled) {
729 return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
730 .<FlowId, Set<StoredFlowEntry>>persistentMapBuilder()
731 .withName("FlowTable:" + deviceId.toString())
732 .withSerializer(new Serializer() {
733 @Override
734 public <T> byte[] encode(T object) {
735 return SERIALIZER.encode(object);
736 }
737
738 @Override
739 public <T> T decode(byte[] bytes) {
740 return SERIALIZER.decode(bytes);
741 }
742 })
743 .build());
744 } else {
745 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
746 }
Madan Jampani86940d92015-05-06 11:47:57 -0700747 }
748
749 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
750 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
751 }
752
753 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
754 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
755 return flowEntries.stream()
756 .filter(entry -> Objects.equal(entry, rule))
757 .findAny()
758 .orElse(null);
759 }
760
761 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
762 Set<FlowEntry> result = Sets.newHashSet();
763 getFlowTable(deviceId).values().forEach(result::addAll);
764 return result;
765 }
766
767 public StoredFlowEntry getFlowEntry(FlowRule rule) {
768 return getFlowEntryInternal(rule);
769 }
770
771 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
772 return getFlowEntriesInternal(deviceId);
773 }
774
775 public void add(FlowEntry rule) {
776 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
777 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
778 }
779
780 public boolean remove(DeviceId deviceId, FlowEntry rule) {
781 try {
782 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
783 } finally {
784 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
785 }
786 }
787
788 private NodeId getBackupNode(DeviceId deviceId) {
789 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
790 // pick the standby which is most likely to become next master
791 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
792 }
793
794 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700795 if (!backupEnabled) {
796 return;
797 }
Madan Jampani86940d92015-05-06 11:47:57 -0700798 try {
799 // determine the set of devices that we need to backup during this run.
800 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
801 .stream()
802 .filter(deviceId -> {
803 Long lastBackupTime = lastBackupTimes.get(deviceId);
804 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
805 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700806 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700807 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700808 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700809 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
810 })
811 .collect(Collectors.toSet());
812
813 // compute a mapping from node to the set of devices whose flow entries it should backup
814 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
815 devicesToBackup.forEach(deviceId -> {
816 NodeId backupLocation = getBackupNode(deviceId);
817 if (backupLocation != null) {
818 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
819 .add(deviceId);
820 }
821 });
Madan Jampani86940d92015-05-06 11:47:57 -0700822 // send the device flow entries to their respective backup nodes
Madan Jampaniadea8902015-06-04 17:39:45 -0700823 devicesToBackupByNode.forEach(this::sendBackups);
Madan Jampani86940d92015-05-06 11:47:57 -0700824 } catch (Exception e) {
825 log.error("Backup failed.", e);
826 }
827 }
828
Madan Jampani654b58a2015-05-22 11:28:11 -0700829 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700830 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700831 Set<DeviceId> backedupDevices = Sets.newHashSet();
832 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700833 flowTables.forEach((deviceId, deviceFlowTable) -> {
834 // Only process those devices are that not managed by the local node.
835 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
836 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
837 backupFlowTable.clear();
838 backupFlowTable.putAll(deviceFlowTable);
839 backedupDevices.add(deviceId);
840 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700841 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700842 } catch (Exception e) {
843 log.warn("Failure processing backup request", e);
844 }
845 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700846 }
847 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700848
849 @Override
850 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
851 List<TableStatisticsEntry> tableStats) {
852 deviceTableStats.put(deviceId, tableStats);
853 return null;
854 }
855
856 @Override
857 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
858 NodeId master = mastershipService.getMasterFor(deviceId);
859
860 if (master == null) {
861 log.debug("Failed to getTableStats: No master for {}", deviceId);
862 return Collections.emptyList();
863 }
864
865 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
866 if (tableStats == null) {
867 return Collections.emptyList();
868 }
869 return ImmutableList.copyOf(tableStats);
870 }
871
872 private class InternalTableStatsListener
873 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
874 @Override
875 public void event(EventuallyConsistentMapEvent<DeviceId,
876 List<TableStatisticsEntry>> event) {
877 //TODO: Generate an event to listeners (do we need?)
878 }
879 }
Madan Jampani86940d92015-05-06 11:47:57 -0700880}