blob: dad9351b950819803714768d790d678654fff32b [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070020import com.google.common.collect.Streams;
Jon Hallfa132292017-10-24 11:11:24 -070021import org.onlab.util.KryoNamespace;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.core.CoreService;
27import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070029import org.onosproject.mastership.MastershipService;
30import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070031import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070033import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.CompletedBatchOperation;
35import org.onosproject.net.flow.DefaultFlowEntry;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070038import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070039import org.onosproject.net.flow.FlowRuleEvent;
40import org.onosproject.net.flow.FlowRuleEvent.Type;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.FlowRuleStore;
43import org.onosproject.net.flow.FlowRuleStoreDelegate;
44import org.onosproject.net.flow.StoredFlowEntry;
45import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070046import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
47import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
48import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
49import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
50import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070051import org.onosproject.persistence.PersistenceService;
52import org.onosproject.store.AbstractStore;
53import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
54import org.onosproject.store.cluster.messaging.ClusterMessage;
55import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070056import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070057import org.onosproject.store.flow.ReplicaInfoEvent;
58import org.onosproject.store.flow.ReplicaInfoEventListener;
59import org.onosproject.store.flow.ReplicaInfoService;
60import org.onosproject.store.impl.MastershipBasedTimestamp;
61import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070062import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070063import org.onosproject.store.service.EventuallyConsistentMap;
64import org.onosproject.store.service.EventuallyConsistentMapEvent;
65import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070066import org.onosproject.store.service.MapEvent;
67import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070068import org.onosproject.store.service.Serializer;
69import org.onosproject.store.service.StorageService;
70import org.onosproject.store.service.WallClockTimestamp;
71import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
75import org.osgi.service.component.annotations.Modified;
76import org.osgi.service.component.annotations.Reference;
77import org.osgi.service.component.annotations.ReferenceCardinality;
Jon Hallfa132292017-10-24 11:11:24 -070078import org.slf4j.Logger;
79
Ray Milkeyd84f89b2018-08-17 14:54:17 -070080import java.util.Collections;
81import java.util.Dictionary;
82import java.util.HashSet;
83import java.util.Iterator;
84import java.util.List;
85import java.util.Map;
86import java.util.Objects;
87import java.util.Set;
88import java.util.concurrent.ExecutorService;
89import java.util.concurrent.Executors;
90import java.util.concurrent.ScheduledExecutorService;
91import java.util.concurrent.TimeUnit;
92import java.util.function.Function;
93import java.util.stream.Collectors;
94
Jon Hallfa132292017-10-24 11:11:24 -070095import static com.google.common.base.Strings.isNullOrEmpty;
96import static org.onlab.util.Tools.get;
97import static org.onlab.util.Tools.groupedThreads;
98import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -070099import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
100import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
106import static org.slf4j.LoggerFactory.getLogger;
107
Ray Milkeyb5646e62018-10-16 11:42:18 -0700108import static org.onosproject.store.OsgiPropertyConstants.*;
109
Jon Hallfa132292017-10-24 11:11:24 -0700110/**
111 * Manages inventory of flow rules using a distributed state management protocol.
112 */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700113@Component(
114 immediate = true,
115 service = FlowRuleStore.class,
116 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -0700117 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
118 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
119 ANTI_ENTROPY_PERIOD_MILLIS + ":Integer=" + ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT,
120 EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED + ":Boolean=" + EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT,
121 MAX_BACKUP_COUNT + ":Integer=" + MAX_BACKUP_COUNT_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -0700122 }
123)
Jon Hallfa132292017-10-24 11:11:24 -0700124public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700125 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
126 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700127
128 private final Logger log = getLogger(getClass());
129
Jon Hallfa132292017-10-24 11:11:24 -0700130 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700131
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700132 /** Number of threads in the message handler pool. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700133 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700134
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700135 /** Delay in ms between successive backup runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700136 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700137
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700138 /** Delay in ms between anti-entropy runs. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700139 private int antiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700140
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700141 /** Indicates whether or not changes in the flow table should be persisted to disk. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700142 private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700143
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700144 /** Max number of backup copies for each device. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700145 private volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700146
147 private InternalFlowTable flowTable = new InternalFlowTable();
148
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700150 protected ReplicaInfoService replicaInfoManager;
151
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700153 protected ClusterCommunicationService clusterCommunicator;
154
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700156 protected ClusterService clusterService;
157
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700159 protected DeviceService deviceService;
160
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700162 protected CoreService coreService;
163
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700165 protected ComponentConfigService configService;
166
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700168 protected MastershipService mastershipService;
169
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700171 protected PersistenceService persistenceService;
172
173 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
174 private ExecutorService messageHandlingExecutor;
175 private ExecutorService eventHandler;
176
Jon Hallfa132292017-10-24 11:11:24 -0700177 private final ScheduledExecutorService backupSenderExecutor =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700178 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
Jon Hallfa132292017-10-24 11:11:24 -0700179
180 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
181 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700182 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700183
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700184 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jon Hallfa132292017-10-24 11:11:24 -0700185 protected StorageService storageService;
186
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700187 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
188 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700189 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700190 .register(FlowBucket.class)
191 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700192
193 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700194 .register(KryoNamespaces.API)
195 .register(BucketId.class)
196 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700197
Jordan Halterman281dbf32018-06-15 17:46:28 -0700198 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700199
200 private IdGenerator idGenerator;
201 private NodeId local;
202
203 @Activate
204 public void activate(ComponentContext context) {
205 configService.registerProperties(getClass());
206
207 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
208
209 local = clusterService.getLocalNode().id();
210
211 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700212 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700213 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700214 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700215
216 registerMessageHandlers(messageHandlingExecutor);
217
Jordan Halterman281dbf32018-06-15 17:46:28 -0700218 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
219 .withName("onos-flow-store-terms")
220 .withSerializer(serializer)
221 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800222
Jon Hallfa132292017-10-24 11:11:24 -0700223 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700224 .withName("onos-flow-table-stats")
225 .withSerializer(serializerBuilder)
226 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
227 .withTimestampProvider((k, v) -> new WallClockTimestamp())
228 .withTombstonesDisabled()
229 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700230 deviceTableStats.addListener(tableStatsListener);
231
Jordan Halterman281dbf32018-06-15 17:46:28 -0700232 deviceService.addListener(flowTable);
233 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
234
Jon Hallfa132292017-10-24 11:11:24 -0700235 logConfig("Started");
236 }
237
238 @Deactivate
239 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700240 configService.unregisterProperties(getClass(), false);
241 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700242 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700243 deviceTableStats.removeListener(tableStatsListener);
244 deviceTableStats.destroy();
245 eventHandler.shutdownNow();
246 messageHandlingExecutor.shutdownNow();
247 backupSenderExecutor.shutdownNow();
248 log.info("Stopped");
249 }
250
251 @SuppressWarnings("rawtypes")
252 @Modified
253 public void modified(ComponentContext context) {
254 if (context == null) {
255 logConfig("Default config");
256 return;
257 }
258
259 Dictionary properties = context.getProperties();
260 int newPoolSize;
261 int newBackupPeriod;
262 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700263 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700264 try {
265 String s = get(properties, "msgHandlerPoolSize");
266 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
267
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700268 s = get(properties, BACKUP_PERIOD_MILLIS);
Jon Hallfa132292017-10-24 11:11:24 -0700269 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
270
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700271 s = get(properties, MAX_BACKUP_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700272 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700273
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700274 s = get(properties, ANTI_ENTROPY_PERIOD_MILLIS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700275 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700276 } catch (NumberFormatException | ClassCastException e) {
Ray Milkeyb5646e62018-10-16 11:42:18 -0700277 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
278 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
279 newBackupCount = MAX_BACKUP_COUNT_DEFAULT;
280 newAntiEntropyPeriod = ANTI_ENTROPY_PERIOD_MILLIS_DEFAULT;
Jon Hallfa132292017-10-24 11:11:24 -0700281 }
282
Jon Hallfa132292017-10-24 11:11:24 -0700283 if (newBackupPeriod != backupPeriod) {
284 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700285 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700286 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700287
288 if (newAntiEntropyPeriod != antiEntropyPeriod) {
289 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700290 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700291 }
292
Jon Hallfa132292017-10-24 11:11:24 -0700293 if (newPoolSize != msgHandlerPoolSize) {
294 msgHandlerPoolSize = newPoolSize;
295 ExecutorService oldMsgHandler = messageHandlingExecutor;
296 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700297 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700298
299 // replace previously registered handlers.
300 registerMessageHandlers(messageHandlingExecutor);
301 oldMsgHandler.shutdown();
302 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700303
Jon Hallfa132292017-10-24 11:11:24 -0700304 if (backupCount != newBackupCount) {
305 backupCount = newBackupCount;
306 }
307 logConfig("Reconfigured");
308 }
309
310 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700311 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
312 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700313 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700314 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700315 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700316 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700317 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700318 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700319 GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700320 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700321 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700322 }
323
324 private void unregisterMessageHandlers() {
325 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
326 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700327 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700328 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
329 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
330 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
331 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
332 }
333
334 private void logConfig(String prefix) {
335 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700336 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700337 }
338
Jon Hallfa132292017-10-24 11:11:24 -0700339 @Override
340 public int getFlowRuleCount() {
341 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700342 .mapToInt(device -> getFlowRuleCount(device.id()))
343 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800344 }
345
346 @Override
347 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700348 NodeId master = mastershipService.getMasterFor(deviceId);
349 if (master == null) {
350 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
351 return 0;
352 }
353
354 if (Objects.equals(local, master)) {
355 return flowTable.getFlowRuleCount(deviceId);
356 }
357
358 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
359 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
360 deviceId,
361 GET_DEVICE_FLOW_COUNT,
362 serializer::encode,
363 serializer::decode,
364 master),
365 FLOW_RULE_STORE_TIMEOUT_MILLIS,
366 TimeUnit.MILLISECONDS,
367 0);
Jon Hallfa132292017-10-24 11:11:24 -0700368 }
369
370 @Override
371 public FlowEntry getFlowEntry(FlowRule rule) {
372 NodeId master = mastershipService.getMasterFor(rule.deviceId());
373
374 if (master == null) {
375 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
376 return null;
377 }
378
379 if (Objects.equals(local, master)) {
380 return flowTable.getFlowEntry(rule);
381 }
382
383 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700384 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700385
386 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700387 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
388 serializer::encode,
389 serializer::decode,
390 master),
391 FLOW_RULE_STORE_TIMEOUT_MILLIS,
392 TimeUnit.MILLISECONDS,
393 null);
Jon Hallfa132292017-10-24 11:11:24 -0700394 }
395
396 @Override
397 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
398 NodeId master = mastershipService.getMasterFor(deviceId);
399
400 if (master == null) {
401 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
402 return Collections.emptyList();
403 }
404
405 if (Objects.equals(local, master)) {
406 return flowTable.getFlowEntries(deviceId);
407 }
408
409 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700410 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700411
412 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700413 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
414 serializer::encode,
415 serializer::decode,
416 master),
417 FLOW_RULE_STORE_TIMEOUT_MILLIS,
418 TimeUnit.MILLISECONDS,
419 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700420 }
421
422 @Override
423 public void storeFlowRule(FlowRule rule) {
424 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700425 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
426 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700427 }
428
429 @Override
430 public void storeBatch(FlowRuleBatchOperation operation) {
431 if (operation.getOperations().isEmpty()) {
432 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700433 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
434 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700435 return;
436 }
437
438 DeviceId deviceId = operation.deviceId();
439 NodeId master = mastershipService.getMasterFor(deviceId);
440
441 if (master == null) {
442 log.warn("No master for {} ", deviceId);
443
Jordan Halterman281dbf32018-06-15 17:46:28 -0700444 Set<FlowRule> allFailures = operation.getOperations()
445 .stream()
446 .map(op -> op.target())
447 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700448 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700449 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
450 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700451 return;
452 }
453
454 if (Objects.equals(local, master)) {
455 storeBatchInternal(operation);
456 return;
457 }
458
459 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700460 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700461
462 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700463 APPLY_BATCH_FLOWS,
464 serializer::encode,
465 master)
466 .whenComplete((result, error) -> {
467 if (error != null) {
468 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700469
Jordan Halterman281dbf32018-06-15 17:46:28 -0700470 Set<FlowRule> allFailures = operation.getOperations()
471 .stream()
472 .map(op -> op.target())
473 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700474
Jordan Halterman281dbf32018-06-15 17:46:28 -0700475 notifyDelegate(FlowRuleBatchEvent.completed(
476 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
477 new CompletedBatchOperation(false, allFailures, deviceId)));
478 }
479 });
Jon Hallfa132292017-10-24 11:11:24 -0700480 }
481
482 private void storeBatchInternal(FlowRuleBatchOperation operation) {
483
484 final DeviceId did = operation.deviceId();
485 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
486 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
487 if (currentOps.isEmpty()) {
488 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700489 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
490 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700491 return;
492 }
493
494 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700495 FlowRuleBatchRequest(operation.id(),
496 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700497 }
498
499 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
500 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700501 op -> {
502 StoredFlowEntry entry;
503 switch (op.operator()) {
504 case ADD:
505 entry = new DefaultFlowEntry(op.target());
506 flowTable.add(entry);
507 return op;
508 case MODIFY:
509 entry = new DefaultFlowEntry(op.target());
510 flowTable.update(entry);
511 return op;
512 case REMOVE:
513 return flowTable.update(op.target(), stored -> {
514 stored.setState(FlowEntryState.PENDING_REMOVE);
515 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800516 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700517 });
518 default:
519 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700520 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700521 return null;
522 }
Jon Hallfa132292017-10-24 11:11:24 -0700523 ).filter(Objects::nonNull).collect(Collectors.toSet());
524 }
525
526 @Override
527 public void deleteFlowRule(FlowRule rule) {
528 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700529 new FlowRuleBatchOperation(
530 Collections.singletonList(
531 new FlowRuleBatchEntry(
532 FlowRuleOperation.REMOVE,
533 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700534 }
535
536 @Override
537 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
538 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700539 return flowTable.update(rule, stored -> {
540 if (stored.state() == FlowEntryState.PENDING_ADD) {
541 stored.setState(FlowEntryState.PENDING_ADD);
542 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
543 }
544 return null;
545 });
Jon Hallfa132292017-10-24 11:11:24 -0700546 }
547 return null;
548 }
549
550 @Override
551 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
552 NodeId master = mastershipService.getMasterFor(rule.deviceId());
553 if (Objects.equals(local, master)) {
554 return addOrUpdateFlowRuleInternal(rule);
555 }
556
557 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700558 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700559 return null;
560 }
561
562 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700563 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700564 stored.setBytes(rule.bytes());
565 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
566 stored.setLiveType(rule.liveType());
567 stored.setPackets(rule.packets());
568 stored.setLastSeen();
569 if (stored.state() == FlowEntryState.PENDING_ADD) {
570 stored.setState(FlowEntryState.ADDED);
571 return new FlowRuleEvent(Type.RULE_ADDED, rule);
572 }
573 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700574 });
575 if (event != null) {
576 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700577 }
578
579 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
580 // TODO: also update backup if the behavior is correct.
581 flowTable.add(rule);
582 return null;
583 }
584
585 @Override
586 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
587 final DeviceId deviceId = rule.deviceId();
588 NodeId master = mastershipService.getMasterFor(deviceId);
589
590 if (Objects.equals(local, master)) {
591 // bypass and handle it locally
592 return removeFlowRuleInternal(rule);
593 }
594
595 if (master == null) {
596 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
597 // TODO: revisit if this should be null (="no-op") or Exception
598 return null;
599 }
600
601 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700602 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700603
Jordan Halterman281dbf32018-06-15 17:46:28 -0700604 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
605 rule,
606 REMOVE_FLOW_ENTRY,
607 serializer::encode,
608 serializer::decode,
609 master),
610 FLOW_RULE_STORE_TIMEOUT_MILLIS,
611 TimeUnit.MILLISECONDS,
612 null);
Jon Hallfa132292017-10-24 11:11:24 -0700613 }
614
615 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700616 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800617 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700618 // rule may be partial rule that is missing treatment, we should use rule from store instead
619 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
620 }
621
622 @Override
623 public void purgeFlowRule(DeviceId deviceId) {
624 flowTable.purgeFlowRule(deviceId);
625 }
626
627 @Override
628 public void purgeFlowRules() {
629 flowTable.purgeFlowRules();
630 }
631
632 @Override
633 public void batchOperationComplete(FlowRuleBatchEvent event) {
634 //FIXME: need a per device pending response
635 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
636 if (nodeId == null) {
637 notifyDelegate(event);
638 } else {
639 // TODO check unicast return value
640 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
641 //error log: log.warn("Failed to respond to peer for batch operation result");
642 }
643 }
644
645 private final class OnStoreBatch implements ClusterMessageHandler {
646
647 @Override
648 public void handle(final ClusterMessage message) {
649 FlowRuleBatchOperation operation = serializer.decode(message.payload());
650 log.debug("received batch request {}", operation);
651
652 final DeviceId deviceId = operation.deviceId();
653 NodeId master = mastershipService.getMasterFor(deviceId);
654 if (!Objects.equals(local, master)) {
655 Set<FlowRule> failures = new HashSet<>(operation.size());
656 for (FlowRuleBatchEntry op : operation.getOperations()) {
657 failures.add(op.target());
658 }
659 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
660 // This node is no longer the master, respond as all failed.
661 // TODO: we might want to wrap response in envelope
662 // to distinguish sw programming failure and hand over
663 // it make sense in the latter case to retry immediately.
664 message.respond(serializer.encode(allFailed));
665 return;
666 }
667
668 pendingResponses.put(operation.id(), message.sender());
669 storeBatchInternal(operation);
670 }
671 }
672
Jordan Halterman281dbf32018-06-15 17:46:28 -0700673 private class InternalFlowTable implements DeviceListener {
674 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700675
676 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700677 public void event(DeviceEvent event) {
678 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
679 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700680 }
681 }
682
Jordan Halterman281dbf32018-06-15 17:46:28 -0700683 /**
684 * Adds the given device to the flow table.
685 *
686 * @param deviceId the device to add to the table
687 */
688 public void addDevice(DeviceId deviceId) {
689 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
690 id,
691 clusterService,
692 clusterCommunicator,
693 new InternalLifecycleManager(id),
694 backupSenderExecutor,
695 backupPeriod,
696 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700697 }
698
Jordan Halterman281dbf32018-06-15 17:46:28 -0700699 /**
700 * Sets the flow table backup period.
701 *
702 * @param backupPeriod the flow table backup period
703 */
704 void setBackupPeriod(int backupPeriod) {
705 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700706 }
707
Jordan Halterman281dbf32018-06-15 17:46:28 -0700708 /**
709 * Sets the flow table anti-entropy period.
710 *
711 * @param antiEntropyPeriod the flow table anti-entropy period
712 */
713 void setAntiEntropyPeriod(int antiEntropyPeriod) {
714 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700715 }
716
Jordan Halterman281dbf32018-06-15 17:46:28 -0700717 /**
718 * Returns the flow table for a specific device.
719 *
720 * @param deviceId the device identifier
721 * @return the flow table for the given device
722 */
723 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
724 DeviceFlowTable flowTable = flowTables.get(deviceId);
725 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
726 deviceId,
727 clusterService,
728 clusterCommunicator,
729 new InternalLifecycleManager(deviceId),
730 backupSenderExecutor,
731 backupPeriod,
732 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700733 }
734
Jordan Halterman281dbf32018-06-15 17:46:28 -0700735 /**
736 * Returns the flow rule count for the given device.
737 *
738 * @param deviceId the device for which to return the flow rule count
739 * @return the flow rule count for the given device
740 */
741 public int getFlowRuleCount(DeviceId deviceId) {
742 return getFlowTable(deviceId).count();
743 }
744
745 /**
746 * Returns the flow entry for the given rule.
747 *
748 * @param rule the rule for which to return the flow entry
749 * @return the flow entry for the given rule
750 */
Jon Hallfa132292017-10-24 11:11:24 -0700751 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700752 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700753 }
754
Jordan Halterman281dbf32018-06-15 17:46:28 -0700755 /**
756 * Returns the set of flow entries for the given device.
757 *
758 * @param deviceId the device for which to lookup flow entries
759 * @return the set of flow entries for the given device
760 */
Jon Hallfa132292017-10-24 11:11:24 -0700761 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700762 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700763 }
764
Jordan Halterman281dbf32018-06-15 17:46:28 -0700765 /**
766 * Adds the given flow rule.
767 *
768 * @param rule the rule to add
769 */
Jon Hallfa132292017-10-24 11:11:24 -0700770 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700771 Tools.futureGetOrElse(
772 getFlowTable(rule.deviceId()).add(rule),
773 FLOW_RULE_STORE_TIMEOUT_MILLIS,
774 TimeUnit.MILLISECONDS,
775 null);
Jon Hallfa132292017-10-24 11:11:24 -0700776 }
777
Jordan Halterman281dbf32018-06-15 17:46:28 -0700778 /**
779 * Updates the given flow rule.
780 *
781 * @param rule the rule to update
782 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800783 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700784 Tools.futureGetOrElse(
785 getFlowTable(rule.deviceId()).update(rule),
786 FLOW_RULE_STORE_TIMEOUT_MILLIS,
787 TimeUnit.MILLISECONDS,
788 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800789 }
790
Jordan Halterman281dbf32018-06-15 17:46:28 -0700791 /**
792 * Applies the given update function to the rule.
793 *
794 * @param function the update function to apply
795 * @return a future to be completed with the update event or {@code null} if the rule was not updated
796 */
797 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
798 return Tools.futureGetOrElse(
799 getFlowTable(rule.deviceId()).update(rule, function),
800 FLOW_RULE_STORE_TIMEOUT_MILLIS,
801 TimeUnit.MILLISECONDS,
802 null);
803 }
804
805 /**
806 * Removes the given flow rule.
807 *
808 * @param rule the rule to remove
809 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800810 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700811 return Tools.futureGetOrElse(
812 getFlowTable(rule.deviceId()).remove(rule),
813 FLOW_RULE_STORE_TIMEOUT_MILLIS,
814 TimeUnit.MILLISECONDS,
815 null);
Jon Hallfa132292017-10-24 11:11:24 -0700816 }
817
Jordan Halterman281dbf32018-06-15 17:46:28 -0700818 /**
819 * Purges flow rules for the given device.
820 *
821 * @param deviceId the device for which to purge flow rules
822 */
Jon Hallfa132292017-10-24 11:11:24 -0700823 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700824 // If the device is still present in the store, purge the underlying DeviceFlowTable.
825 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
826 if (deviceService.getDevice(deviceId) != null) {
827 DeviceFlowTable flowTable = flowTables.get(deviceId);
828 if (flowTable != null) {
829 flowTable.purge();
830 }
831 } else {
832 DeviceFlowTable flowTable = flowTables.remove(deviceId);
833 if (flowTable != null) {
834 flowTable.close();
835 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700836 }
Jon Hallfa132292017-10-24 11:11:24 -0700837 }
838
Jordan Halterman281dbf32018-06-15 17:46:28 -0700839 /**
840 * Purges all flow rules from the table.
841 */
Jon Hallfa132292017-10-24 11:11:24 -0700842 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700843 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
844 while (iterator.hasNext()) {
845 iterator.next().close();
846 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700847 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700848 }
Jon Hallfa132292017-10-24 11:11:24 -0700849 }
850
851 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700852 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700853 deviceTableStats.put(deviceId, tableStats);
854 return null;
855 }
856
857 @Override
858 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
859 NodeId master = mastershipService.getMasterFor(deviceId);
860
861 if (master == null) {
862 log.debug("Failed to getTableStats: No master for {}", deviceId);
863 return Collections.emptyList();
864 }
865
866 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
867 if (tableStats == null) {
868 return Collections.emptyList();
869 }
870 return ImmutableList.copyOf(tableStats);
871 }
872
873 @Override
874 public long getActiveFlowRuleCount(DeviceId deviceId) {
875 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700876 .mapToLong(TableStatisticsEntry::activeFlowEntries)
877 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700878 }
879
880 private class InternalTableStatsListener
881 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
882 @Override
883 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700884 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700885 //TODO: Generate an event to listeners (do we need?)
886 }
887 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700888
889 /**
890 * Device lifecycle manager implementation.
891 */
892 private final class InternalLifecycleManager
893 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
894 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
895
896 private final DeviceId deviceId;
897
898 private volatile DeviceReplicaInfo replicaInfo;
899
900 InternalLifecycleManager(DeviceId deviceId) {
901 this.deviceId = deviceId;
902 replicaInfoManager.addListener(this);
903 mastershipTermLifecycles.addListener(this);
904 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
905 }
906
907 @Override
908 public DeviceReplicaInfo getReplicaInfo() {
909 return replicaInfo;
910 }
911
912 @Override
913 public void activate(long term) {
914 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
915 if (replicaInfo != null && replicaInfo.term() == term) {
916 mastershipTermLifecycles.put(deviceId, term);
917 }
918 }
919
920 @Override
921 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700922 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700923 onReplicaInfoChange(event.replicaInfo());
924 }
925 }
926
927 @Override
928 public void event(MapEvent<DeviceId, Long> event) {
929 if (event.key().equals(deviceId) && event.newValue() != null) {
930 onActivate(event.newValue().value());
931 }
932 }
933
934 /**
935 * Handles a term activation event.
936 *
937 * @param term the term that was activated
938 */
939 private void onActivate(long term) {
940 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
941 if (replicaInfo != null && replicaInfo.term() == term) {
942 NodeId master = replicaInfo.master().orElse(null);
943 List<NodeId> backups = replicaInfo.backups()
944 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
945 listenerRegistry.process(new LifecycleEvent(
946 LifecycleEvent.Type.TERM_ACTIVE,
947 new DeviceReplicaInfo(term, master, backups)));
948 }
949 }
950
951 /**
952 * Handles a replica info change event.
953 *
954 * @param replicaInfo the updated replica info
955 */
956 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
957 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
958 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
959 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
960 if (oldReplicaInfo != null) {
961 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
962 }
963 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700964 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
965 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700966 }
967 }
968
969 /**
970 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
971 *
972 * @param replicaInfo the replica info to convert
973 * @return the converted replica info
974 */
975 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
976 NodeId master = replicaInfo.master().orElse(null);
977 List<NodeId> backups = replicaInfo.backups()
978 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
979 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
980 }
981
982 @Override
983 public void close() {
984 replicaInfoManager.removeListener(this);
985 mastershipTermLifecycles.removeListener(this);
986 }
987 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700988}