blob: 9a683560a6a5bf440df1c08f95dbeb788b8b4408 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman78c287d2019-04-17 11:05:16 -070018import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34import java.util.stream.StreamSupport;
35
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
37import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070038import com.google.common.collect.Streams;
Jordan Haltermanb81fdc12019-03-04 18:12:20 -080039import org.apache.commons.lang3.tuple.ImmutablePair;
40import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.onlab.util.KryoNamespace;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080042import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070043import org.onlab.util.Tools;
44import org.onosproject.cfg.ComponentConfigService;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070049import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070050import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070052import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070054import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.flow.CompletedBatchOperation;
56import org.onosproject.net.flow.DefaultFlowEntry;
57import org.onosproject.net.flow.FlowEntry;
58import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070059import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070060import org.onosproject.net.flow.FlowRuleEvent;
61import org.onosproject.net.flow.FlowRuleEvent.Type;
62import org.onosproject.net.flow.FlowRuleService;
63import org.onosproject.net.flow.FlowRuleStore;
64import org.onosproject.net.flow.FlowRuleStoreDelegate;
65import org.onosproject.net.flow.StoredFlowEntry;
Jordan Halterman78c287d2019-04-17 11:05:16 -070066import org.onosproject.net.flow.FlowRuleStoreException;
Jon Hallfa132292017-10-24 11:11:24 -070067import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070068import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
69import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
70import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070073import org.onosproject.persistence.PersistenceService;
74import org.onosproject.store.AbstractStore;
75import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76import org.onosproject.store.cluster.messaging.ClusterMessage;
77import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070078import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070079import org.onosproject.store.flow.ReplicaInfoEvent;
80import org.onosproject.store.flow.ReplicaInfoEventListener;
81import org.onosproject.store.flow.ReplicaInfoService;
82import org.onosproject.store.impl.MastershipBasedTimestamp;
83import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070084import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070085import org.onosproject.store.service.EventuallyConsistentMap;
86import org.onosproject.store.service.EventuallyConsistentMapEvent;
87import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070088import org.onosproject.store.service.MapEvent;
89import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070090import org.onosproject.store.service.Serializer;
91import org.onosproject.store.service.StorageService;
92import org.onosproject.store.service.WallClockTimestamp;
93import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094import org.osgi.service.component.annotations.Activate;
95import org.osgi.service.component.annotations.Component;
96import org.osgi.service.component.annotations.Deactivate;
97import org.osgi.service.component.annotations.Modified;
98import org.osgi.service.component.annotations.Reference;
99import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -0700100import org.slf4j.Logger;
101
Jon Hallfa132292017-10-24 11:11:24 -0700102import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800103import static java.lang.Math.max;
104import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700105import static org.onlab.util.Tools.get;
106import static org.onlab.util.Tools.groupedThreads;
107import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700108import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700111import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
112import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
113import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
114import static org.slf4j.LoggerFactory.getLogger;
115
Ray Milkeyb5646e62018-10-16 11:42:18 -0700116import static org.onosproject.store.OsgiPropertyConstants.*;
117
Jon Hallfa132292017-10-24 11:11:24 -0700118/**
119 * Manages inventory of flow rules using a distributed state management protocol.
120 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700121@Component(
122 immediate = true,
123 service = FlowRuleStore.class,
124 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700125 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
126 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
127 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
128 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
129 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700130 }
131)
Jon Hallfa132292017-10-24 11:11:24 -0700132public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700133 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
134 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700135
136 private final Logger log = getLogger(getClass());
137
Jon Hallfa132292017-10-24 11:11:24 -0700138 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700139
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700140 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700141 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700142
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700143 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700144 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700145
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700146 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700147 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700148
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700149 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700150 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700151
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700152 /** Max number of backup copies for each device. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700153 private volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700154
155 private InternalFlowTable flowTable = new InternalFlowTable();
156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700158 protected ReplicaInfoService replicaInfoManager;
159
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700161 protected ClusterCommunicationService clusterCommunicator;
162
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700164 protected ClusterService clusterService;
165
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700167 protected DeviceService deviceService;
168
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700170 protected CoreService coreService;
171
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700173 protected ComponentConfigService configService;
174
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700176 protected MastershipService mastershipService;
177
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700178 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700179 protected PersistenceService persistenceService;
180
181 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
182 private ExecutorService messageHandlingExecutor;
183 private ExecutorService eventHandler;
184
Ray Milkeyd1092d62019-04-02 09:12:00 -0700185 private ScheduledExecutorService backupScheduler;
186 private ExecutorService backupExecutor;
Jon Hallfa132292017-10-24 11:11:24 -0700187
188 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
189 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700190 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700191
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700192 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700193 protected StorageService storageService;
194
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700195 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
196 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700197 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700198 .register(FlowBucket.class)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800199 .register(ImmutablePair.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700200 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700201
202 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700203 .register(KryoNamespaces.API)
204 .register(BucketId.class)
205 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700206
Jordan Halterman281dbf32018-06-15 17:46:28 -0700207 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700208
209 private IdGenerator idGenerator;
210 private NodeId local;
211
212 @Activate
213 public void activate(ComponentContext context) {
214 configService.registerProperties(getClass());
215
Ray Milkeyd1092d62019-04-02 09:12:00 -0700216 backupScheduler = Executors.newSingleThreadScheduledExecutor(
217 groupedThreads("onos/flow", "backup-scheduler", log));
218 backupExecutor = Executors.newFixedThreadPool(
219 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
220 groupedThreads("onos/flow", "backup-%d", log));
221
Jon Hallfa132292017-10-24 11:11:24 -0700222 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
223
224 local = clusterService.getLocalNode().id();
225
226 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700227 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700228 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700229 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700230
231 registerMessageHandlers(messageHandlingExecutor);
232
Jordan Halterman281dbf32018-06-15 17:46:28 -0700233 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
234 .withName("onos-flow-store-terms")
235 .withSerializer(serializer)
236 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800237
Jon Hallfa132292017-10-24 11:11:24 -0700238 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700239 .withName("onos-flow-table-stats")
240 .withSerializer(serializerBuilder)
241 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
242 .withTimestampProvider((k, v) -> new WallClockTimestamp())
243 .withTombstonesDisabled()
244 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700245 deviceTableStats.addListener(tableStatsListener);
246
Jordan Halterman281dbf32018-06-15 17:46:28 -0700247 deviceService.addListener(flowTable);
248 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
249
Jon Hallfa132292017-10-24 11:11:24 -0700250 logConfig("Started");
251 }
252
253 @Deactivate
254 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700255 configService.unregisterProperties(getClass(), false);
256 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700257 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700258 deviceTableStats.removeListener(tableStatsListener);
259 deviceTableStats.destroy();
260 eventHandler.shutdownNow();
261 messageHandlingExecutor.shutdownNow();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800262 backupScheduler.shutdownNow();
263 backupExecutor.shutdownNow();
Ray Milkeyd1092d62019-04-02 09:12:00 -0700264 backupScheduler = null;
265 backupExecutor = null;
Jon Hallfa132292017-10-24 11:11:24 -0700266 log.info("Stopped");
267 }
268
269 @SuppressWarnings("rawtypes")
270 @Modified
271 public void modified(ComponentContext context) {
272 if (context == null) {
273 logConfig("Default config");
274 return;
275 }
276
277 Dictionary properties = context.getProperties();
278 int newPoolSize;
279 int newBackupPeriod;
280 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700281 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700282 try {
283 String s = get(properties, "msgHandlerPoolSize");
284 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
285
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700286 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700287 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
288
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700289 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700290 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700291
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700292 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700293 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700294 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700295 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
296 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
297 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
298 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700299 }
300
Jon Hallfa132292017-10-24 11:11:24 -0700301 if (newBackupPeriod != backupPeriod) {
302 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700303 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700304 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700305
306 if (newAntiEntropyPeriod != antiEntropyPeriod) {
307 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700308 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700309 }
310
Jon Hallfa132292017-10-24 11:11:24 -0700311 if (newPoolSize != msgHandlerPoolSize) {
312 msgHandlerPoolSize = newPoolSize;
313 ExecutorService oldMsgHandler = messageHandlingExecutor;
314 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700315 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700316
317 // replace previously registered handlers.
318 registerMessageHandlers(messageHandlingExecutor);
319 oldMsgHandler.shutdown();
320 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700321
Jon Hallfa132292017-10-24 11:11:24 -0700322 if (backupCount != newBackupCount) {
323 backupCount = newBackupCount;
324 }
325 logConfig("Reconfigured");
326 }
327
328 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700329 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
330 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700331 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700332 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700333 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800334 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
335 GET_DEVICE_FLOW_COUNT,
336 serializer::decode,
337 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
338 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700339 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700340 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700341 }
342
343 private void unregisterMessageHandlers() {
344 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700345 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700346 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
347 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
348 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
349 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
350 }
351
352 private void logConfig(String prefix) {
353 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700354 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700355 }
356
Jon Hallfa132292017-10-24 11:11:24 -0700357 @Override
358 public int getFlowRuleCount() {
359 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700360 .mapToInt(device -> getFlowRuleCount(device.id()))
361 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800362 }
363
364 @Override
365 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800366 return getFlowRuleCount(deviceId, null);
367 }
368
369 @Override
370 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700371 NodeId master = mastershipService.getMasterFor(deviceId);
372 if (master == null) {
373 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
374 return 0;
375 }
376
377 if (Objects.equals(local, master)) {
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800378 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700379 }
380
381 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
382 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800383 Pair.of(deviceId, state),
384 GET_DEVICE_FLOW_COUNT,
385 serializer::encode,
386 serializer::decode,
387 master),
388 FLOW_RULE_STORE_TIMEOUT_MILLIS,
389 TimeUnit.MILLISECONDS,
390 0);
Jon Hallfa132292017-10-24 11:11:24 -0700391 }
392
393 @Override
394 public FlowEntry getFlowEntry(FlowRule rule) {
395 NodeId master = mastershipService.getMasterFor(rule.deviceId());
396
397 if (master == null) {
398 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
399 return null;
400 }
401
402 if (Objects.equals(local, master)) {
403 return flowTable.getFlowEntry(rule);
404 }
405
406 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700407 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700408
409 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700410 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
411 serializer::encode,
412 serializer::decode,
413 master),
414 FLOW_RULE_STORE_TIMEOUT_MILLIS,
415 TimeUnit.MILLISECONDS,
416 null);
Jon Hallfa132292017-10-24 11:11:24 -0700417 }
418
419 @Override
420 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman78c287d2019-04-17 11:05:16 -0700421 return flowTable.getFlowEntries(deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700422 }
423
424 @Override
425 public void storeFlowRule(FlowRule rule) {
426 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700427 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
428 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700429 }
430
431 @Override
432 public void storeBatch(FlowRuleBatchOperation operation) {
433 if (operation.getOperations().isEmpty()) {
434 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700435 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
436 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700437 return;
438 }
439
440 DeviceId deviceId = operation.deviceId();
441 NodeId master = mastershipService.getMasterFor(deviceId);
442
443 if (master == null) {
444 log.warn("No master for {} ", deviceId);
445
Jordan Halterman281dbf32018-06-15 17:46:28 -0700446 Set<FlowRule> allFailures = operation.getOperations()
447 .stream()
448 .map(op -> op.target())
449 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700450 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700451 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
452 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700453 return;
454 }
455
456 if (Objects.equals(local, master)) {
457 storeBatchInternal(operation);
458 return;
459 }
460
461 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700462 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700463
464 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700465 APPLY_BATCH_FLOWS,
466 serializer::encode,
467 master)
468 .whenComplete((result, error) -> {
469 if (error != null) {
470 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700471
Jordan Halterman281dbf32018-06-15 17:46:28 -0700472 Set<FlowRule> allFailures = operation.getOperations()
473 .stream()
474 .map(op -> op.target())
475 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700476
Jordan Halterman281dbf32018-06-15 17:46:28 -0700477 notifyDelegate(FlowRuleBatchEvent.completed(
478 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
479 new CompletedBatchOperation(false, allFailures, deviceId)));
480 }
481 });
Jon Hallfa132292017-10-24 11:11:24 -0700482 }
483
484 private void storeBatchInternal(FlowRuleBatchOperation operation) {
485
486 final DeviceId did = operation.deviceId();
487 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
488 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
489 if (currentOps.isEmpty()) {
490 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700491 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
492 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700493 return;
494 }
495
496 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700497 FlowRuleBatchRequest(operation.id(),
498 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700499 }
500
501 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
502 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700503 op -> {
504 StoredFlowEntry entry;
505 switch (op.operator()) {
506 case ADD:
507 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800508 log.debug("Adding flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700509 flowTable.add(entry);
510 return op;
511 case MODIFY:
512 entry = new DefaultFlowEntry(op.target());
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800513 log.debug("Updating flow rule: {}", entry);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700514 flowTable.update(entry);
515 return op;
516 case REMOVE:
517 return flowTable.update(op.target(), stored -> {
518 stored.setState(FlowEntryState.PENDING_REMOVE);
519 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800520 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700521 });
522 default:
523 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700524 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700525 return null;
526 }
Jon Hallfa132292017-10-24 11:11:24 -0700527 ).filter(Objects::nonNull).collect(Collectors.toSet());
528 }
529
530 @Override
531 public void deleteFlowRule(FlowRule rule) {
532 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700533 new FlowRuleBatchOperation(
534 Collections.singletonList(
535 new FlowRuleBatchEntry(
536 FlowRuleOperation.REMOVE,
537 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700538 }
539
540 @Override
541 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
542 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700543 return flowTable.update(rule, stored -> {
544 if (stored.state() == FlowEntryState.PENDING_ADD) {
545 stored.setState(FlowEntryState.PENDING_ADD);
546 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
547 }
548 return null;
549 });
Jon Hallfa132292017-10-24 11:11:24 -0700550 }
551 return null;
552 }
553
554 @Override
555 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
556 NodeId master = mastershipService.getMasterFor(rule.deviceId());
557 if (Objects.equals(local, master)) {
558 return addOrUpdateFlowRuleInternal(rule);
559 }
560
561 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700562 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700563 return null;
564 }
565
566 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700567 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700568 stored.setBytes(rule.bytes());
569 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
570 stored.setLiveType(rule.liveType());
571 stored.setPackets(rule.packets());
572 stored.setLastSeen();
573 if (stored.state() == FlowEntryState.PENDING_ADD) {
574 stored.setState(FlowEntryState.ADDED);
575 return new FlowRuleEvent(Type.RULE_ADDED, rule);
576 }
577 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700578 });
579 if (event != null) {
580 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700581 }
582
583 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
584 // TODO: also update backup if the behavior is correct.
585 flowTable.add(rule);
586 return null;
587 }
588
589 @Override
590 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
591 final DeviceId deviceId = rule.deviceId();
592 NodeId master = mastershipService.getMasterFor(deviceId);
593
594 if (Objects.equals(local, master)) {
595 // bypass and handle it locally
596 return removeFlowRuleInternal(rule);
597 }
598
599 if (master == null) {
600 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
601 // TODO: revisit if this should be null (="no-op") or Exception
602 return null;
603 }
604
605 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700606 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700607
Jordan Halterman281dbf32018-06-15 17:46:28 -0700608 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
609 rule,
610 REMOVE_FLOW_ENTRY,
611 serializer::encode,
612 serializer::decode,
613 master),
614 FLOW_RULE_STORE_TIMEOUT_MILLIS,
615 TimeUnit.MILLISECONDS,
616 null);
Jon Hallfa132292017-10-24 11:11:24 -0700617 }
618
619 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700620 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800621 final FlowEntry removed = flowTable.remove(rule);
Jordan Haltermanc88c2652019-01-14 16:23:31 -0800622 log.debug("Removed flow rule: {}", removed);
Jon Hallfa132292017-10-24 11:11:24 -0700623 // rule may be partial rule that is missing treatment, we should use rule from store instead
624 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
625 }
626
627 @Override
628 public void purgeFlowRule(DeviceId deviceId) {
629 flowTable.purgeFlowRule(deviceId);
630 }
631
632 @Override
633 public void purgeFlowRules() {
634 flowTable.purgeFlowRules();
635 }
636
637 @Override
638 public void batchOperationComplete(FlowRuleBatchEvent event) {
639 //FIXME: need a per device pending response
640 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
641 if (nodeId == null) {
642 notifyDelegate(event);
643 } else {
644 // TODO check unicast return value
645 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
646 //error log: log.warn("Failed to respond to peer for batch operation result");
647 }
648 }
649
650 private final class OnStoreBatch implements ClusterMessageHandler {
651
652 @Override
653 public void handle(final ClusterMessage message) {
654 FlowRuleBatchOperation operation = serializer.decode(message.payload());
655 log.debug("received batch request {}", operation);
656
657 final DeviceId deviceId = operation.deviceId();
658 NodeId master = mastershipService.getMasterFor(deviceId);
659 if (!Objects.equals(local, master)) {
660 Set<FlowRule> failures = new HashSet<>(operation.size());
661 for (FlowRuleBatchEntry op : operation.getOperations()) {
662 failures.add(op.target());
663 }
664 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
665 // This node is no longer the master, respond as all failed.
666 // TODO: we might want to wrap response in envelope
667 // to distinguish sw programming failure and hand over
668 // it make sense in the latter case to retry immediately.
669 message.respond(serializer.encode(allFailed));
670 return;
671 }
672
673 pendingResponses.put(operation.id(), message.sender());
674 storeBatchInternal(operation);
675 }
676 }
677
Jordan Halterman281dbf32018-06-15 17:46:28 -0700678 private class InternalFlowTable implements DeviceListener {
679 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700680
681 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700682 public void event(DeviceEvent event) {
683 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
684 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700685 }
686 }
687
Jordan Halterman281dbf32018-06-15 17:46:28 -0700688 /**
689 * Adds the given device to the flow table.
690 *
691 * @param deviceId the device to add to the table
692 */
693 public void addDevice(DeviceId deviceId) {
694 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
695 id,
696 clusterService,
697 clusterCommunicator,
698 new InternalLifecycleManager(id),
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800699 backupScheduler,
700 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700701 backupPeriod,
702 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700703 }
704
Jordan Halterman281dbf32018-06-15 17:46:28 -0700705 /**
706 * Sets the flow table backup period.
707 *
708 * @param backupPeriod the flow table backup period
709 */
710 void setBackupPeriod(int backupPeriod) {
711 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700712 }
713
Jordan Halterman281dbf32018-06-15 17:46:28 -0700714 /**
715 * Sets the flow table anti-entropy period.
716 *
717 * @param antiEntropyPeriod the flow table anti-entropy period
718 */
719 void setAntiEntropyPeriod(int antiEntropyPeriod) {
720 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700721 }
722
Jordan Halterman281dbf32018-06-15 17:46:28 -0700723 /**
724 * Returns the flow table for a specific device.
725 *
726 * @param deviceId the device identifier
727 * @return the flow table for the given device
728 */
729 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
730 DeviceFlowTable flowTable = flowTables.get(deviceId);
731 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
732 deviceId,
733 clusterService,
734 clusterCommunicator,
735 new InternalLifecycleManager(deviceId),
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800736 backupScheduler,
737 new OrderedExecutor(backupExecutor),
Jordan Halterman281dbf32018-06-15 17:46:28 -0700738 backupPeriod,
739 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700740 }
741
Jordan Halterman281dbf32018-06-15 17:46:28 -0700742 /**
743 * Returns the flow rule count for the given device.
744 *
745 * @param deviceId the device for which to return the flow rule count
746 * @return the flow rule count for the given device
747 */
748 public int getFlowRuleCount(DeviceId deviceId) {
749 return getFlowTable(deviceId).count();
750 }
751
752 /**
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800753 * Returns the count of flow rules in the given state for the given device.
754 *
755 * @param deviceId the device for which to return the flow rule count
756 * @return the flow rule count for the given device
757 */
758 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
759 if (state == null) {
760 return getFlowRuleCount(deviceId);
761 }
Jordan Halterman78c287d2019-04-17 11:05:16 -0700762 return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
Jordan Haltermanb81fdc12019-03-04 18:12:20 -0800763 .filter(rule -> rule.state() == state)
764 .count();
765 }
766
767 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700768 * Returns the flow entry for the given rule.
769 *
770 * @param rule the rule for which to return the flow entry
771 * @return the flow entry for the given rule
772 */
Jon Hallfa132292017-10-24 11:11:24 -0700773 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700774 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700775 }
776
Jordan Halterman281dbf32018-06-15 17:46:28 -0700777 /**
778 * Returns the set of flow entries for the given device.
779 *
780 * @param deviceId the device for which to lookup flow entries
781 * @return the set of flow entries for the given device
782 */
Jordan Halterman78c287d2019-04-17 11:05:16 -0700783 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
784 try {
785 return getFlowTable(deviceId).getFlowEntries()
786 .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
787 } catch (ExecutionException e) {
788 throw new FlowRuleStoreException(e.getCause());
789 } catch (TimeoutException e) {
790 throw new FlowRuleStoreException.Timeout();
791 } catch (InterruptedException e) {
792 throw new FlowRuleStoreException.Interrupted();
793 }
Jon Hallfa132292017-10-24 11:11:24 -0700794 }
795
Jordan Halterman281dbf32018-06-15 17:46:28 -0700796 /**
797 * Adds the given flow rule.
798 *
799 * @param rule the rule to add
800 */
Jon Hallfa132292017-10-24 11:11:24 -0700801 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700802 Tools.futureGetOrElse(
803 getFlowTable(rule.deviceId()).add(rule),
804 FLOW_RULE_STORE_TIMEOUT_MILLIS,
805 TimeUnit.MILLISECONDS,
806 null);
Jon Hallfa132292017-10-24 11:11:24 -0700807 }
808
Jordan Halterman281dbf32018-06-15 17:46:28 -0700809 /**
810 * Updates the given flow rule.
811 *
812 * @param rule the rule to update
813 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800814 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700815 Tools.futureGetOrElse(
816 getFlowTable(rule.deviceId()).update(rule),
817 FLOW_RULE_STORE_TIMEOUT_MILLIS,
818 TimeUnit.MILLISECONDS,
819 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800820 }
821
Jordan Halterman281dbf32018-06-15 17:46:28 -0700822 /**
823 * Applies the given update function to the rule.
824 *
825 * @param function the update function to apply
826 * @return a future to be completed with the update event or {@code null} if the rule was not updated
827 */
828 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
829 return Tools.futureGetOrElse(
830 getFlowTable(rule.deviceId()).update(rule, function),
831 FLOW_RULE_STORE_TIMEOUT_MILLIS,
832 TimeUnit.MILLISECONDS,
833 null);
834 }
835
836 /**
837 * Removes the given flow rule.
838 *
839 * @param rule the rule to remove
840 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800841 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700842 return Tools.futureGetOrElse(
843 getFlowTable(rule.deviceId()).remove(rule),
844 FLOW_RULE_STORE_TIMEOUT_MILLIS,
845 TimeUnit.MILLISECONDS,
846 null);
Jon Hallfa132292017-10-24 11:11:24 -0700847 }
848
Jordan Halterman281dbf32018-06-15 17:46:28 -0700849 /**
850 * Purges flow rules for the given device.
851 *
852 * @param deviceId the device for which to purge flow rules
853 */
Jon Hallfa132292017-10-24 11:11:24 -0700854 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700855 // If the device is still present in the store, purge the underlying DeviceFlowTable.
856 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
857 if (deviceService.getDevice(deviceId) != null) {
858 DeviceFlowTable flowTable = flowTables.get(deviceId);
859 if (flowTable != null) {
860 flowTable.purge();
861 }
862 } else {
863 DeviceFlowTable flowTable = flowTables.remove(deviceId);
864 if (flowTable != null) {
865 flowTable.close();
866 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700867 }
Jon Hallfa132292017-10-24 11:11:24 -0700868 }
869
Jordan Halterman281dbf32018-06-15 17:46:28 -0700870 /**
871 * Purges all flow rules from the table.
872 */
Jon Hallfa132292017-10-24 11:11:24 -0700873 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700874 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
875 while (iterator.hasNext()) {
876 iterator.next().close();
877 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700878 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700879 }
Jon Hallfa132292017-10-24 11:11:24 -0700880 }
881
882 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700883 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700884 deviceTableStats.put(deviceId, tableStats);
885 return null;
886 }
887
888 @Override
889 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
890 NodeId master = mastershipService.getMasterFor(deviceId);
891
892 if (master == null) {
893 log.debug("Failed to getTableStats: No master for {}", deviceId);
894 return Collections.emptyList();
895 }
896
897 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
898 if (tableStats == null) {
899 return Collections.emptyList();
900 }
901 return ImmutableList.copyOf(tableStats);
902 }
903
904 @Override
905 public long getActiveFlowRuleCount(DeviceId deviceId) {
906 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700907 .mapToLong(TableStatisticsEntry::activeFlowEntries)
908 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700909 }
910
911 private class InternalTableStatsListener
912 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
913 @Override
914 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700915 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700916 //TODO: Generate an event to listeners (do we need?)
917 }
918 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700919
920 /**
921 * Device lifecycle manager implementation.
922 */
923 private final class InternalLifecycleManager
924 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
925 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
926
927 private final DeviceId deviceId;
928
929 private volatile DeviceReplicaInfo replicaInfo;
930
931 InternalLifecycleManager(DeviceId deviceId) {
932 this.deviceId = deviceId;
933 replicaInfoManager.addListener(this);
934 mastershipTermLifecycles.addListener(this);
935 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
936 }
937
938 @Override
939 public DeviceReplicaInfo getReplicaInfo() {
940 return replicaInfo;
941 }
942
943 @Override
944 public void activate(long term) {
945 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
946 if (replicaInfo != null && replicaInfo.term() == term) {
947 mastershipTermLifecycles.put(deviceId, term);
948 }
949 }
950
951 @Override
952 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700953 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700954 onReplicaInfoChange(event.replicaInfo());
955 }
956 }
957
958 @Override
959 public void event(MapEvent<DeviceId, Long> event) {
960 if (event.key().equals(deviceId) && event.newValue() != null) {
961 onActivate(event.newValue().value());
962 }
963 }
964
965 /**
966 * Handles a term activation event.
967 *
968 * @param term the term that was activated
969 */
970 private void onActivate(long term) {
971 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
972 if (replicaInfo != null && replicaInfo.term() == term) {
973 NodeId master = replicaInfo.master().orElse(null);
974 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800975 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700976 listenerRegistry.process(new LifecycleEvent(
977 LifecycleEvent.Type.TERM_ACTIVE,
978 new DeviceReplicaInfo(term, master, backups)));
979 }
980 }
981
982 /**
983 * Handles a replica info change event.
984 *
985 * @param replicaInfo the updated replica info
986 */
987 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
988 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
989 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
990 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
991 if (oldReplicaInfo != null) {
992 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
993 }
994 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700995 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
996 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700997 }
998 }
999
1000 /**
1001 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
1002 *
1003 * @param replicaInfo the replica info to convert
1004 * @return the converted replica info
1005 */
1006 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
1007 NodeId master = replicaInfo.master().orElse(null);
1008 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanaeda2752019-02-22 12:31:25 -08001009 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman281dbf32018-06-15 17:46:28 -07001010 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
1011 }
1012
1013 @Override
1014 public void close() {
1015 replicaInfoManager.removeListener(this);
1016 mastershipTermLifecycles.removeListener(this);
1017 }
1018 }
Jordan Haltermanb81fdc12019-03-04 18:12:20 -08001019
1020 private static class CountMessage {
1021 private final DeviceId deviceId;
1022 private final FlowEntryState state;
1023
1024 CountMessage(DeviceId deviceId, FlowEntryState state) {
1025 this.deviceId = deviceId;
1026 this.state = state;
1027 }
1028 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001029}