blob: 884b74b064f6fcdf8531c5a0d2368ee90e9844a5 [file] [log] [blame]
Jordan Halterman356cda52018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
18import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
Jordan Halterman356cda52018-06-15 17:46:28 -070021import java.util.Iterator;
Jon Hallfa132292017-10-24 11:11:24 -070022import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
Jordan Halterman9b49e342019-04-17 11:05:16 -070026import java.util.concurrent.ExecutionException;
Jon Hallfa132292017-10-24 11:11:24 -070027import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
Jon Hallfa132292017-10-24 11:11:24 -070030import java.util.concurrent.TimeUnit;
Jordan Halterman9b49e342019-04-17 11:05:16 -070031import java.util.concurrent.TimeoutException;
Jordan Halterman356cda52018-06-15 17:46:28 -070032import java.util.function.Function;
Jon Hallfa132292017-10-24 11:11:24 -070033import java.util.stream.Collectors;
Jordan Halterman9b49e342019-04-17 11:05:16 -070034import java.util.stream.StreamSupport;
Jon Hallfa132292017-10-24 11:11:24 -070035
Jordan Halterman7ae81b82018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
37import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070038import com.google.common.collect.Streams;
Jordan Halterman7a9bfb82019-03-04 18:12:20 -080039import org.apache.commons.lang3.tuple.ImmutablePair;
40import org.apache.commons.lang3.tuple.Pair;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
44import org.apache.felix.scr.annotations.Modified;
45import org.apache.felix.scr.annotations.Property;
46import org.apache.felix.scr.annotations.Reference;
47import org.apache.felix.scr.annotations.ReferenceCardinality;
48import org.apache.felix.scr.annotations.Service;
49import org.onlab.util.KryoNamespace;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080050import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070051import org.onlab.util.Tools;
52import org.onosproject.cfg.ComponentConfigService;
53import org.onosproject.cluster.ClusterService;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.core.CoreService;
56import org.onosproject.core.IdGenerator;
Jordan Halterman356cda52018-06-15 17:46:28 -070057import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070058import org.onosproject.mastership.MastershipService;
59import org.onosproject.net.DeviceId;
Jordan Halterman356cda52018-06-15 17:46:28 -070060import org.onosproject.net.device.DeviceEvent;
61import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070062import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.flow.CompletedBatchOperation;
64import org.onosproject.net.flow.DefaultFlowEntry;
65import org.onosproject.net.flow.FlowEntry;
66import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070067import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070068import org.onosproject.net.flow.FlowRuleEvent;
69import org.onosproject.net.flow.FlowRuleEvent.Type;
70import org.onosproject.net.flow.FlowRuleService;
71import org.onosproject.net.flow.FlowRuleStore;
72import org.onosproject.net.flow.FlowRuleStoreDelegate;
73import org.onosproject.net.flow.StoredFlowEntry;
74import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman7ae81b82018-06-12 11:23:33 -070075import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
76import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
77import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
78import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
79import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070080import org.onosproject.persistence.PersistenceService;
81import org.onosproject.store.AbstractStore;
82import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
83import org.onosproject.store.cluster.messaging.ClusterMessage;
84import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman356cda52018-06-15 17:46:28 -070085import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070086import org.onosproject.store.flow.ReplicaInfoEvent;
87import org.onosproject.store.flow.ReplicaInfoEventListener;
88import org.onosproject.store.flow.ReplicaInfoService;
89import org.onosproject.store.impl.MastershipBasedTimestamp;
90import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman356cda52018-06-15 17:46:28 -070091import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070092import org.onosproject.store.service.EventuallyConsistentMap;
93import org.onosproject.store.service.EventuallyConsistentMapEvent;
94import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman356cda52018-06-15 17:46:28 -070095import org.onosproject.store.service.MapEvent;
96import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070097import org.onosproject.store.service.Serializer;
98import org.onosproject.store.service.StorageService;
99import org.onosproject.store.service.WallClockTimestamp;
100import org.osgi.service.component.ComponentContext;
101import org.slf4j.Logger;
102
Jon Hallfa132292017-10-24 11:11:24 -0700103import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800104import static java.lang.Math.max;
105import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700106import static org.onlab.util.Tools.get;
107import static org.onlab.util.Tools.groupedThreads;
108import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman356cda52018-06-15 17:46:28 -0700111import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700112import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
113import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
114import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
115import static org.slf4j.LoggerFactory.getLogger;
116
117/**
118 * Manages inventory of flow rules using a distributed state management protocol.
119 */
Thomas Vachuska71026b22018-01-05 16:01:44 -0800120@Component(immediate = true)
Jon Hallfa132292017-10-24 11:11:24 -0700121@Service
122public class ECFlowRuleStore
Jordan Halterman356cda52018-06-15 17:46:28 -0700123 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
124 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700125
126 private final Logger log = getLogger(getClass());
127
128 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
129 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
130 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
131 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Jordan Halterman000e4102018-06-12 15:34:19 -0700132 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700133 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700134
135 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
Jordan Halterman356cda52018-06-15 17:46:28 -0700136 label = "Number of threads in the message handler pool")
Jon Hallfa132292017-10-24 11:11:24 -0700137 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
138
139 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
Jordan Halterman356cda52018-06-15 17:46:28 -0700140 label = "Delay in ms between successive backup runs")
Jon Hallfa132292017-10-24 11:11:24 -0700141 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Jordan Halterman000e4102018-06-12 15:34:19 -0700142
143 @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
Jordan Halterman356cda52018-06-15 17:46:28 -0700144 label = "Delay in ms between anti-entropy runs")
Jordan Halterman000e4102018-06-12 15:34:19 -0700145 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
146
Jon Hallfa132292017-10-24 11:11:24 -0700147 @Property(name = "persistenceEnabled", boolValue = false,
Jordan Halterman356cda52018-06-15 17:46:28 -0700148 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Jon Hallfa132292017-10-24 11:11:24 -0700149 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
150
151 @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
Jordan Halterman356cda52018-06-15 17:46:28 -0700152 label = "Max number of backup copies for each device")
Jon Hallfa132292017-10-24 11:11:24 -0700153 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
154
155 private InternalFlowTable flowTable = new InternalFlowTable();
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
158 protected ReplicaInfoService replicaInfoManager;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
161 protected ClusterCommunicationService clusterCommunicator;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
164 protected ClusterService clusterService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
167 protected DeviceService deviceService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
170 protected CoreService coreService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
173 protected ComponentConfigService configService;
174
175 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
176 protected MastershipService mastershipService;
177
178 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
179 protected PersistenceService persistenceService;
180
181 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
182 private ExecutorService messageHandlingExecutor;
183 private ExecutorService eventHandler;
184
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800185 private final ScheduledExecutorService backupScheduler = Executors.newSingleThreadScheduledExecutor(
186 groupedThreads("onos/flow", "backup-scheduler", log));
187 private final ExecutorService backupExecutor = Executors.newFixedThreadPool(
188 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
189 groupedThreads("onos/flow", "backup-%d", log));
Jon Hallfa132292017-10-24 11:11:24 -0700190
191 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
192 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman356cda52018-06-15 17:46:28 -0700193 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700194
195 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
196 protected StorageService storageService;
197
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700198 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
199 .register(KryoNamespaces.API)
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700200 .register(BucketId.class)
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700201 .register(FlowBucket.class)
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800202 .register(ImmutablePair.class)
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700203 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700204
205 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700206 .register(KryoNamespaces.API)
207 .register(BucketId.class)
208 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700209
Jordan Halterman356cda52018-06-15 17:46:28 -0700210 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700211
212 private IdGenerator idGenerator;
213 private NodeId local;
214
215 @Activate
216 public void activate(ComponentContext context) {
217 configService.registerProperties(getClass());
218
219 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
220
221 local = clusterService.getLocalNode().id();
222
223 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman356cda52018-06-15 17:46:28 -0700224 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700225 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman356cda52018-06-15 17:46:28 -0700226 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700227
228 registerMessageHandlers(messageHandlingExecutor);
229
Jordan Halterman356cda52018-06-15 17:46:28 -0700230 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
231 .withName("onos-flow-store-terms")
232 .withSerializer(serializer)
233 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800234
Jon Hallfa132292017-10-24 11:11:24 -0700235 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman356cda52018-06-15 17:46:28 -0700236 .withName("onos-flow-table-stats")
237 .withSerializer(serializerBuilder)
238 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
239 .withTimestampProvider((k, v) -> new WallClockTimestamp())
240 .withTombstonesDisabled()
241 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700242 deviceTableStats.addListener(tableStatsListener);
243
Jordan Halterman356cda52018-06-15 17:46:28 -0700244 deviceService.addListener(flowTable);
245 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
246
Jon Hallfa132292017-10-24 11:11:24 -0700247 logConfig("Started");
248 }
249
250 @Deactivate
251 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700252 configService.unregisterProperties(getClass(), false);
253 unregisterMessageHandlers();
Jordan Halterman356cda52018-06-15 17:46:28 -0700254 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700255 deviceTableStats.removeListener(tableStatsListener);
256 deviceTableStats.destroy();
257 eventHandler.shutdownNow();
258 messageHandlingExecutor.shutdownNow();
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800259 backupScheduler.shutdownNow();
260 backupExecutor.shutdownNow();
Jon Hallfa132292017-10-24 11:11:24 -0700261 log.info("Stopped");
262 }
263
264 @SuppressWarnings("rawtypes")
265 @Modified
266 public void modified(ComponentContext context) {
267 if (context == null) {
268 logConfig("Default config");
269 return;
270 }
271
272 Dictionary properties = context.getProperties();
273 int newPoolSize;
274 int newBackupPeriod;
275 int newBackupCount;
Jordan Halterman000e4102018-06-12 15:34:19 -0700276 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700277 try {
278 String s = get(properties, "msgHandlerPoolSize");
279 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
280
281 s = get(properties, "backupPeriod");
282 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
283
284 s = get(properties, "backupCount");
285 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman000e4102018-06-12 15:34:19 -0700286
287 s = get(properties, "antiEntropyPeriod");
288 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700289 } catch (NumberFormatException | ClassCastException e) {
290 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
291 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
292 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Jordan Halterman000e4102018-06-12 15:34:19 -0700293 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700294 }
295
Jon Hallfa132292017-10-24 11:11:24 -0700296 if (newBackupPeriod != backupPeriod) {
297 backupPeriod = newBackupPeriod;
Jordan Halterman356cda52018-06-15 17:46:28 -0700298 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700299 }
Jordan Halterman000e4102018-06-12 15:34:19 -0700300
301 if (newAntiEntropyPeriod != antiEntropyPeriod) {
302 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman356cda52018-06-15 17:46:28 -0700303 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman000e4102018-06-12 15:34:19 -0700304 }
305
Jon Hallfa132292017-10-24 11:11:24 -0700306 if (newPoolSize != msgHandlerPoolSize) {
307 msgHandlerPoolSize = newPoolSize;
308 ExecutorService oldMsgHandler = messageHandlingExecutor;
309 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman356cda52018-06-15 17:46:28 -0700310 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700311
312 // replace previously registered handlers.
313 registerMessageHandlers(messageHandlingExecutor);
314 oldMsgHandler.shutdown();
315 }
Jordan Halterman000e4102018-06-12 15:34:19 -0700316
Jon Hallfa132292017-10-24 11:11:24 -0700317 if (backupCount != newBackupCount) {
318 backupCount = newBackupCount;
319 }
320 logConfig("Reconfigured");
321 }
322
323 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700324 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
325 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman356cda52018-06-15 17:46:28 -0700326 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700327 clusterCommunicator.addSubscriber(
Jordan Halterman356cda52018-06-15 17:46:28 -0700328 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800329 clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
330 GET_DEVICE_FLOW_COUNT,
331 serializer::decode,
332 p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
333 serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700334 clusterCommunicator.addSubscriber(
Jordan Halterman356cda52018-06-15 17:46:28 -0700335 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700336 }
337
338 private void unregisterMessageHandlers() {
339 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
Jordan Halterman356cda52018-06-15 17:46:28 -0700340 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700341 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
342 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
343 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
344 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
345 }
346
347 private void logConfig(String prefix) {
348 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman356cda52018-06-15 17:46:28 -0700349 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700350 }
351
Jon Hallfa132292017-10-24 11:11:24 -0700352 @Override
353 public int getFlowRuleCount() {
354 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman356cda52018-06-15 17:46:28 -0700355 .mapToInt(device -> getFlowRuleCount(device.id()))
356 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800357 }
358
359 @Override
360 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800361 return getFlowRuleCount(deviceId, null);
362 }
363
364 @Override
365 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700366 NodeId master = mastershipService.getMasterFor(deviceId);
367 if (master == null) {
368 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
369 return 0;
370 }
371
372 if (Objects.equals(local, master)) {
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800373 return flowTable.getFlowRuleCount(deviceId, state);
Jordan Halterman356cda52018-06-15 17:46:28 -0700374 }
375
376 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
377 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800378 Pair.of(deviceId, state),
379 GET_DEVICE_FLOW_COUNT,
380 serializer::encode,
381 serializer::decode,
382 master),
383 FLOW_RULE_STORE_TIMEOUT_MILLIS,
384 TimeUnit.MILLISECONDS,
385 0);
Jon Hallfa132292017-10-24 11:11:24 -0700386 }
387
388 @Override
389 public FlowEntry getFlowEntry(FlowRule rule) {
390 NodeId master = mastershipService.getMasterFor(rule.deviceId());
391
392 if (master == null) {
393 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
394 return null;
395 }
396
397 if (Objects.equals(local, master)) {
398 return flowTable.getFlowEntry(rule);
399 }
400
401 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman356cda52018-06-15 17:46:28 -0700402 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700403
404 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman356cda52018-06-15 17:46:28 -0700405 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
406 serializer::encode,
407 serializer::decode,
408 master),
409 FLOW_RULE_STORE_TIMEOUT_MILLIS,
410 TimeUnit.MILLISECONDS,
411 null);
Jon Hallfa132292017-10-24 11:11:24 -0700412 }
413
414 @Override
415 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman9b49e342019-04-17 11:05:16 -0700416 return flowTable.getFlowEntries(deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700417 }
418
419 @Override
420 public void storeFlowRule(FlowRule rule) {
421 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman356cda52018-06-15 17:46:28 -0700422 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
423 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700424 }
425
426 @Override
427 public void storeBatch(FlowRuleBatchOperation operation) {
428 if (operation.getOperations().isEmpty()) {
429 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman356cda52018-06-15 17:46:28 -0700430 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
431 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700432 return;
433 }
434
435 DeviceId deviceId = operation.deviceId();
436 NodeId master = mastershipService.getMasterFor(deviceId);
437
438 if (master == null) {
439 log.warn("No master for {} ", deviceId);
440
Jordan Halterman356cda52018-06-15 17:46:28 -0700441 Set<FlowRule> allFailures = operation.getOperations()
442 .stream()
443 .map(op -> op.target())
444 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700445 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman356cda52018-06-15 17:46:28 -0700446 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
447 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700448 return;
449 }
450
451 if (Objects.equals(local, master)) {
452 storeBatchInternal(operation);
453 return;
454 }
455
456 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman356cda52018-06-15 17:46:28 -0700457 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700458
459 clusterCommunicator.unicast(operation,
Jordan Halterman356cda52018-06-15 17:46:28 -0700460 APPLY_BATCH_FLOWS,
461 serializer::encode,
462 master)
463 .whenComplete((result, error) -> {
464 if (error != null) {
465 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700466
Jordan Halterman356cda52018-06-15 17:46:28 -0700467 Set<FlowRule> allFailures = operation.getOperations()
468 .stream()
469 .map(op -> op.target())
470 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700471
Jordan Halterman356cda52018-06-15 17:46:28 -0700472 notifyDelegate(FlowRuleBatchEvent.completed(
473 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
474 new CompletedBatchOperation(false, allFailures, deviceId)));
475 }
476 });
Jon Hallfa132292017-10-24 11:11:24 -0700477 }
478
479 private void storeBatchInternal(FlowRuleBatchOperation operation) {
480
481 final DeviceId did = operation.deviceId();
482 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
483 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
484 if (currentOps.isEmpty()) {
485 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman356cda52018-06-15 17:46:28 -0700486 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
487 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700488 return;
489 }
490
491 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman356cda52018-06-15 17:46:28 -0700492 FlowRuleBatchRequest(operation.id(),
493 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700494 }
495
496 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
497 return operation.getOperations().stream().map(
Jordan Halterman356cda52018-06-15 17:46:28 -0700498 op -> {
499 StoredFlowEntry entry;
500 switch (op.operator()) {
501 case ADD:
502 entry = new DefaultFlowEntry(op.target());
503 flowTable.add(entry);
504 return op;
505 case MODIFY:
506 entry = new DefaultFlowEntry(op.target());
507 flowTable.update(entry);
508 return op;
509 case REMOVE:
510 return flowTable.update(op.target(), stored -> {
511 stored.setState(FlowEntryState.PENDING_REMOVE);
512 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800513 return op;
Jordan Halterman356cda52018-06-15 17:46:28 -0700514 });
515 default:
516 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700517 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700518 return null;
519 }
Jon Hallfa132292017-10-24 11:11:24 -0700520 ).filter(Objects::nonNull).collect(Collectors.toSet());
521 }
522
523 @Override
524 public void deleteFlowRule(FlowRule rule) {
525 storeBatch(
Jordan Halterman356cda52018-06-15 17:46:28 -0700526 new FlowRuleBatchOperation(
527 Collections.singletonList(
528 new FlowRuleBatchEntry(
529 FlowRuleOperation.REMOVE,
530 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700531 }
532
533 @Override
534 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
535 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700536 return flowTable.update(rule, stored -> {
537 if (stored.state() == FlowEntryState.PENDING_ADD) {
538 stored.setState(FlowEntryState.PENDING_ADD);
539 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
540 }
541 return null;
542 });
Jon Hallfa132292017-10-24 11:11:24 -0700543 }
544 return null;
545 }
546
547 @Override
548 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
549 NodeId master = mastershipService.getMasterFor(rule.deviceId());
550 if (Objects.equals(local, master)) {
551 return addOrUpdateFlowRuleInternal(rule);
552 }
553
554 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman356cda52018-06-15 17:46:28 -0700555 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700556 return null;
557 }
558
559 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700560 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700561 stored.setBytes(rule.bytes());
562 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
563 stored.setLiveType(rule.liveType());
564 stored.setPackets(rule.packets());
565 stored.setLastSeen();
566 if (stored.state() == FlowEntryState.PENDING_ADD) {
567 stored.setState(FlowEntryState.ADDED);
568 return new FlowRuleEvent(Type.RULE_ADDED, rule);
569 }
570 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman356cda52018-06-15 17:46:28 -0700571 });
572 if (event != null) {
573 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700574 }
575
576 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
577 // TODO: also update backup if the behavior is correct.
578 flowTable.add(rule);
579 return null;
580 }
581
582 @Override
583 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
584 final DeviceId deviceId = rule.deviceId();
585 NodeId master = mastershipService.getMasterFor(deviceId);
586
587 if (Objects.equals(local, master)) {
588 // bypass and handle it locally
589 return removeFlowRuleInternal(rule);
590 }
591
592 if (master == null) {
593 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
594 // TODO: revisit if this should be null (="no-op") or Exception
595 return null;
596 }
597
598 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman356cda52018-06-15 17:46:28 -0700599 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700600
Jordan Halterman356cda52018-06-15 17:46:28 -0700601 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
602 rule,
603 REMOVE_FLOW_ENTRY,
604 serializer::encode,
605 serializer::decode,
606 master),
607 FLOW_RULE_STORE_TIMEOUT_MILLIS,
608 TimeUnit.MILLISECONDS,
609 null);
Jon Hallfa132292017-10-24 11:11:24 -0700610 }
611
612 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700613 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800614 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700615 // rule may be partial rule that is missing treatment, we should use rule from store instead
616 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
617 }
618
619 @Override
620 public void purgeFlowRule(DeviceId deviceId) {
621 flowTable.purgeFlowRule(deviceId);
622 }
623
624 @Override
625 public void purgeFlowRules() {
626 flowTable.purgeFlowRules();
627 }
628
629 @Override
630 public void batchOperationComplete(FlowRuleBatchEvent event) {
631 //FIXME: need a per device pending response
632 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
633 if (nodeId == null) {
634 notifyDelegate(event);
635 } else {
636 // TODO check unicast return value
637 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
638 //error log: log.warn("Failed to respond to peer for batch operation result");
639 }
640 }
641
642 private final class OnStoreBatch implements ClusterMessageHandler {
643
644 @Override
645 public void handle(final ClusterMessage message) {
646 FlowRuleBatchOperation operation = serializer.decode(message.payload());
647 log.debug("received batch request {}", operation);
648
649 final DeviceId deviceId = operation.deviceId();
650 NodeId master = mastershipService.getMasterFor(deviceId);
651 if (!Objects.equals(local, master)) {
652 Set<FlowRule> failures = new HashSet<>(operation.size());
653 for (FlowRuleBatchEntry op : operation.getOperations()) {
654 failures.add(op.target());
655 }
656 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
657 // This node is no longer the master, respond as all failed.
658 // TODO: we might want to wrap response in envelope
659 // to distinguish sw programming failure and hand over
660 // it make sense in the latter case to retry immediately.
661 message.respond(serializer.encode(allFailed));
662 return;
663 }
664
665 pendingResponses.put(operation.id(), message.sender());
666 storeBatchInternal(operation);
667 }
668 }
669
Jordan Halterman356cda52018-06-15 17:46:28 -0700670 private class InternalFlowTable implements DeviceListener {
671 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700672
673 @Override
Jordan Halterman356cda52018-06-15 17:46:28 -0700674 public void event(DeviceEvent event) {
675 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
676 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700677 }
678 }
679
Jordan Halterman356cda52018-06-15 17:46:28 -0700680 /**
681 * Adds the given device to the flow table.
682 *
683 * @param deviceId the device to add to the table
684 */
685 public void addDevice(DeviceId deviceId) {
686 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
687 id,
688 clusterService,
689 clusterCommunicator,
690 new InternalLifecycleManager(id),
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800691 backupScheduler,
692 new OrderedExecutor(backupExecutor),
Jordan Halterman356cda52018-06-15 17:46:28 -0700693 backupPeriod,
694 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700695 }
696
Jordan Halterman356cda52018-06-15 17:46:28 -0700697 /**
698 * Sets the flow table backup period.
699 *
700 * @param backupPeriod the flow table backup period
701 */
702 void setBackupPeriod(int backupPeriod) {
703 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700704 }
705
Jordan Halterman356cda52018-06-15 17:46:28 -0700706 /**
707 * Sets the flow table anti-entropy period.
708 *
709 * @param antiEntropyPeriod the flow table anti-entropy period
710 */
711 void setAntiEntropyPeriod(int antiEntropyPeriod) {
712 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700713 }
714
Jordan Halterman356cda52018-06-15 17:46:28 -0700715 /**
716 * Returns the flow table for a specific device.
717 *
718 * @param deviceId the device identifier
719 * @return the flow table for the given device
720 */
721 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
722 DeviceFlowTable flowTable = flowTables.get(deviceId);
723 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
724 deviceId,
725 clusterService,
726 clusterCommunicator,
727 new InternalLifecycleManager(deviceId),
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800728 backupScheduler,
729 new OrderedExecutor(backupExecutor),
Jordan Halterman356cda52018-06-15 17:46:28 -0700730 backupPeriod,
731 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700732 }
733
Jordan Halterman356cda52018-06-15 17:46:28 -0700734 /**
735 * Returns the flow rule count for the given device.
736 *
737 * @param deviceId the device for which to return the flow rule count
738 * @return the flow rule count for the given device
739 */
740 public int getFlowRuleCount(DeviceId deviceId) {
741 return getFlowTable(deviceId).count();
742 }
743
744 /**
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800745 * Returns the count of flow rules in the given state for the given device.
746 *
747 * @param deviceId the device for which to return the flow rule count
748 * @return the flow rule count for the given device
749 */
750 public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
751 if (state == null) {
752 return getFlowRuleCount(deviceId);
753 }
Jordan Halterman9b49e342019-04-17 11:05:16 -0700754 return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
Jordan Halterman7a9bfb82019-03-04 18:12:20 -0800755 .filter(rule -> rule.state() == state)
756 .count();
757 }
758
759 /**
Jordan Halterman356cda52018-06-15 17:46:28 -0700760 * Returns the flow entry for the given rule.
761 *
762 * @param rule the rule for which to return the flow entry
763 * @return the flow entry for the given rule
764 */
Jon Hallfa132292017-10-24 11:11:24 -0700765 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700766 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700767 }
768
Jordan Halterman356cda52018-06-15 17:46:28 -0700769 /**
770 * Returns the set of flow entries for the given device.
771 *
772 * @param deviceId the device for which to lookup flow entries
773 * @return the set of flow entries for the given device
774 */
Jordan Halterman9b49e342019-04-17 11:05:16 -0700775 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
776 try {
777 return getFlowTable(deviceId).getFlowEntries()
778 .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
779 } catch (ExecutionException e) {
780 throw new RuntimeException(e.getCause());
781 } catch (TimeoutException | InterruptedException e) {
782 throw new RuntimeException(e);
783 }
Jon Hallfa132292017-10-24 11:11:24 -0700784 }
785
Jordan Halterman356cda52018-06-15 17:46:28 -0700786 /**
787 * Adds the given flow rule.
788 *
789 * @param rule the rule to add
790 */
Jon Hallfa132292017-10-24 11:11:24 -0700791 public void add(FlowEntry rule) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700792 Tools.futureGetOrElse(
793 getFlowTable(rule.deviceId()).add(rule),
794 FLOW_RULE_STORE_TIMEOUT_MILLIS,
795 TimeUnit.MILLISECONDS,
796 null);
Jon Hallfa132292017-10-24 11:11:24 -0700797 }
798
Jordan Halterman356cda52018-06-15 17:46:28 -0700799 /**
800 * Updates the given flow rule.
801 *
802 * @param rule the rule to update
803 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800804 public void update(FlowEntry rule) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700805 Tools.futureGetOrElse(
806 getFlowTable(rule.deviceId()).update(rule),
807 FLOW_RULE_STORE_TIMEOUT_MILLIS,
808 TimeUnit.MILLISECONDS,
809 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800810 }
811
Jordan Halterman356cda52018-06-15 17:46:28 -0700812 /**
813 * Applies the given update function to the rule.
814 *
815 * @param function the update function to apply
816 * @return a future to be completed with the update event or {@code null} if the rule was not updated
817 */
818 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
819 return Tools.futureGetOrElse(
820 getFlowTable(rule.deviceId()).update(rule, function),
821 FLOW_RULE_STORE_TIMEOUT_MILLIS,
822 TimeUnit.MILLISECONDS,
823 null);
824 }
825
826 /**
827 * Removes the given flow rule.
828 *
829 * @param rule the rule to remove
830 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800831 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700832 return Tools.futureGetOrElse(
833 getFlowTable(rule.deviceId()).remove(rule),
834 FLOW_RULE_STORE_TIMEOUT_MILLIS,
835 TimeUnit.MILLISECONDS,
836 null);
Jon Hallfa132292017-10-24 11:11:24 -0700837 }
838
Jordan Halterman356cda52018-06-15 17:46:28 -0700839 /**
840 * Purges flow rules for the given device.
841 *
842 * @param deviceId the device for which to purge flow rules
843 */
Jon Hallfa132292017-10-24 11:11:24 -0700844 public void purgeFlowRule(DeviceId deviceId) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700845 DeviceFlowTable flowTable = flowTables.remove(deviceId);
846 if (flowTable != null) {
847 flowTable.close();
848 }
Jon Hallfa132292017-10-24 11:11:24 -0700849 }
850
Jordan Halterman356cda52018-06-15 17:46:28 -0700851 /**
852 * Purges all flow rules from the table.
853 */
Jon Hallfa132292017-10-24 11:11:24 -0700854 public void purgeFlowRules() {
Jordan Halterman356cda52018-06-15 17:46:28 -0700855 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
856 while (iterator.hasNext()) {
857 iterator.next().close();
858 iterator.remove();
Jordan Halterman7ae81b82018-06-12 11:23:33 -0700859 }
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700860 }
Jon Hallfa132292017-10-24 11:11:24 -0700861 }
862
863 @Override
Jordan Halterman000e4102018-06-12 15:34:19 -0700864 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700865 deviceTableStats.put(deviceId, tableStats);
866 return null;
867 }
868
869 @Override
870 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
871 NodeId master = mastershipService.getMasterFor(deviceId);
872
873 if (master == null) {
874 log.debug("Failed to getTableStats: No master for {}", deviceId);
875 return Collections.emptyList();
876 }
877
878 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
879 if (tableStats == null) {
880 return Collections.emptyList();
881 }
882 return ImmutableList.copyOf(tableStats);
883 }
884
885 @Override
886 public long getActiveFlowRuleCount(DeviceId deviceId) {
887 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman356cda52018-06-15 17:46:28 -0700888 .mapToLong(TableStatisticsEntry::activeFlowEntries)
889 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700890 }
891
892 private class InternalTableStatsListener
893 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
894 @Override
895 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman356cda52018-06-15 17:46:28 -0700896 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700897 //TODO: Generate an event to listeners (do we need?)
898 }
899 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700900
901 /**
902 * Device lifecycle manager implementation.
903 */
904 private final class InternalLifecycleManager
905 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
906 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
907
908 private final DeviceId deviceId;
909
910 private volatile DeviceReplicaInfo replicaInfo;
911
912 InternalLifecycleManager(DeviceId deviceId) {
913 this.deviceId = deviceId;
914 replicaInfoManager.addListener(this);
915 mastershipTermLifecycles.addListener(this);
916 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
917 }
918
919 @Override
920 public DeviceReplicaInfo getReplicaInfo() {
921 return replicaInfo;
922 }
923
924 @Override
925 public void activate(long term) {
926 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
927 if (replicaInfo != null && replicaInfo.term() == term) {
928 mastershipTermLifecycles.put(deviceId, term);
929 }
930 }
931
932 @Override
933 public void event(ReplicaInfoEvent event) {
Jordan Halterman97cd2722018-06-22 16:58:08 -0700934 if (event.subject().equals(deviceId)) {
Jordan Halterman356cda52018-06-15 17:46:28 -0700935 onReplicaInfoChange(event.replicaInfo());
936 }
937 }
938
939 @Override
940 public void event(MapEvent<DeviceId, Long> event) {
941 if (event.key().equals(deviceId) && event.newValue() != null) {
942 onActivate(event.newValue().value());
943 }
944 }
945
946 /**
947 * Handles a term activation event.
948 *
949 * @param term the term that was activated
950 */
951 private void onActivate(long term) {
952 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
953 if (replicaInfo != null && replicaInfo.term() == term) {
954 NodeId master = replicaInfo.master().orElse(null);
955 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800956 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman356cda52018-06-15 17:46:28 -0700957 listenerRegistry.process(new LifecycleEvent(
958 LifecycleEvent.Type.TERM_ACTIVE,
959 new DeviceReplicaInfo(term, master, backups)));
960 }
961 }
962
963 /**
964 * Handles a replica info change event.
965 *
966 * @param replicaInfo the updated replica info
967 */
968 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
969 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
970 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
971 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
972 if (oldReplicaInfo != null) {
973 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
974 }
975 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Halterman97cd2722018-06-22 16:58:08 -0700976 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
977 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman356cda52018-06-15 17:46:28 -0700978 }
979 }
980
981 /**
982 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
983 *
984 * @param replicaInfo the replica info to convert
985 * @return the converted replica info
986 */
987 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
988 NodeId master = replicaInfo.master().orElse(null);
989 List<NodeId> backups = replicaInfo.backups()
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800990 .subList(0, min(replicaInfo.backups().size(), backupCount));
Jordan Halterman356cda52018-06-15 17:46:28 -0700991 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
992 }
993
994 @Override
995 public void close() {
996 replicaInfoManager.removeListener(this);
997 mastershipTermLifecycles.removeListener(this);
998 }
999 }
Jordan Halterman7a9bfb82019-03-04 18:12:20 -08001000
1001 private static class CountMessage {
1002 private final DeviceId deviceId;
1003 private final FlowEntryState state;
1004
1005 CountMessage(DeviceId deviceId, FlowEntryState state) {
1006 this.deviceId = deviceId;
1007 this.state = state;
1008 }
1009 }
Jordan Halterman000e4102018-06-12 15:34:19 -07001010}