blob: 8cd63e7dd8811152e12a92c2cd42a0a51d805719 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070019import com.google.common.collect.ImmutableList;
Madan Jampani85a9b0d2015-06-03 17:15:44 -070020import com.google.common.collect.ImmutableMap;
Madan Jampani86940d92015-05-06 11:47:57 -070021import com.google.common.collect.Iterables;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
24import com.google.common.util.concurrent.Futures;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Modified;
30import org.apache.felix.scr.annotations.Property;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.util.KryoNamespace;
Madan Jampani86940d92015-05-06 11:47:57 -070035import org.onlab.util.Tools;
36import org.onosproject.cfg.ComponentConfigService;
37import org.onosproject.cluster.ClusterService;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.IdGenerator;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.flow.CompletedBatchOperation;
45import org.onosproject.net.flow.DefaultFlowEntry;
46import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowEntry.FlowEntryState;
48import org.onosproject.net.flow.FlowId;
49import org.onosproject.net.flow.FlowRule;
50import org.onosproject.net.flow.FlowRuleBatchEntry;
51import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
52import org.onosproject.net.flow.FlowRuleBatchEvent;
53import org.onosproject.net.flow.FlowRuleBatchOperation;
54import org.onosproject.net.flow.FlowRuleBatchRequest;
55import org.onosproject.net.flow.FlowRuleEvent;
56import org.onosproject.net.flow.FlowRuleEvent.Type;
57import org.onosproject.net.flow.FlowRuleService;
58import org.onosproject.net.flow.FlowRuleStore;
59import org.onosproject.net.flow.FlowRuleStoreDelegate;
60import org.onosproject.net.flow.StoredFlowEntry;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070061import org.onosproject.net.flow.TableStatisticsEntry;
Madan Jampani86940d92015-05-06 11:47:57 -070062import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.cluster.messaging.ClusterMessage;
65import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070066import org.onosproject.store.flow.ReplicaInfoEvent;
67import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070068import org.onosproject.store.flow.ReplicaInfoService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070069import org.onosproject.store.impl.MastershipBasedTimestamp;
70import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani86940d92015-05-06 11:47:57 -070071import org.onosproject.store.serializers.KryoSerializer;
72import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070073import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -070074import org.onosproject.store.service.EventuallyConsistentMap;
75import org.onosproject.store.service.EventuallyConsistentMapEvent;
76import org.onosproject.store.service.EventuallyConsistentMapListener;
77import org.onosproject.store.service.StorageService;
78import org.onosproject.store.service.WallClockTimestamp;
Madan Jampani86940d92015-05-06 11:47:57 -070079import org.osgi.service.component.ComponentContext;
80import org.slf4j.Logger;
81
Madan Jampani86940d92015-05-06 11:47:57 -070082import java.util.Collections;
83import java.util.Dictionary;
84import java.util.HashSet;
85import java.util.List;
86import java.util.Map;
87import java.util.Set;
Madan Jampani86940d92015-05-06 11:47:57 -070088import java.util.concurrent.ExecutorService;
89import java.util.concurrent.Executors;
90import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070091import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070092import java.util.concurrent.TimeUnit;
93import java.util.concurrent.atomic.AtomicInteger;
94import java.util.stream.Collectors;
95
Madan Jampani86940d92015-05-06 11:47:57 -070096import static com.google.common.base.Strings.isNullOrEmpty;
97import static org.onlab.util.Tools.get;
98import static org.onlab.util.Tools.groupedThreads;
99import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
100import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
101import static org.slf4j.LoggerFactory.getLogger;
102
103/**
104 * Manages inventory of flow rules using a distributed state management protocol.
105 */
106@Component(immediate = true, enabled = true)
107@Service
108public class NewDistributedFlowRuleStore
109 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
110 implements FlowRuleStore {
111
112 private final Logger log = getLogger(getClass());
113
114 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
115 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700116 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700117 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampaniadea8902015-06-04 17:39:45 -0700118 // number of devices whose flow entries will be backed up in one communication round
119 private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
Madan Jampani86940d92015-05-06 11:47:57 -0700120
121 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
122 label = "Number of threads in the message handler pool")
123 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
124
125 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
126 label = "Indicates whether backups are enabled or not")
127 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
128
Madan Jampani08bf17b2015-05-06 16:25:26 -0700129 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
130 label = "Delay in ms between successive backup runs")
131 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
132
Madan Jampani86940d92015-05-06 11:47:57 -0700133 private InternalFlowTable flowTable = new InternalFlowTable();
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected ReplicaInfoService replicaInfoManager;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected ClusterCommunicationService clusterCommunicator;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ClusterService clusterService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected DeviceService deviceService;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected CoreService coreService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected ComponentConfigService configService;
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected MastershipService mastershipService;
155
156 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
157 private ExecutorService messageHandlingExecutor;
158
Madan Jampani08bf17b2015-05-06 16:25:26 -0700159 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700160 private final ScheduledExecutorService backupSenderExecutor =
161 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
162
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700163 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
164 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
165 new InternalTableStatsListener();
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168 protected StorageService storageService;
169
Madan Jampani86940d92015-05-06 11:47:57 -0700170 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
171 @Override
172 protected void setupKryoPool() {
173 serializerPool = KryoNamespace.newBuilder()
174 .register(DistributedStoreSerializers.STORE_COMMON)
175 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700176 .build();
177 }
178 };
179
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700180 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
181 .register(KryoNamespaces.API)
182 .register(MastershipBasedTimestamp.class);
183
184
Madan Jampani86940d92015-05-06 11:47:57 -0700185 private IdGenerator idGenerator;
186 private NodeId local;
187
188 @Activate
189 public void activate(ComponentContext context) {
190 configService.registerProperties(getClass());
191
192 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
193
194 local = clusterService.getLocalNode().id();
195
196 messageHandlingExecutor = Executors.newFixedThreadPool(
197 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
198
199 registerMessageHandlers(messageHandlingExecutor);
200
Madan Jampani08bf17b2015-05-06 16:25:26 -0700201 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700202 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700203 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
204 flowTable::backup,
205 0,
206 backupPeriod,
207 TimeUnit.MILLISECONDS);
208 }
Madan Jampani86940d92015-05-06 11:47:57 -0700209
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700210 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
211 .withName("onos-flow-table-stats")
212 .withSerializer(SERIALIZER_BUILDER)
213 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
214 .withTimestampProvider((k, v) -> new WallClockTimestamp())
215 .withTombstonesDisabled()
216 .build();
217 deviceTableStats.addListener(tableStatsListener);
218
Madan Jampani86940d92015-05-06 11:47:57 -0700219 logConfig("Started");
220 }
221
222 @Deactivate
223 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700224 if (backupEnabled) {
225 replicaInfoManager.removeListener(flowTable);
226 backupTask.cancel(true);
227 }
Madan Jampani86940d92015-05-06 11:47:57 -0700228 configService.unregisterProperties(getClass(), false);
229 unregisterMessageHandlers();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700230 deviceTableStats.removeListener(tableStatsListener);
231 deviceTableStats.destroy();
Madan Jampani86940d92015-05-06 11:47:57 -0700232 messageHandlingExecutor.shutdownNow();
233 backupSenderExecutor.shutdownNow();
234 log.info("Stopped");
235 }
236
237 @SuppressWarnings("rawtypes")
238 @Modified
239 public void modified(ComponentContext context) {
240 if (context == null) {
241 backupEnabled = DEFAULT_BACKUP_ENABLED;
242 logConfig("Default config");
243 return;
244 }
245
246 Dictionary properties = context.getProperties();
247 int newPoolSize;
248 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700249 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700250 try {
251 String s = get(properties, "msgHandlerPoolSize");
252 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
253
254 s = get(properties, "backupEnabled");
255 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
256
Madan Jampani08bf17b2015-05-06 16:25:26 -0700257 s = get(properties, "backupPeriod");
258 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
259
Madan Jampani86940d92015-05-06 11:47:57 -0700260 } catch (NumberFormatException | ClassCastException e) {
261 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
262 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700263 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700264 }
265
Madan Jampani08bf17b2015-05-06 16:25:26 -0700266 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700267 if (newBackupEnabled != backupEnabled) {
268 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700269 if (!backupEnabled) {
270 replicaInfoManager.removeListener(flowTable);
271 if (backupTask != null) {
272 backupTask.cancel(false);
273 backupTask = null;
274 }
275 } else {
276 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700277 }
278 restartBackupTask = backupEnabled;
279 }
280 if (newBackupPeriod != backupPeriod) {
281 backupPeriod = newBackupPeriod;
282 restartBackupTask = backupEnabled;
283 }
284 if (restartBackupTask) {
285 if (backupTask != null) {
286 // cancel previously running task
287 backupTask.cancel(false);
288 }
289 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
290 flowTable::backup,
291 0,
292 backupPeriod,
293 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700294 }
295 if (newPoolSize != msgHandlerPoolSize) {
296 msgHandlerPoolSize = newPoolSize;
297 ExecutorService oldMsgHandler = messageHandlingExecutor;
298 messageHandlingExecutor = Executors.newFixedThreadPool(
299 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
300
301 // replace previously registered handlers.
302 registerMessageHandlers(messageHandlingExecutor);
303 oldMsgHandler.shutdown();
304 }
305 logConfig("Reconfigured");
306 }
307
308 private void registerMessageHandlers(ExecutorService executor) {
309
310 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
311 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
312 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
313 clusterCommunicator.addSubscriber(
314 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
315 clusterCommunicator.addSubscriber(
316 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
317 clusterCommunicator.addSubscriber(
318 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
319 clusterCommunicator.addSubscriber(
320 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
321 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700322 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700323 }
324
325 private void unregisterMessageHandlers() {
326 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
327 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
328 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
329 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
330 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
331 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
332 }
333
334 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700335 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
336 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700337 }
338
339 // This is not a efficient operation on a distributed sharded
340 // flow store. We need to revisit the need for this operation or at least
341 // make it device specific.
342 @Override
343 public int getFlowRuleCount() {
344 AtomicInteger sum = new AtomicInteger(0);
345 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
346 return sum.get();
347 }
348
349 @Override
350 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700351 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700352
353 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700354 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700355 return null;
356 }
357
358 if (Objects.equal(local, master)) {
359 return flowTable.getFlowEntry(rule);
360 }
361
362 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
363 master, rule.deviceId());
364
365 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
366 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
367 SERIALIZER::encode,
368 SERIALIZER::decode,
369 master),
370 FLOW_RULE_STORE_TIMEOUT_MILLIS,
371 TimeUnit.MILLISECONDS,
372 null);
373 }
374
375 @Override
376 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700377 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700378
379 if (master == null) {
Thomas Vachuska88fd6902015-08-04 10:08:34 -0700380 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700381 return Collections.emptyList();
382 }
383
384 if (Objects.equal(local, master)) {
385 return flowTable.getFlowEntries(deviceId);
386 }
387
388 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
389 master, deviceId);
390
391 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
392 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
393 SERIALIZER::encode,
394 SERIALIZER::decode,
395 master),
396 FLOW_RULE_STORE_TIMEOUT_MILLIS,
397 TimeUnit.MILLISECONDS,
398 Collections.emptyList());
399 }
400
401 @Override
402 public void storeFlowRule(FlowRule rule) {
403 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700404 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700405 rule.deviceId(), idGenerator.getNewId()));
406 }
407
408 @Override
409 public void storeBatch(FlowRuleBatchOperation operation) {
410 if (operation.getOperations().isEmpty()) {
411 notifyDelegate(FlowRuleBatchEvent.completed(
412 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
413 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
414 return;
415 }
416
417 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700418 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700419
420 if (master == null) {
421 log.warn("No master for {} : flows will be marked for removal", deviceId);
422
423 updateStoreInternal(operation);
424
425 notifyDelegate(FlowRuleBatchEvent.completed(
426 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
427 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
428 return;
429 }
430
431 if (Objects.equal(local, master)) {
432 storeBatchInternal(operation);
433 return;
434 }
435
436 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
437 master, deviceId);
438
Madan Jampani175e8fd2015-05-20 14:10:45 -0700439 clusterCommunicator.unicast(operation,
440 APPLY_BATCH_FLOWS,
441 SERIALIZER::encode,
442 master)
443 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700444 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700445 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700446
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700447 Set<FlowRule> allFailures = operation.getOperations()
448 .stream()
449 .map(op -> op.target())
450 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700451
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700452 notifyDelegate(FlowRuleBatchEvent.completed(
453 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
454 new CompletedBatchOperation(false, allFailures, deviceId)));
455 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700456 });
Madan Jampani86940d92015-05-06 11:47:57 -0700457 }
458
459 private void storeBatchInternal(FlowRuleBatchOperation operation) {
460
461 final DeviceId did = operation.deviceId();
462 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
463 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
464 if (currentOps.isEmpty()) {
465 batchOperationComplete(FlowRuleBatchEvent.completed(
466 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
467 new CompletedBatchOperation(true, Collections.emptySet(), did)));
468 return;
469 }
470
471 notifyDelegate(FlowRuleBatchEvent.requested(new
472 FlowRuleBatchRequest(operation.id(),
473 currentOps), operation.deviceId()));
474 }
475
476 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
477 return operation.getOperations().stream().map(
478 op -> {
479 StoredFlowEntry entry;
480 switch (op.operator()) {
481 case ADD:
482 entry = new DefaultFlowEntry(op.target());
483 // always add requested FlowRule
484 // Note: 2 equal FlowEntry may have different treatment
485 flowTable.remove(entry.deviceId(), entry);
486 flowTable.add(entry);
487
488 return op;
489 case REMOVE:
490 entry = flowTable.getFlowEntry(op.target());
491 if (entry != null) {
492 entry.setState(FlowEntryState.PENDING_REMOVE);
493 return op;
494 }
495 break;
496 case MODIFY:
497 //TODO: figure this out at some point
498 break;
499 default:
500 log.warn("Unknown flow operation operator: {}", op.operator());
501 }
502 return null;
503 }
504 ).filter(op -> op != null).collect(Collectors.toSet());
505 }
506
507 @Override
508 public void deleteFlowRule(FlowRule rule) {
509 storeBatch(
510 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700511 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700512 new FlowRuleBatchEntry(
513 FlowRuleOperation.REMOVE,
514 rule)), rule.deviceId(), idGenerator.getNewId()));
515 }
516
517 @Override
518 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700519 NodeId master = mastershipService.getMasterFor(rule.deviceId());
520 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700521 return addOrUpdateFlowRuleInternal(rule);
522 }
523
524 log.warn("Tried to update FlowRule {} state,"
525 + " while the Node was not the master.", rule);
526 return null;
527 }
528
529 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
530 // check if this new rule is an update to an existing entry
531 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
532 if (stored != null) {
533 stored.setBytes(rule.bytes());
534 stored.setLife(rule.life());
535 stored.setPackets(rule.packets());
536 if (stored.state() == FlowEntryState.PENDING_ADD) {
537 stored.setState(FlowEntryState.ADDED);
538 return new FlowRuleEvent(Type.RULE_ADDED, rule);
539 }
540 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
541 }
542
543 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
544 // TODO: also update backup if the behavior is correct.
545 flowTable.add(rule);
546 return null;
547 }
548
549 @Override
550 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
551 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700552 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700553
554 if (Objects.equal(local, master)) {
555 // bypass and handle it locally
556 return removeFlowRuleInternal(rule);
557 }
558
559 if (master == null) {
560 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
561 // TODO: revisit if this should be null (="no-op") or Exception
562 return null;
563 }
564
565 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
566 master, deviceId);
567
568 return Futures.get(clusterCommunicator.sendAndReceive(
569 rule,
570 REMOVE_FLOW_ENTRY,
571 SERIALIZER::encode,
572 SERIALIZER::decode,
573 master),
574 FLOW_RULE_STORE_TIMEOUT_MILLIS,
575 TimeUnit.MILLISECONDS,
576 RuntimeException.class);
577 }
578
579 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
580 final DeviceId deviceId = rule.deviceId();
581 // This is where one could mark a rule as removed and still keep it in the store.
582 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
583 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
584 }
585
586 @Override
587 public void batchOperationComplete(FlowRuleBatchEvent event) {
588 //FIXME: need a per device pending response
589 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
590 if (nodeId == null) {
591 notifyDelegate(event);
592 } else {
593 // TODO check unicast return value
594 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
595 //error log: log.warn("Failed to respond to peer for batch operation result");
596 }
597 }
598
599 private final class OnStoreBatch implements ClusterMessageHandler {
600
601 @Override
602 public void handle(final ClusterMessage message) {
603 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
604 log.debug("received batch request {}", operation);
605
606 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700607 NodeId master = mastershipService.getMasterFor(deviceId);
608 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700609 Set<FlowRule> failures = new HashSet<>(operation.size());
610 for (FlowRuleBatchEntry op : operation.getOperations()) {
611 failures.add(op.target());
612 }
613 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
614 // This node is no longer the master, respond as all failed.
615 // TODO: we might want to wrap response in envelope
616 // to distinguish sw programming failure and hand over
617 // it make sense in the latter case to retry immediately.
618 message.respond(SERIALIZER.encode(allFailed));
619 return;
620 }
621
622 pendingResponses.put(operation.id(), message.sender());
623 storeBatchInternal(operation);
624 }
625 }
626
Madan Jampanif7536ab2015-05-07 23:23:23 -0700627 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700628
Madan Jampani5c3766c2015-06-02 15:54:41 -0700629 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
630 flowEntries = Maps.newConcurrentMap();
Madan Jampani86940d92015-05-06 11:47:57 -0700631
632 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
633 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
634 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
635
Madan Jampanif7536ab2015-05-07 23:23:23 -0700636 @Override
637 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700638 if (!backupEnabled) {
639 return;
640 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700641 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
642 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700643 NodeId master = mastershipService.getMasterFor(deviceId);
644 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700645 // ignore since this event is for a device this node does not manage.
646 return;
647 }
Madan Jampani7267c552015-05-20 22:39:17 -0700648 NodeId newBackupNode = getBackupNode(deviceId);
649 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
650 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700651 // ignore since backup location hasn't changed.
652 return;
653 }
Madan Jampani7267c552015-05-20 22:39:17 -0700654 if (currentBackupNode != null && newBackupNode == null) {
655 // Current backup node is most likely down and no alternate backup node
656 // has been chosen. Clear current backup location so that we can resume
657 // backups when either current backup comes online or a different backup node
658 // is chosen.
659 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
660 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
661 lastBackupNodes.remove(deviceId);
662 lastBackupTimes.remove(deviceId);
663 return;
664 // TODO: Pick any available node as backup and ensure hand-off occurs when
665 // a new master is elected.
666 }
Madan Jampani44839b82015-06-12 13:57:41 -0700667 log.debug("Backup location for {} has changed from {} to {}.",
Madan Jampani7267c552015-05-20 22:39:17 -0700668 deviceId, currentBackupNode, newBackupNode);
669 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700670 0,
671 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700672 }
673 }
674
Madan Jampaniadea8902015-06-04 17:39:45 -0700675 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
676 // split up the devices into smaller batches and send them separately.
677 Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
678 .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
679 }
680
Madan Jampanif7536ab2015-05-07 23:23:23 -0700681 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
Madan Jampaniadea8902015-06-04 17:39:45 -0700682 if (deviceIds.isEmpty()) {
683 return;
684 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700685 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Madan Jampani654b58a2015-05-22 11:28:11 -0700686 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700687 Maps.newConcurrentMap();
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700688 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
Madan Jampani654b58a2015-05-22 11:28:11 -0700689 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
690 deviceFlowEntries,
691 FLOW_TABLE_BACKUP,
692 SERIALIZER::encode,
693 SERIALIZER::decode,
694 nodeId)
695 .whenComplete((backedupDevices, error) -> {
696 Set<DeviceId> devicesNotBackedup = error != null ?
697 deviceFlowEntries.keySet() :
698 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
699 if (devicesNotBackedup.size() > 0) {
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700700 log.warn("Failed to backup devices: {}. Reason: {}",
701 devicesNotBackedup, error.getMessage());
Madan Jampani654b58a2015-05-22 11:28:11 -0700702 }
703 if (backedupDevices != null) {
704 backedupDevices.forEach(id -> {
705 lastBackupTimes.put(id, System.currentTimeMillis());
706 lastBackupNodes.put(id, nodeId);
707 });
708 }
709 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700710 }
711
Madan Jampani86940d92015-05-06 11:47:57 -0700712 /**
713 * Returns the flow table for specified device.
714 *
715 * @param deviceId identifier of the device
716 * @return Map representing Flow Table of given device.
717 */
Madan Jampani5c3766c2015-06-02 15:54:41 -0700718 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
719 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
Madan Jampani86940d92015-05-06 11:47:57 -0700720 }
721
722 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
723 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
724 }
725
726 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
727 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
728 return flowEntries.stream()
729 .filter(entry -> Objects.equal(entry, rule))
730 .findAny()
731 .orElse(null);
732 }
733
734 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
735 Set<FlowEntry> result = Sets.newHashSet();
736 getFlowTable(deviceId).values().forEach(result::addAll);
737 return result;
738 }
739
740 public StoredFlowEntry getFlowEntry(FlowRule rule) {
741 return getFlowEntryInternal(rule);
742 }
743
744 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
745 return getFlowEntriesInternal(deviceId);
746 }
747
748 public void add(FlowEntry rule) {
749 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
750 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
751 }
752
753 public boolean remove(DeviceId deviceId, FlowEntry rule) {
754 try {
755 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
756 } finally {
757 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
758 }
759 }
760
761 private NodeId getBackupNode(DeviceId deviceId) {
762 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
763 // pick the standby which is most likely to become next master
764 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
765 }
766
767 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700768 if (!backupEnabled) {
769 return;
770 }
Madan Jampani86940d92015-05-06 11:47:57 -0700771 try {
772 // determine the set of devices that we need to backup during this run.
773 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
774 .stream()
775 .filter(deviceId -> {
776 Long lastBackupTime = lastBackupTimes.get(deviceId);
777 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
778 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700779 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700780 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700781 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700782 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
783 })
784 .collect(Collectors.toSet());
785
786 // compute a mapping from node to the set of devices whose flow entries it should backup
787 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
788 devicesToBackup.forEach(deviceId -> {
789 NodeId backupLocation = getBackupNode(deviceId);
790 if (backupLocation != null) {
791 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
792 .add(deviceId);
793 }
794 });
Madan Jampani86940d92015-05-06 11:47:57 -0700795 // send the device flow entries to their respective backup nodes
Madan Jampaniadea8902015-06-04 17:39:45 -0700796 devicesToBackupByNode.forEach(this::sendBackups);
Madan Jampani86940d92015-05-06 11:47:57 -0700797 } catch (Exception e) {
798 log.error("Backup failed.", e);
799 }
800 }
801
Madan Jampani654b58a2015-05-22 11:28:11 -0700802 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700803 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700804 Set<DeviceId> backedupDevices = Sets.newHashSet();
805 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700806 flowTables.forEach((deviceId, deviceFlowTable) -> {
807 // Only process those devices are that not managed by the local node.
808 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
809 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
810 backupFlowTable.clear();
811 backupFlowTable.putAll(deviceFlowTable);
812 backedupDevices.add(deviceId);
813 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700814 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700815 } catch (Exception e) {
816 log.warn("Failure processing backup request", e);
817 }
818 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700819 }
820 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700821
822 @Override
823 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
824 List<TableStatisticsEntry> tableStats) {
825 deviceTableStats.put(deviceId, tableStats);
826 return null;
827 }
828
829 @Override
830 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
831 NodeId master = mastershipService.getMasterFor(deviceId);
832
833 if (master == null) {
834 log.debug("Failed to getTableStats: No master for {}", deviceId);
835 return Collections.emptyList();
836 }
837
838 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
839 if (tableStats == null) {
840 return Collections.emptyList();
841 }
842 return ImmutableList.copyOf(tableStats);
843 }
844
845 private class InternalTableStatsListener
846 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
847 @Override
848 public void event(EventuallyConsistentMapEvent<DeviceId,
849 List<TableStatisticsEntry>> event) {
850 //TODO: Generate an event to listeners (do we need?)
851 }
852 }
Madan Jampani86940d92015-05-06 11:47:57 -0700853}