blob: 89758c430f94b9ed4592bca62dc048309908a0d0 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070020import com.google.common.collect.Streams;
Jon Hallfa132292017-10-24 11:11:24 -070021import org.onlab.util.KryoNamespace;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.core.CoreService;
27import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070029import org.onosproject.mastership.MastershipService;
30import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070031import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070033import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.CompletedBatchOperation;
35import org.onosproject.net.flow.DefaultFlowEntry;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070038import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070039import org.onosproject.net.flow.FlowRuleEvent;
40import org.onosproject.net.flow.FlowRuleEvent.Type;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.FlowRuleStore;
43import org.onosproject.net.flow.FlowRuleStoreDelegate;
44import org.onosproject.net.flow.StoredFlowEntry;
45import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070046import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
47import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
48import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
49import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
50import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070051import org.onosproject.persistence.PersistenceService;
52import org.onosproject.store.AbstractStore;
53import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
54import org.onosproject.store.cluster.messaging.ClusterMessage;
55import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070056import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070057import org.onosproject.store.flow.ReplicaInfoEvent;
58import org.onosproject.store.flow.ReplicaInfoEventListener;
59import org.onosproject.store.flow.ReplicaInfoService;
60import org.onosproject.store.impl.MastershipBasedTimestamp;
61import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070062import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070063import org.onosproject.store.service.EventuallyConsistentMap;
64import org.onosproject.store.service.EventuallyConsistentMapEvent;
65import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070066import org.onosproject.store.service.MapEvent;
67import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070068import org.onosproject.store.service.Serializer;
69import org.onosproject.store.service.StorageService;
70import org.onosproject.store.service.WallClockTimestamp;
71import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
75import org.osgi.service.component.annotations.Modified;
76import org.osgi.service.component.annotations.Reference;
77import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -070078import org.slf4j.Logger;
79
Ray Milkeyd84f89b2018-08-17 14:54:17 -070080import java.util.Collections;
81import java.util.Dictionary;
82import java.util.HashSet;
83import java.util.Iterator;
84import java.util.List;
85import java.util.Map;
86import java.util.Objects;
87import java.util.Set;
88import java.util.concurrent.ExecutorService;
89import java.util.concurrent.Executors;
90import java.util.concurrent.ScheduledExecutorService;
91import java.util.concurrent.TimeUnit;
92import java.util.function.Function;
93import java.util.stream.Collectors;
94
Jon Hallfa132292017-10-24 11:11:24 -070095import static com.google.common.base.Strings.isNullOrEmpty;
96import static org.onlab.util.Tools.get;
97import static org.onlab.util.Tools.groupedThreads;
98import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -070099import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
100import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
106import static org.slf4j.LoggerFactory.getLogger;
107
Ray Milkeyb5646e62018-10-16 11:42:18 -0700108import static org.onosproject.store.OsgiPropertyConstants.*;
109
Jon Hallfa132292017-10-24 11:11:24 -0700110/**
111 * Manages inventory of flow rules using a distributed state management protocol.
112 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700113@Component(
114 immediate = true,
115 service = FlowRuleStore.class,
116 property = {
117 MESSAGE_HANDLER_THREAD_POOL_SIZE + "=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
118 BACKUP_PERIOD_MILLIS + "=" + BACKUP_PERIOD_MILLIS_DEFAULT,
119 ANTI_ENTROPY_PERIOD_MILLIS + "=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
120 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + "=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
121 MAX_BACKUP_COUNT + "=" + MAX_BACKUP_COUNT_DEFAULT
122 }
123)
Jon Hallfa132292017-10-24 11:11:24 -0700124public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700125 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
126 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700127
128 private final Logger log = getLogger(getClass());
129
Jon Hallfa132292017-10-24 11:11:24 -0700130 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700131
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700132 //@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
133 // label = "Number of threads in the message handler pool")
Ray Milkeyb5646e62018-10-16 11:42:18 -0700134 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700135
Ray Milkeyb5646e62018-10-16 11:42:18 -0700136 //@Property(name = "backupPeriod", intValue = BACKUP_PERIOD_MILLIS,
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700137 // label = "Delay in ms between successive backup runs")
Ray Milkeyb5646e62018-10-16 11:42:18 -0700138 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700139
Ray Milkeyb5646e62018-10-16 11:42:18 -0700140 //@Property(name = "antiEntropyPeriod", intValue = ANTI_ENTROPY_PERIOD_MILLIS,
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700141 // label = "Delay in ms between anti-entropy runs")
Ray Milkeyb5646e62018-10-16 11:42:18 -0700142 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700143
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700144 //@Property(name = "persistenceEnabled", boolValue = false,
145 // label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Ray Milkeyb5646e62018-10-16 11:42:18 -0700146 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700147
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700148 //@Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
149 // label = "Max number of backup copies for each device")
Ray Milkeyb5646e62018-10-16 11:42:18 -0700150 private volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700151
152 private InternalFlowTable flowTable = new InternalFlowTable();
153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700155 protected ReplicaInfoService replicaInfoManager;
156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700158 protected ClusterCommunicationService clusterCommunicator;
159
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700161 protected ClusterService clusterService;
162
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700164 protected DeviceService deviceService;
165
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700167 protected CoreService coreService;
168
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700170 protected ComponentConfigService configService;
171
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700173 protected MastershipService mastershipService;
174
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700176 protected PersistenceService persistenceService;
177
178 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
179 private ExecutorService messageHandlingExecutor;
180 private ExecutorService eventHandler;
181
Jon Hallfa132292017-10-24 11:11:24 -0700182 private final ScheduledExecutorService backupSenderExecutor =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700183 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
Jon Hallfa132292017-10-24 11:11:24 -0700184
185 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
186 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700187 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700188
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700189 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700190 protected StorageService storageService;
191
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700192 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
193 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700194 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700195 .register(FlowBucket.class)
196 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700197
198 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700199 .register(KryoNamespaces.API)
200 .register(BucketId.class)
201 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700202
Jordan Halterman281dbf32018-06-15 17:46:28 -0700203 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700204
205 private IdGenerator idGenerator;
206 private NodeId local;
207
208 @Activate
209 public void activate(ComponentContext context) {
210 configService.registerProperties(getClass());
211
212 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
213
214 local = clusterService.getLocalNode().id();
215
216 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700217 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700218 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700219 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700220
221 registerMessageHandlers(messageHandlingExecutor);
222
Jordan Halterman281dbf32018-06-15 17:46:28 -0700223 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
224 .withName("onos-flow-store-terms")
225 .withSerializer(serializer)
226 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800227
Jon Hallfa132292017-10-24 11:11:24 -0700228 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700229 .withName("onos-flow-table-stats")
230 .withSerializer(serializerBuilder)
231 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
232 .withTimestampProvider((k, v) -> new WallClockTimestamp())
233 .withTombstonesDisabled()
234 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700235 deviceTableStats.addListener(tableStatsListener);
236
Jordan Halterman281dbf32018-06-15 17:46:28 -0700237 deviceService.addListener(flowTable);
238 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
239
Jon Hallfa132292017-10-24 11:11:24 -0700240 logConfig("Started");
241 }
242
243 @Deactivate
244 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700245 configService.unregisterProperties(getClass(), false);
246 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700247 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700248 deviceTableStats.removeListener(tableStatsListener);
249 deviceTableStats.destroy();
250 eventHandler.shutdownNow();
251 messageHandlingExecutor.shutdownNow();
252 backupSenderExecutor.shutdownNow();
253 log.info("Stopped");
254 }
255
256 @SuppressWarnings("rawtypes")
257 @Modified
258 public void modified(ComponentContext context) {
259 if (context == null) {
260 logConfig("Default config");
261 return;
262 }
263
264 Dictionary properties = context.getProperties();
265 int newPoolSize;
266 int newBackupPeriod;
267 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700268 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700269 try {
270 String s = get(properties, "msgHandlerPoolSize");
271 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
272
273 s = get(properties, "backupPeriod");
274 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
275
276 s = get(properties, "backupCount");
277 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700278
279 s = get(properties, "antiEntropyPeriod");
280 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700281 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700282 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
283 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
284 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
285 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700286 }
287
Jon Hallfa132292017-10-24 11:11:24 -0700288 if (newBackupPeriod != backupPeriod) {
289 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700290 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700291 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700292
293 if (newAntiEntropyPeriod != antiEntropyPeriod) {
294 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700295 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700296 }
297
Jon Hallfa132292017-10-24 11:11:24 -0700298 if (newPoolSize != msgHandlerPoolSize) {
299 msgHandlerPoolSize = newPoolSize;
300 ExecutorService oldMsgHandler = messageHandlingExecutor;
301 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700302 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700303
304 // replace previously registered handlers.
305 registerMessageHandlers(messageHandlingExecutor);
306 oldMsgHandler.shutdown();
307 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700308
Jon Hallfa132292017-10-24 11:11:24 -0700309 if (backupCount != newBackupCount) {
310 backupCount = newBackupCount;
311 }
312 logConfig("Reconfigured");
313 }
314
315 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700316 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
317 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700318 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700319 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700320 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700321 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700322 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700323 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700324 GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700325 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700326 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700327 }
328
329 private void unregisterMessageHandlers() {
330 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
331 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700332 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700333 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
334 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
335 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
336 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
337 }
338
339 private void logConfig(String prefix) {
340 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700341 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700342 }
343
Jon Hallfa132292017-10-24 11:11:24 -0700344 @Override
345 public int getFlowRuleCount() {
346 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700347 .mapToInt(device -> getFlowRuleCount(device.id()))
348 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800349 }
350
351 @Override
352 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700353 NodeId master = mastershipService.getMasterFor(deviceId);
354 if (master == null) {
355 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
356 return 0;
357 }
358
359 if (Objects.equals(local, master)) {
360 return flowTable.getFlowRuleCount(deviceId);
361 }
362
363 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
364 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
365 deviceId,
366 GET_DEVICE_FLOW_COUNT,
367 serializer::encode,
368 serializer::decode,
369 master),
370 FLOW_RULE_STORE_TIMEOUT_MILLIS,
371 TimeUnit.MILLISECONDS,
372 0);
Jon Hallfa132292017-10-24 11:11:24 -0700373 }
374
375 @Override
376 public FlowEntry getFlowEntry(FlowRule rule) {
377 NodeId master = mastershipService.getMasterFor(rule.deviceId());
378
379 if (master == null) {
380 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
381 return null;
382 }
383
384 if (Objects.equals(local, master)) {
385 return flowTable.getFlowEntry(rule);
386 }
387
388 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700389 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700390
391 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700392 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
393 serializer::encode,
394 serializer::decode,
395 master),
396 FLOW_RULE_STORE_TIMEOUT_MILLIS,
397 TimeUnit.MILLISECONDS,
398 null);
Jon Hallfa132292017-10-24 11:11:24 -0700399 }
400
401 @Override
402 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
403 NodeId master = mastershipService.getMasterFor(deviceId);
404
405 if (master == null) {
406 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
407 return Collections.emptyList();
408 }
409
410 if (Objects.equals(local, master)) {
411 return flowTable.getFlowEntries(deviceId);
412 }
413
414 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700415 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700416
417 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700418 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
419 serializer::encode,
420 serializer::decode,
421 master),
422 FLOW_RULE_STORE_TIMEOUT_MILLIS,
423 TimeUnit.MILLISECONDS,
424 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700425 }
426
427 @Override
428 public void storeFlowRule(FlowRule rule) {
429 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700430 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
431 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700432 }
433
434 @Override
435 public void storeBatch(FlowRuleBatchOperation operation) {
436 if (operation.getOperations().isEmpty()) {
437 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700438 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
439 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700440 return;
441 }
442
443 DeviceId deviceId = operation.deviceId();
444 NodeId master = mastershipService.getMasterFor(deviceId);
445
446 if (master == null) {
447 log.warn("No master for {} ", deviceId);
448
Jordan Halterman281dbf32018-06-15 17:46:28 -0700449 Set<FlowRule> allFailures = operation.getOperations()
450 .stream()
451 .map(op -> op.target())
452 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700453 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700454 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
455 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700456 return;
457 }
458
459 if (Objects.equals(local, master)) {
460 storeBatchInternal(operation);
461 return;
462 }
463
464 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700465 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700466
467 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700468 APPLY_BATCH_FLOWS,
469 serializer::encode,
470 master)
471 .whenComplete((result, error) -> {
472 if (error != null) {
473 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700474
Jordan Halterman281dbf32018-06-15 17:46:28 -0700475 Set<FlowRule> allFailures = operation.getOperations()
476 .stream()
477 .map(op -> op.target())
478 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700479
Jordan Halterman281dbf32018-06-15 17:46:28 -0700480 notifyDelegate(FlowRuleBatchEvent.completed(
481 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
482 new CompletedBatchOperation(false, allFailures, deviceId)));
483 }
484 });
Jon Hallfa132292017-10-24 11:11:24 -0700485 }
486
487 private void storeBatchInternal(FlowRuleBatchOperation operation) {
488
489 final DeviceId did = operation.deviceId();
490 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
491 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
492 if (currentOps.isEmpty()) {
493 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700494 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
495 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700496 return;
497 }
498
499 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700500 FlowRuleBatchRequest(operation.id(),
501 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700502 }
503
504 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
505 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700506 op -> {
507 StoredFlowEntry entry;
508 switch (op.operator()) {
509 case ADD:
510 entry = new DefaultFlowEntry(op.target());
511 flowTable.add(entry);
512 return op;
513 case MODIFY:
514 entry = new DefaultFlowEntry(op.target());
515 flowTable.update(entry);
516 return op;
517 case REMOVE:
518 return flowTable.update(op.target(), stored -> {
519 stored.setState(FlowEntryState.PENDING_REMOVE);
520 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800521 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700522 });
523 default:
524 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700525 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700526 return null;
527 }
Jon Hallfa132292017-10-24 11:11:24 -0700528 ).filter(Objects::nonNull).collect(Collectors.toSet());
529 }
530
531 @Override
532 public void deleteFlowRule(FlowRule rule) {
533 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700534 new FlowRuleBatchOperation(
535 Collections.singletonList(
536 new FlowRuleBatchEntry(
537 FlowRuleOperation.REMOVE,
538 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700539 }
540
541 @Override
542 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
543 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700544 return flowTable.update(rule, stored -> {
545 if (stored.state() == FlowEntryState.PENDING_ADD) {
546 stored.setState(FlowEntryState.PENDING_ADD);
547 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
548 }
549 return null;
550 });
Jon Hallfa132292017-10-24 11:11:24 -0700551 }
552 return null;
553 }
554
555 @Override
556 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
557 NodeId master = mastershipService.getMasterFor(rule.deviceId());
558 if (Objects.equals(local, master)) {
559 return addOrUpdateFlowRuleInternal(rule);
560 }
561
562 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700563 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700564 return null;
565 }
566
567 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700568 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700569 stored.setBytes(rule.bytes());
570 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
571 stored.setLiveType(rule.liveType());
572 stored.setPackets(rule.packets());
573 stored.setLastSeen();
574 if (stored.state() == FlowEntryState.PENDING_ADD) {
575 stored.setState(FlowEntryState.ADDED);
576 return new FlowRuleEvent(Type.RULE_ADDED, rule);
577 }
578 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700579 });
580 if (event != null) {
581 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700582 }
583
584 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
585 // TODO: also update backup if the behavior is correct.
586 flowTable.add(rule);
587 return null;
588 }
589
590 @Override
591 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
592 final DeviceId deviceId = rule.deviceId();
593 NodeId master = mastershipService.getMasterFor(deviceId);
594
595 if (Objects.equals(local, master)) {
596 // bypass and handle it locally
597 return removeFlowRuleInternal(rule);
598 }
599
600 if (master == null) {
601 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
602 // TODO: revisit if this should be null (="no-op") or Exception
603 return null;
604 }
605
606 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700607 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700608
Jordan Halterman281dbf32018-06-15 17:46:28 -0700609 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
610 rule,
611 REMOVE_FLOW_ENTRY,
612 serializer::encode,
613 serializer::decode,
614 master),
615 FLOW_RULE_STORE_TIMEOUT_MILLIS,
616 TimeUnit.MILLISECONDS,
617 null);
Jon Hallfa132292017-10-24 11:11:24 -0700618 }
619
620 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700621 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800622 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700623 // rule may be partial rule that is missing treatment, we should use rule from store instead
624 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
625 }
626
627 @Override
628 public void purgeFlowRule(DeviceId deviceId) {
629 flowTable.purgeFlowRule(deviceId);
630 }
631
632 @Override
633 public void purgeFlowRules() {
634 flowTable.purgeFlowRules();
635 }
636
637 @Override
638 public void batchOperationComplete(FlowRuleBatchEvent event) {
639 //FIXME: need a per device pending response
640 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
641 if (nodeId == null) {
642 notifyDelegate(event);
643 } else {
644 // TODO check unicast return value
645 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
646 //error log: log.warn("Failed to respond to peer for batch operation result");
647 }
648 }
649
650 private final class OnStoreBatch implements ClusterMessageHandler {
651
652 @Override
653 public void handle(final ClusterMessage message) {
654 FlowRuleBatchOperation operation = serializer.decode(message.payload());
655 log.debug("received batch request {}", operation);
656
657 final DeviceId deviceId = operation.deviceId();
658 NodeId master = mastershipService.getMasterFor(deviceId);
659 if (!Objects.equals(local, master)) {
660 Set<FlowRule> failures = new HashSet<>(operation.size());
661 for (FlowRuleBatchEntry op : operation.getOperations()) {
662 failures.add(op.target());
663 }
664 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
665 // This node is no longer the master, respond as all failed.
666 // TODO: we might want to wrap response in envelope
667 // to distinguish sw programming failure and hand over
668 // it make sense in the latter case to retry immediately.
669 message.respond(serializer.encode(allFailed));
670 return;
671 }
672
673 pendingResponses.put(operation.id(), message.sender());
674 storeBatchInternal(operation);
675 }
676 }
677
Jordan Halterman281dbf32018-06-15 17:46:28 -0700678 private class InternalFlowTable implements DeviceListener {
679 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700680
681 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700682 public void event(DeviceEvent event) {
683 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
684 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700685 }
686 }
687
Jordan Halterman281dbf32018-06-15 17:46:28 -0700688 /**
689 * Adds the given device to the flow table.
690 *
691 * @param deviceId the device to add to the table
692 */
693 public void addDevice(DeviceId deviceId) {
694 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
695 id,
696 clusterService,
697 clusterCommunicator,
698 new InternalLifecycleManager(id),
699 backupSenderExecutor,
700 backupPeriod,
701 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700702 }
703
Jordan Halterman281dbf32018-06-15 17:46:28 -0700704 /**
705 * Sets the flow table backup period.
706 *
707 * @param backupPeriod the flow table backup period
708 */
709 void setBackupPeriod(int backupPeriod) {
710 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700711 }
712
Jordan Halterman281dbf32018-06-15 17:46:28 -0700713 /**
714 * Sets the flow table anti-entropy period.
715 *
716 * @param antiEntropyPeriod the flow table anti-entropy period
717 */
718 void setAntiEntropyPeriod(int antiEntropyPeriod) {
719 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700720 }
721
Jordan Halterman281dbf32018-06-15 17:46:28 -0700722 /**
723 * Returns the flow table for a specific device.
724 *
725 * @param deviceId the device identifier
726 * @return the flow table for the given device
727 */
728 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
729 DeviceFlowTable flowTable = flowTables.get(deviceId);
730 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
731 deviceId,
732 clusterService,
733 clusterCommunicator,
734 new InternalLifecycleManager(deviceId),
735 backupSenderExecutor,
736 backupPeriod,
737 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700738 }
739
Jordan Halterman281dbf32018-06-15 17:46:28 -0700740 /**
741 * Returns the flow rule count for the given device.
742 *
743 * @param deviceId the device for which to return the flow rule count
744 * @return the flow rule count for the given device
745 */
746 public int getFlowRuleCount(DeviceId deviceId) {
747 return getFlowTable(deviceId).count();
748 }
749
750 /**
751 * Returns the flow entry for the given rule.
752 *
753 * @param rule the rule for which to return the flow entry
754 * @return the flow entry for the given rule
755 */
Jon Hallfa132292017-10-24 11:11:24 -0700756 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700757 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700758 }
759
Jordan Halterman281dbf32018-06-15 17:46:28 -0700760 /**
761 * Returns the set of flow entries for the given device.
762 *
763 * @param deviceId the device for which to lookup flow entries
764 * @return the set of flow entries for the given device
765 */
Jon Hallfa132292017-10-24 11:11:24 -0700766 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700767 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700768 }
769
Jordan Halterman281dbf32018-06-15 17:46:28 -0700770 /**
771 * Adds the given flow rule.
772 *
773 * @param rule the rule to add
774 */
Jon Hallfa132292017-10-24 11:11:24 -0700775 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700776 Tools.futureGetOrElse(
777 getFlowTable(rule.deviceId()).add(rule),
778 FLOW_RULE_STORE_TIMEOUT_MILLIS,
779 TimeUnit.MILLISECONDS,
780 null);
Jon Hallfa132292017-10-24 11:11:24 -0700781 }
782
Jordan Halterman281dbf32018-06-15 17:46:28 -0700783 /**
784 * Updates the given flow rule.
785 *
786 * @param rule the rule to update
787 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800788 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700789 Tools.futureGetOrElse(
790 getFlowTable(rule.deviceId()).update(rule),
791 FLOW_RULE_STORE_TIMEOUT_MILLIS,
792 TimeUnit.MILLISECONDS,
793 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800794 }
795
Jordan Halterman281dbf32018-06-15 17:46:28 -0700796 /**
797 * Applies the given update function to the rule.
798 *
799 * @param function the update function to apply
800 * @return a future to be completed with the update event or {@code null} if the rule was not updated
801 */
802 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
803 return Tools.futureGetOrElse(
804 getFlowTable(rule.deviceId()).update(rule, function),
805 FLOW_RULE_STORE_TIMEOUT_MILLIS,
806 TimeUnit.MILLISECONDS,
807 null);
808 }
809
810 /**
811 * Removes the given flow rule.
812 *
813 * @param rule the rule to remove
814 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800815 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700816 return Tools.futureGetOrElse(
817 getFlowTable(rule.deviceId()).remove(rule),
818 FLOW_RULE_STORE_TIMEOUT_MILLIS,
819 TimeUnit.MILLISECONDS,
820 null);
Jon Hallfa132292017-10-24 11:11:24 -0700821 }
822
Jordan Halterman281dbf32018-06-15 17:46:28 -0700823 /**
824 * Purges flow rules for the given device.
825 *
826 * @param deviceId the device for which to purge flow rules
827 */
Jon Hallfa132292017-10-24 11:11:24 -0700828 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700829 // If the device is still present in the store, purge the underlying DeviceFlowTable.
830 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
831 if (deviceService.getDevice(deviceId) != null) {
832 DeviceFlowTable flowTable = flowTables.get(deviceId);
833 if (flowTable != null) {
834 flowTable.purge();
835 }
836 } else {
837 DeviceFlowTable flowTable = flowTables.remove(deviceId);
838 if (flowTable != null) {
839 flowTable.close();
840 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700841 }
Jon Hallfa132292017-10-24 11:11:24 -0700842 }
843
Jordan Halterman281dbf32018-06-15 17:46:28 -0700844 /**
845 * Purges all flow rules from the table.
846 */
Jon Hallfa132292017-10-24 11:11:24 -0700847 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700848 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
849 while (iterator.hasNext()) {
850 iterator.next().close();
851 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700852 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700853 }
Jon Hallfa132292017-10-24 11:11:24 -0700854 }
855
856 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700857 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700858 deviceTableStats.put(deviceId, tableStats);
859 return null;
860 }
861
862 @Override
863 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
864 NodeId master = mastershipService.getMasterFor(deviceId);
865
866 if (master == null) {
867 log.debug("Failed to getTableStats: No master for {}", deviceId);
868 return Collections.emptyList();
869 }
870
871 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
872 if (tableStats == null) {
873 return Collections.emptyList();
874 }
875 return ImmutableList.copyOf(tableStats);
876 }
877
878 @Override
879 public long getActiveFlowRuleCount(DeviceId deviceId) {
880 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700881 .mapToLong(TableStatisticsEntry::activeFlowEntries)
882 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700883 }
884
885 private class InternalTableStatsListener
886 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
887 @Override
888 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700889 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700890 //TODO: Generate an event to listeners (do we need?)
891 }
892 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700893
894 /**
895 * Device lifecycle manager implementation.
896 */
897 private final class InternalLifecycleManager
898 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
899 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
900
901 private final DeviceId deviceId;
902
903 private volatile DeviceReplicaInfo replicaInfo;
904
905 InternalLifecycleManager(DeviceId deviceId) {
906 this.deviceId = deviceId;
907 replicaInfoManager.addListener(this);
908 mastershipTermLifecycles.addListener(this);
909 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
910 }
911
912 @Override
913 public DeviceReplicaInfo getReplicaInfo() {
914 return replicaInfo;
915 }
916
917 @Override
918 public void activate(long term) {
919 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
920 if (replicaInfo != null && replicaInfo.term() == term) {
921 mastershipTermLifecycles.put(deviceId, term);
922 }
923 }
924
925 @Override
926 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700927 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700928 onReplicaInfoChange(event.replicaInfo());
929 }
930 }
931
932 @Override
933 public void event(MapEvent<DeviceId, Long> event) {
934 if (event.key().equals(deviceId) && event.newValue() != null) {
935 onActivate(event.newValue().value());
936 }
937 }
938
939 /**
940 * Handles a term activation event.
941 *
942 * @param term the term that was activated
943 */
944 private void onActivate(long term) {
945 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
946 if (replicaInfo != null && replicaInfo.term() == term) {
947 NodeId master = replicaInfo.master().orElse(null);
948 List<NodeId> backups = replicaInfo.backups()
949 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
950 listenerRegistry.process(new LifecycleEvent(
951 LifecycleEvent.Type.TERM_ACTIVE,
952 new DeviceReplicaInfo(term, master, backups)));
953 }
954 }
955
956 /**
957 * Handles a replica info change event.
958 *
959 * @param replicaInfo the updated replica info
960 */
961 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
962 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
963 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
964 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
965 if (oldReplicaInfo != null) {
966 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
967 }
968 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700969 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
970 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700971 }
972 }
973
974 /**
975 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
976 *
977 * @param replicaInfo the replica info to convert
978 * @return the converted replica info
979 */
980 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
981 NodeId master = replicaInfo.master().orElse(null);
982 List<NodeId> backups = replicaInfo.backups()
983 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
984 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
985 }
986
987 @Override
988 public void close() {
989 replicaInfoManager.removeListener(this);
990 mastershipTermLifecycles.removeListener(this);
991 }
992 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700993}