blob: af5b8e86a3284fcf67ca8e946565b40756bf27da [file] [log] [blame]
Nikunj Desai31f277e2018-11-27 18:41:24 -05001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
18import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
Nikunj Desai31f277e2018-11-27 18:41:24 -050021import java.util.Iterator;
Jon Hallfa132292017-10-24 11:11:24 -070022import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.ScheduledExecutorService;
Jon Hallfa132292017-10-24 11:11:24 -070029import java.util.concurrent.TimeUnit;
Nikunj Desai31f277e2018-11-27 18:41:24 -050030import java.util.function.Function;
Jon Hallfa132292017-10-24 11:11:24 -070031import java.util.stream.Collectors;
32
Nikunj Desai31f277e2018-11-27 18:41:24 -050033import com.google.common.collect.ImmutableList;
34import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070035import com.google.common.collect.Streams;
36import org.apache.felix.scr.annotations.Activate;
37import org.apache.felix.scr.annotations.Component;
38import org.apache.felix.scr.annotations.Deactivate;
39import org.apache.felix.scr.annotations.Modified;
40import org.apache.felix.scr.annotations.Property;
41import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.util.KryoNamespace;
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080045import org.onlab.util.OrderedExecutor;
Jon Hallfa132292017-10-24 11:11:24 -070046import org.onlab.util.Tools;
47import org.onosproject.cfg.ComponentConfigService;
48import org.onosproject.cluster.ClusterService;
49import org.onosproject.cluster.NodeId;
50import org.onosproject.core.CoreService;
51import org.onosproject.core.IdGenerator;
Nikunj Desai31f277e2018-11-27 18:41:24 -050052import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070053import org.onosproject.mastership.MastershipService;
54import org.onosproject.net.DeviceId;
Nikunj Desai31f277e2018-11-27 18:41:24 -050055import org.onosproject.net.device.DeviceEvent;
56import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070057import org.onosproject.net.device.DeviceService;
58import org.onosproject.net.flow.CompletedBatchOperation;
59import org.onosproject.net.flow.DefaultFlowEntry;
60import org.onosproject.net.flow.FlowEntry;
61import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070062import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070063import org.onosproject.net.flow.FlowRuleEvent;
64import org.onosproject.net.flow.FlowRuleEvent.Type;
65import org.onosproject.net.flow.FlowRuleService;
66import org.onosproject.net.flow.FlowRuleStore;
67import org.onosproject.net.flow.FlowRuleStoreDelegate;
68import org.onosproject.net.flow.StoredFlowEntry;
69import org.onosproject.net.flow.TableStatisticsEntry;
Nikunj Desai31f277e2018-11-27 18:41:24 -050070import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
73import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
74import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070075import org.onosproject.persistence.PersistenceService;
76import org.onosproject.store.AbstractStore;
77import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
78import org.onosproject.store.cluster.messaging.ClusterMessage;
79import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Nikunj Desai31f277e2018-11-27 18:41:24 -050080import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070081import org.onosproject.store.flow.ReplicaInfoEvent;
82import org.onosproject.store.flow.ReplicaInfoEventListener;
83import org.onosproject.store.flow.ReplicaInfoService;
84import org.onosproject.store.impl.MastershipBasedTimestamp;
85import org.onosproject.store.serializers.KryoNamespaces;
Nikunj Desai31f277e2018-11-27 18:41:24 -050086import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070087import org.onosproject.store.service.EventuallyConsistentMap;
88import org.onosproject.store.service.EventuallyConsistentMapEvent;
89import org.onosproject.store.service.EventuallyConsistentMapListener;
Nikunj Desai31f277e2018-11-27 18:41:24 -050090import org.onosproject.store.service.MapEvent;
91import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070092import org.onosproject.store.service.Serializer;
93import org.onosproject.store.service.StorageService;
94import org.onosproject.store.service.WallClockTimestamp;
95import org.osgi.service.component.ComponentContext;
96import org.slf4j.Logger;
97
Jon Hallfa132292017-10-24 11:11:24 -070098import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080099import static java.lang.Math.max;
100import static java.lang.Math.min;
Jon Hallfa132292017-10-24 11:11:24 -0700101import static org.onlab.util.Tools.get;
102import static org.onlab.util.Tools.groupedThreads;
103import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500106import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700107import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
108import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
109import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
110import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
111import static org.slf4j.LoggerFactory.getLogger;
112
113/**
114 * Manages inventory of flow rules using a distributed state management protocol.
115 */
Thomas Vachuska5c851822018-01-05 16:01:44 -0800116@Component(immediate = true)
Jon Hallfa132292017-10-24 11:11:24 -0700117@Service
118public class ECFlowRuleStore
Nikunj Desai31f277e2018-11-27 18:41:24 -0500119 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
120 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700121
122 private final Logger log = getLogger(getClass());
123
124 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
125 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
126 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
127 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500128 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700129 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700130
131 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500132 label = "Number of threads in the message handler pool")
Jon Hallfa132292017-10-24 11:11:24 -0700133 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
134
135 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500136 label = "Delay in ms between successive backup runs")
Jon Hallfa132292017-10-24 11:11:24 -0700137 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500138
139 @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
140 label = "Delay in ms between anti-entropy runs")
141 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
142
Jon Hallfa132292017-10-24 11:11:24 -0700143 @Property(name = "persistenceEnabled", boolValue = false,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500144 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Jon Hallfa132292017-10-24 11:11:24 -0700145 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
146
147 @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500148 label = "Max number of backup copies for each device")
Jon Hallfa132292017-10-24 11:11:24 -0700149 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
150
151 private InternalFlowTable flowTable = new InternalFlowTable();
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected ReplicaInfoService replicaInfoManager;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157 protected ClusterCommunicationService clusterCommunicator;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160 protected ClusterService clusterService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163 protected DeviceService deviceService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166 protected CoreService coreService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169 protected ComponentConfigService configService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
172 protected MastershipService mastershipService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
175 protected PersistenceService persistenceService;
176
177 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
178 private ExecutorService messageHandlingExecutor;
179 private ExecutorService eventHandler;
180
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800181 private final ScheduledExecutorService backupScheduler = Executors.newSingleThreadScheduledExecutor(
182 groupedThreads("onos/flow", "backup-scheduler", log));
183 private final ExecutorService backupExecutor = Executors.newFixedThreadPool(
184 max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
185 groupedThreads("onos/flow", "backup-%d", log));
Jon Hallfa132292017-10-24 11:11:24 -0700186
187 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
188 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Nikunj Desai31f277e2018-11-27 18:41:24 -0500189 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700190
191 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
192 protected StorageService storageService;
193
Nikunj Desai31f277e2018-11-27 18:41:24 -0500194 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
195 .register(KryoNamespaces.API)
196 .register(BucketId.class)
197 .register(FlowBucket.class)
198 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700199
200 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Nikunj Desai31f277e2018-11-27 18:41:24 -0500201 .register(KryoNamespaces.API)
202 .register(BucketId.class)
203 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700204
Nikunj Desai31f277e2018-11-27 18:41:24 -0500205 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700206
207 private IdGenerator idGenerator;
208 private NodeId local;
209
210 @Activate
211 public void activate(ComponentContext context) {
212 configService.registerProperties(getClass());
213
214 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
215
216 local = clusterService.getLocalNode().id();
217
218 eventHandler = Executors.newSingleThreadExecutor(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500219 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700220 messageHandlingExecutor = Executors.newFixedThreadPool(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500221 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700222
223 registerMessageHandlers(messageHandlingExecutor);
224
Nikunj Desai31f277e2018-11-27 18:41:24 -0500225 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
226 .withName("onos-flow-store-terms")
227 .withSerializer(serializer)
228 .buildAsyncMap();
Thomas Vachuskad821c752018-02-26 11:33:35 -0800229
Jon Hallfa132292017-10-24 11:11:24 -0700230 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Nikunj Desai31f277e2018-11-27 18:41:24 -0500231 .withName("onos-flow-table-stats")
232 .withSerializer(serializerBuilder)
233 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
234 .withTimestampProvider((k, v) -> new WallClockTimestamp())
235 .withTombstonesDisabled()
236 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700237 deviceTableStats.addListener(tableStatsListener);
238
Nikunj Desai31f277e2018-11-27 18:41:24 -0500239 deviceService.addListener(flowTable);
240 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
241
Jon Hallfa132292017-10-24 11:11:24 -0700242 logConfig("Started");
243 }
244
245 @Deactivate
246 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700247 configService.unregisterProperties(getClass(), false);
248 unregisterMessageHandlers();
Nikunj Desai31f277e2018-11-27 18:41:24 -0500249 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700250 deviceTableStats.removeListener(tableStatsListener);
251 deviceTableStats.destroy();
252 eventHandler.shutdownNow();
253 messageHandlingExecutor.shutdownNow();
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800254 backupScheduler.shutdownNow();
255 backupExecutor.shutdownNow();
Jon Hallfa132292017-10-24 11:11:24 -0700256 log.info("Stopped");
257 }
258
259 @SuppressWarnings("rawtypes")
260 @Modified
261 public void modified(ComponentContext context) {
262 if (context == null) {
263 logConfig("Default config");
264 return;
265 }
266
267 Dictionary properties = context.getProperties();
268 int newPoolSize;
269 int newBackupPeriod;
270 int newBackupCount;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500271 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700272 try {
273 String s = get(properties, "msgHandlerPoolSize");
274 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
275
276 s = get(properties, "backupPeriod");
277 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
278
279 s = get(properties, "backupCount");
280 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Nikunj Desai31f277e2018-11-27 18:41:24 -0500281
282 s = get(properties, "antiEntropyPeriod");
283 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700284 } catch (NumberFormatException | ClassCastException e) {
285 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
286 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
287 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500288 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700289 }
290
Jon Hallfa132292017-10-24 11:11:24 -0700291 if (newBackupPeriod != backupPeriod) {
292 backupPeriod = newBackupPeriod;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500293 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700294 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500295
296 if (newAntiEntropyPeriod != antiEntropyPeriod) {
297 antiEntropyPeriod = newAntiEntropyPeriod;
298 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700299 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500300
Jon Hallfa132292017-10-24 11:11:24 -0700301 if (newPoolSize != msgHandlerPoolSize) {
302 msgHandlerPoolSize = newPoolSize;
303 ExecutorService oldMsgHandler = messageHandlingExecutor;
304 messageHandlingExecutor = Executors.newFixedThreadPool(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500305 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700306
307 // replace previously registered handlers.
308 registerMessageHandlers(messageHandlingExecutor);
309 oldMsgHandler.shutdown();
310 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500311
Jon Hallfa132292017-10-24 11:11:24 -0700312 if (backupCount != newBackupCount) {
313 backupCount = newBackupCount;
314 }
315 logConfig("Reconfigured");
316 }
317
318 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700319 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
320 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500321 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700322 clusterCommunicator.addSubscriber(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500323 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700324 clusterCommunicator.addSubscriber(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500325 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700326 clusterCommunicator.addSubscriber(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500327 GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700328 clusterCommunicator.addSubscriber(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500329 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700330 }
331
332 private void unregisterMessageHandlers() {
333 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
334 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500335 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700336 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
337 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
338 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
339 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
340 }
341
342 private void logConfig(String prefix) {
343 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Nikunj Desai31f277e2018-11-27 18:41:24 -0500344 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700345 }
346
Jon Hallfa132292017-10-24 11:11:24 -0700347 @Override
348 public int getFlowRuleCount() {
349 return Streams.stream(deviceService.getDevices()).parallel()
Nikunj Desai31f277e2018-11-27 18:41:24 -0500350 .mapToInt(device -> getFlowRuleCount(device.id()))
351 .sum();
Thomas Vachuskad821c752018-02-26 11:33:35 -0800352 }
353
354 @Override
355 public int getFlowRuleCount(DeviceId deviceId) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500356 NodeId master = mastershipService.getMasterFor(deviceId);
357 if (master == null) {
358 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
359 return 0;
360 }
361
362 if (Objects.equals(local, master)) {
363 return flowTable.getFlowRuleCount(deviceId);
364 }
365
366 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
367 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
368 deviceId,
369 GET_DEVICE_FLOW_COUNT,
370 serializer::encode,
371 serializer::decode,
372 master),
373 FLOW_RULE_STORE_TIMEOUT_MILLIS,
374 TimeUnit.MILLISECONDS,
375 0);
Jon Hallfa132292017-10-24 11:11:24 -0700376 }
377
378 @Override
379 public FlowEntry getFlowEntry(FlowRule rule) {
380 NodeId master = mastershipService.getMasterFor(rule.deviceId());
381
382 if (master == null) {
383 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
384 return null;
385 }
386
387 if (Objects.equals(local, master)) {
388 return flowTable.getFlowEntry(rule);
389 }
390
391 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Nikunj Desai31f277e2018-11-27 18:41:24 -0500392 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700393
394 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500395 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
396 serializer::encode,
397 serializer::decode,
398 master),
399 FLOW_RULE_STORE_TIMEOUT_MILLIS,
400 TimeUnit.MILLISECONDS,
401 null);
Jon Hallfa132292017-10-24 11:11:24 -0700402 }
403
404 @Override
405 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
406 NodeId master = mastershipService.getMasterFor(deviceId);
407
408 if (master == null) {
409 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
410 return Collections.emptyList();
411 }
412
413 if (Objects.equals(local, master)) {
414 return flowTable.getFlowEntries(deviceId);
415 }
416
417 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Nikunj Desai31f277e2018-11-27 18:41:24 -0500418 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700419
420 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500421 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
422 serializer::encode,
423 serializer::decode,
424 master),
425 FLOW_RULE_STORE_TIMEOUT_MILLIS,
426 TimeUnit.MILLISECONDS,
427 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700428 }
429
430 @Override
431 public void storeFlowRule(FlowRule rule) {
432 storeBatch(new FlowRuleBatchOperation(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500433 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
434 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700435 }
436
437 @Override
438 public void storeBatch(FlowRuleBatchOperation operation) {
439 if (operation.getOperations().isEmpty()) {
440 notifyDelegate(FlowRuleBatchEvent.completed(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500441 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
442 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700443 return;
444 }
445
446 DeviceId deviceId = operation.deviceId();
447 NodeId master = mastershipService.getMasterFor(deviceId);
448
449 if (master == null) {
450 log.warn("No master for {} ", deviceId);
451
Nikunj Desai31f277e2018-11-27 18:41:24 -0500452 Set<FlowRule> allFailures = operation.getOperations()
453 .stream()
454 .map(op -> op.target())
455 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700456 notifyDelegate(FlowRuleBatchEvent.completed(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500457 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
458 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700459 return;
460 }
461
462 if (Objects.equals(local, master)) {
463 storeBatchInternal(operation);
464 return;
465 }
466
467 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Nikunj Desai31f277e2018-11-27 18:41:24 -0500468 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700469
470 clusterCommunicator.unicast(operation,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500471 APPLY_BATCH_FLOWS,
472 serializer::encode,
473 master)
474 .whenComplete((result, error) -> {
475 if (error != null) {
476 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700477
Nikunj Desai31f277e2018-11-27 18:41:24 -0500478 Set<FlowRule> allFailures = operation.getOperations()
479 .stream()
480 .map(op -> op.target())
481 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700482
Nikunj Desai31f277e2018-11-27 18:41:24 -0500483 notifyDelegate(FlowRuleBatchEvent.completed(
484 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
485 new CompletedBatchOperation(false, allFailures, deviceId)));
486 }
487 });
Jon Hallfa132292017-10-24 11:11:24 -0700488 }
489
490 private void storeBatchInternal(FlowRuleBatchOperation operation) {
491
492 final DeviceId did = operation.deviceId();
493 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
494 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
495 if (currentOps.isEmpty()) {
496 batchOperationComplete(FlowRuleBatchEvent.completed(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500497 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
498 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700499 return;
500 }
501
502 notifyDelegate(FlowRuleBatchEvent.requested(new
Nikunj Desai31f277e2018-11-27 18:41:24 -0500503 FlowRuleBatchRequest(operation.id(),
504 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700505 }
506
507 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
508 return operation.getOperations().stream().map(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500509 op -> {
510 StoredFlowEntry entry;
511 switch (op.operator()) {
512 case ADD:
513 entry = new DefaultFlowEntry(op.target());
514 flowTable.add(entry);
515 return op;
516 case MODIFY:
517 entry = new DefaultFlowEntry(op.target());
518 flowTable.update(entry);
519 return op;
520 case REMOVE:
521 return flowTable.update(op.target(), stored -> {
522 stored.setState(FlowEntryState.PENDING_REMOVE);
523 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman020b53e2018-01-16 14:59:49 -0800524 return op;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500525 });
526 default:
527 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700528 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500529 return null;
530 }
Jon Hallfa132292017-10-24 11:11:24 -0700531 ).filter(Objects::nonNull).collect(Collectors.toSet());
532 }
533
534 @Override
535 public void deleteFlowRule(FlowRule rule) {
536 storeBatch(
Nikunj Desai31f277e2018-11-27 18:41:24 -0500537 new FlowRuleBatchOperation(
538 Collections.singletonList(
539 new FlowRuleBatchEntry(
540 FlowRuleOperation.REMOVE,
541 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700542 }
543
544 @Override
545 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
546 if (mastershipService.isLocalMaster(rule.deviceId())) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500547 return flowTable.update(rule, stored -> {
548 if (stored.state() == FlowEntryState.PENDING_ADD) {
549 stored.setState(FlowEntryState.PENDING_ADD);
550 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
551 }
552 return null;
553 });
Jon Hallfa132292017-10-24 11:11:24 -0700554 }
555 return null;
556 }
557
558 @Override
559 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
560 NodeId master = mastershipService.getMasterFor(rule.deviceId());
561 if (Objects.equals(local, master)) {
562 return addOrUpdateFlowRuleInternal(rule);
563 }
564
565 log.warn("Tried to update FlowRule {} state,"
Nikunj Desai31f277e2018-11-27 18:41:24 -0500566 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700567 return null;
568 }
569
570 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500571 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700572 stored.setBytes(rule.bytes());
573 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
574 stored.setLiveType(rule.liveType());
575 stored.setPackets(rule.packets());
576 stored.setLastSeen();
577 if (stored.state() == FlowEntryState.PENDING_ADD) {
578 stored.setState(FlowEntryState.ADDED);
579 return new FlowRuleEvent(Type.RULE_ADDED, rule);
580 }
581 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500582 });
583 if (event != null) {
584 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700585 }
586
587 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
588 // TODO: also update backup if the behavior is correct.
589 flowTable.add(rule);
590 return null;
591 }
592
593 @Override
594 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
595 final DeviceId deviceId = rule.deviceId();
596 NodeId master = mastershipService.getMasterFor(deviceId);
597
598 if (Objects.equals(local, master)) {
599 // bypass and handle it locally
600 return removeFlowRuleInternal(rule);
601 }
602
603 if (master == null) {
604 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
605 // TODO: revisit if this should be null (="no-op") or Exception
606 return null;
607 }
608
609 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Nikunj Desai31f277e2018-11-27 18:41:24 -0500610 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700611
Nikunj Desai31f277e2018-11-27 18:41:24 -0500612 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
613 rule,
614 REMOVE_FLOW_ENTRY,
615 serializer::encode,
616 serializer::decode,
617 master),
618 FLOW_RULE_STORE_TIMEOUT_MILLIS,
619 TimeUnit.MILLISECONDS,
620 null);
Jon Hallfa132292017-10-24 11:11:24 -0700621 }
622
623 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700624 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman020b53e2018-01-16 14:59:49 -0800625 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700626 // rule may be partial rule that is missing treatment, we should use rule from store instead
627 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
628 }
629
630 @Override
631 public void purgeFlowRule(DeviceId deviceId) {
632 flowTable.purgeFlowRule(deviceId);
633 }
634
635 @Override
636 public void purgeFlowRules() {
637 flowTable.purgeFlowRules();
638 }
639
640 @Override
641 public void batchOperationComplete(FlowRuleBatchEvent event) {
642 //FIXME: need a per device pending response
643 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
644 if (nodeId == null) {
645 notifyDelegate(event);
646 } else {
647 // TODO check unicast return value
648 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
649 //error log: log.warn("Failed to respond to peer for batch operation result");
650 }
651 }
652
653 private final class OnStoreBatch implements ClusterMessageHandler {
654
655 @Override
656 public void handle(final ClusterMessage message) {
657 FlowRuleBatchOperation operation = serializer.decode(message.payload());
658 log.debug("received batch request {}", operation);
659
660 final DeviceId deviceId = operation.deviceId();
661 NodeId master = mastershipService.getMasterFor(deviceId);
662 if (!Objects.equals(local, master)) {
663 Set<FlowRule> failures = new HashSet<>(operation.size());
664 for (FlowRuleBatchEntry op : operation.getOperations()) {
665 failures.add(op.target());
666 }
667 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
668 // This node is no longer the master, respond as all failed.
669 // TODO: we might want to wrap response in envelope
670 // to distinguish sw programming failure and hand over
671 // it make sense in the latter case to retry immediately.
672 message.respond(serializer.encode(allFailed));
673 return;
674 }
675
676 pendingResponses.put(operation.id(), message.sender());
677 storeBatchInternal(operation);
678 }
679 }
680
Nikunj Desai31f277e2018-11-27 18:41:24 -0500681 private class InternalFlowTable implements DeviceListener {
682 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700683
684 @Override
Nikunj Desai31f277e2018-11-27 18:41:24 -0500685 public void event(DeviceEvent event) {
686 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
687 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700688 }
689 }
Jon Hallfa132292017-10-24 11:11:24 -0700690
691 /**
Nikunj Desai31f277e2018-11-27 18:41:24 -0500692 * Adds the given device to the flow table.
Jon Hallfa132292017-10-24 11:11:24 -0700693 *
Nikunj Desai31f277e2018-11-27 18:41:24 -0500694 * @param deviceId the device to add to the table
Jon Hallfa132292017-10-24 11:11:24 -0700695 */
Nikunj Desai31f277e2018-11-27 18:41:24 -0500696 public void addDevice(DeviceId deviceId) {
697 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
698 id,
699 clusterService,
700 clusterCommunicator,
701 new InternalLifecycleManager(id),
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800702 backupScheduler,
703 new OrderedExecutor(backupExecutor),
Nikunj Desai31f277e2018-11-27 18:41:24 -0500704 backupPeriod,
705 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700706 }
707
Nikunj Desai31f277e2018-11-27 18:41:24 -0500708 /**
709 * Sets the flow table backup period.
710 *
711 * @param backupPeriod the flow table backup period
712 */
713 void setBackupPeriod(int backupPeriod) {
714 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700715 }
716
Nikunj Desai31f277e2018-11-27 18:41:24 -0500717 /**
718 * Sets the flow table anti-entropy period.
719 *
720 * @param antiEntropyPeriod the flow table anti-entropy period
721 */
722 void setAntiEntropyPeriod(int antiEntropyPeriod) {
723 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700724 }
725
Nikunj Desai31f277e2018-11-27 18:41:24 -0500726 /**
727 * Returns the flow table for a specific device.
728 *
729 * @param deviceId the device identifier
730 * @return the flow table for the given device
731 */
732 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
733 DeviceFlowTable flowTable = flowTables.get(deviceId);
734 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
735 deviceId,
736 clusterService,
737 clusterCommunicator,
738 new InternalLifecycleManager(deviceId),
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800739 backupScheduler,
740 new OrderedExecutor(backupExecutor),
Nikunj Desai31f277e2018-11-27 18:41:24 -0500741 backupPeriod,
742 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700743 }
744
Nikunj Desai31f277e2018-11-27 18:41:24 -0500745 /**
746 * Returns the flow rule count for the given device.
747 *
748 * @param deviceId the device for which to return the flow rule count
749 * @return the flow rule count for the given device
750 */
751 public int getFlowRuleCount(DeviceId deviceId) {
752 return getFlowTable(deviceId).count();
Jon Hallfa132292017-10-24 11:11:24 -0700753 }
754
Nikunj Desai31f277e2018-11-27 18:41:24 -0500755 /**
756 * Returns the flow entry for the given rule.
757 *
758 * @param rule the rule for which to return the flow entry
759 * @return the flow entry for the given rule
760 */
Jon Hallfa132292017-10-24 11:11:24 -0700761 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500762 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700763 }
764
Nikunj Desai31f277e2018-11-27 18:41:24 -0500765 /**
766 * Returns the set of flow entries for the given device.
767 *
768 * @param deviceId the device for which to lookup flow entries
769 * @return the set of flow entries for the given device
770 */
Jon Hallfa132292017-10-24 11:11:24 -0700771 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500772 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700773 }
774
Nikunj Desai31f277e2018-11-27 18:41:24 -0500775 /**
776 * Adds the given flow rule.
777 *
778 * @param rule the rule to add
779 */
Jon Hallfa132292017-10-24 11:11:24 -0700780 public void add(FlowEntry rule) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500781 Tools.futureGetOrElse(
782 getFlowTable(rule.deviceId()).add(rule),
783 FLOW_RULE_STORE_TIMEOUT_MILLIS,
784 TimeUnit.MILLISECONDS,
785 null);
Jon Hallfa132292017-10-24 11:11:24 -0700786 }
787
Nikunj Desai31f277e2018-11-27 18:41:24 -0500788 /**
789 * Updates the given flow rule.
790 *
791 * @param rule the rule to update
792 */
Jordan Halterman020b53e2018-01-16 14:59:49 -0800793 public void update(FlowEntry rule) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500794 Tools.futureGetOrElse(
795 getFlowTable(rule.deviceId()).update(rule),
796 FLOW_RULE_STORE_TIMEOUT_MILLIS,
797 TimeUnit.MILLISECONDS,
798 null);
Jordan Halterman020b53e2018-01-16 14:59:49 -0800799 }
800
Nikunj Desai31f277e2018-11-27 18:41:24 -0500801 /**
802 * Applies the given update function to the rule.
803 *
804 * @param function the update function to apply
805 * @return a future to be completed with the update event or {@code null} if the rule was not updated
806 */
807 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
808 return Tools.futureGetOrElse(
809 getFlowTable(rule.deviceId()).update(rule, function),
810 FLOW_RULE_STORE_TIMEOUT_MILLIS,
811 TimeUnit.MILLISECONDS,
812 null);
813 }
814
815 /**
816 * Removes the given flow rule.
817 *
818 * @param rule the rule to remove
819 */
Jordan Halterman020b53e2018-01-16 14:59:49 -0800820 public FlowEntry remove(FlowEntry rule) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500821 return Tools.futureGetOrElse(
822 getFlowTable(rule.deviceId()).remove(rule),
823 FLOW_RULE_STORE_TIMEOUT_MILLIS,
824 TimeUnit.MILLISECONDS,
825 null);
Jon Hallfa132292017-10-24 11:11:24 -0700826 }
827
Nikunj Desai31f277e2018-11-27 18:41:24 -0500828 /**
829 * Purges flow rules for the given device.
830 *
831 * @param deviceId the device for which to purge flow rules
832 */
Jon Hallfa132292017-10-24 11:11:24 -0700833 public void purgeFlowRule(DeviceId deviceId) {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500834 // If the device is still present in the store, purge the underlying DeviceFlowTable.
835 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
836 if (deviceService.getDevice(deviceId) != null) {
837 DeviceFlowTable flowTable = flowTables.get(deviceId);
838 if (flowTable != null) {
839 flowTable.purge();
840 }
841 } else {
842 DeviceFlowTable flowTable = flowTables.remove(deviceId);
843 if (flowTable != null) {
844 flowTable.close();
845 }
846 }
Jon Hallfa132292017-10-24 11:11:24 -0700847 }
848
Nikunj Desai31f277e2018-11-27 18:41:24 -0500849 /**
850 * Purges all flow rules from the table.
851 */
Jon Hallfa132292017-10-24 11:11:24 -0700852 public void purgeFlowRules() {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500853 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
854 while (iterator.hasNext()) {
855 iterator.next().close();
856 iterator.remove();
Jon Hallfa132292017-10-24 11:11:24 -0700857 }
858 }
Jon Hallfa132292017-10-24 11:11:24 -0700859 }
860
861 @Override
Nikunj Desai31f277e2018-11-27 18:41:24 -0500862 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700863 deviceTableStats.put(deviceId, tableStats);
864 return null;
865 }
866
867 @Override
868 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
869 NodeId master = mastershipService.getMasterFor(deviceId);
870
871 if (master == null) {
872 log.debug("Failed to getTableStats: No master for {}", deviceId);
873 return Collections.emptyList();
874 }
875
876 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
877 if (tableStats == null) {
878 return Collections.emptyList();
879 }
880 return ImmutableList.copyOf(tableStats);
881 }
882
883 @Override
884 public long getActiveFlowRuleCount(DeviceId deviceId) {
885 return Streams.stream(getTableStatistics(deviceId))
Nikunj Desai31f277e2018-11-27 18:41:24 -0500886 .mapToLong(TableStatisticsEntry::activeFlowEntries)
887 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700888 }
889
890 private class InternalTableStatsListener
891 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
892 @Override
893 public void event(EventuallyConsistentMapEvent<DeviceId,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500894 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700895 //TODO: Generate an event to listeners (do we need?)
896 }
897 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500898
899 /**
900 * Device lifecycle manager implementation.
901 */
902 private final class InternalLifecycleManager
903 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
904 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
905
906 private final DeviceId deviceId;
907
908 private volatile DeviceReplicaInfo replicaInfo;
909
910 InternalLifecycleManager(DeviceId deviceId) {
911 this.deviceId = deviceId;
912 replicaInfoManager.addListener(this);
913 mastershipTermLifecycles.addListener(this);
914 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
915 }
916
917 @Override
918 public DeviceReplicaInfo getReplicaInfo() {
919 return replicaInfo;
920 }
921
922 @Override
923 public void activate(long term) {
924 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
925 if (replicaInfo != null && replicaInfo.term() == term) {
926 mastershipTermLifecycles.put(deviceId, term);
927 }
928 }
929
930 @Override
931 public void event(ReplicaInfoEvent event) {
932 if (event.subject().equals(deviceId)) {
933 onReplicaInfoChange(event.replicaInfo());
934 }
935 }
936
937 @Override
938 public void event(MapEvent<DeviceId, Long> event) {
939 if (event.key().equals(deviceId) && event.newValue() != null) {
940 onActivate(event.newValue().value());
941 }
942 }
943
944 /**
945 * Handles a term activation event.
946 *
947 * @param term the term that was activated
948 */
949 private void onActivate(long term) {
950 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
951 if (replicaInfo != null && replicaInfo.term() == term) {
952 NodeId master = replicaInfo.master().orElse(null);
953 List<NodeId> backups = replicaInfo.backups()
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800954 .subList(0, min(replicaInfo.backups().size(), backupCount));
Nikunj Desai31f277e2018-11-27 18:41:24 -0500955 listenerRegistry.process(new LifecycleEvent(
956 LifecycleEvent.Type.TERM_ACTIVE,
957 new DeviceReplicaInfo(term, master, backups)));
958 }
959 }
960
961 /**
962 * Handles a replica info change event.
963 *
964 * @param replicaInfo the updated replica info
965 */
966 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
967 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
968 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
969 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
970 if (oldReplicaInfo != null) {
971 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
972 }
973 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
974 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
975 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
976 }
977 }
978
979 /**
980 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
981 *
982 * @param replicaInfo the replica info to convert
983 * @return the converted replica info
984 */
985 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
986 NodeId master = replicaInfo.master().orElse(null);
987 List<NodeId> backups = replicaInfo.backups()
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800988 .subList(0, min(replicaInfo.backups().size(), backupCount));
Nikunj Desai31f277e2018-11-27 18:41:24 -0500989 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
990 }
991
992 @Override
993 public void close() {
994 replicaInfoManager.removeListener(this);
995 mastershipTermLifecycles.removeListener(this);
996 }
997 }
998}