blob: 602f4d31a59b93da5b6cf527977367e5d70ecb51 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070018import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34import java.util.stream.StreamSupport;
35
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
37import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070038import com.google.common.collect.Streams;
Jordan Haltermanb81fdc12019-03-04 18:12:20 -080039import org.apache.commons.lang3.tuple.ImmutablePair;
40import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.onlab.util.KryoNamespace;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080042import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070043import org.onlab.util.Tools;
44import org.onosproject.cfg.ComponentConfigService;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
Daniele Moro43ac2892021-07-15 17:02:59 +020047import org.onosproject.core.ApplicationId;
Jon Hallfa132292017-10-24 11:11:24 -070048import org.onosproject.core.CoreService;
49import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070050import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070051import org.onosproject.mastership.MastershipService;
52import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070053import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070055import org.onosproject.net.device.DeviceService;
56import org.onosproject.net.flow.CompletedBatchOperation;
57import org.onosproject.net.flow.DefaultFlowEntry;
pierventre019b07292021-05-21 12:39:09 +020058import org.onosproject.net.flow.DefaultFlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070059import org.onosproject.net.flow.FlowEntry;
60import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070061import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070062import org.onosproject.net.flow.FlowRuleEvent;
63import org.onosproject.net.flow.FlowRuleEvent.Type;
64import org.onosproject.net.flow.FlowRuleService;
65import org.onosproject.net.flow.FlowRuleStore;
66import org.onosproject.net.flow.FlowRuleStoreDelegate;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070067import org.onosproject.net.flow.FlowRuleStoreException;
Daniele Moro43ac2892021-07-15 17:02:59 +020068import org.onosproject.net.flow.StoredFlowEntry;
Jon Hallfa132292017-10-24 11:11:24 -070069import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070070import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
73import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
74import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070075import org.onosproject.persistence.PersistenceService;
76import org.onosproject.store.AbstractStore;
77import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
78import org.onosproject.store.cluster.messaging.ClusterMessage;
79import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070080import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070081import org.onosproject.store.flow.ReplicaInfoEvent;
82import org.onosproject.store.flow.ReplicaInfoEventListener;
83import org.onosproject.store.flow.ReplicaInfoService;
84import org.onosproject.store.impl.MastershipBasedTimestamp;
85import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070086import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070087import org.onosproject.store.service.EventuallyConsistentMap;
88import org.onosproject.store.service.EventuallyConsistentMapEvent;
89import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070090import org.onosproject.store.service.MapEvent;
91import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070092import org.onosproject.store.service.Serializer;
93import org.onosproject.store.service.StorageService;
94import org.onosproject.store.service.WallClockTimestamp;
95import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070096import org.osgi.service.component.annotations.Activate;
97import org.osgi.service.component.annotations.Component;
98import org.osgi.service.component.annotations.Deactivate;
99import org.osgi.service.component.annotations.Modified;
100import org.osgi.service.component.annotations.Reference;
101import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -0700102import org.slf4j.Logger;
103
Jon Hallfa132292017-10-24 11:11:24 -0700104import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800105import static java.lang.Math.max;
106import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700107import static org.onlab.util.Tools.get;
108import static org.onlab.util.Tools.groupedThreads;
109import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
111import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700112import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700113import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
Daniele Moro43ac2892021-07-15 17:02:59 +0200114import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
Jon Hallfa132292017-10-24 11:11:24 -0700115import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
116import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
117import static org.slf4j.LoggerFactory.getLogger;
118
Ray Milkeyb5646e62018-10-16 11:42:18 -0700119import static org.onosproject.store.OsgiPropertyConstants.*;
120
Jon Hallfa132292017-10-24 11:11:24 -0700121/**
122 * Manages inventory of flow rules using a distributed state management protocol.
123 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700124@Component(
125 immediate = true,
126 service = FlowRuleStore.class,
127 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700128 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
129 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
130 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
131 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
132 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700133 }
134)
Jon Hallfa132292017-10-24 11:11:24 -0700135public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700136 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
137 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700138
139 private final Logger log = getLogger(getClass());
140
Jon Hallfa132292017-10-24 11:11:24 -0700141 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Daniele Moro43ac2892021-07-15 17:02:59 +0200142 private static final long PURGE_TIMEOUT_MILLIS = 30000;
Jordan Halterman43300782019-05-21 11:27:50 -0700143 private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
Jon Hallfa132292017-10-24 11:11:24 -0700144
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700145 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700146 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700147
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700148 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700149 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700150
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700151 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700152 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700153
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700154 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700155 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700156
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700157 /** Max number of backup copies for each device. */
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100158 protected static volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700159
160 private InternalFlowTable flowTable = new InternalFlowTable();
161
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700163 protected ReplicaInfoService replicaInfoManager;
164
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700166 protected ClusterCommunicationService clusterCommunicator;
167
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700169 protected ClusterService clusterService;
170
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700172 protected DeviceService deviceService;
173
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700175 protected CoreService coreService;
176
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700178 protected ComponentConfigService configService;
179
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700181 protected MastershipService mastershipService;
182
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700183 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700184 protected PersistenceService persistenceService;
185
186 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
187 private ExecutorService messageHandlingExecutor;
188 private ExecutorService eventHandler;
189
Ray Milkeyd1092d62019-04-02 09:12:00 -0700190 private ScheduledExecutorService backupScheduler;
191 private ExecutorService backupExecutor;
Jon Hallfa132292017-10-24 11:11:24 -0700192
193 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
194 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700195 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700196
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700197 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700198 protected StorageService storageService;
199
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700200 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
201 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700202 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700203 .register(FlowBucket.class)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800204 .register(ImmutablePair.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700205 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700206
207 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700208 .register(KryoNamespaces.API)
209 .register(BucketId.class)
210 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700211
Jordan Halterman281dbf32018-06-15 17:46:28 -0700212 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700213
214 private IdGenerator idGenerator;
215 private NodeId local;
216
217 @Activate
218 public void activate(ComponentContext context) {
219 configService.registerProperties(getClass());
220
Ray Milkeyd1092d62019-04-02 09:12:00 -0700221 backupScheduler = Executors.newSingleThreadScheduledExecutor(
222 groupedThreads("onos/flow", "backup-scheduler", log));
223 backupExecutor = Executors.newFixedThreadPool(
224 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
225 groupedThreads("onos/flow", "backup-%d", log));
226
Jon Hallfa132292017-10-24 11:11:24 -0700227 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
228
229 local = clusterService.getLocalNode().id();
230
231 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700232 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700233 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700234 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700235
236 registerMessageHandlers(messageHandlingExecutor);
237
Jordan Halterman281dbf32018-06-15 17:46:28 -0700238 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
239 .withName("onos-flow-store-terms")
240 .withSerializer(serializer)
241 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800242
Jon Hallfa132292017-10-24 11:11:24 -0700243 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700244 .withName("onos-flow-table-stats")
245 .withSerializer(serializerBuilder)
246 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
247 .withTimestampProvider((k, v) -> new WallClockTimestamp())
248 .withTombstonesDisabled()
249 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700250 deviceTableStats.addListener(tableStatsListener);
251
Jordan Halterman281dbf32018-06-15 17:46:28 -0700252 deviceService.addListener(flowTable);
253 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
254
Jon Hallfa132292017-10-24 11:11:24 -0700255 logConfig("Started");
256 }
257
258 @Deactivate
259 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700260 configService.unregisterProperties(getClass(), false);
261 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700262 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700263 deviceTableStats.removeListener(tableStatsListener);
264 deviceTableStats.destroy();
265 eventHandler.shutdownNow();
266 messageHandlingExecutor.shutdownNow();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800267 backupScheduler.shutdownNow();
268 backupExecutor.shutdownNow();
Ray Milkeyd1092d62019-04-02 09:12:00 -0700269 backupScheduler = null;
270 backupExecutor = null;
Jon Hallfa132292017-10-24 11:11:24 -0700271 log.info("Stopped");
272 }
273
274 @SuppressWarnings("rawtypes")
275 @Modified
276 public void modified(ComponentContext context) {
277 if (context == null) {
278 logConfig("Default config");
279 return;
280 }
281
282 Dictionary properties = context.getProperties();
283 int newPoolSize;
284 int newBackupPeriod;
285 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700286 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700287 try {
288 String s = get(properties, "msgHandlerPoolSize");
289 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
290
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700291 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700292 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
293
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700294 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700295 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700296
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700297 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700298 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700299 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700300 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
301 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
302 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
303 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700304 }
305
Jon Hallfa132292017-10-24 11:11:24 -0700306 if (newBackupPeriod != backupPeriod) {
307 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700308 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700309 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700310
311 if (newAntiEntropyPeriod != antiEntropyPeriod) {
312 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700313 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700314 }
315
Jon Hallfa132292017-10-24 11:11:24 -0700316 if (newPoolSize != msgHandlerPoolSize) {
317 msgHandlerPoolSize = newPoolSize;
318 ExecutorService oldMsgHandler = messageHandlingExecutor;
319 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700320 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700321
322 // replace previously registered handlers.
323 registerMessageHandlers(messageHandlingExecutor);
324 oldMsgHandler.shutdown();
325 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700326
Jon Hallfa132292017-10-24 11:11:24 -0700327 if (backupCount != newBackupCount) {
328 backupCount = newBackupCount;
329 }
330 logConfig("Reconfigured");
331 }
332
333 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700334 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
335 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700336 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700337 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700338 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800339 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
340 GET_DEVICE_FLOW_COUNT,
341 serializer::decode,
342 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
343 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700344 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700345 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Daniele Moro43ac2892021-07-15 17:02:59 +0200346 clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
347 PURGE_FLOW_RULES,
348 serializer::decode,
349 p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
350 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700351 }
352
353 private void unregisterMessageHandlers() {
354 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700355 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700356 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
357 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
358 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
359 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
360 }
361
362 private void logConfig(String prefix) {
363 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700364 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700365 }
366
Jon Hallfa132292017-10-24 11:11:24 -0700367 @Override
368 public int getFlowRuleCount() {
369 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700370 .mapToInt(device -> getFlowRuleCount(device.id()))
371 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800372 }
373
374 @Override
375 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800376 return getFlowRuleCount(deviceId, null);
377 }
378
379 @Override
380 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700381 NodeId master = mastershipService.getMasterFor(deviceId);
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100382 if (master == null && deviceService.isAvailable(deviceId)) {
pierventre7768d7f2022-01-04 19:08:46 +0100383 log.warn("Failed to getFlowRuleCount: No master for {}", deviceId);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700384 return 0;
385 }
386
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100387 if (Objects.equals(local, master) || master == null) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800388 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700389 }
390
391 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
392 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800393 Pair.of(deviceId, state),
394 GET_DEVICE_FLOW_COUNT,
395 serializer::encode,
396 serializer::decode,
397 master),
398 FLOW_RULE_STORE_TIMEOUT_MILLIS,
399 TimeUnit.MILLISECONDS,
400 0);
Jon Hallfa132292017-10-24 11:11:24 -0700401 }
402
403 @Override
404 public FlowEntry getFlowEntry(FlowRule rule) {
405 NodeId master = mastershipService.getMasterFor(rule.deviceId());
406
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100407 if (master == null && deviceService.isAvailable(rule.deviceId())) {
pierventre7768d7f2022-01-04 19:08:46 +0100408 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700409 return null;
410 }
411
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100412 if (Objects.equals(local, master) || master == null) {
Jon Hallfa132292017-10-24 11:11:24 -0700413 return flowTable.getFlowEntry(rule);
414 }
415
416 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700417 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700418
419 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700420 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
421 serializer::encode,
422 serializer::decode,
423 master),
424 FLOW_RULE_STORE_TIMEOUT_MILLIS,
425 TimeUnit.MILLISECONDS,
426 null);
Jon Hallfa132292017-10-24 11:11:24 -0700427 }
428
429 @Override
430 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700431 return flowTable.getFlowEntries(deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700432 }
433
434 @Override
435 public void storeFlowRule(FlowRule rule) {
436 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700437 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
438 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700439 }
440
441 @Override
442 public void storeBatch(FlowRuleBatchOperation operation) {
443 if (operation.getOperations().isEmpty()) {
444 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700445 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
446 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700447 return;
448 }
449
450 DeviceId deviceId = operation.deviceId();
451 NodeId master = mastershipService.getMasterFor(deviceId);
452
453 if (master == null) {
pierventre7768d7f2022-01-04 19:08:46 +0100454 log.warn("Failed to storeBatch: No master for {}", deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700455
Jordan Halterman281dbf32018-06-15 17:46:28 -0700456 Set<FlowRule> allFailures = operation.getOperations()
457 .stream()
458 .map(op -> op.target())
459 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700460 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700461 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
462 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700463 return;
464 }
465
466 if (Objects.equals(local, master)) {
467 storeBatchInternal(operation);
468 return;
469 }
470
471 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700472 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700473
474 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700475 APPLY_BATCH_FLOWS,
476 serializer::encode,
477 master)
478 .whenComplete((result, error) -> {
479 if (error != null) {
480 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700481
Jordan Halterman281dbf32018-06-15 17:46:28 -0700482 Set<FlowRule> allFailures = operation.getOperations()
483 .stream()
484 .map(op -> op.target())
485 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700486
Jordan Halterman281dbf32018-06-15 17:46:28 -0700487 notifyDelegate(FlowRuleBatchEvent.completed(
488 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
489 new CompletedBatchOperation(false, allFailures, deviceId)));
490 }
491 });
Jon Hallfa132292017-10-24 11:11:24 -0700492 }
493
494 private void storeBatchInternal(FlowRuleBatchOperation operation) {
495
496 final DeviceId did = operation.deviceId();
497 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
498 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
499 if (currentOps.isEmpty()) {
500 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700501 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
502 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700503 return;
504 }
505
506 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700507 FlowRuleBatchRequest(operation.id(),
508 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700509 }
510
511 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
512 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700513 op -> {
514 StoredFlowEntry entry;
515 switch (op.operator()) {
516 case ADD:
517 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800518 log.debug("Adding flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700519 flowTable.add(entry);
520 return op;
521 case MODIFY:
522 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800523 log.debug("Updating flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700524 flowTable.update(entry);
525 return op;
526 case REMOVE:
527 return flowTable.update(op.target(), stored -> {
528 stored.setState(FlowEntryState.PENDING_REMOVE);
529 log.debug("Setting state of rule to pending remove: {}", stored);
pierventre019b07292021-05-21 12:39:09 +0200530 // Using the stored value instead of op to allow the removal
531 // of flows that do not specify actions or have empty treatment
532 return new FlowRuleBatchEntry(op.operator(), new DefaultFlowRule(stored));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700533 });
534 default:
535 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700536 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700537 return null;
538 }
Jon Hallfa132292017-10-24 11:11:24 -0700539 ).filter(Objects::nonNull).collect(Collectors.toSet());
540 }
541
542 @Override
543 public void deleteFlowRule(FlowRule rule) {
544 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700545 new FlowRuleBatchOperation(
546 Collections.singletonList(
547 new FlowRuleBatchEntry(
548 FlowRuleOperation.REMOVE,
549 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700550 }
551
552 @Override
553 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
554 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700555 return flowTable.update(rule, stored -> {
556 if (stored.state() == FlowEntryState.PENDING_ADD) {
557 stored.setState(FlowEntryState.PENDING_ADD);
558 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
559 }
560 return null;
561 });
Jon Hallfa132292017-10-24 11:11:24 -0700562 }
563 return null;
564 }
565
566 @Override
567 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
568 NodeId master = mastershipService.getMasterFor(rule.deviceId());
569 if (Objects.equals(local, master)) {
570 return addOrUpdateFlowRuleInternal(rule);
571 }
572
573 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700574 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700575 return null;
576 }
577
578 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700579 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700580 stored.setBytes(rule.bytes());
581 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
582 stored.setLiveType(rule.liveType());
583 stored.setPackets(rule.packets());
584 stored.setLastSeen();
585 if (stored.state() == FlowEntryState.PENDING_ADD) {
586 stored.setState(FlowEntryState.ADDED);
587 return new FlowRuleEvent(Type.RULE_ADDED, rule);
588 }
589 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700590 });
591 if (event != null) {
592 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700593 }
594
595 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
596 // TODO: also update backup if the behavior is correct.
597 flowTable.add(rule);
598 return null;
599 }
600
601 @Override
602 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
603 final DeviceId deviceId = rule.deviceId();
604 NodeId master = mastershipService.getMasterFor(deviceId);
605
606 if (Objects.equals(local, master)) {
607 // bypass and handle it locally
608 return removeFlowRuleInternal(rule);
609 }
610
611 if (master == null) {
612 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
613 // TODO: revisit if this should be null (="no-op") or Exception
614 return null;
615 }
616
617 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700618 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700619
Jordan Halterman281dbf32018-06-15 17:46:28 -0700620 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
621 rule,
622 REMOVE_FLOW_ENTRY,
623 serializer::encode,
624 serializer::decode,
625 master),
626 FLOW_RULE_STORE_TIMEOUT_MILLIS,
627 TimeUnit.MILLISECONDS,
628 null);
Jon Hallfa132292017-10-24 11:11:24 -0700629 }
630
631 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700632 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800633 final FlowEntry removed = flowTable.remove(rule);
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800634 log.debug("Removed flow rule: {}", removed);
Jon Hallfa132292017-10-24 11:11:24 -0700635 // rule may be partial rule that is missing treatment, we should use rule from store instead
636 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
637 }
638
639 @Override
640 public void purgeFlowRule(DeviceId deviceId) {
641 flowTable.purgeFlowRule(deviceId);
642 }
643
644 @Override
Daniele Moro43ac2892021-07-15 17:02:59 +0200645 public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
646 NodeId master = mastershipService.getMasterFor(deviceId);
647
648 if (Objects.equals(local, master)) {
649 // bypass and handle it locally
650 return flowTable.purgeFlowRules(deviceId, appId);
651 }
652
653 if (master == null) {
654 log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
655 return false;
656 }
657
658 log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
659 master, deviceId);
660
661 return Tools.futureGetOrElse(
662 clusterCommunicator.sendAndReceive(
663 Pair.of(deviceId, appId),
664 PURGE_FLOW_RULES,
665 serializer::encode,
666 serializer::decode,
667 master),
668 FLOW_RULE_STORE_TIMEOUT_MILLIS,
669 TimeUnit.MILLISECONDS,
670 false);
671 }
672
673 @Override
Jon Hallfa132292017-10-24 11:11:24 -0700674 public void purgeFlowRules() {
675 flowTable.purgeFlowRules();
676 }
677
678 @Override
679 public void batchOperationComplete(FlowRuleBatchEvent event) {
680 //FIXME: need a per device pending response
681 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
682 if (nodeId == null) {
683 notifyDelegate(event);
684 } else {
685 // TODO check unicast return value
686 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
687 //error log: log.warn("Failed to respond to peer for batch operation result");
688 }
689 }
690
691 private final class OnStoreBatch implements ClusterMessageHandler {
692
693 @Override
694 public void handle(final ClusterMessage message) {
695 FlowRuleBatchOperation operation = serializer.decode(message.payload());
696 log.debug("received batch request {}", operation);
697
698 final DeviceId deviceId = operation.deviceId();
699 NodeId master = mastershipService.getMasterFor(deviceId);
700 if (!Objects.equals(local, master)) {
701 Set<FlowRule> failures = new HashSet<>(operation.size());
702 for (FlowRuleBatchEntry op : operation.getOperations()) {
703 failures.add(op.target());
704 }
705 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
706 // This node is no longer the master, respond as all failed.
707 // TODO: we might want to wrap response in envelope
708 // to distinguish sw programming failure and hand over
709 // it make sense in the latter case to retry immediately.
710 message.respond(serializer.encode(allFailed));
711 return;
712 }
713
714 pendingResponses.put(operation.id(), message.sender());
715 storeBatchInternal(operation);
716 }
717 }
718
Jordan Halterman281dbf32018-06-15 17:46:28 -0700719 private class InternalFlowTable implements DeviceListener {
720 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700721
722 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700723 public void event(DeviceEvent event) {
724 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
725 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700726 }
727 }
728
Jordan Halterman281dbf32018-06-15 17:46:28 -0700729 /**
730 * Adds the given device to the flow table.
731 *
732 * @param deviceId the device to add to the table
733 */
734 public void addDevice(DeviceId deviceId) {
735 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
736 id,
737 clusterService,
738 clusterCommunicator,
739 new InternalLifecycleManager(id),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000740 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800741 backupScheduler,
742 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700743 backupPeriod,
744 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700745 }
746
Jordan Halterman281dbf32018-06-15 17:46:28 -0700747 /**
748 * Sets the flow table backup period.
749 *
750 * @param backupPeriod the flow table backup period
751 */
752 void setBackupPeriod(int backupPeriod) {
753 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700754 }
755
Jordan Halterman281dbf32018-06-15 17:46:28 -0700756 /**
757 * Sets the flow table anti-entropy period.
758 *
759 * @param antiEntropyPeriod the flow table anti-entropy period
760 */
761 void setAntiEntropyPeriod(int antiEntropyPeriod) {
762 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700763 }
764
Jordan Halterman281dbf32018-06-15 17:46:28 -0700765 /**
766 * Returns the flow table for a specific device.
767 *
768 * @param deviceId the device identifier
769 * @return the flow table for the given device
770 */
771 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
772 DeviceFlowTable flowTable = flowTables.get(deviceId);
773 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
774 deviceId,
775 clusterService,
776 clusterCommunicator,
777 new InternalLifecycleManager(deviceId),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000778 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800779 backupScheduler,
780 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700781 backupPeriod,
782 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700783 }
784
Jordan Halterman281dbf32018-06-15 17:46:28 -0700785 /**
786 * Returns the flow rule count for the given device.
787 *
788 * @param deviceId the device for which to return the flow rule count
789 * @return the flow rule count for the given device
790 */
791 public int getFlowRuleCount(DeviceId deviceId) {
792 return getFlowTable(deviceId).count();
793 }
794
795 /**
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800796 * Returns the count of flow rules in the given state for the given device.
797 *
798 * @param deviceId the device for which to return the flow rule count
799 * @return the flow rule count for the given device
800 */
801 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
802 if (state == null) {
803 return getFlowRuleCount(deviceId);
804 }
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700805 return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800806 .filter(rule -> rule.state() == state)
807 .count();
808 }
809
810 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700811 * Returns the flow entry for the given rule.
812 *
813 * @param rule the rule for which to return the flow entry
814 * @return the flow entry for the given rule
815 */
Jon Hallfa132292017-10-24 11:11:24 -0700816 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700817 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700818 }
819
Jordan Halterman281dbf32018-06-15 17:46:28 -0700820 /**
821 * Returns the set of flow entries for the given device.
822 *
823 * @param deviceId the device for which to lookup flow entries
824 * @return the set of flow entries for the given device
825 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700826 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
827 try {
828 return getFlowTable(deviceId).getFlowEntries()
Jordan Halterman43300782019-05-21 11:27:50 -0700829 .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700830 } catch (ExecutionException e) {
831 throw new FlowRuleStoreException(e.getCause());
832 } catch (TimeoutException e) {
833 throw new FlowRuleStoreException.Timeout();
834 } catch (InterruptedException e) {
835 throw new FlowRuleStoreException.Interrupted();
836 }
Jon Hallfa132292017-10-24 11:11:24 -0700837 }
838
Jordan Halterman281dbf32018-06-15 17:46:28 -0700839 /**
840 * Adds the given flow rule.
841 *
842 * @param rule the rule to add
843 */
Jon Hallfa132292017-10-24 11:11:24 -0700844 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700845 Tools.futureGetOrElse(
846 getFlowTable(rule.deviceId()).add(rule),
847 FLOW_RULE_STORE_TIMEOUT_MILLIS,
848 TimeUnit.MILLISECONDS,
849 null);
Jon Hallfa132292017-10-24 11:11:24 -0700850 }
851
Jordan Halterman281dbf32018-06-15 17:46:28 -0700852 /**
853 * Updates the given flow rule.
854 *
855 * @param rule the rule to update
856 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800857 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700858 Tools.futureGetOrElse(
859 getFlowTable(rule.deviceId()).update(rule),
860 FLOW_RULE_STORE_TIMEOUT_MILLIS,
861 TimeUnit.MILLISECONDS,
862 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800863 }
864
Jordan Halterman281dbf32018-06-15 17:46:28 -0700865 /**
866 * Applies the given update function to the rule.
867 *
868 * @param function the update function to apply
869 * @return a future to be completed with the update event or {@code null} if the rule was not updated
870 */
871 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
872 return Tools.futureGetOrElse(
873 getFlowTable(rule.deviceId()).update(rule, function),
874 FLOW_RULE_STORE_TIMEOUT_MILLIS,
875 TimeUnit.MILLISECONDS,
876 null);
877 }
878
879 /**
880 * Removes the given flow rule.
881 *
882 * @param rule the rule to remove
883 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800884 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700885 return Tools.futureGetOrElse(
886 getFlowTable(rule.deviceId()).remove(rule),
887 FLOW_RULE_STORE_TIMEOUT_MILLIS,
888 TimeUnit.MILLISECONDS,
889 null);
Jon Hallfa132292017-10-24 11:11:24 -0700890 }
891
Jordan Halterman281dbf32018-06-15 17:46:28 -0700892 /**
893 * Purges flow rules for the given device.
894 *
895 * @param deviceId the device for which to purge flow rules
896 */
Jon Hallfa132292017-10-24 11:11:24 -0700897 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700898 // If the device is still present in the store, purge the underlying DeviceFlowTable.
899 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
900 if (deviceService.getDevice(deviceId) != null) {
901 DeviceFlowTable flowTable = flowTables.get(deviceId);
902 if (flowTable != null) {
903 flowTable.purge();
904 }
905 } else {
906 DeviceFlowTable flowTable = flowTables.remove(deviceId);
907 if (flowTable != null) {
908 flowTable.close();
909 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700910 }
Jon Hallfa132292017-10-24 11:11:24 -0700911 }
912
Jordan Halterman281dbf32018-06-15 17:46:28 -0700913 /**
Daniele Moro43ac2892021-07-15 17:02:59 +0200914 * Purges flow rules for the given device and application id.
915 *
916 * @param deviceId the device for which to purge flow rules
917 * @param appId the application id for with to purge flow rules
918 * @return true if purge is successful, false otherwise
919 */
920 public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
921 DeviceFlowTable flowTable = flowTables.get(deviceId);
922 if (flowTable != null) {
923 // flowTable.purge() returns a CompletableFuture<Void>, we want
924 // to return true when the completable future returns correctly
925 // within the timeout, otherwise return false.
926 try {
927 // Use higher timeout, purge(appId) may require more time
928 // than normal operations because it's applying the purge
929 // operation on every single flow table bucket.
930 flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
931 return true;
932 } catch (InterruptedException e) {
933 Thread.currentThread().interrupt();
934 } catch (ExecutionException | TimeoutException ignored) {
935 }
936 }
937 return false;
938 }
939
940 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700941 * Purges all flow rules from the table.
942 */
Jon Hallfa132292017-10-24 11:11:24 -0700943 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700944 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
945 while (iterator.hasNext()) {
946 iterator.next().close();
947 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700948 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700949 }
Jon Hallfa132292017-10-24 11:11:24 -0700950 }
951
952 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700953 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700954 deviceTableStats.put(deviceId, tableStats);
955 return null;
956 }
957
958 @Override
959 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
960 NodeId master = mastershipService.getMasterFor(deviceId);
961
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100962 if (master == null && deviceService.isAvailable(deviceId)) {
pierventre7768d7f2022-01-04 19:08:46 +0100963 log.warn("Failed to getTableStats: No master for {}", deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700964 return Collections.emptyList();
965 }
966
967 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
968 if (tableStats == null) {
969 return Collections.emptyList();
970 }
971 return ImmutableList.copyOf(tableStats);
972 }
973
974 @Override
975 public long getActiveFlowRuleCount(DeviceId deviceId) {
976 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700977 .mapToLong(TableStatisticsEntry::activeFlowEntries)
978 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700979 }
980
981 private class InternalTableStatsListener
982 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
983 @Override
984 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700985 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700986 //TODO: Generate an event to listeners (do we need?)
987 }
988 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700989
990 /**
991 * Device lifecycle manager implementation.
992 */
993 private final class InternalLifecycleManager
994 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
995 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
996
997 private final DeviceId deviceId;
998
999 private volatile DeviceReplicaInfo replicaInfo;
1000
1001 InternalLifecycleManager(DeviceId deviceId) {
1002 this.deviceId = deviceId;
1003 replicaInfoManager.addListener(this);
1004 mastershipTermLifecycles.addListener(this);
1005 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
1006 }
1007
1008 @Override
1009 public DeviceReplicaInfo getReplicaInfo() {
1010 return replicaInfo;
1011 }
1012
1013 @Override
1014 public void activate(long term) {
1015 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
1016 if (replicaInfo != null && replicaInfo.term() == term) {
1017 mastershipTermLifecycles.put(deviceId, term);
1018 }
1019 }
1020
1021 @Override
1022 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -07001023 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -07001024 onReplicaInfoChange(event.replicaInfo());
1025 }
1026 }
1027
1028 @Override
1029 public void event(MapEvent<DeviceId, Long> event) {
1030 if (event.key().equals(deviceId) && event.newValue() != null) {
1031 onActivate(event.newValue().value());
1032 }
1033 }
1034
1035 /**
1036 * Handles a term activation event.
1037 *
1038 * @param term the term that was activated
1039 */
1040 private void onActivate(long term) {
1041 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
1042 if (replicaInfo != null && replicaInfo.term() == term) {
1043 NodeId master = replicaInfo.master().orElse(null);
1044 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001045 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001046 listenerRegistry.process(new LifecycleEvent(
1047 LifecycleEvent.Type.TERM_ACTIVE,
1048 new DeviceReplicaInfo(term, master, backups)));
1049 }
1050 }
1051
1052 /**
1053 * Handles a replica info change event.
1054 *
1055 * @param replicaInfo the updated replica info
1056 */
1057 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
1058 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
1059 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
1060 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
1061 if (oldReplicaInfo != null) {
1062 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
1063 }
1064 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -07001065 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
1066 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001067 }
1068 }
1069
1070 /**
1071 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
1072 *
1073 * @param replicaInfo the replica info to convert
1074 * @return the converted replica info
1075 */
1076 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
1077 NodeId master = replicaInfo.master().orElse(null);
1078 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001079 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001080 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
1081 }
1082
1083 @Override
1084 public void close() {
1085 replicaInfoManager.removeListener(this);
1086 mastershipTermLifecycles.removeListener(this);
1087 }
1088 }
Jordan Haltermanb81fdc12019-03-04 18:12:20 -08001089
1090 private static class CountMessage {
1091 private final DeviceId deviceId;
1092 private final FlowEntryState state;
1093
1094 CountMessage(DeviceId deviceId, FlowEntryState state) {
1095 this.deviceId = deviceId;
1096 this.state = state;
1097 }
1098 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001099}