blob: 0a962521e383347eb9f81abdfc7ee6db6cdbbef1 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070018import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34import java.util.stream.StreamSupport;
35
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
37import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070038import com.google.common.collect.Streams;
Jordan Haltermanb81fdc12019-03-04 18:12:20 -080039import org.apache.commons.lang3.tuple.ImmutablePair;
40import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.onlab.util.KryoNamespace;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080042import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070043import org.onlab.util.Tools;
44import org.onosproject.cfg.ComponentConfigService;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070049import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070050import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070052import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070054import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.flow.CompletedBatchOperation;
56import org.onosproject.net.flow.DefaultFlowEntry;
pierventre019b07292021-05-21 12:39:09 +020057import org.onosproject.net.flow.DefaultFlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070058import org.onosproject.net.flow.FlowEntry;
59import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070060import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070061import org.onosproject.net.flow.FlowRuleEvent;
62import org.onosproject.net.flow.FlowRuleEvent.Type;
63import org.onosproject.net.flow.FlowRuleService;
64import org.onosproject.net.flow.FlowRuleStore;
65import org.onosproject.net.flow.FlowRuleStoreDelegate;
66import org.onosproject.net.flow.StoredFlowEntry;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070067import org.onosproject.net.flow.FlowRuleStoreException;
Jon Hallfa132292017-10-24 11:11:24 -070068import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070069import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
70import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
73import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070074import org.onosproject.persistence.PersistenceService;
75import org.onosproject.store.AbstractStore;
76import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
77import org.onosproject.store.cluster.messaging.ClusterMessage;
78import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070079import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070080import org.onosproject.store.flow.ReplicaInfoEvent;
81import org.onosproject.store.flow.ReplicaInfoEventListener;
82import org.onosproject.store.flow.ReplicaInfoService;
83import org.onosproject.store.impl.MastershipBasedTimestamp;
84import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070085import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070086import org.onosproject.store.service.EventuallyConsistentMap;
87import org.onosproject.store.service.EventuallyConsistentMapEvent;
88import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070089import org.onosproject.store.service.MapEvent;
90import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070091import org.onosproject.store.service.Serializer;
92import org.onosproject.store.service.StorageService;
93import org.onosproject.store.service.WallClockTimestamp;
94import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095import org.osgi.service.component.annotations.Activate;
96import org.osgi.service.component.annotations.Component;
97import org.osgi.service.component.annotations.Deactivate;
98import org.osgi.service.component.annotations.Modified;
99import org.osgi.service.component.annotations.Reference;
100import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -0700101import org.slf4j.Logger;
102
Jon Hallfa132292017-10-24 11:11:24 -0700103import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800104import static java.lang.Math.max;
105import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700106import static org.onlab.util.Tools.get;
107import static org.onlab.util.Tools.groupedThreads;
108import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700111import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700112import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
113import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
114import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
115import static org.slf4j.LoggerFactory.getLogger;
116
Ray Milkeyb5646e62018-10-16 11:42:18 -0700117import static org.onosproject.store.OsgiPropertyConstants.*;
118
Jon Hallfa132292017-10-24 11:11:24 -0700119/**
120 * Manages inventory of flow rules using a distributed state management protocol.
121 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700122@Component(
123 immediate = true,
124 service = FlowRuleStore.class,
125 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700126 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
127 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
128 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
129 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
130 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700131 }
132)
Jon Hallfa132292017-10-24 11:11:24 -0700133public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700134 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
135 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700136
137 private final Logger log = getLogger(getClass());
138
Jon Hallfa132292017-10-24 11:11:24 -0700139 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jordan Halterman43300782019-05-21 11:27:50 -0700140 private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
Jon Hallfa132292017-10-24 11:11:24 -0700141
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700142 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700143 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700144
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700145 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700146 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700147
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700148 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700149 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700150
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700151 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700152 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700153
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700154 /** Max number of backup copies for each device. */
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100155 protected static volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700156
157 private InternalFlowTable flowTable = new InternalFlowTable();
158
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700160 protected ReplicaInfoService replicaInfoManager;
161
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700163 protected ClusterCommunicationService clusterCommunicator;
164
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700166 protected ClusterService clusterService;
167
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700169 protected DeviceService deviceService;
170
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700172 protected CoreService coreService;
173
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700175 protected ComponentConfigService configService;
176
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700178 protected MastershipService mastershipService;
179
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700181 protected PersistenceService persistenceService;
182
183 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
184 private ExecutorService messageHandlingExecutor;
185 private ExecutorService eventHandler;
186
Ray Milkeyd1092d62019-04-02 09:12:00 -0700187 private ScheduledExecutorService backupScheduler;
188 private ExecutorService backupExecutor;
Jon Hallfa132292017-10-24 11:11:24 -0700189
190 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
191 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700192 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700193
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700194 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700195 protected StorageService storageService;
196
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700197 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
198 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700199 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700200 .register(FlowBucket.class)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800201 .register(ImmutablePair.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700202 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700203
204 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700205 .register(KryoNamespaces.API)
206 .register(BucketId.class)
207 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700208
Jordan Halterman281dbf32018-06-15 17:46:28 -0700209 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700210
211 private IdGenerator idGenerator;
212 private NodeId local;
213
214 @Activate
215 public void activate(ComponentContext context) {
216 configService.registerProperties(getClass());
217
Ray Milkeyd1092d62019-04-02 09:12:00 -0700218 backupScheduler = Executors.newSingleThreadScheduledExecutor(
219 groupedThreads("onos/flow", "backup-scheduler", log));
220 backupExecutor = Executors.newFixedThreadPool(
221 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
222 groupedThreads("onos/flow", "backup-%d", log));
223
Jon Hallfa132292017-10-24 11:11:24 -0700224 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
225
226 local = clusterService.getLocalNode().id();
227
228 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700229 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700230 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700231 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700232
233 registerMessageHandlers(messageHandlingExecutor);
234
Jordan Halterman281dbf32018-06-15 17:46:28 -0700235 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
236 .withName("onos-flow-store-terms")
237 .withSerializer(serializer)
238 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800239
Jon Hallfa132292017-10-24 11:11:24 -0700240 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700241 .withName("onos-flow-table-stats")
242 .withSerializer(serializerBuilder)
243 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
244 .withTimestampProvider((k, v) -> new WallClockTimestamp())
245 .withTombstonesDisabled()
246 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700247 deviceTableStats.addListener(tableStatsListener);
248
Jordan Halterman281dbf32018-06-15 17:46:28 -0700249 deviceService.addListener(flowTable);
250 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
251
Jon Hallfa132292017-10-24 11:11:24 -0700252 logConfig("Started");
253 }
254
255 @Deactivate
256 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700257 configService.unregisterProperties(getClass(), false);
258 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700259 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700260 deviceTableStats.removeListener(tableStatsListener);
261 deviceTableStats.destroy();
262 eventHandler.shutdownNow();
263 messageHandlingExecutor.shutdownNow();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800264 backupScheduler.shutdownNow();
265 backupExecutor.shutdownNow();
Ray Milkeyd1092d62019-04-02 09:12:00 -0700266 backupScheduler = null;
267 backupExecutor = null;
Jon Hallfa132292017-10-24 11:11:24 -0700268 log.info("Stopped");
269 }
270
271 @SuppressWarnings("rawtypes")
272 @Modified
273 public void modified(ComponentContext context) {
274 if (context == null) {
275 logConfig("Default config");
276 return;
277 }
278
279 Dictionary properties = context.getProperties();
280 int newPoolSize;
281 int newBackupPeriod;
282 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700283 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700284 try {
285 String s = get(properties, "msgHandlerPoolSize");
286 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
287
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700288 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700289 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
290
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700291 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700292 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700293
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700294 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700295 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700296 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700297 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
298 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
299 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
300 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700301 }
302
Jon Hallfa132292017-10-24 11:11:24 -0700303 if (newBackupPeriod != backupPeriod) {
304 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700305 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700306 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700307
308 if (newAntiEntropyPeriod != antiEntropyPeriod) {
309 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700310 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700311 }
312
Jon Hallfa132292017-10-24 11:11:24 -0700313 if (newPoolSize != msgHandlerPoolSize) {
314 msgHandlerPoolSize = newPoolSize;
315 ExecutorService oldMsgHandler = messageHandlingExecutor;
316 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700317 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700318
319 // replace previously registered handlers.
320 registerMessageHandlers(messageHandlingExecutor);
321 oldMsgHandler.shutdown();
322 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700323
Jon Hallfa132292017-10-24 11:11:24 -0700324 if (backupCount != newBackupCount) {
325 backupCount = newBackupCount;
326 }
327 logConfig("Reconfigured");
328 }
329
330 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700331 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
332 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700333 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700334 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700335 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800336 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
337 GET_DEVICE_FLOW_COUNT,
338 serializer::decode,
339 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
340 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700341 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700342 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700343 }
344
345 private void unregisterMessageHandlers() {
346 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700347 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700348 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
349 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
350 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
351 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
352 }
353
354 private void logConfig(String prefix) {
355 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700356 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700357 }
358
Jon Hallfa132292017-10-24 11:11:24 -0700359 @Override
360 public int getFlowRuleCount() {
361 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700362 .mapToInt(device -> getFlowRuleCount(device.id()))
363 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800364 }
365
366 @Override
367 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800368 return getFlowRuleCount(deviceId, null);
369 }
370
371 @Override
372 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700373 NodeId master = mastershipService.getMasterFor(deviceId);
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100374 if (master == null && deviceService.isAvailable(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700375 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
376 return 0;
377 }
378
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100379 if (Objects.equals(local, master) || master == null) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800380 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700381 }
382
383 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
384 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800385 Pair.of(deviceId, state),
386 GET_DEVICE_FLOW_COUNT,
387 serializer::encode,
388 serializer::decode,
389 master),
390 FLOW_RULE_STORE_TIMEOUT_MILLIS,
391 TimeUnit.MILLISECONDS,
392 0);
Jon Hallfa132292017-10-24 11:11:24 -0700393 }
394
395 @Override
396 public FlowEntry getFlowEntry(FlowRule rule) {
397 NodeId master = mastershipService.getMasterFor(rule.deviceId());
398
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100399 if (master == null && deviceService.isAvailable(rule.deviceId())) {
Jon Hallfa132292017-10-24 11:11:24 -0700400 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
401 return null;
402 }
403
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100404 if (Objects.equals(local, master) || master == null) {
Jon Hallfa132292017-10-24 11:11:24 -0700405 return flowTable.getFlowEntry(rule);
406 }
407
408 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700409 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700410
411 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700412 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
413 serializer::encode,
414 serializer::decode,
415 master),
416 FLOW_RULE_STORE_TIMEOUT_MILLIS,
417 TimeUnit.MILLISECONDS,
418 null);
Jon Hallfa132292017-10-24 11:11:24 -0700419 }
420
421 @Override
422 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700423 return flowTable.getFlowEntries(deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700424 }
425
426 @Override
427 public void storeFlowRule(FlowRule rule) {
428 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700429 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
430 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700431 }
432
433 @Override
434 public void storeBatch(FlowRuleBatchOperation operation) {
435 if (operation.getOperations().isEmpty()) {
436 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700437 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
438 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700439 return;
440 }
441
442 DeviceId deviceId = operation.deviceId();
443 NodeId master = mastershipService.getMasterFor(deviceId);
444
445 if (master == null) {
446 log.warn("No master for {} ", deviceId);
447
Jordan Halterman281dbf32018-06-15 17:46:28 -0700448 Set<FlowRule> allFailures = operation.getOperations()
449 .stream()
450 .map(op -> op.target())
451 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700452 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700453 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
454 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700455 return;
456 }
457
458 if (Objects.equals(local, master)) {
459 storeBatchInternal(operation);
460 return;
461 }
462
463 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700464 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700465
466 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700467 APPLY_BATCH_FLOWS,
468 serializer::encode,
469 master)
470 .whenComplete((result, error) -> {
471 if (error != null) {
472 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700473
Jordan Halterman281dbf32018-06-15 17:46:28 -0700474 Set<FlowRule> allFailures = operation.getOperations()
475 .stream()
476 .map(op -> op.target())
477 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700478
Jordan Halterman281dbf32018-06-15 17:46:28 -0700479 notifyDelegate(FlowRuleBatchEvent.completed(
480 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
481 new CompletedBatchOperation(false, allFailures, deviceId)));
482 }
483 });
Jon Hallfa132292017-10-24 11:11:24 -0700484 }
485
486 private void storeBatchInternal(FlowRuleBatchOperation operation) {
487
488 final DeviceId did = operation.deviceId();
489 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
490 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
491 if (currentOps.isEmpty()) {
492 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700493 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
494 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700495 return;
496 }
497
498 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700499 FlowRuleBatchRequest(operation.id(),
500 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700501 }
502
503 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
504 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700505 op -> {
506 StoredFlowEntry entry;
507 switch (op.operator()) {
508 case ADD:
509 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800510 log.debug("Adding flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700511 flowTable.add(entry);
512 return op;
513 case MODIFY:
514 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800515 log.debug("Updating flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700516 flowTable.update(entry);
517 return op;
518 case REMOVE:
519 return flowTable.update(op.target(), stored -> {
520 stored.setState(FlowEntryState.PENDING_REMOVE);
521 log.debug("Setting state of rule to pending remove: {}", stored);
pierventre019b07292021-05-21 12:39:09 +0200522 // Using the stored value instead of op to allow the removal
523 // of flows that do not specify actions or have empty treatment
524 return new FlowRuleBatchEntry(op.operator(), new DefaultFlowRule(stored));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700525 });
526 default:
527 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700528 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700529 return null;
530 }
Jon Hallfa132292017-10-24 11:11:24 -0700531 ).filter(Objects::nonNull).collect(Collectors.toSet());
532 }
533
534 @Override
535 public void deleteFlowRule(FlowRule rule) {
536 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700537 new FlowRuleBatchOperation(
538 Collections.singletonList(
539 new FlowRuleBatchEntry(
540 FlowRuleOperation.REMOVE,
541 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700542 }
543
544 @Override
545 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
546 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700547 return flowTable.update(rule, stored -> {
548 if (stored.state() == FlowEntryState.PENDING_ADD) {
549 stored.setState(FlowEntryState.PENDING_ADD);
550 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
551 }
552 return null;
553 });
Jon Hallfa132292017-10-24 11:11:24 -0700554 }
555 return null;
556 }
557
558 @Override
559 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
560 NodeId master = mastershipService.getMasterFor(rule.deviceId());
561 if (Objects.equals(local, master)) {
562 return addOrUpdateFlowRuleInternal(rule);
563 }
564
565 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700566 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700567 return null;
568 }
569
570 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700571 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700572 stored.setBytes(rule.bytes());
573 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
574 stored.setLiveType(rule.liveType());
575 stored.setPackets(rule.packets());
576 stored.setLastSeen();
577 if (stored.state() == FlowEntryState.PENDING_ADD) {
578 stored.setState(FlowEntryState.ADDED);
579 return new FlowRuleEvent(Type.RULE_ADDED, rule);
580 }
581 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700582 });
583 if (event != null) {
584 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700585 }
586
587 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
588 // TODO: also update backup if the behavior is correct.
589 flowTable.add(rule);
590 return null;
591 }
592
593 @Override
594 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
595 final DeviceId deviceId = rule.deviceId();
596 NodeId master = mastershipService.getMasterFor(deviceId);
597
598 if (Objects.equals(local, master)) {
599 // bypass and handle it locally
600 return removeFlowRuleInternal(rule);
601 }
602
603 if (master == null) {
604 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
605 // TODO: revisit if this should be null (="no-op") or Exception
606 return null;
607 }
608
609 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700610 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700611
Jordan Halterman281dbf32018-06-15 17:46:28 -0700612 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
613 rule,
614 REMOVE_FLOW_ENTRY,
615 serializer::encode,
616 serializer::decode,
617 master),
618 FLOW_RULE_STORE_TIMEOUT_MILLIS,
619 TimeUnit.MILLISECONDS,
620 null);
Jon Hallfa132292017-10-24 11:11:24 -0700621 }
622
623 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700624 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800625 final FlowEntry removed = flowTable.remove(rule);
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800626 log.debug("Removed flow rule: {}", removed);
Jon Hallfa132292017-10-24 11:11:24 -0700627 // rule may be partial rule that is missing treatment, we should use rule from store instead
628 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
629 }
630
631 @Override
632 public void purgeFlowRule(DeviceId deviceId) {
633 flowTable.purgeFlowRule(deviceId);
634 }
635
636 @Override
637 public void purgeFlowRules() {
638 flowTable.purgeFlowRules();
639 }
640
641 @Override
642 public void batchOperationComplete(FlowRuleBatchEvent event) {
643 //FIXME: need a per device pending response
644 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
645 if (nodeId == null) {
646 notifyDelegate(event);
647 } else {
648 // TODO check unicast return value
649 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
650 //error log: log.warn("Failed to respond to peer for batch operation result");
651 }
652 }
653
654 private final class OnStoreBatch implements ClusterMessageHandler {
655
656 @Override
657 public void handle(final ClusterMessage message) {
658 FlowRuleBatchOperation operation = serializer.decode(message.payload());
659 log.debug("received batch request {}", operation);
660
661 final DeviceId deviceId = operation.deviceId();
662 NodeId master = mastershipService.getMasterFor(deviceId);
663 if (!Objects.equals(local, master)) {
664 Set<FlowRule> failures = new HashSet<>(operation.size());
665 for (FlowRuleBatchEntry op : operation.getOperations()) {
666 failures.add(op.target());
667 }
668 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
669 // This node is no longer the master, respond as all failed.
670 // TODO: we might want to wrap response in envelope
671 // to distinguish sw programming failure and hand over
672 // it make sense in the latter case to retry immediately.
673 message.respond(serializer.encode(allFailed));
674 return;
675 }
676
677 pendingResponses.put(operation.id(), message.sender());
678 storeBatchInternal(operation);
679 }
680 }
681
Jordan Halterman281dbf32018-06-15 17:46:28 -0700682 private class InternalFlowTable implements DeviceListener {
683 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700684
685 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700686 public void event(DeviceEvent event) {
687 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
688 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700689 }
690 }
691
Jordan Halterman281dbf32018-06-15 17:46:28 -0700692 /**
693 * Adds the given device to the flow table.
694 *
695 * @param deviceId the device to add to the table
696 */
697 public void addDevice(DeviceId deviceId) {
698 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
699 id,
700 clusterService,
701 clusterCommunicator,
702 new InternalLifecycleManager(id),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000703 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800704 backupScheduler,
705 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700706 backupPeriod,
707 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700708 }
709
Jordan Halterman281dbf32018-06-15 17:46:28 -0700710 /**
711 * Sets the flow table backup period.
712 *
713 * @param backupPeriod the flow table backup period
714 */
715 void setBackupPeriod(int backupPeriod) {
716 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700717 }
718
Jordan Halterman281dbf32018-06-15 17:46:28 -0700719 /**
720 * Sets the flow table anti-entropy period.
721 *
722 * @param antiEntropyPeriod the flow table anti-entropy period
723 */
724 void setAntiEntropyPeriod(int antiEntropyPeriod) {
725 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700726 }
727
Jordan Halterman281dbf32018-06-15 17:46:28 -0700728 /**
729 * Returns the flow table for a specific device.
730 *
731 * @param deviceId the device identifier
732 * @return the flow table for the given device
733 */
734 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
735 DeviceFlowTable flowTable = flowTables.get(deviceId);
736 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
737 deviceId,
738 clusterService,
739 clusterCommunicator,
740 new InternalLifecycleManager(deviceId),
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000741 deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800742 backupScheduler,
743 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700744 backupPeriod,
745 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700746 }
747
Jordan Halterman281dbf32018-06-15 17:46:28 -0700748 /**
749 * Returns the flow rule count for the given device.
750 *
751 * @param deviceId the device for which to return the flow rule count
752 * @return the flow rule count for the given device
753 */
754 public int getFlowRuleCount(DeviceId deviceId) {
755 return getFlowTable(deviceId).count();
756 }
757
758 /**
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800759 * Returns the count of flow rules in the given state for the given device.
760 *
761 * @param deviceId the device for which to return the flow rule count
762 * @return the flow rule count for the given device
763 */
764 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
765 if (state == null) {
766 return getFlowRuleCount(deviceId);
767 }
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700768 return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800769 .filter(rule -> rule.state() == state)
770 .count();
771 }
772
773 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700774 * Returns the flow entry for the given rule.
775 *
776 * @param rule the rule for which to return the flow entry
777 * @return the flow entry for the given rule
778 */
Jon Hallfa132292017-10-24 11:11:24 -0700779 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700780 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700781 }
782
Jordan Halterman281dbf32018-06-15 17:46:28 -0700783 /**
784 * Returns the set of flow entries for the given device.
785 *
786 * @param deviceId the device for which to lookup flow entries
787 * @return the set of flow entries for the given device
788 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700789 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
790 try {
791 return getFlowTable(deviceId).getFlowEntries()
Jordan Halterman43300782019-05-21 11:27:50 -0700792 .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700793 } catch (ExecutionException e) {
794 throw new FlowRuleStoreException(e.getCause());
795 } catch (TimeoutException e) {
796 throw new FlowRuleStoreException.Timeout();
797 } catch (InterruptedException e) {
798 throw new FlowRuleStoreException.Interrupted();
799 }
Jon Hallfa132292017-10-24 11:11:24 -0700800 }
801
Jordan Halterman281dbf32018-06-15 17:46:28 -0700802 /**
803 * Adds the given flow rule.
804 *
805 * @param rule the rule to add
806 */
Jon Hallfa132292017-10-24 11:11:24 -0700807 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700808 Tools.futureGetOrElse(
809 getFlowTable(rule.deviceId()).add(rule),
810 FLOW_RULE_STORE_TIMEOUT_MILLIS,
811 TimeUnit.MILLISECONDS,
812 null);
Jon Hallfa132292017-10-24 11:11:24 -0700813 }
814
Jordan Halterman281dbf32018-06-15 17:46:28 -0700815 /**
816 * Updates the given flow rule.
817 *
818 * @param rule the rule to update
819 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800820 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700821 Tools.futureGetOrElse(
822 getFlowTable(rule.deviceId()).update(rule),
823 FLOW_RULE_STORE_TIMEOUT_MILLIS,
824 TimeUnit.MILLISECONDS,
825 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800826 }
827
Jordan Halterman281dbf32018-06-15 17:46:28 -0700828 /**
829 * Applies the given update function to the rule.
830 *
831 * @param function the update function to apply
832 * @return a future to be completed with the update event or {@code null} if the rule was not updated
833 */
834 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
835 return Tools.futureGetOrElse(
836 getFlowTable(rule.deviceId()).update(rule, function),
837 FLOW_RULE_STORE_TIMEOUT_MILLIS,
838 TimeUnit.MILLISECONDS,
839 null);
840 }
841
842 /**
843 * Removes the given flow rule.
844 *
845 * @param rule the rule to remove
846 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800847 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700848 return Tools.futureGetOrElse(
849 getFlowTable(rule.deviceId()).remove(rule),
850 FLOW_RULE_STORE_TIMEOUT_MILLIS,
851 TimeUnit.MILLISECONDS,
852 null);
Jon Hallfa132292017-10-24 11:11:24 -0700853 }
854
Jordan Halterman281dbf32018-06-15 17:46:28 -0700855 /**
856 * Purges flow rules for the given device.
857 *
858 * @param deviceId the device for which to purge flow rules
859 */
Jon Hallfa132292017-10-24 11:11:24 -0700860 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700861 // If the device is still present in the store, purge the underlying DeviceFlowTable.
862 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
863 if (deviceService.getDevice(deviceId) != null) {
864 DeviceFlowTable flowTable = flowTables.get(deviceId);
865 if (flowTable != null) {
866 flowTable.purge();
867 }
868 } else {
869 DeviceFlowTable flowTable = flowTables.remove(deviceId);
870 if (flowTable != null) {
871 flowTable.close();
872 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700873 }
Jon Hallfa132292017-10-24 11:11:24 -0700874 }
875
Jordan Halterman281dbf32018-06-15 17:46:28 -0700876 /**
877 * Purges all flow rules from the table.
878 */
Jon Hallfa132292017-10-24 11:11:24 -0700879 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700880 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
881 while (iterator.hasNext()) {
882 iterator.next().close();
883 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700884 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700885 }
Jon Hallfa132292017-10-24 11:11:24 -0700886 }
887
888 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700889 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700890 deviceTableStats.put(deviceId, tableStats);
891 return null;
892 }
893
894 @Override
895 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
896 NodeId master = mastershipService.getMasterFor(deviceId);
897
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100898 if (master == null && deviceService.isAvailable(deviceId)) {
Jon Hallfa132292017-10-24 11:11:24 -0700899 log.debug("Failed to getTableStats: No master for {}", deviceId);
900 return Collections.emptyList();
901 }
902
903 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
904 if (tableStats == null) {
905 return Collections.emptyList();
906 }
907 return ImmutableList.copyOf(tableStats);
908 }
909
910 @Override
911 public long getActiveFlowRuleCount(DeviceId deviceId) {
912 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700913 .mapToLong(TableStatisticsEntry::activeFlowEntries)
914 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700915 }
916
917 private class InternalTableStatsListener
918 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
919 @Override
920 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700921 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700922 //TODO: Generate an event to listeners (do we need?)
923 }
924 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700925
926 /**
927 * Device lifecycle manager implementation.
928 */
929 private final class InternalLifecycleManager
930 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
931 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
932
933 private final DeviceId deviceId;
934
935 private volatile DeviceReplicaInfo replicaInfo;
936
937 InternalLifecycleManager(DeviceId deviceId) {
938 this.deviceId = deviceId;
939 replicaInfoManager.addListener(this);
940 mastershipTermLifecycles.addListener(this);
941 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
942 }
943
944 @Override
945 public DeviceReplicaInfo getReplicaInfo() {
946 return replicaInfo;
947 }
948
949 @Override
950 public void activate(long term) {
951 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
952 if (replicaInfo != null && replicaInfo.term() == term) {
953 mastershipTermLifecycles.put(deviceId, term);
954 }
955 }
956
957 @Override
958 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700959 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700960 onReplicaInfoChange(event.replicaInfo());
961 }
962 }
963
964 @Override
965 public void event(MapEvent<DeviceId, Long> event) {
966 if (event.key().equals(deviceId) && event.newValue() != null) {
967 onActivate(event.newValue().value());
968 }
969 }
970
971 /**
972 * Handles a term activation event.
973 *
974 * @param term the term that was activated
975 */
976 private void onActivate(long term) {
977 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
978 if (replicaInfo != null && replicaInfo.term() == term) {
979 NodeId master = replicaInfo.master().orElse(null);
980 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800981 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700982 listenerRegistry.process(new LifecycleEvent(
983 LifecycleEvent.Type.TERM_ACTIVE,
984 new DeviceReplicaInfo(term, master, backups)));
985 }
986 }
987
988 /**
989 * Handles a replica info change event.
990 *
991 * @param replicaInfo the updated replica info
992 */
993 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
994 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
995 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
996 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
997 if (oldReplicaInfo != null) {
998 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
999 }
1000 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -07001001 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
1002 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001003 }
1004 }
1005
1006 /**
1007 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
1008 *
1009 * @param replicaInfo the replica info to convert
1010 * @return the converted replica info
1011 */
1012 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
1013 NodeId master = replicaInfo.master().orElse(null);
1014 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001015 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001016 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
1017 }
1018
1019 @Override
1020 public void close() {
1021 replicaInfoManager.removeListener(this);
1022 mastershipTermLifecycles.removeListener(this);
1023 }
1024 }
Jordan Haltermanb81fdc12019-03-04 18:12:20 -08001025
1026 private static class CountMessage {
1027 private final DeviceId deviceId;
1028 private final FlowEntryState state;
1029
1030 CountMessage(DeviceId deviceId, FlowEntryState state) {
1031 this.deviceId = deviceId;
1032 this.state = state;
1033 }
1034 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001035}