blob: b24414b5d914671e452675958fa0e83d5acd6f38 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070020import com.google.common.collect.Streams;
Jon Hallfa132292017-10-24 11:11:24 -070021import org.onlab.util.KryoNamespace;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.core.CoreService;
27import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070029import org.onosproject.mastership.MastershipService;
30import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070031import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070033import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.CompletedBatchOperation;
35import org.onosproject.net.flow.DefaultFlowEntry;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070038import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070039import org.onosproject.net.flow.FlowRuleEvent;
40import org.onosproject.net.flow.FlowRuleEvent.Type;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.FlowRuleStore;
43import org.onosproject.net.flow.FlowRuleStoreDelegate;
44import org.onosproject.net.flow.StoredFlowEntry;
45import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070046import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
47import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
48import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
49import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
50import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070051import org.onosproject.persistence.PersistenceService;
52import org.onosproject.store.AbstractStore;
53import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
54import org.onosproject.store.cluster.messaging.ClusterMessage;
55import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070056import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070057import org.onosproject.store.flow.ReplicaInfoEvent;
58import org.onosproject.store.flow.ReplicaInfoEventListener;
59import org.onosproject.store.flow.ReplicaInfoService;
60import org.onosproject.store.impl.MastershipBasedTimestamp;
61import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070062import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070063import org.onosproject.store.service.EventuallyConsistentMap;
64import org.onosproject.store.service.EventuallyConsistentMapEvent;
65import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070066import org.onosproject.store.service.MapEvent;
67import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070068import org.onosproject.store.service.Serializer;
69import org.onosproject.store.service.StorageService;
70import org.onosproject.store.service.WallClockTimestamp;
71import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
75import org.osgi.service.component.annotations.Modified;
76import org.osgi.service.component.annotations.Reference;
77import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -070078import org.slf4j.Logger;
79
Ray Milkeyd84f89b2018-08-17 14:54:17 -070080import java.util.Collections;
81import java.util.Dictionary;
82import java.util.HashSet;
83import java.util.Iterator;
84import java.util.List;
85import java.util.Map;
86import java.util.Objects;
87import java.util.Set;
88import java.util.concurrent.ExecutorService;
89import java.util.concurrent.Executors;
90import java.util.concurrent.ScheduledExecutorService;
91import java.util.concurrent.TimeUnit;
92import java.util.function.Function;
93import java.util.stream.Collectors;
94
Jon Hallfa132292017-10-24 11:11:24 -070095import static com.google.common.base.Strings.isNullOrEmpty;
96import static org.onlab.util.Tools.get;
97import static org.onlab.util.Tools.groupedThreads;
98import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -070099import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
100import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
106import static org.slf4j.LoggerFactory.getLogger;
107
108/**
109 * Manages inventory of flow rules using a distributed state management protocol.
110 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700111@Component(immediate = true, service = FlowRuleStore.class)
Jon Hallfa132292017-10-24 11:11:24 -0700112public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700113 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
114 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700115
116 private final Logger log = getLogger(getClass());
117
118 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
119 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
120 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
121 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Jordan Halterman5259b332018-06-12 15:34:19 -0700122 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700123 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700124
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700125 //@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
126 // label = "Number of threads in the message handler pool")
Jon Hallfa132292017-10-24 11:11:24 -0700127 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
128
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700129 //@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
130 // label = "Delay in ms between successive backup runs")
Jon Hallfa132292017-10-24 11:11:24 -0700131 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Jordan Halterman5259b332018-06-12 15:34:19 -0700132
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700133 //@Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
134 // label = "Delay in ms between anti-entropy runs")
Jordan Halterman5259b332018-06-12 15:34:19 -0700135 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
136
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700137 //@Property(name = "persistenceEnabled", boolValue = false,
138 // label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Jon Hallfa132292017-10-24 11:11:24 -0700139 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
140
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700141 //@Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
142 // label = "Max number of backup copies for each device")
Jon Hallfa132292017-10-24 11:11:24 -0700143 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
144
145 private InternalFlowTable flowTable = new InternalFlowTable();
146
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700148 protected ReplicaInfoService replicaInfoManager;
149
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700151 protected ClusterCommunicationService clusterCommunicator;
152
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700154 protected ClusterService clusterService;
155
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700157 protected DeviceService deviceService;
158
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700160 protected CoreService coreService;
161
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700163 protected ComponentConfigService configService;
164
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700166 protected MastershipService mastershipService;
167
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700169 protected PersistenceService persistenceService;
170
171 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
172 private ExecutorService messageHandlingExecutor;
173 private ExecutorService eventHandler;
174
Jon Hallfa132292017-10-24 11:11:24 -0700175 private final ScheduledExecutorService backupSenderExecutor =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700176 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
Jon Hallfa132292017-10-24 11:11:24 -0700177
178 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
179 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700180 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700181
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700182 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700183 protected StorageService storageService;
184
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700185 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
186 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700187 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700188 .register(FlowBucket.class)
189 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700190
191 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700192 .register(KryoNamespaces.API)
193 .register(BucketId.class)
194 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700195
Jordan Halterman281dbf32018-06-15 17:46:28 -0700196 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700197
198 private IdGenerator idGenerator;
199 private NodeId local;
200
201 @Activate
202 public void activate(ComponentContext context) {
203 configService.registerProperties(getClass());
204
205 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
206
207 local = clusterService.getLocalNode().id();
208
209 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700210 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700211 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700212 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700213
214 registerMessageHandlers(messageHandlingExecutor);
215
Jordan Halterman281dbf32018-06-15 17:46:28 -0700216 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
217 .withName("onos-flow-store-terms")
218 .withSerializer(serializer)
219 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800220
Jon Hallfa132292017-10-24 11:11:24 -0700221 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700222 .withName("onos-flow-table-stats")
223 .withSerializer(serializerBuilder)
224 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
225 .withTimestampProvider((k, v) -> new WallClockTimestamp())
226 .withTombstonesDisabled()
227 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700228 deviceTableStats.addListener(tableStatsListener);
229
Jordan Halterman281dbf32018-06-15 17:46:28 -0700230 deviceService.addListener(flowTable);
231 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
232
Jon Hallfa132292017-10-24 11:11:24 -0700233 logConfig("Started");
234 }
235
236 @Deactivate
237 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700238 configService.unregisterProperties(getClass(), false);
239 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700240 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700241 deviceTableStats.removeListener(tableStatsListener);
242 deviceTableStats.destroy();
243 eventHandler.shutdownNow();
244 messageHandlingExecutor.shutdownNow();
245 backupSenderExecutor.shutdownNow();
246 log.info("Stopped");
247 }
248
249 @SuppressWarnings("rawtypes")
250 @Modified
251 public void modified(ComponentContext context) {
252 if (context == null) {
253 logConfig("Default config");
254 return;
255 }
256
257 Dictionary properties = context.getProperties();
258 int newPoolSize;
259 int newBackupPeriod;
260 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700261 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700262 try {
263 String s = get(properties, "msgHandlerPoolSize");
264 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
265
266 s = get(properties, "backupPeriod");
267 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
268
269 s = get(properties, "backupCount");
270 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700271
272 s = get(properties, "antiEntropyPeriod");
273 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700274 } catch (NumberFormatException | ClassCastException e) {
275 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
276 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
277 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700278 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700279 }
280
Jon Hallfa132292017-10-24 11:11:24 -0700281 if (newBackupPeriod != backupPeriod) {
282 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700283 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700284 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700285
286 if (newAntiEntropyPeriod != antiEntropyPeriod) {
287 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700288 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700289 }
290
Jon Hallfa132292017-10-24 11:11:24 -0700291 if (newPoolSize != msgHandlerPoolSize) {
292 msgHandlerPoolSize = newPoolSize;
293 ExecutorService oldMsgHandler = messageHandlingExecutor;
294 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700295 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700296
297 // replace previously registered handlers.
298 registerMessageHandlers(messageHandlingExecutor);
299 oldMsgHandler.shutdown();
300 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700301
Jon Hallfa132292017-10-24 11:11:24 -0700302 if (backupCount != newBackupCount) {
303 backupCount = newBackupCount;
304 }
305 logConfig("Reconfigured");
306 }
307
308 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700309 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
310 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700311 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700312 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700313 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700314 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700315 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700316 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700317 GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700318 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700319 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700320 }
321
322 private void unregisterMessageHandlers() {
323 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
324 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700325 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700326 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
327 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
328 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
329 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
330 }
331
332 private void logConfig(String prefix) {
333 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700334 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700335 }
336
Jon Hallfa132292017-10-24 11:11:24 -0700337 @Override
338 public int getFlowRuleCount() {
339 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700340 .mapToInt(device -> getFlowRuleCount(device.id()))
341 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800342 }
343
344 @Override
345 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700346 NodeId master = mastershipService.getMasterFor(deviceId);
347 if (master == null) {
348 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
349 return 0;
350 }
351
352 if (Objects.equals(local, master)) {
353 return flowTable.getFlowRuleCount(deviceId);
354 }
355
356 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
357 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
358 deviceId,
359 GET_DEVICE_FLOW_COUNT,
360 serializer::encode,
361 serializer::decode,
362 master),
363 FLOW_RULE_STORE_TIMEOUT_MILLIS,
364 TimeUnit.MILLISECONDS,
365 0);
Jon Hallfa132292017-10-24 11:11:24 -0700366 }
367
368 @Override
369 public FlowEntry getFlowEntry(FlowRule rule) {
370 NodeId master = mastershipService.getMasterFor(rule.deviceId());
371
372 if (master == null) {
373 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
374 return null;
375 }
376
377 if (Objects.equals(local, master)) {
378 return flowTable.getFlowEntry(rule);
379 }
380
381 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700382 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700383
384 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700385 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
386 serializer::encode,
387 serializer::decode,
388 master),
389 FLOW_RULE_STORE_TIMEOUT_MILLIS,
390 TimeUnit.MILLISECONDS,
391 null);
Jon Hallfa132292017-10-24 11:11:24 -0700392 }
393
394 @Override
395 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
396 NodeId master = mastershipService.getMasterFor(deviceId);
397
398 if (master == null) {
399 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
400 return Collections.emptyList();
401 }
402
403 if (Objects.equals(local, master)) {
404 return flowTable.getFlowEntries(deviceId);
405 }
406
407 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700408 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700409
410 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700411 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
412 serializer::encode,
413 serializer::decode,
414 master),
415 FLOW_RULE_STORE_TIMEOUT_MILLIS,
416 TimeUnit.MILLISECONDS,
417 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700418 }
419
420 @Override
421 public void storeFlowRule(FlowRule rule) {
422 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700423 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
424 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700425 }
426
427 @Override
428 public void storeBatch(FlowRuleBatchOperation operation) {
429 if (operation.getOperations().isEmpty()) {
430 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700431 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
432 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700433 return;
434 }
435
436 DeviceId deviceId = operation.deviceId();
437 NodeId master = mastershipService.getMasterFor(deviceId);
438
439 if (master == null) {
440 log.warn("No master for {} ", deviceId);
441
Jordan Halterman281dbf32018-06-15 17:46:28 -0700442 Set<FlowRule> allFailures = operation.getOperations()
443 .stream()
444 .map(op -> op.target())
445 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700446 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700447 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
448 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700449 return;
450 }
451
452 if (Objects.equals(local, master)) {
453 storeBatchInternal(operation);
454 return;
455 }
456
457 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700458 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700459
460 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700461 APPLY_BATCH_FLOWS,
462 serializer::encode,
463 master)
464 .whenComplete((result, error) -> {
465 if (error != null) {
466 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700467
Jordan Halterman281dbf32018-06-15 17:46:28 -0700468 Set<FlowRule> allFailures = operation.getOperations()
469 .stream()
470 .map(op -> op.target())
471 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700472
Jordan Halterman281dbf32018-06-15 17:46:28 -0700473 notifyDelegate(FlowRuleBatchEvent.completed(
474 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
475 new CompletedBatchOperation(false, allFailures, deviceId)));
476 }
477 });
Jon Hallfa132292017-10-24 11:11:24 -0700478 }
479
480 private void storeBatchInternal(FlowRuleBatchOperation operation) {
481
482 final DeviceId did = operation.deviceId();
483 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
484 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
485 if (currentOps.isEmpty()) {
486 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700487 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
488 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700489 return;
490 }
491
492 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700493 FlowRuleBatchRequest(operation.id(),
494 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700495 }
496
497 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
498 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700499 op -> {
500 StoredFlowEntry entry;
501 switch (op.operator()) {
502 case ADD:
503 entry = new DefaultFlowEntry(op.target());
504 flowTable.add(entry);
505 return op;
506 case MODIFY:
507 entry = new DefaultFlowEntry(op.target());
508 flowTable.update(entry);
509 return op;
510 case REMOVE:
511 return flowTable.update(op.target(), stored -> {
512 stored.setState(FlowEntryState.PENDING_REMOVE);
513 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800514 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700515 });
516 default:
517 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700518 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700519 return null;
520 }
Jon Hallfa132292017-10-24 11:11:24 -0700521 ).filter(Objects::nonNull).collect(Collectors.toSet());
522 }
523
524 @Override
525 public void deleteFlowRule(FlowRule rule) {
526 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700527 new FlowRuleBatchOperation(
528 Collections.singletonList(
529 new FlowRuleBatchEntry(
530 FlowRuleOperation.REMOVE,
531 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700532 }
533
534 @Override
535 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
536 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700537 return flowTable.update(rule, stored -> {
538 if (stored.state() == FlowEntryState.PENDING_ADD) {
539 stored.setState(FlowEntryState.PENDING_ADD);
540 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
541 }
542 return null;
543 });
Jon Hallfa132292017-10-24 11:11:24 -0700544 }
545 return null;
546 }
547
548 @Override
549 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
550 NodeId master = mastershipService.getMasterFor(rule.deviceId());
551 if (Objects.equals(local, master)) {
552 return addOrUpdateFlowRuleInternal(rule);
553 }
554
555 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700556 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700557 return null;
558 }
559
560 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700561 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700562 stored.setBytes(rule.bytes());
563 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
564 stored.setLiveType(rule.liveType());
565 stored.setPackets(rule.packets());
566 stored.setLastSeen();
567 if (stored.state() == FlowEntryState.PENDING_ADD) {
568 stored.setState(FlowEntryState.ADDED);
569 return new FlowRuleEvent(Type.RULE_ADDED, rule);
570 }
571 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700572 });
573 if (event != null) {
574 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700575 }
576
577 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
578 // TODO: also update backup if the behavior is correct.
579 flowTable.add(rule);
580 return null;
581 }
582
583 @Override
584 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
585 final DeviceId deviceId = rule.deviceId();
586 NodeId master = mastershipService.getMasterFor(deviceId);
587
588 if (Objects.equals(local, master)) {
589 // bypass and handle it locally
590 return removeFlowRuleInternal(rule);
591 }
592
593 if (master == null) {
594 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
595 // TODO: revisit if this should be null (="no-op") or Exception
596 return null;
597 }
598
599 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700600 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700601
Jordan Halterman281dbf32018-06-15 17:46:28 -0700602 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
603 rule,
604 REMOVE_FLOW_ENTRY,
605 serializer::encode,
606 serializer::decode,
607 master),
608 FLOW_RULE_STORE_TIMEOUT_MILLIS,
609 TimeUnit.MILLISECONDS,
610 null);
Jon Hallfa132292017-10-24 11:11:24 -0700611 }
612
613 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700614 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800615 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700616 // rule may be partial rule that is missing treatment, we should use rule from store instead
617 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
618 }
619
620 @Override
621 public void purgeFlowRule(DeviceId deviceId) {
622 flowTable.purgeFlowRule(deviceId);
623 }
624
625 @Override
626 public void purgeFlowRules() {
627 flowTable.purgeFlowRules();
628 }
629
630 @Override
631 public void batchOperationComplete(FlowRuleBatchEvent event) {
632 //FIXME: need a per device pending response
633 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
634 if (nodeId == null) {
635 notifyDelegate(event);
636 } else {
637 // TODO check unicast return value
638 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
639 //error log: log.warn("Failed to respond to peer for batch operation result");
640 }
641 }
642
643 private final class OnStoreBatch implements ClusterMessageHandler {
644
645 @Override
646 public void handle(final ClusterMessage message) {
647 FlowRuleBatchOperation operation = serializer.decode(message.payload());
648 log.debug("received batch request {}", operation);
649
650 final DeviceId deviceId = operation.deviceId();
651 NodeId master = mastershipService.getMasterFor(deviceId);
652 if (!Objects.equals(local, master)) {
653 Set<FlowRule> failures = new HashSet<>(operation.size());
654 for (FlowRuleBatchEntry op : operation.getOperations()) {
655 failures.add(op.target());
656 }
657 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
658 // This node is no longer the master, respond as all failed.
659 // TODO: we might want to wrap response in envelope
660 // to distinguish sw programming failure and hand over
661 // it make sense in the latter case to retry immediately.
662 message.respond(serializer.encode(allFailed));
663 return;
664 }
665
666 pendingResponses.put(operation.id(), message.sender());
667 storeBatchInternal(operation);
668 }
669 }
670
Jordan Halterman281dbf32018-06-15 17:46:28 -0700671 private class InternalFlowTable implements DeviceListener {
672 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700673
674 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700675 public void event(DeviceEvent event) {
676 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
677 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700678 }
679 }
680
Jordan Halterman281dbf32018-06-15 17:46:28 -0700681 /**
682 * Adds the given device to the flow table.
683 *
684 * @param deviceId the device to add to the table
685 */
686 public void addDevice(DeviceId deviceId) {
687 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
688 id,
689 clusterService,
690 clusterCommunicator,
691 new InternalLifecycleManager(id),
692 backupSenderExecutor,
693 backupPeriod,
694 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700695 }
696
Jordan Halterman281dbf32018-06-15 17:46:28 -0700697 /**
698 * Sets the flow table backup period.
699 *
700 * @param backupPeriod the flow table backup period
701 */
702 void setBackupPeriod(int backupPeriod) {
703 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700704 }
705
Jordan Halterman281dbf32018-06-15 17:46:28 -0700706 /**
707 * Sets the flow table anti-entropy period.
708 *
709 * @param antiEntropyPeriod the flow table anti-entropy period
710 */
711 void setAntiEntropyPeriod(int antiEntropyPeriod) {
712 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700713 }
714
Jordan Halterman281dbf32018-06-15 17:46:28 -0700715 /**
716 * Returns the flow table for a specific device.
717 *
718 * @param deviceId the device identifier
719 * @return the flow table for the given device
720 */
721 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
722 DeviceFlowTable flowTable = flowTables.get(deviceId);
723 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
724 deviceId,
725 clusterService,
726 clusterCommunicator,
727 new InternalLifecycleManager(deviceId),
728 backupSenderExecutor,
729 backupPeriod,
730 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700731 }
732
Jordan Halterman281dbf32018-06-15 17:46:28 -0700733 /**
734 * Returns the flow rule count for the given device.
735 *
736 * @param deviceId the device for which to return the flow rule count
737 * @return the flow rule count for the given device
738 */
739 public int getFlowRuleCount(DeviceId deviceId) {
740 return getFlowTable(deviceId).count();
741 }
742
743 /**
744 * Returns the flow entry for the given rule.
745 *
746 * @param rule the rule for which to return the flow entry
747 * @return the flow entry for the given rule
748 */
Jon Hallfa132292017-10-24 11:11:24 -0700749 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700750 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700751 }
752
Jordan Halterman281dbf32018-06-15 17:46:28 -0700753 /**
754 * Returns the set of flow entries for the given device.
755 *
756 * @param deviceId the device for which to lookup flow entries
757 * @return the set of flow entries for the given device
758 */
Jon Hallfa132292017-10-24 11:11:24 -0700759 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700760 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700761 }
762
Jordan Halterman281dbf32018-06-15 17:46:28 -0700763 /**
764 * Adds the given flow rule.
765 *
766 * @param rule the rule to add
767 */
Jon Hallfa132292017-10-24 11:11:24 -0700768 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700769 Tools.futureGetOrElse(
770 getFlowTable(rule.deviceId()).add(rule),
771 FLOW_RULE_STORE_TIMEOUT_MILLIS,
772 TimeUnit.MILLISECONDS,
773 null);
Jon Hallfa132292017-10-24 11:11:24 -0700774 }
775
Jordan Halterman281dbf32018-06-15 17:46:28 -0700776 /**
777 * Updates the given flow rule.
778 *
779 * @param rule the rule to update
780 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800781 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700782 Tools.futureGetOrElse(
783 getFlowTable(rule.deviceId()).update(rule),
784 FLOW_RULE_STORE_TIMEOUT_MILLIS,
785 TimeUnit.MILLISECONDS,
786 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800787 }
788
Jordan Halterman281dbf32018-06-15 17:46:28 -0700789 /**
790 * Applies the given update function to the rule.
791 *
792 * @param function the update function to apply
793 * @return a future to be completed with the update event or {@code null} if the rule was not updated
794 */
795 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
796 return Tools.futureGetOrElse(
797 getFlowTable(rule.deviceId()).update(rule, function),
798 FLOW_RULE_STORE_TIMEOUT_MILLIS,
799 TimeUnit.MILLISECONDS,
800 null);
801 }
802
803 /**
804 * Removes the given flow rule.
805 *
806 * @param rule the rule to remove
807 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800808 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700809 return Tools.futureGetOrElse(
810 getFlowTable(rule.deviceId()).remove(rule),
811 FLOW_RULE_STORE_TIMEOUT_MILLIS,
812 TimeUnit.MILLISECONDS,
813 null);
Jon Hallfa132292017-10-24 11:11:24 -0700814 }
815
Jordan Halterman281dbf32018-06-15 17:46:28 -0700816 /**
817 * Purges flow rules for the given device.
818 *
819 * @param deviceId the device for which to purge flow rules
820 */
Jon Hallfa132292017-10-24 11:11:24 -0700821 public void purgeFlowRule(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700822 DeviceFlowTable flowTable = flowTables.remove(deviceId);
823 if (flowTable != null) {
824 flowTable.close();
825 }
Jon Hallfa132292017-10-24 11:11:24 -0700826 }
827
Jordan Halterman281dbf32018-06-15 17:46:28 -0700828 /**
829 * Purges all flow rules from the table.
830 */
Jon Hallfa132292017-10-24 11:11:24 -0700831 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700832 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
833 while (iterator.hasNext()) {
834 iterator.next().close();
835 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700836 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700837 }
Jon Hallfa132292017-10-24 11:11:24 -0700838 }
839
840 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700841 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700842 deviceTableStats.put(deviceId, tableStats);
843 return null;
844 }
845
846 @Override
847 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
848 NodeId master = mastershipService.getMasterFor(deviceId);
849
850 if (master == null) {
851 log.debug("Failed to getTableStats: No master for {}", deviceId);
852 return Collections.emptyList();
853 }
854
855 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
856 if (tableStats == null) {
857 return Collections.emptyList();
858 }
859 return ImmutableList.copyOf(tableStats);
860 }
861
862 @Override
863 public long getActiveFlowRuleCount(DeviceId deviceId) {
864 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700865 .mapToLong(TableStatisticsEntry::activeFlowEntries)
866 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700867 }
868
869 private class InternalTableStatsListener
870 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
871 @Override
872 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700873 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700874 //TODO: Generate an event to listeners (do we need?)
875 }
876 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700877
878 /**
879 * Device lifecycle manager implementation.
880 */
881 private final class InternalLifecycleManager
882 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
883 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
884
885 private final DeviceId deviceId;
886
887 private volatile DeviceReplicaInfo replicaInfo;
888
889 InternalLifecycleManager(DeviceId deviceId) {
890 this.deviceId = deviceId;
891 replicaInfoManager.addListener(this);
892 mastershipTermLifecycles.addListener(this);
893 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
894 }
895
896 @Override
897 public DeviceReplicaInfo getReplicaInfo() {
898 return replicaInfo;
899 }
900
901 @Override
902 public void activate(long term) {
903 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
904 if (replicaInfo != null && replicaInfo.term() == term) {
905 mastershipTermLifecycles.put(deviceId, term);
906 }
907 }
908
909 @Override
910 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700911 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700912 onReplicaInfoChange(event.replicaInfo());
913 }
914 }
915
916 @Override
917 public void event(MapEvent<DeviceId, Long> event) {
918 if (event.key().equals(deviceId) && event.newValue() != null) {
919 onActivate(event.newValue().value());
920 }
921 }
922
923 /**
924 * Handles a term activation event.
925 *
926 * @param term the term that was activated
927 */
928 private void onActivate(long term) {
929 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
930 if (replicaInfo != null && replicaInfo.term() == term) {
931 NodeId master = replicaInfo.master().orElse(null);
932 List<NodeId> backups = replicaInfo.backups()
933 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
934 listenerRegistry.process(new LifecycleEvent(
935 LifecycleEvent.Type.TERM_ACTIVE,
936 new DeviceReplicaInfo(term, master, backups)));
937 }
938 }
939
940 /**
941 * Handles a replica info change event.
942 *
943 * @param replicaInfo the updated replica info
944 */
945 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
946 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
947 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
948 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
949 if (oldReplicaInfo != null) {
950 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
951 }
952 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700953 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
954 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700955 }
956 }
957
958 /**
959 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
960 *
961 * @param replicaInfo the replica info to convert
962 * @return the converted replica info
963 */
964 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
965 NodeId master = replicaInfo.master().orElse(null);
966 List<NodeId> backups = replicaInfo.backups()
967 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
968 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
969 }
970
971 @Override
972 public void close() {
973 replicaInfoManager.removeListener(this);
974 mastershipTermLifecycles.removeListener(this);
975 }
976 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700977}