blob: 354b494a70e2d07a6cbb8b2c2a51b9d46ad448d9 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070018import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34import java.util.stream.StreamSupport;
35
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
37import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070038import com.google.common.collect.Streams;
Jordan Haltermanb81fdc12019-03-04 18:12:20 -080039import org.apache.commons.lang3.tuple.ImmutablePair;
40import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.onlab.util.KryoNamespace;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080042import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070043import org.onlab.util.Tools;
44import org.onosproject.cfg.ComponentConfigService;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070049import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070050import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070052import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070054import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.flow.CompletedBatchOperation;
56import org.onosproject.net.flow.DefaultFlowEntry;
57import org.onosproject.net.flow.FlowEntry;
58import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070059import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070060import org.onosproject.net.flow.FlowRuleEvent;
61import org.onosproject.net.flow.FlowRuleEvent.Type;
62import org.onosproject.net.flow.FlowRuleService;
63import org.onosproject.net.flow.FlowRuleStore;
64import org.onosproject.net.flow.FlowRuleStoreDelegate;
65import org.onosproject.net.flow.StoredFlowEntry;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070066import org.onosproject.net.flow.FlowRuleStoreException;
Jon Hallfa132292017-10-24 11:11:24 -070067import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070068import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
69import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
70import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070073import org.onosproject.persistence.PersistenceService;
74import org.onosproject.store.AbstractStore;
75import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76import org.onosproject.store.cluster.messaging.ClusterMessage;
77import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070078import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070079import org.onosproject.store.flow.ReplicaInfoEvent;
80import org.onosproject.store.flow.ReplicaInfoEventListener;
81import org.onosproject.store.flow.ReplicaInfoService;
82import org.onosproject.store.impl.MastershipBasedTimestamp;
83import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070084import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070085import org.onosproject.store.service.EventuallyConsistentMap;
86import org.onosproject.store.service.EventuallyConsistentMapEvent;
87import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070088import org.onosproject.store.service.MapEvent;
89import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070090import org.onosproject.store.service.Serializer;
91import org.onosproject.store.service.StorageService;
92import org.onosproject.store.service.WallClockTimestamp;
93import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094import org.osgi.service.component.annotations.Activate;
95import org.osgi.service.component.annotations.Component;
96import org.osgi.service.component.annotations.Deactivate;
97import org.osgi.service.component.annotations.Modified;
98import org.osgi.service.component.annotations.Reference;
99import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -0700100import org.slf4j.Logger;
101
Jon Hallfa132292017-10-24 11:11:24 -0700102import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800103import static java.lang.Math.max;
104import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700105import static org.onlab.util.Tools.get;
106import static org.onlab.util.Tools.groupedThreads;
107import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700108import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700111import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
112import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
113import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
114import static org.slf4j.LoggerFactory.getLogger;
115
Ray Milkeyb5646e62018-10-16 11:42:18 -0700116import static org.onosproject.store.OsgiPropertyConstants.*;
117
Jon Hallfa132292017-10-24 11:11:24 -0700118/**
119 * Manages inventory of flow rules using a distributed state management protocol.
120 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700121@Component(
122 immediate = true,
123 service = FlowRuleStore.class,
124 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700125 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
126 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
127 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
128 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
129 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700130 }
131)
Jon Hallfa132292017-10-24 11:11:24 -0700132public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700133 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
134 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700135
136 private final Logger log = getLogger(getClass());
137
Jon Hallfa132292017-10-24 11:11:24 -0700138 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jordan Halterman43300782019-05-21 11:27:50 -0700139 private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
Jon Hallfa132292017-10-24 11:11:24 -0700140
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700141 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700142 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700143
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700144 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700145 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700146
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700147 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700148 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700149
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700150 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700151 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700152
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700153 /** Max number of backup copies for each device. */
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100154 protected static volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700155
156 private InternalFlowTable flowTable = new InternalFlowTable();
157
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700159 protected ReplicaInfoService replicaInfoManager;
160
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700162 protected ClusterCommunicationService clusterCommunicator;
163
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700165 protected ClusterService clusterService;
166
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700168 protected DeviceService deviceService;
169
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700171 protected CoreService coreService;
172
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700173 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700174 protected ComponentConfigService configService;
175
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700176 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700177 protected MastershipService mastershipService;
178
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700179 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700180 protected PersistenceService persistenceService;
181
182 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
183 private ExecutorService messageHandlingExecutor;
184 private ExecutorService eventHandler;
185
Ray Milkeyd1092d62019-04-02 09:12:00 -0700186 private ScheduledExecutorService backupScheduler;
187 private ExecutorService backupExecutor;
Jon Hallfa132292017-10-24 11:11:24 -0700188
189 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
190 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700191 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700192
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700193 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700194 protected StorageService storageService;
195
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700196 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
197 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700198 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700199 .register(FlowBucket.class)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800200 .register(ImmutablePair.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700201 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700202
203 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700204 .register(KryoNamespaces.API)
205 .register(BucketId.class)
206 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700207
Jordan Halterman281dbf32018-06-15 17:46:28 -0700208 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700209
210 private IdGenerator idGenerator;
211 private NodeId local;
212
213 @Activate
214 public void activate(ComponentContext context) {
215 configService.registerProperties(getClass());
216
Ray Milkeyd1092d62019-04-02 09:12:00 -0700217 backupScheduler = Executors.newSingleThreadScheduledExecutor(
218 groupedThreads("onos/flow", "backup-scheduler", log));
219 backupExecutor = Executors.newFixedThreadPool(
220 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
221 groupedThreads("onos/flow", "backup-%d", log));
222
Jon Hallfa132292017-10-24 11:11:24 -0700223 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
224
225 local = clusterService.getLocalNode().id();
226
227 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700228 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700229 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700230 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700231
232 registerMessageHandlers(messageHandlingExecutor);
233
Jordan Halterman281dbf32018-06-15 17:46:28 -0700234 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
235 .withName("onos-flow-store-terms")
236 .withSerializer(serializer)
237 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800238
Jon Hallfa132292017-10-24 11:11:24 -0700239 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700240 .withName("onos-flow-table-stats")
241 .withSerializer(serializerBuilder)
242 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
243 .withTimestampProvider((k, v) -> new WallClockTimestamp())
244 .withTombstonesDisabled()
245 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700246 deviceTableStats.addListener(tableStatsListener);
247
Jordan Halterman281dbf32018-06-15 17:46:28 -0700248 deviceService.addListener(flowTable);
249 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
250
Jon Hallfa132292017-10-24 11:11:24 -0700251 logConfig("Started");
252 }
253
254 @Deactivate
255 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700256 configService.unregisterProperties(getClass(), false);
257 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700258 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700259 deviceTableStats.removeListener(tableStatsListener);
260 deviceTableStats.destroy();
261 eventHandler.shutdownNow();
262 messageHandlingExecutor.shutdownNow();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800263 backupScheduler.shutdownNow();
264 backupExecutor.shutdownNow();
Ray Milkeyd1092d62019-04-02 09:12:00 -0700265 backupScheduler = null;
266 backupExecutor = null;
Jon Hallfa132292017-10-24 11:11:24 -0700267 log.info("Stopped");
268 }
269
270 @SuppressWarnings("rawtypes")
271 @Modified
272 public void modified(ComponentContext context) {
273 if (context == null) {
274 logConfig("Default config");
275 return;
276 }
277
278 Dictionary properties = context.getProperties();
279 int newPoolSize;
280 int newBackupPeriod;
281 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700282 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700283 try {
284 String s = get(properties, "msgHandlerPoolSize");
285 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
286
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700287 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700288 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
289
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700290 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700291 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700292
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700293 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700294 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700295 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700296 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
297 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
298 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
299 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700300 }
301
Jon Hallfa132292017-10-24 11:11:24 -0700302 if (newBackupPeriod != backupPeriod) {
303 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700304 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700305 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700306
307 if (newAntiEntropyPeriod != antiEntropyPeriod) {
308 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700309 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700310 }
311
Jon Hallfa132292017-10-24 11:11:24 -0700312 if (newPoolSize != msgHandlerPoolSize) {
313 msgHandlerPoolSize = newPoolSize;
314 ExecutorService oldMsgHandler = messageHandlingExecutor;
315 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700316 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700317
318 // replace previously registered handlers.
319 registerMessageHandlers(messageHandlingExecutor);
320 oldMsgHandler.shutdown();
321 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700322
Jon Hallfa132292017-10-24 11:11:24 -0700323 if (backupCount != newBackupCount) {
324 backupCount = newBackupCount;
325 }
326 logConfig("Reconfigured");
327 }
328
329 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700330 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
331 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700332 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700333 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700334 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800335 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
336 GET_DEVICE_FLOW_COUNT,
337 serializer::decode,
338 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
339 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700340 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700341 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700342 }
343
344 private void unregisterMessageHandlers() {
345 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700346 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700347 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
348 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
349 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
350 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
351 }
352
353 private void logConfig(String prefix) {
354 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700355 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700356 }
357
Jon Hallfa132292017-10-24 11:11:24 -0700358 @Override
359 public int getFlowRuleCount() {
360 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700361 .mapToInt(device -> getFlowRuleCount(device.id()))
362 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800363 }
364
365 @Override
366 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800367 return getFlowRuleCount(deviceId, null);
368 }
369
370 @Override
371 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700372 NodeId master = mastershipService.getMasterFor(deviceId);
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100373 if (master == null && deviceService.isAvailable(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700374 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
375 return 0;
376 }
377
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100378 if (Objects.equals(local, master) || master == null) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800379 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700380 }
381
382 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
383 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800384 Pair.of(deviceId, state),
385 GET_DEVICE_FLOW_COUNT,
386 serializer::encode,
387 serializer::decode,
388 master),
389 FLOW_RULE_STORE_TIMEOUT_MILLIS,
390 TimeUnit.MILLISECONDS,
391 0);
Jon Hallfa132292017-10-24 11:11:24 -0700392 }
393
394 @Override
395 public FlowEntry getFlowEntry(FlowRule rule) {
396 NodeId master = mastershipService.getMasterFor(rule.deviceId());
397
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100398 if (master == null && deviceService.isAvailable(rule.deviceId())) {
Jon Hallfa132292017-10-24 11:11:24 -0700399 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
400 return null;
401 }
402
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100403 if (Objects.equals(local, master) || master == null) {
Jon Hallfa132292017-10-24 11:11:24 -0700404 return flowTable.getFlowEntry(rule);
405 }
406
407 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700408 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700409
410 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700411 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
412 serializer::encode,
413 serializer::decode,
414 master),
415 FLOW_RULE_STORE_TIMEOUT_MILLIS,
416 TimeUnit.MILLISECONDS,
417 null);
Jon Hallfa132292017-10-24 11:11:24 -0700418 }
419
420 @Override
421 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700422 return flowTable.getFlowEntries(deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700423 }
424
425 @Override
426 public void storeFlowRule(FlowRule rule) {
427 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700428 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
429 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700430 }
431
432 @Override
433 public void storeBatch(FlowRuleBatchOperation operation) {
434 if (operation.getOperations().isEmpty()) {
435 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700436 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
437 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700438 return;
439 }
440
441 DeviceId deviceId = operation.deviceId();
442 NodeId master = mastershipService.getMasterFor(deviceId);
443
444 if (master == null) {
445 log.warn("No master for {} ", deviceId);
446
Jordan Halterman281dbf32018-06-15 17:46:28 -0700447 Set<FlowRule> allFailures = operation.getOperations()
448 .stream()
449 .map(op -> op.target())
450 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700451 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700452 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
453 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700454 return;
455 }
456
457 if (Objects.equals(local, master)) {
458 storeBatchInternal(operation);
459 return;
460 }
461
462 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700463 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700464
465 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700466 APPLY_BATCH_FLOWS,
467 serializer::encode,
468 master)
469 .whenComplete((result, error) -> {
470 if (error != null) {
471 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700472
Jordan Halterman281dbf32018-06-15 17:46:28 -0700473 Set<FlowRule> allFailures = operation.getOperations()
474 .stream()
475 .map(op -> op.target())
476 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700477
Jordan Halterman281dbf32018-06-15 17:46:28 -0700478 notifyDelegate(FlowRuleBatchEvent.completed(
479 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
480 new CompletedBatchOperation(false, allFailures, deviceId)));
481 }
482 });
Jon Hallfa132292017-10-24 11:11:24 -0700483 }
484
485 private void storeBatchInternal(FlowRuleBatchOperation operation) {
486
487 final DeviceId did = operation.deviceId();
488 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
489 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
490 if (currentOps.isEmpty()) {
491 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700492 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
493 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700494 return;
495 }
496
497 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700498 FlowRuleBatchRequest(operation.id(),
499 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700500 }
501
502 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
503 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700504 op -> {
505 StoredFlowEntry entry;
506 switch (op.operator()) {
507 case ADD:
508 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800509 log.debug("Adding flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700510 flowTable.add(entry);
511 return op;
512 case MODIFY:
513 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800514 log.debug("Updating flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700515 flowTable.update(entry);
516 return op;
517 case REMOVE:
518 return flowTable.update(op.target(), stored -> {
519 stored.setState(FlowEntryState.PENDING_REMOVE);
520 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800521 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700522 });
523 default:
524 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700525 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700526 return null;
527 }
Jon Hallfa132292017-10-24 11:11:24 -0700528 ).filter(Objects::nonNull).collect(Collectors.toSet());
529 }
530
531 @Override
532 public void deleteFlowRule(FlowRule rule) {
533 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700534 new FlowRuleBatchOperation(
535 Collections.singletonList(
536 new FlowRuleBatchEntry(
537 FlowRuleOperation.REMOVE,
538 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700539 }
540
541 @Override
542 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
543 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700544 return flowTable.update(rule, stored -> {
545 if (stored.state() == FlowEntryState.PENDING_ADD) {
546 stored.setState(FlowEntryState.PENDING_ADD);
547 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
548 }
549 return null;
550 });
Jon Hallfa132292017-10-24 11:11:24 -0700551 }
552 return null;
553 }
554
555 @Override
556 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
557 NodeId master = mastershipService.getMasterFor(rule.deviceId());
558 if (Objects.equals(local, master)) {
559 return addOrUpdateFlowRuleInternal(rule);
560 }
561
562 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700563 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700564 return null;
565 }
566
567 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700568 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700569 stored.setBytes(rule.bytes());
570 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
571 stored.setLiveType(rule.liveType());
572 stored.setPackets(rule.packets());
573 stored.setLastSeen();
574 if (stored.state() == FlowEntryState.PENDING_ADD) {
575 stored.setState(FlowEntryState.ADDED);
576 return new FlowRuleEvent(Type.RULE_ADDED, rule);
577 }
578 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700579 });
580 if (event != null) {
581 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700582 }
583
584 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
585 // TODO: also update backup if the behavior is correct.
586 flowTable.add(rule);
587 return null;
588 }
589
590 @Override
591 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
592 final DeviceId deviceId = rule.deviceId();
593 NodeId master = mastershipService.getMasterFor(deviceId);
594
595 if (Objects.equals(local, master)) {
596 // bypass and handle it locally
597 return removeFlowRuleInternal(rule);
598 }
599
600 if (master == null) {
601 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
602 // TODO: revisit if this should be null (="no-op") or Exception
603 return null;
604 }
605
606 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700607 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700608
Jordan Halterman281dbf32018-06-15 17:46:28 -0700609 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
610 rule,
611 REMOVE_FLOW_ENTRY,
612 serializer::encode,
613 serializer::decode,
614 master),
615 FLOW_RULE_STORE_TIMEOUT_MILLIS,
616 TimeUnit.MILLISECONDS,
617 null);
Jon Hallfa132292017-10-24 11:11:24 -0700618 }
619
620 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700621 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800622 final FlowEntry removed = flowTable.remove(rule);
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800623 log.debug("Removed flow rule: {}", removed);
Jon Hallfa132292017-10-24 11:11:24 -0700624 // rule may be partial rule that is missing treatment, we should use rule from store instead
625 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
626 }
627
628 @Override
629 public void purgeFlowRule(DeviceId deviceId) {
630 flowTable.purgeFlowRule(deviceId);
631 }
632
633 @Override
634 public void purgeFlowRules() {
635 flowTable.purgeFlowRules();
636 }
637
638 @Override
639 public void batchOperationComplete(FlowRuleBatchEvent event) {
640 //FIXME: need a per device pending response
641 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
642 if (nodeId == null) {
643 notifyDelegate(event);
644 } else {
645 // TODO check unicast return value
646 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
647 //error log: log.warn("Failed to respond to peer for batch operation result");
648 }
649 }
650
651 private final class OnStoreBatch implements ClusterMessageHandler {
652
653 @Override
654 public void handle(final ClusterMessage message) {
655 FlowRuleBatchOperation operation = serializer.decode(message.payload());
656 log.debug("received batch request {}", operation);
657
658 final DeviceId deviceId = operation.deviceId();
659 NodeId master = mastershipService.getMasterFor(deviceId);
660 if (!Objects.equals(local, master)) {
661 Set<FlowRule> failures = new HashSet<>(operation.size());
662 for (FlowRuleBatchEntry op : operation.getOperations()) {
663 failures.add(op.target());
664 }
665 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
666 // This node is no longer the master, respond as all failed.
667 // TODO: we might want to wrap response in envelope
668 // to distinguish sw programming failure and hand over
669 // it make sense in the latter case to retry immediately.
670 message.respond(serializer.encode(allFailed));
671 return;
672 }
673
674 pendingResponses.put(operation.id(), message.sender());
675 storeBatchInternal(operation);
676 }
677 }
678
Jordan Halterman281dbf32018-06-15 17:46:28 -0700679 private class InternalFlowTable implements DeviceListener {
680 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700681
682 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700683 public void event(DeviceEvent event) {
684 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
685 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700686 }
687 }
688
Jordan Halterman281dbf32018-06-15 17:46:28 -0700689 /**
690 * Adds the given device to the flow table.
691 *
692 * @param deviceId the device to add to the table
693 */
694 public void addDevice(DeviceId deviceId) {
695 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
696 id,
697 clusterService,
698 clusterCommunicator,
699 new InternalLifecycleManager(id),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000700 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800701 backupScheduler,
702 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700703 backupPeriod,
704 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700705 }
706
Jordan Halterman281dbf32018-06-15 17:46:28 -0700707 /**
708 * Sets the flow table backup period.
709 *
710 * @param backupPeriod the flow table backup period
711 */
712 void setBackupPeriod(int backupPeriod) {
713 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700714 }
715
Jordan Halterman281dbf32018-06-15 17:46:28 -0700716 /**
717 * Sets the flow table anti-entropy period.
718 *
719 * @param antiEntropyPeriod the flow table anti-entropy period
720 */
721 void setAntiEntropyPeriod(int antiEntropyPeriod) {
722 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700723 }
724
Jordan Halterman281dbf32018-06-15 17:46:28 -0700725 /**
726 * Returns the flow table for a specific device.
727 *
728 * @param deviceId the device identifier
729 * @return the flow table for the given device
730 */
731 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
732 DeviceFlowTable flowTable = flowTables.get(deviceId);
733 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
734 deviceId,
735 clusterService,
736 clusterCommunicator,
737 new InternalLifecycleManager(deviceId),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000738 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800739 backupScheduler,
740 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700741 backupPeriod,
742 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700743 }
744
Jordan Halterman281dbf32018-06-15 17:46:28 -0700745 /**
746 * Returns the flow rule count for the given device.
747 *
748 * @param deviceId the device for which to return the flow rule count
749 * @return the flow rule count for the given device
750 */
751 public int getFlowRuleCount(DeviceId deviceId) {
752 return getFlowTable(deviceId).count();
753 }
754
755 /**
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800756 * Returns the count of flow rules in the given state for the given device.
757 *
758 * @param deviceId the device for which to return the flow rule count
759 * @return the flow rule count for the given device
760 */
761 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
762 if (state == null) {
763 return getFlowRuleCount(deviceId);
764 }
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700765 return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800766 .filter(rule -> rule.state() == state)
767 .count();
768 }
769
770 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700771 * Returns the flow entry for the given rule.
772 *
773 * @param rule the rule for which to return the flow entry
774 * @return the flow entry for the given rule
775 */
Jon Hallfa132292017-10-24 11:11:24 -0700776 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700777 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700778 }
779
Jordan Halterman281dbf32018-06-15 17:46:28 -0700780 /**
781 * Returns the set of flow entries for the given device.
782 *
783 * @param deviceId the device for which to lookup flow entries
784 * @return the set of flow entries for the given device
785 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700786 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
787 try {
788 return getFlowTable(deviceId).getFlowEntries()
Jordan Halterman43300782019-05-21 11:27:50 -0700789 .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700790 } catch (ExecutionException e) {
791 throw new FlowRuleStoreException(e.getCause());
792 } catch (TimeoutException e) {
793 throw new FlowRuleStoreException.Timeout();
794 } catch (InterruptedException e) {
795 throw new FlowRuleStoreException.Interrupted();
796 }
Jon Hallfa132292017-10-24 11:11:24 -0700797 }
798
Jordan Halterman281dbf32018-06-15 17:46:28 -0700799 /**
800 * Adds the given flow rule.
801 *
802 * @param rule the rule to add
803 */
Jon Hallfa132292017-10-24 11:11:24 -0700804 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700805 Tools.futureGetOrElse(
806 getFlowTable(rule.deviceId()).add(rule),
807 FLOW_RULE_STORE_TIMEOUT_MILLIS,
808 TimeUnit.MILLISECONDS,
809 null);
Jon Hallfa132292017-10-24 11:11:24 -0700810 }
811
Jordan Halterman281dbf32018-06-15 17:46:28 -0700812 /**
813 * Updates the given flow rule.
814 *
815 * @param rule the rule to update
816 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800817 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700818 Tools.futureGetOrElse(
819 getFlowTable(rule.deviceId()).update(rule),
820 FLOW_RULE_STORE_TIMEOUT_MILLIS,
821 TimeUnit.MILLISECONDS,
822 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800823 }
824
Jordan Halterman281dbf32018-06-15 17:46:28 -0700825 /**
826 * Applies the given update function to the rule.
827 *
828 * @param function the update function to apply
829 * @return a future to be completed with the update event or {@code null} if the rule was not updated
830 */
831 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
832 return Tools.futureGetOrElse(
833 getFlowTable(rule.deviceId()).update(rule, function),
834 FLOW_RULE_STORE_TIMEOUT_MILLIS,
835 TimeUnit.MILLISECONDS,
836 null);
837 }
838
839 /**
840 * Removes the given flow rule.
841 *
842 * @param rule the rule to remove
843 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800844 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700845 return Tools.futureGetOrElse(
846 getFlowTable(rule.deviceId()).remove(rule),
847 FLOW_RULE_STORE_TIMEOUT_MILLIS,
848 TimeUnit.MILLISECONDS,
849 null);
Jon Hallfa132292017-10-24 11:11:24 -0700850 }
851
Jordan Halterman281dbf32018-06-15 17:46:28 -0700852 /**
853 * Purges flow rules for the given device.
854 *
855 * @param deviceId the device for which to purge flow rules
856 */
Jon Hallfa132292017-10-24 11:11:24 -0700857 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700858 // If the device is still present in the store, purge the underlying DeviceFlowTable.
859 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
860 if (deviceService.getDevice(deviceId) != null) {
861 DeviceFlowTable flowTable = flowTables.get(deviceId);
862 if (flowTable != null) {
863 flowTable.purge();
864 }
865 } else {
866 DeviceFlowTable flowTable = flowTables.remove(deviceId);
867 if (flowTable != null) {
868 flowTable.close();
869 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700870 }
Jon Hallfa132292017-10-24 11:11:24 -0700871 }
872
Jordan Halterman281dbf32018-06-15 17:46:28 -0700873 /**
874 * Purges all flow rules from the table.
875 */
Jon Hallfa132292017-10-24 11:11:24 -0700876 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700877 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
878 while (iterator.hasNext()) {
879 iterator.next().close();
880 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700881 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700882 }
Jon Hallfa132292017-10-24 11:11:24 -0700883 }
884
885 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700886 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700887 deviceTableStats.put(deviceId, tableStats);
888 return null;
889 }
890
891 @Override
892 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
893 NodeId master = mastershipService.getMasterFor(deviceId);
894
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100895 if (master == null && deviceService.isAvailable(deviceId)) {
Jon Hallfa132292017-10-24 11:11:24 -0700896 log.debug("Failed to getTableStats: No master for {}", deviceId);
897 return Collections.emptyList();
898 }
899
900 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
901 if (tableStats == null) {
902 return Collections.emptyList();
903 }
904 return ImmutableList.copyOf(tableStats);
905 }
906
907 @Override
908 public long getActiveFlowRuleCount(DeviceId deviceId) {
909 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700910 .mapToLong(TableStatisticsEntry::activeFlowEntries)
911 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700912 }
913
914 private class InternalTableStatsListener
915 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
916 @Override
917 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700918 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700919 //TODO: Generate an event to listeners (do we need?)
920 }
921 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700922
923 /**
924 * Device lifecycle manager implementation.
925 */
926 private final class InternalLifecycleManager
927 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
928 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
929
930 private final DeviceId deviceId;
931
932 private volatile DeviceReplicaInfo replicaInfo;
933
934 InternalLifecycleManager(DeviceId deviceId) {
935 this.deviceId = deviceId;
936 replicaInfoManager.addListener(this);
937 mastershipTermLifecycles.addListener(this);
938 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
939 }
940
941 @Override
942 public DeviceReplicaInfo getReplicaInfo() {
943 return replicaInfo;
944 }
945
946 @Override
947 public void activate(long term) {
948 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
949 if (replicaInfo != null && replicaInfo.term() == term) {
950 mastershipTermLifecycles.put(deviceId, term);
951 }
952 }
953
954 @Override
955 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700956 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700957 onReplicaInfoChange(event.replicaInfo());
958 }
959 }
960
961 @Override
962 public void event(MapEvent<DeviceId, Long> event) {
963 if (event.key().equals(deviceId) && event.newValue() != null) {
964 onActivate(event.newValue().value());
965 }
966 }
967
968 /**
969 * Handles a term activation event.
970 *
971 * @param term the term that was activated
972 */
973 private void onActivate(long term) {
974 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
975 if (replicaInfo != null && replicaInfo.term() == term) {
976 NodeId master = replicaInfo.master().orElse(null);
977 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800978 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700979 listenerRegistry.process(new LifecycleEvent(
980 LifecycleEvent.Type.TERM_ACTIVE,
981 new DeviceReplicaInfo(term, master, backups)));
982 }
983 }
984
985 /**
986 * Handles a replica info change event.
987 *
988 * @param replicaInfo the updated replica info
989 */
990 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
991 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
992 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
993 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
994 if (oldReplicaInfo != null) {
995 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
996 }
997 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700998 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
999 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001000 }
1001 }
1002
1003 /**
1004 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
1005 *
1006 * @param replicaInfo the replica info to convert
1007 * @return the converted replica info
1008 */
1009 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
1010 NodeId master = replicaInfo.master().orElse(null);
1011 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001012 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001013 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
1014 }
1015
1016 @Override
1017 public void close() {
1018 replicaInfoManager.removeListener(this);
1019 mastershipTermLifecycles.removeListener(this);
1020 }
1021 }
Jordan Haltermanb81fdc12019-03-04 18:12:20 -08001022
1023 private static class CountMessage {
1024 private final DeviceId deviceId;
1025 private final FlowEntryState state;
1026
1027 CountMessage(DeviceId deviceId, FlowEntryState state) {
1028 this.deviceId = deviceId;
1029 this.state = state;
1030 }
1031 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001032}