blob: 04c6ecf0dada9f80324197553862cebfd84d70ab [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070020import com.google.common.collect.Streams;
Jordan Haltermanb81fdc12019-03-04 18:12:20 -080021import org.apache.commons.lang3.tuple.ImmutablePair;
22import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070023import org.onlab.util.KryoNamespace;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080024import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070025import org.onlab.util.Tools;
26import org.onosproject.cfg.ComponentConfigService;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.core.CoreService;
30import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070031import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070032import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070034import org.onosproject.net.device.DeviceEvent;
35import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070036import org.onosproject.net.device.DeviceService;
37import org.onosproject.net.flow.CompletedBatchOperation;
38import org.onosproject.net.flow.DefaultFlowEntry;
39import org.onosproject.net.flow.FlowEntry;
40import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070042import org.onosproject.net.flow.FlowRuleEvent;
43import org.onosproject.net.flow.FlowRuleEvent.Type;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.FlowRuleStore;
46import org.onosproject.net.flow.FlowRuleStoreDelegate;
47import org.onosproject.net.flow.StoredFlowEntry;
48import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070049import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
50import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
52import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
53import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070054import org.onosproject.persistence.PersistenceService;
55import org.onosproject.store.AbstractStore;
56import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57import org.onosproject.store.cluster.messaging.ClusterMessage;
58import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070059import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070060import org.onosproject.store.flow.ReplicaInfoEvent;
61import org.onosproject.store.flow.ReplicaInfoEventListener;
62import org.onosproject.store.flow.ReplicaInfoService;
63import org.onosproject.store.impl.MastershipBasedTimestamp;
64import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070065import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070066import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapEvent;
68import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070069import org.onosproject.store.service.MapEvent;
70import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070071import org.onosproject.store.service.Serializer;
72import org.onosproject.store.service.StorageService;
73import org.onosproject.store.service.WallClockTimestamp;
74import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075import org.osgi.service.component.annotations.Activate;
76import org.osgi.service.component.annotations.Component;
77import org.osgi.service.component.annotations.Deactivate;
78import org.osgi.service.component.annotations.Modified;
79import org.osgi.service.component.annotations.Reference;
80import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -070081import org.slf4j.Logger;
82
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083import java.util.Collections;
84import java.util.Dictionary;
85import java.util.HashSet;
86import java.util.Iterator;
87import java.util.List;
88import java.util.Map;
89import java.util.Objects;
90import java.util.Set;
91import java.util.concurrent.ExecutorService;
92import java.util.concurrent.Executors;
93import java.util.concurrent.ScheduledExecutorService;
94import java.util.concurrent.TimeUnit;
95import java.util.function.Function;
96import java.util.stream.Collectors;
97
Jon Hallfa132292017-10-24 11:11:24 -070098import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080099import static java.lang.Math.max;
100import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700101import static org.onlab.util.Tools.get;
102import static org.onlab.util.Tools.groupedThreads;
103import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700106import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700107import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
108import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
111import static org.slf4j.LoggerFactory.getLogger;
112
Ray Milkeyb5646e62018-10-16 11:42:18 -0700113import static org.onosproject.store.OsgiPropertyConstants.*;
114
Jon Hallfa132292017-10-24 11:11:24 -0700115/**
116 * Manages inventory of flow rules using a distributed state management protocol.
117 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700118@Component(
119 immediate = true,
120 service = FlowRuleStore.class,
121 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700122 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
123 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
124 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
125 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
126 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700127 }
128)
Jon Hallfa132292017-10-24 11:11:24 -0700129public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700130 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
131 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700132
133 private final Logger log = getLogger(getClass());
134
Jon Hallfa132292017-10-24 11:11:24 -0700135 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700136
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700137 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700138 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700139
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700140 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700141 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700142
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700143 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700144 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700145
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700146 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700147 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700148
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700149 /** Max number of backup copies for each device. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700150 private volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700151
152 private InternalFlowTable flowTable = new InternalFlowTable();
153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700155 protected ReplicaInfoService replicaInfoManager;
156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700158 protected ClusterCommunicationService clusterCommunicator;
159
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700161 protected ClusterService clusterService;
162
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700164 protected DeviceService deviceService;
165
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700167 protected CoreService coreService;
168
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700170 protected ComponentConfigService configService;
171
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700173 protected MastershipService mastershipService;
174
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700176 protected PersistenceService persistenceService;
177
178 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
179 private ExecutorService messageHandlingExecutor;
180 private ExecutorService eventHandler;
181
Ray Milkeyd1092d62019-04-02 09:12:00 -0700182 private ScheduledExecutorService backupScheduler;
183 private ExecutorService backupExecutor;
Jon Hallfa132292017-10-24 11:11:24 -0700184
185 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
186 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700187 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700188
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700189 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700190 protected StorageService storageService;
191
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700192 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
193 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700194 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700195 .register(FlowBucket.class)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800196 .register(ImmutablePair.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700197 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700198
199 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700200 .register(KryoNamespaces.API)
201 .register(BucketId.class)
202 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700203
Jordan Halterman281dbf32018-06-15 17:46:28 -0700204 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700205
206 private IdGenerator idGenerator;
207 private NodeId local;
208
209 @Activate
210 public void activate(ComponentContext context) {
211 configService.registerProperties(getClass());
212
Ray Milkeyd1092d62019-04-02 09:12:00 -0700213 backupScheduler = Executors.newSingleThreadScheduledExecutor(
214 groupedThreads("onos/flow", "backup-scheduler", log));
215 backupExecutor = Executors.newFixedThreadPool(
216 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
217 groupedThreads("onos/flow", "backup-%d", log));
218
Jon Hallfa132292017-10-24 11:11:24 -0700219 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
220
221 local = clusterService.getLocalNode().id();
222
223 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700224 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700225 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700226 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700227
228 registerMessageHandlers(messageHandlingExecutor);
229
Jordan Halterman281dbf32018-06-15 17:46:28 -0700230 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
231 .withName("onos-flow-store-terms")
232 .withSerializer(serializer)
233 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800234
Jon Hallfa132292017-10-24 11:11:24 -0700235 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700236 .withName("onos-flow-table-stats")
237 .withSerializer(serializerBuilder)
238 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
239 .withTimestampProvider((k, v) -> new WallClockTimestamp())
240 .withTombstonesDisabled()
241 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700242 deviceTableStats.addListener(tableStatsListener);
243
Jordan Halterman281dbf32018-06-15 17:46:28 -0700244 deviceService.addListener(flowTable);
245 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
246
Jon Hallfa132292017-10-24 11:11:24 -0700247 logConfig("Started");
248 }
249
250 @Deactivate
251 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700252 configService.unregisterProperties(getClass(), false);
253 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700254 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700255 deviceTableStats.removeListener(tableStatsListener);
256 deviceTableStats.destroy();
257 eventHandler.shutdownNow();
258 messageHandlingExecutor.shutdownNow();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800259 backupScheduler.shutdownNow();
260 backupExecutor.shutdownNow();
Ray Milkeyd1092d62019-04-02 09:12:00 -0700261 backupScheduler = null;
262 backupExecutor = null;
Jon Hallfa132292017-10-24 11:11:24 -0700263 log.info("Stopped");
264 }
265
266 @SuppressWarnings("rawtypes")
267 @Modified
268 public void modified(ComponentContext context) {
269 if (context == null) {
270 logConfig("Default config");
271 return;
272 }
273
274 Dictionary properties = context.getProperties();
275 int newPoolSize;
276 int newBackupPeriod;
277 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700278 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700279 try {
280 String s = get(properties, "msgHandlerPoolSize");
281 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
282
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700283 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700284 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
285
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700286 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700287 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700288
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700289 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700290 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700291 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700292 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
293 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
294 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
295 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700296 }
297
Jon Hallfa132292017-10-24 11:11:24 -0700298 if (newBackupPeriod != backupPeriod) {
299 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700300 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700301 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700302
303 if (newAntiEntropyPeriod != antiEntropyPeriod) {
304 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700305 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700306 }
307
Jon Hallfa132292017-10-24 11:11:24 -0700308 if (newPoolSize != msgHandlerPoolSize) {
309 msgHandlerPoolSize = newPoolSize;
310 ExecutorService oldMsgHandler = messageHandlingExecutor;
311 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700312 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700313
314 // replace previously registered handlers.
315 registerMessageHandlers(messageHandlingExecutor);
316 oldMsgHandler.shutdown();
317 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700318
Jon Hallfa132292017-10-24 11:11:24 -0700319 if (backupCount != newBackupCount) {
320 backupCount = newBackupCount;
321 }
322 logConfig("Reconfigured");
323 }
324
325 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700326 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
327 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700328 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700329 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700330 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700331 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700332 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800333 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
334 GET_DEVICE_FLOW_COUNT,
335 serializer::decode,
336 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
337 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700338 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700339 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700340 }
341
342 private void unregisterMessageHandlers() {
343 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
344 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700345 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700346 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
347 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
348 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
349 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
350 }
351
352 private void logConfig(String prefix) {
353 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700354 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700355 }
356
Jon Hallfa132292017-10-24 11:11:24 -0700357 @Override
358 public int getFlowRuleCount() {
359 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700360 .mapToInt(device -> getFlowRuleCount(device.id()))
361 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800362 }
363
364 @Override
365 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800366 return getFlowRuleCount(deviceId, null);
367 }
368
369 @Override
370 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700371 NodeId master = mastershipService.getMasterFor(deviceId);
372 if (master == null) {
373 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
374 return 0;
375 }
376
377 if (Objects.equals(local, master)) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800378 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700379 }
380
381 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
382 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800383 Pair.of(deviceId, state),
384 GET_DEVICE_FLOW_COUNT,
385 serializer::encode,
386 serializer::decode,
387 master),
388 FLOW_RULE_STORE_TIMEOUT_MILLIS,
389 TimeUnit.MILLISECONDS,
390 0);
Jon Hallfa132292017-10-24 11:11:24 -0700391 }
392
393 @Override
394 public FlowEntry getFlowEntry(FlowRule rule) {
395 NodeId master = mastershipService.getMasterFor(rule.deviceId());
396
397 if (master == null) {
398 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
399 return null;
400 }
401
402 if (Objects.equals(local, master)) {
403 return flowTable.getFlowEntry(rule);
404 }
405
406 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700407 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700408
409 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700410 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
411 serializer::encode,
412 serializer::decode,
413 master),
414 FLOW_RULE_STORE_TIMEOUT_MILLIS,
415 TimeUnit.MILLISECONDS,
416 null);
Jon Hallfa132292017-10-24 11:11:24 -0700417 }
418
419 @Override
420 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
421 NodeId master = mastershipService.getMasterFor(deviceId);
422
423 if (master == null) {
424 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
425 return Collections.emptyList();
426 }
427
428 if (Objects.equals(local, master)) {
429 return flowTable.getFlowEntries(deviceId);
430 }
431
432 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700433 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700434
435 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700436 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
437 serializer::encode,
438 serializer::decode,
439 master),
440 FLOW_RULE_STORE_TIMEOUT_MILLIS,
441 TimeUnit.MILLISECONDS,
442 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700443 }
444
445 @Override
446 public void storeFlowRule(FlowRule rule) {
447 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700448 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
449 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700450 }
451
452 @Override
453 public void storeBatch(FlowRuleBatchOperation operation) {
454 if (operation.getOperations().isEmpty()) {
455 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700456 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
457 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700458 return;
459 }
460
461 DeviceId deviceId = operation.deviceId();
462 NodeId master = mastershipService.getMasterFor(deviceId);
463
464 if (master == null) {
465 log.warn("No master for {} ", deviceId);
466
Jordan Halterman281dbf32018-06-15 17:46:28 -0700467 Set<FlowRule> allFailures = operation.getOperations()
468 .stream()
469 .map(op -> op.target())
470 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700471 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700472 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
473 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700474 return;
475 }
476
477 if (Objects.equals(local, master)) {
478 storeBatchInternal(operation);
479 return;
480 }
481
482 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700483 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700484
485 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700486 APPLY_BATCH_FLOWS,
487 serializer::encode,
488 master)
489 .whenComplete((result, error) -> {
490 if (error != null) {
491 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700492
Jordan Halterman281dbf32018-06-15 17:46:28 -0700493 Set<FlowRule> allFailures = operation.getOperations()
494 .stream()
495 .map(op -> op.target())
496 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700497
Jordan Halterman281dbf32018-06-15 17:46:28 -0700498 notifyDelegate(FlowRuleBatchEvent.completed(
499 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
500 new CompletedBatchOperation(false, allFailures, deviceId)));
501 }
502 });
Jon Hallfa132292017-10-24 11:11:24 -0700503 }
504
505 private void storeBatchInternal(FlowRuleBatchOperation operation) {
506
507 final DeviceId did = operation.deviceId();
508 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
509 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
510 if (currentOps.isEmpty()) {
511 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700512 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
513 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700514 return;
515 }
516
517 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700518 FlowRuleBatchRequest(operation.id(),
519 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700520 }
521
522 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
523 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700524 op -> {
525 StoredFlowEntry entry;
526 switch (op.operator()) {
527 case ADD:
528 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800529 log.debug("Adding flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700530 flowTable.add(entry);
531 return op;
532 case MODIFY:
533 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800534 log.debug("Updating flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700535 flowTable.update(entry);
536 return op;
537 case REMOVE:
538 return flowTable.update(op.target(), stored -> {
539 stored.setState(FlowEntryState.PENDING_REMOVE);
540 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800541 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700542 });
543 default:
544 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700545 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700546 return null;
547 }
Jon Hallfa132292017-10-24 11:11:24 -0700548 ).filter(Objects::nonNull).collect(Collectors.toSet());
549 }
550
551 @Override
552 public void deleteFlowRule(FlowRule rule) {
553 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700554 new FlowRuleBatchOperation(
555 Collections.singletonList(
556 new FlowRuleBatchEntry(
557 FlowRuleOperation.REMOVE,
558 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700559 }
560
561 @Override
562 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
563 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700564 return flowTable.update(rule, stored -> {
565 if (stored.state() == FlowEntryState.PENDING_ADD) {
566 stored.setState(FlowEntryState.PENDING_ADD);
567 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
568 }
569 return null;
570 });
Jon Hallfa132292017-10-24 11:11:24 -0700571 }
572 return null;
573 }
574
575 @Override
576 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
577 NodeId master = mastershipService.getMasterFor(rule.deviceId());
578 if (Objects.equals(local, master)) {
579 return addOrUpdateFlowRuleInternal(rule);
580 }
581
582 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700583 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700584 return null;
585 }
586
587 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700588 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700589 stored.setBytes(rule.bytes());
590 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
591 stored.setLiveType(rule.liveType());
592 stored.setPackets(rule.packets());
593 stored.setLastSeen();
594 if (stored.state() == FlowEntryState.PENDING_ADD) {
595 stored.setState(FlowEntryState.ADDED);
596 return new FlowRuleEvent(Type.RULE_ADDED, rule);
597 }
598 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700599 });
600 if (event != null) {
601 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700602 }
603
604 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
605 // TODO: also update backup if the behavior is correct.
606 flowTable.add(rule);
607 return null;
608 }
609
610 @Override
611 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
612 final DeviceId deviceId = rule.deviceId();
613 NodeId master = mastershipService.getMasterFor(deviceId);
614
615 if (Objects.equals(local, master)) {
616 // bypass and handle it locally
617 return removeFlowRuleInternal(rule);
618 }
619
620 if (master == null) {
621 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
622 // TODO: revisit if this should be null (="no-op") or Exception
623 return null;
624 }
625
626 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700627 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700628
Jordan Halterman281dbf32018-06-15 17:46:28 -0700629 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
630 rule,
631 REMOVE_FLOW_ENTRY,
632 serializer::encode,
633 serializer::decode,
634 master),
635 FLOW_RULE_STORE_TIMEOUT_MILLIS,
636 TimeUnit.MILLISECONDS,
637 null);
Jon Hallfa132292017-10-24 11:11:24 -0700638 }
639
640 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700641 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800642 final FlowEntry removed = flowTable.remove(rule);
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800643 log.debug("Removed flow rule: {}", removed);
Jon Hallfa132292017-10-24 11:11:24 -0700644 // rule may be partial rule that is missing treatment, we should use rule from store instead
645 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
646 }
647
648 @Override
649 public void purgeFlowRule(DeviceId deviceId) {
650 flowTable.purgeFlowRule(deviceId);
651 }
652
653 @Override
654 public void purgeFlowRules() {
655 flowTable.purgeFlowRules();
656 }
657
658 @Override
659 public void batchOperationComplete(FlowRuleBatchEvent event) {
660 //FIXME: need a per device pending response
661 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
662 if (nodeId == null) {
663 notifyDelegate(event);
664 } else {
665 // TODO check unicast return value
666 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
667 //error log: log.warn("Failed to respond to peer for batch operation result");
668 }
669 }
670
671 private final class OnStoreBatch implements ClusterMessageHandler {
672
673 @Override
674 public void handle(final ClusterMessage message) {
675 FlowRuleBatchOperation operation = serializer.decode(message.payload());
676 log.debug("received batch request {}", operation);
677
678 final DeviceId deviceId = operation.deviceId();
679 NodeId master = mastershipService.getMasterFor(deviceId);
680 if (!Objects.equals(local, master)) {
681 Set<FlowRule> failures = new HashSet<>(operation.size());
682 for (FlowRuleBatchEntry op : operation.getOperations()) {
683 failures.add(op.target());
684 }
685 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
686 // This node is no longer the master, respond as all failed.
687 // TODO: we might want to wrap response in envelope
688 // to distinguish sw programming failure and hand over
689 // it make sense in the latter case to retry immediately.
690 message.respond(serializer.encode(allFailed));
691 return;
692 }
693
694 pendingResponses.put(operation.id(), message.sender());
695 storeBatchInternal(operation);
696 }
697 }
698
Jordan Halterman281dbf32018-06-15 17:46:28 -0700699 private class InternalFlowTable implements DeviceListener {
700 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700701
702 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700703 public void event(DeviceEvent event) {
704 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
705 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700706 }
707 }
708
Jordan Halterman281dbf32018-06-15 17:46:28 -0700709 /**
710 * Adds the given device to the flow table.
711 *
712 * @param deviceId the device to add to the table
713 */
714 public void addDevice(DeviceId deviceId) {
715 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
716 id,
717 clusterService,
718 clusterCommunicator,
719 new InternalLifecycleManager(id),
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800720 backupScheduler,
721 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700722 backupPeriod,
723 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700724 }
725
Jordan Halterman281dbf32018-06-15 17:46:28 -0700726 /**
727 * Sets the flow table backup period.
728 *
729 * @param backupPeriod the flow table backup period
730 */
731 void setBackupPeriod(int backupPeriod) {
732 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700733 }
734
Jordan Halterman281dbf32018-06-15 17:46:28 -0700735 /**
736 * Sets the flow table anti-entropy period.
737 *
738 * @param antiEntropyPeriod the flow table anti-entropy period
739 */
740 void setAntiEntropyPeriod(int antiEntropyPeriod) {
741 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700742 }
743
Jordan Halterman281dbf32018-06-15 17:46:28 -0700744 /**
745 * Returns the flow table for a specific device.
746 *
747 * @param deviceId the device identifier
748 * @return the flow table for the given device
749 */
750 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
751 DeviceFlowTable flowTable = flowTables.get(deviceId);
752 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
753 deviceId,
754 clusterService,
755 clusterCommunicator,
756 new InternalLifecycleManager(deviceId),
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800757 backupScheduler,
758 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700759 backupPeriod,
760 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700761 }
762
Jordan Halterman281dbf32018-06-15 17:46:28 -0700763 /**
764 * Returns the flow rule count for the given device.
765 *
766 * @param deviceId the device for which to return the flow rule count
767 * @return the flow rule count for the given device
768 */
769 public int getFlowRuleCount(DeviceId deviceId) {
770 return getFlowTable(deviceId).count();
771 }
772
773 /**
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800774 * Returns the count of flow rules in the given state for the given device.
775 *
776 * @param deviceId the device for which to return the flow rule count
777 * @return the flow rule count for the given device
778 */
779 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
780 if (state == null) {
781 return getFlowRuleCount(deviceId);
782 }
783 return (int) getFlowTable(deviceId)
784 .getFlowEntries()
785 .stream()
786 .filter(rule -> rule.state() == state)
787 .count();
788 }
789
790 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700791 * Returns the flow entry for the given rule.
792 *
793 * @param rule the rule for which to return the flow entry
794 * @return the flow entry for the given rule
795 */
Jon Hallfa132292017-10-24 11:11:24 -0700796 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700797 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700798 }
799
Jordan Halterman281dbf32018-06-15 17:46:28 -0700800 /**
801 * Returns the set of flow entries for the given device.
802 *
803 * @param deviceId the device for which to lookup flow entries
804 * @return the set of flow entries for the given device
805 */
Jon Hallfa132292017-10-24 11:11:24 -0700806 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700807 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700808 }
809
Jordan Halterman281dbf32018-06-15 17:46:28 -0700810 /**
811 * Adds the given flow rule.
812 *
813 * @param rule the rule to add
814 */
Jon Hallfa132292017-10-24 11:11:24 -0700815 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700816 Tools.futureGetOrElse(
817 getFlowTable(rule.deviceId()).add(rule),
818 FLOW_RULE_STORE_TIMEOUT_MILLIS,
819 TimeUnit.MILLISECONDS,
820 null);
Jon Hallfa132292017-10-24 11:11:24 -0700821 }
822
Jordan Halterman281dbf32018-06-15 17:46:28 -0700823 /**
824 * Updates the given flow rule.
825 *
826 * @param rule the rule to update
827 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800828 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700829 Tools.futureGetOrElse(
830 getFlowTable(rule.deviceId()).update(rule),
831 FLOW_RULE_STORE_TIMEOUT_MILLIS,
832 TimeUnit.MILLISECONDS,
833 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800834 }
835
Jordan Halterman281dbf32018-06-15 17:46:28 -0700836 /**
837 * Applies the given update function to the rule.
838 *
839 * @param function the update function to apply
840 * @return a future to be completed with the update event or {@code null} if the rule was not updated
841 */
842 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
843 return Tools.futureGetOrElse(
844 getFlowTable(rule.deviceId()).update(rule, function),
845 FLOW_RULE_STORE_TIMEOUT_MILLIS,
846 TimeUnit.MILLISECONDS,
847 null);
848 }
849
850 /**
851 * Removes the given flow rule.
852 *
853 * @param rule the rule to remove
854 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800855 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700856 return Tools.futureGetOrElse(
857 getFlowTable(rule.deviceId()).remove(rule),
858 FLOW_RULE_STORE_TIMEOUT_MILLIS,
859 TimeUnit.MILLISECONDS,
860 null);
Jon Hallfa132292017-10-24 11:11:24 -0700861 }
862
Jordan Halterman281dbf32018-06-15 17:46:28 -0700863 /**
864 * Purges flow rules for the given device.
865 *
866 * @param deviceId the device for which to purge flow rules
867 */
Jon Hallfa132292017-10-24 11:11:24 -0700868 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700869 // If the device is still present in the store, purge the underlying DeviceFlowTable.
870 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
871 if (deviceService.getDevice(deviceId) != null) {
872 DeviceFlowTable flowTable = flowTables.get(deviceId);
873 if (flowTable != null) {
874 flowTable.purge();
875 }
876 } else {
877 DeviceFlowTable flowTable = flowTables.remove(deviceId);
878 if (flowTable != null) {
879 flowTable.close();
880 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700881 }
Jon Hallfa132292017-10-24 11:11:24 -0700882 }
883
Jordan Halterman281dbf32018-06-15 17:46:28 -0700884 /**
885 * Purges all flow rules from the table.
886 */
Jon Hallfa132292017-10-24 11:11:24 -0700887 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700888 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
889 while (iterator.hasNext()) {
890 iterator.next().close();
891 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700892 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700893 }
Jon Hallfa132292017-10-24 11:11:24 -0700894 }
895
896 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700897 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700898 deviceTableStats.put(deviceId, tableStats);
899 return null;
900 }
901
902 @Override
903 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
904 NodeId master = mastershipService.getMasterFor(deviceId);
905
906 if (master == null) {
907 log.debug("Failed to getTableStats: No master for {}", deviceId);
908 return Collections.emptyList();
909 }
910
911 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
912 if (tableStats == null) {
913 return Collections.emptyList();
914 }
915 return ImmutableList.copyOf(tableStats);
916 }
917
918 @Override
919 public long getActiveFlowRuleCount(DeviceId deviceId) {
920 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700921 .mapToLong(TableStatisticsEntry::activeFlowEntries)
922 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700923 }
924
925 private class InternalTableStatsListener
926 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
927 @Override
928 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700929 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700930 //TODO: Generate an event to listeners (do we need?)
931 }
932 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700933
934 /**
935 * Device lifecycle manager implementation.
936 */
937 private final class InternalLifecycleManager
938 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
939 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
940
941 private final DeviceId deviceId;
942
943 private volatile DeviceReplicaInfo replicaInfo;
944
945 InternalLifecycleManager(DeviceId deviceId) {
946 this.deviceId = deviceId;
947 replicaInfoManager.addListener(this);
948 mastershipTermLifecycles.addListener(this);
949 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
950 }
951
952 @Override
953 public DeviceReplicaInfo getReplicaInfo() {
954 return replicaInfo;
955 }
956
957 @Override
958 public void activate(long term) {
959 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
960 if (replicaInfo != null && replicaInfo.term() == term) {
961 mastershipTermLifecycles.put(deviceId, term);
962 }
963 }
964
965 @Override
966 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700967 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700968 onReplicaInfoChange(event.replicaInfo());
969 }
970 }
971
972 @Override
973 public void event(MapEvent<DeviceId, Long> event) {
974 if (event.key().equals(deviceId) && event.newValue() != null) {
975 onActivate(event.newValue().value());
976 }
977 }
978
979 /**
980 * Handles a term activation event.
981 *
982 * @param term the term that was activated
983 */
984 private void onActivate(long term) {
985 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
986 if (replicaInfo != null && replicaInfo.term() == term) {
987 NodeId master = replicaInfo.master().orElse(null);
988 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800989 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700990 listenerRegistry.process(new LifecycleEvent(
991 LifecycleEvent.Type.TERM_ACTIVE,
992 new DeviceReplicaInfo(term, master, backups)));
993 }
994 }
995
996 /**
997 * Handles a replica info change event.
998 *
999 * @param replicaInfo the updated replica info
1000 */
1001 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
1002 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
1003 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
1004 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
1005 if (oldReplicaInfo != null) {
1006 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
1007 }
1008 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -07001009 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
1010 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001011 }
1012 }
1013
1014 /**
1015 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
1016 *
1017 * @param replicaInfo the replica info to convert
1018 * @return the converted replica info
1019 */
1020 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
1021 NodeId master = replicaInfo.master().orElse(null);
1022 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001023 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001024 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
1025 }
1026
1027 @Override
1028 public void close() {
1029 replicaInfoManager.removeListener(this);
1030 mastershipTermLifecycles.removeListener(this);
1031 }
1032 }
Jordan Haltermanb81fdc12019-03-04 18:12:20 -08001033
1034 private static class CountMessage {
1035 private final DeviceId deviceId;
1036 private final FlowEntryState state;
1037
1038 CountMessage(DeviceId deviceId, FlowEntryState state) {
1039 this.deviceId = deviceId;
1040 this.state = state;
1041 }
1042 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001043}