blob: 450201695c3cc270f2e4487c0374a294789d6d6d [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2* Copyright 2014-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
Jon Hallfa132292017-10-24 11:11:24 -070016package org.onosproject.store.flow.impl;
17
18import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
Jordan Halterman281dbf32018-06-15 17:46:28 -070021import java.util.Iterator;
Jon Hallfa132292017-10-24 11:11:24 -070022import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.ScheduledExecutorService;
Jon Hallfa132292017-10-24 11:11:24 -070029import java.util.concurrent.TimeUnit;
Jordan Halterman281dbf32018-06-15 17:46:28 -070030import java.util.function.Function;
Jon Hallfa132292017-10-24 11:11:24 -070031import java.util.stream.Collectors;
32
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070033import com.google.common.collect.ImmutableList;
34import com.google.common.collect.Maps;
Jon Hallfa132292017-10-24 11:11:24 -070035import com.google.common.collect.Streams;
36import org.apache.felix.scr.annotations.Activate;
37import org.apache.felix.scr.annotations.Component;
38import org.apache.felix.scr.annotations.Deactivate;
39import org.apache.felix.scr.annotations.Modified;
40import org.apache.felix.scr.annotations.Property;
41import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.util.KryoNamespace;
45import org.onlab.util.Tools;
46import org.onosproject.cfg.ComponentConfigService;
47import org.onosproject.cluster.ClusterService;
48import org.onosproject.cluster.NodeId;
49import org.onosproject.core.CoreService;
50import org.onosproject.core.IdGenerator;
Jordan Halterman281dbf32018-06-15 17:46:28 -070051import org.onosproject.event.AbstractListenerManager;
Jon Hallfa132292017-10-24 11:11:24 -070052import org.onosproject.mastership.MastershipService;
53import org.onosproject.net.DeviceId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070054import org.onosproject.net.device.DeviceEvent;
55import org.onosproject.net.device.DeviceListener;
Jon Hallfa132292017-10-24 11:11:24 -070056import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.flow.CompletedBatchOperation;
58import org.onosproject.net.flow.DefaultFlowEntry;
59import org.onosproject.net.flow.FlowEntry;
60import org.onosproject.net.flow.FlowEntry.FlowEntryState;
Jon Hallfa132292017-10-24 11:11:24 -070061import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070062import org.onosproject.net.flow.FlowRuleEvent;
63import org.onosproject.net.flow.FlowRuleEvent.Type;
64import org.onosproject.net.flow.FlowRuleService;
65import org.onosproject.net.flow.FlowRuleStore;
66import org.onosproject.net.flow.FlowRuleStoreDelegate;
67import org.onosproject.net.flow.StoredFlowEntry;
68import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070069import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
70import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
71import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
72import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
73import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070074import org.onosproject.persistence.PersistenceService;
75import org.onosproject.store.AbstractStore;
76import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
77import org.onosproject.store.cluster.messaging.ClusterMessage;
78import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Jordan Halterman281dbf32018-06-15 17:46:28 -070079import org.onosproject.store.flow.ReplicaInfo;
Jon Hallfa132292017-10-24 11:11:24 -070080import org.onosproject.store.flow.ReplicaInfoEvent;
81import org.onosproject.store.flow.ReplicaInfoEventListener;
82import org.onosproject.store.flow.ReplicaInfoService;
83import org.onosproject.store.impl.MastershipBasedTimestamp;
84import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman281dbf32018-06-15 17:46:28 -070085import org.onosproject.store.service.AsyncConsistentMap;
Jon Hallfa132292017-10-24 11:11:24 -070086import org.onosproject.store.service.EventuallyConsistentMap;
87import org.onosproject.store.service.EventuallyConsistentMapEvent;
88import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman281dbf32018-06-15 17:46:28 -070089import org.onosproject.store.service.MapEvent;
90import org.onosproject.store.service.MapEventListener;
Jon Hallfa132292017-10-24 11:11:24 -070091import org.onosproject.store.service.Serializer;
92import org.onosproject.store.service.StorageService;
93import org.onosproject.store.service.WallClockTimestamp;
94import org.osgi.service.component.ComponentContext;
95import org.slf4j.Logger;
96
Jon Hallfa132292017-10-24 11:11:24 -070097import static com.google.common.base.Strings.isNullOrEmpty;
98import static org.onlab.util.Tools.get;
99import static org.onlab.util.Tools.groupedThreads;
100import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
Jon Hallfa132292017-10-24 11:11:24 -0700104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
106import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
107import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Manages inventory of flow rules using a distributed state management protocol.
112 */
Thomas Vachuska71026b22018-01-05 16:01:44 -0800113@Component(immediate = true)
Jon Hallfa132292017-10-24 11:11:24 -0700114@Service
115public class ECFlowRuleStore
Jordan Halterman281dbf32018-06-15 17:46:28 -0700116 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
117 implements FlowRuleStore {
Jon Hallfa132292017-10-24 11:11:24 -0700118
119 private final Logger log = getLogger(getClass());
120
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
122 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
123 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
124 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Jordan Halterman5259b332018-06-12 15:34:19 -0700125 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700126 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700127
128 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700129 label = "Number of threads in the message handler pool")
Jon Hallfa132292017-10-24 11:11:24 -0700130 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
131
132 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700133 label = "Delay in ms between successive backup runs")
Jon Hallfa132292017-10-24 11:11:24 -0700134 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Jordan Halterman5259b332018-06-12 15:34:19 -0700135
136 @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700137 label = "Delay in ms between anti-entropy runs")
Jordan Halterman5259b332018-06-12 15:34:19 -0700138 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
139
Jon Hallfa132292017-10-24 11:11:24 -0700140 @Property(name = "persistenceEnabled", boolValue = false,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700141 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Jon Hallfa132292017-10-24 11:11:24 -0700142 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
143
144 @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700145 label = "Max number of backup copies for each device")
Jon Hallfa132292017-10-24 11:11:24 -0700146 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
147
148 private InternalFlowTable flowTable = new InternalFlowTable();
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected ReplicaInfoService replicaInfoManager;
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected ClusterCommunicationService clusterCommunicator;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157 protected ClusterService clusterService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160 protected DeviceService deviceService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163 protected CoreService coreService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166 protected ComponentConfigService configService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169 protected MastershipService mastershipService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
172 protected PersistenceService persistenceService;
173
174 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
175 private ExecutorService messageHandlingExecutor;
176 private ExecutorService eventHandler;
177
Jon Hallfa132292017-10-24 11:11:24 -0700178 private final ScheduledExecutorService backupSenderExecutor =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700179 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
Jon Hallfa132292017-10-24 11:11:24 -0700180
181 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
182 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
Jordan Halterman281dbf32018-06-15 17:46:28 -0700183 new InternalTableStatsListener();
Jon Hallfa132292017-10-24 11:11:24 -0700184
185 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
186 protected StorageService storageService;
187
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700188 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
189 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700190 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700191 .register(FlowBucket.class)
192 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700193
194 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700195 .register(KryoNamespaces.API)
196 .register(BucketId.class)
197 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700198
Jordan Halterman281dbf32018-06-15 17:46:28 -0700199 protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
Jon Hallfa132292017-10-24 11:11:24 -0700200
201 private IdGenerator idGenerator;
202 private NodeId local;
203
204 @Activate
205 public void activate(ComponentContext context) {
206 configService.registerProperties(getClass());
207
208 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
209
210 local = clusterService.getLocalNode().id();
211
212 eventHandler = Executors.newSingleThreadExecutor(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700213 groupedThreads("onos/flow", "event-handler", log));
Jon Hallfa132292017-10-24 11:11:24 -0700214 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700215 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700216
217 registerMessageHandlers(messageHandlingExecutor);
218
Jordan Halterman281dbf32018-06-15 17:46:28 -0700219 mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
220 .withName("onos-flow-store-terms")
221 .withSerializer(serializer)
222 .buildAsyncMap();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800223
Jon Hallfa132292017-10-24 11:11:24 -0700224 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700225 .withName("onos-flow-table-stats")
226 .withSerializer(serializerBuilder)
227 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
228 .withTimestampProvider((k, v) -> new WallClockTimestamp())
229 .withTombstonesDisabled()
230 .build();
Jon Hallfa132292017-10-24 11:11:24 -0700231 deviceTableStats.addListener(tableStatsListener);
232
Jordan Halterman281dbf32018-06-15 17:46:28 -0700233 deviceService.addListener(flowTable);
234 deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
235
Jon Hallfa132292017-10-24 11:11:24 -0700236 logConfig("Started");
237 }
238
239 @Deactivate
240 public void deactivate(ComponentContext context) {
Jon Hallfa132292017-10-24 11:11:24 -0700241 configService.unregisterProperties(getClass(), false);
242 unregisterMessageHandlers();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700243 deviceService.removeListener(flowTable);
Jon Hallfa132292017-10-24 11:11:24 -0700244 deviceTableStats.removeListener(tableStatsListener);
245 deviceTableStats.destroy();
246 eventHandler.shutdownNow();
247 messageHandlingExecutor.shutdownNow();
248 backupSenderExecutor.shutdownNow();
249 log.info("Stopped");
250 }
251
252 @SuppressWarnings("rawtypes")
253 @Modified
254 public void modified(ComponentContext context) {
255 if (context == null) {
256 logConfig("Default config");
257 return;
258 }
259
260 Dictionary properties = context.getProperties();
261 int newPoolSize;
262 int newBackupPeriod;
263 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700264 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700265 try {
266 String s = get(properties, "msgHandlerPoolSize");
267 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
268
269 s = get(properties, "backupPeriod");
270 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
271
272 s = get(properties, "backupCount");
273 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700274
275 s = get(properties, "antiEntropyPeriod");
276 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700277 } catch (NumberFormatException | ClassCastException e) {
278 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
279 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
280 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700281 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700282 }
283
Jon Hallfa132292017-10-24 11:11:24 -0700284 if (newBackupPeriod != backupPeriod) {
285 backupPeriod = newBackupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700286 flowTable.setBackupPeriod(newBackupPeriod);
Jon Hallfa132292017-10-24 11:11:24 -0700287 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700288
289 if (newAntiEntropyPeriod != antiEntropyPeriod) {
290 antiEntropyPeriod = newAntiEntropyPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700291 flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
Jordan Halterman5259b332018-06-12 15:34:19 -0700292 }
293
Jon Hallfa132292017-10-24 11:11:24 -0700294 if (newPoolSize != msgHandlerPoolSize) {
295 msgHandlerPoolSize = newPoolSize;
296 ExecutorService oldMsgHandler = messageHandlingExecutor;
297 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700298 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
Jon Hallfa132292017-10-24 11:11:24 -0700299
300 // replace previously registered handlers.
301 registerMessageHandlers(messageHandlingExecutor);
302 oldMsgHandler.shutdown();
303 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700304
Jon Hallfa132292017-10-24 11:11:24 -0700305 if (backupCount != newBackupCount) {
306 backupCount = newBackupCount;
307 }
308 logConfig("Reconfigured");
309 }
310
311 private void registerMessageHandlers(ExecutorService executor) {
Jon Hallfa132292017-10-24 11:11:24 -0700312 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
313 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700314 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700315 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700316 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700317 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700318 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700319 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700320 GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700321 clusterCommunicator.addSubscriber(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700322 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700323 }
324
325 private void unregisterMessageHandlers() {
326 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
327 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700328 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
Jon Hallfa132292017-10-24 11:11:24 -0700329 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
330 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
331 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
332 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
333 }
334
335 private void logConfig(String prefix) {
336 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700337 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
Jon Hallfa132292017-10-24 11:11:24 -0700338 }
339
Jon Hallfa132292017-10-24 11:11:24 -0700340 @Override
341 public int getFlowRuleCount() {
342 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Halterman281dbf32018-06-15 17:46:28 -0700343 .mapToInt(device -> getFlowRuleCount(device.id()))
344 .sum();
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800345 }
346
347 @Override
348 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700349 NodeId master = mastershipService.getMasterFor(deviceId);
350 if (master == null) {
351 log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
352 return 0;
353 }
354
355 if (Objects.equals(local, master)) {
356 return flowTable.getFlowRuleCount(deviceId);
357 }
358
359 log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
360 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
361 deviceId,
362 GET_DEVICE_FLOW_COUNT,
363 serializer::encode,
364 serializer::decode,
365 master),
366 FLOW_RULE_STORE_TIMEOUT_MILLIS,
367 TimeUnit.MILLISECONDS,
368 0);
Jon Hallfa132292017-10-24 11:11:24 -0700369 }
370
371 @Override
372 public FlowEntry getFlowEntry(FlowRule rule) {
373 NodeId master = mastershipService.getMasterFor(rule.deviceId());
374
375 if (master == null) {
376 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
377 return null;
378 }
379
380 if (Objects.equals(local, master)) {
381 return flowTable.getFlowEntry(rule);
382 }
383
384 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700385 master, rule.deviceId());
Jon Hallfa132292017-10-24 11:11:24 -0700386
387 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700388 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
389 serializer::encode,
390 serializer::decode,
391 master),
392 FLOW_RULE_STORE_TIMEOUT_MILLIS,
393 TimeUnit.MILLISECONDS,
394 null);
Jon Hallfa132292017-10-24 11:11:24 -0700395 }
396
397 @Override
398 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
399 NodeId master = mastershipService.getMasterFor(deviceId);
400
401 if (master == null) {
402 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
403 return Collections.emptyList();
404 }
405
406 if (Objects.equals(local, master)) {
407 return flowTable.getFlowEntries(deviceId);
408 }
409
410 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700411 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700412
413 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700414 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
415 serializer::encode,
416 serializer::decode,
417 master),
418 FLOW_RULE_STORE_TIMEOUT_MILLIS,
419 TimeUnit.MILLISECONDS,
420 Collections.emptyList());
Jon Hallfa132292017-10-24 11:11:24 -0700421 }
422
423 @Override
424 public void storeFlowRule(FlowRule rule) {
425 storeBatch(new FlowRuleBatchOperation(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700426 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
427 rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700428 }
429
430 @Override
431 public void storeBatch(FlowRuleBatchOperation operation) {
432 if (operation.getOperations().isEmpty()) {
433 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700434 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
435 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jon Hallfa132292017-10-24 11:11:24 -0700436 return;
437 }
438
439 DeviceId deviceId = operation.deviceId();
440 NodeId master = mastershipService.getMasterFor(deviceId);
441
442 if (master == null) {
443 log.warn("No master for {} ", deviceId);
444
Jordan Halterman281dbf32018-06-15 17:46:28 -0700445 Set<FlowRule> allFailures = operation.getOperations()
446 .stream()
447 .map(op -> op.target())
448 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700449 notifyDelegate(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700450 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
451 new CompletedBatchOperation(false, allFailures, deviceId)));
Jon Hallfa132292017-10-24 11:11:24 -0700452 return;
453 }
454
455 if (Objects.equals(local, master)) {
456 storeBatchInternal(operation);
457 return;
458 }
459
460 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700461 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700462
463 clusterCommunicator.unicast(operation,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700464 APPLY_BATCH_FLOWS,
465 serializer::encode,
466 master)
467 .whenComplete((result, error) -> {
468 if (error != null) {
469 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Jon Hallfa132292017-10-24 11:11:24 -0700470
Jordan Halterman281dbf32018-06-15 17:46:28 -0700471 Set<FlowRule> allFailures = operation.getOperations()
472 .stream()
473 .map(op -> op.target())
474 .collect(Collectors.toSet());
Jon Hallfa132292017-10-24 11:11:24 -0700475
Jordan Halterman281dbf32018-06-15 17:46:28 -0700476 notifyDelegate(FlowRuleBatchEvent.completed(
477 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
478 new CompletedBatchOperation(false, allFailures, deviceId)));
479 }
480 });
Jon Hallfa132292017-10-24 11:11:24 -0700481 }
482
483 private void storeBatchInternal(FlowRuleBatchOperation operation) {
484
485 final DeviceId did = operation.deviceId();
486 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
487 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
488 if (currentOps.isEmpty()) {
489 batchOperationComplete(FlowRuleBatchEvent.completed(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700490 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
491 new CompletedBatchOperation(true, Collections.emptySet(), did)));
Jon Hallfa132292017-10-24 11:11:24 -0700492 return;
493 }
494
495 notifyDelegate(FlowRuleBatchEvent.requested(new
Jordan Halterman281dbf32018-06-15 17:46:28 -0700496 FlowRuleBatchRequest(operation.id(),
497 currentOps), operation.deviceId()));
Jon Hallfa132292017-10-24 11:11:24 -0700498 }
499
500 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
501 return operation.getOperations().stream().map(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700502 op -> {
503 StoredFlowEntry entry;
504 switch (op.operator()) {
505 case ADD:
506 entry = new DefaultFlowEntry(op.target());
507 flowTable.add(entry);
508 return op;
509 case MODIFY:
510 entry = new DefaultFlowEntry(op.target());
511 flowTable.update(entry);
512 return op;
513 case REMOVE:
514 return flowTable.update(op.target(), stored -> {
515 stored.setState(FlowEntryState.PENDING_REMOVE);
516 log.debug("Setting state of rule to pending remove: {}", stored);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800517 return op;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700518 });
519 default:
520 log.warn("Unknown flow operation operator: {}", op.operator());
Jon Hallfa132292017-10-24 11:11:24 -0700521 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700522 return null;
523 }
Jon Hallfa132292017-10-24 11:11:24 -0700524 ).filter(Objects::nonNull).collect(Collectors.toSet());
525 }
526
527 @Override
528 public void deleteFlowRule(FlowRule rule) {
529 storeBatch(
Jordan Halterman281dbf32018-06-15 17:46:28 -0700530 new FlowRuleBatchOperation(
531 Collections.singletonList(
532 new FlowRuleBatchEntry(
533 FlowRuleOperation.REMOVE,
534 rule)), rule.deviceId(), idGenerator.getNewId()));
Jon Hallfa132292017-10-24 11:11:24 -0700535 }
536
537 @Override
538 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
539 if (mastershipService.isLocalMaster(rule.deviceId())) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700540 return flowTable.update(rule, stored -> {
541 if (stored.state() == FlowEntryState.PENDING_ADD) {
542 stored.setState(FlowEntryState.PENDING_ADD);
543 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
544 }
545 return null;
546 });
Jon Hallfa132292017-10-24 11:11:24 -0700547 }
548 return null;
549 }
550
551 @Override
552 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
553 NodeId master = mastershipService.getMasterFor(rule.deviceId());
554 if (Objects.equals(local, master)) {
555 return addOrUpdateFlowRuleInternal(rule);
556 }
557
558 log.warn("Tried to update FlowRule {} state,"
Jordan Halterman281dbf32018-06-15 17:46:28 -0700559 + " while the Node was not the master.", rule);
Jon Hallfa132292017-10-24 11:11:24 -0700560 return null;
561 }
562
563 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700564 FlowRuleEvent event = flowTable.update(rule, stored -> {
Jon Hallfa132292017-10-24 11:11:24 -0700565 stored.setBytes(rule.bytes());
566 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
567 stored.setLiveType(rule.liveType());
568 stored.setPackets(rule.packets());
569 stored.setLastSeen();
570 if (stored.state() == FlowEntryState.PENDING_ADD) {
571 stored.setState(FlowEntryState.ADDED);
572 return new FlowRuleEvent(Type.RULE_ADDED, rule);
573 }
574 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700575 });
576 if (event != null) {
577 return event;
Jon Hallfa132292017-10-24 11:11:24 -0700578 }
579
580 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
581 // TODO: also update backup if the behavior is correct.
582 flowTable.add(rule);
583 return null;
584 }
585
586 @Override
587 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
588 final DeviceId deviceId = rule.deviceId();
589 NodeId master = mastershipService.getMasterFor(deviceId);
590
591 if (Objects.equals(local, master)) {
592 // bypass and handle it locally
593 return removeFlowRuleInternal(rule);
594 }
595
596 if (master == null) {
597 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
598 // TODO: revisit if this should be null (="no-op") or Exception
599 return null;
600 }
601
602 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
Jordan Halterman281dbf32018-06-15 17:46:28 -0700603 master, deviceId);
Jon Hallfa132292017-10-24 11:11:24 -0700604
Jordan Halterman281dbf32018-06-15 17:46:28 -0700605 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
606 rule,
607 REMOVE_FLOW_ENTRY,
608 serializer::encode,
609 serializer::decode,
610 master),
611 FLOW_RULE_STORE_TIMEOUT_MILLIS,
612 TimeUnit.MILLISECONDS,
613 null);
Jon Hallfa132292017-10-24 11:11:24 -0700614 }
615
616 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700617 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800618 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700619 // rule may be partial rule that is missing treatment, we should use rule from store instead
620 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
621 }
622
623 @Override
624 public void purgeFlowRule(DeviceId deviceId) {
625 flowTable.purgeFlowRule(deviceId);
626 }
627
628 @Override
629 public void purgeFlowRules() {
630 flowTable.purgeFlowRules();
631 }
632
633 @Override
634 public void batchOperationComplete(FlowRuleBatchEvent event) {
635 //FIXME: need a per device pending response
636 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
637 if (nodeId == null) {
638 notifyDelegate(event);
639 } else {
640 // TODO check unicast return value
641 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
642 //error log: log.warn("Failed to respond to peer for batch operation result");
643 }
644 }
645
646 private final class OnStoreBatch implements ClusterMessageHandler {
647
648 @Override
649 public void handle(final ClusterMessage message) {
650 FlowRuleBatchOperation operation = serializer.decode(message.payload());
651 log.debug("received batch request {}", operation);
652
653 final DeviceId deviceId = operation.deviceId();
654 NodeId master = mastershipService.getMasterFor(deviceId);
655 if (!Objects.equals(local, master)) {
656 Set<FlowRule> failures = new HashSet<>(operation.size());
657 for (FlowRuleBatchEntry op : operation.getOperations()) {
658 failures.add(op.target());
659 }
660 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
661 // This node is no longer the master, respond as all failed.
662 // TODO: we might want to wrap response in envelope
663 // to distinguish sw programming failure and hand over
664 // it make sense in the latter case to retry immediately.
665 message.respond(serializer.encode(allFailed));
666 return;
667 }
668
669 pendingResponses.put(operation.id(), message.sender());
670 storeBatchInternal(operation);
671 }
672 }
673
Jordan Halterman281dbf32018-06-15 17:46:28 -0700674 private class InternalFlowTable implements DeviceListener {
675 private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
Jon Hallfa132292017-10-24 11:11:24 -0700676
677 @Override
Jordan Halterman281dbf32018-06-15 17:46:28 -0700678 public void event(DeviceEvent event) {
679 if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
680 addDevice(event.subject().id());
Jon Hallfa132292017-10-24 11:11:24 -0700681 }
682 }
683
Jordan Halterman281dbf32018-06-15 17:46:28 -0700684 /**
685 * Adds the given device to the flow table.
686 *
687 * @param deviceId the device to add to the table
688 */
689 public void addDevice(DeviceId deviceId) {
690 flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
691 id,
692 clusterService,
693 clusterCommunicator,
694 new InternalLifecycleManager(id),
695 backupSenderExecutor,
696 backupPeriod,
697 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700698 }
699
Jordan Halterman281dbf32018-06-15 17:46:28 -0700700 /**
701 * Sets the flow table backup period.
702 *
703 * @param backupPeriod the flow table backup period
704 */
705 void setBackupPeriod(int backupPeriod) {
706 flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700707 }
708
Jordan Halterman281dbf32018-06-15 17:46:28 -0700709 /**
710 * Sets the flow table anti-entropy period.
711 *
712 * @param antiEntropyPeriod the flow table anti-entropy period
713 */
714 void setAntiEntropyPeriod(int antiEntropyPeriod) {
715 flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700716 }
717
Jordan Halterman281dbf32018-06-15 17:46:28 -0700718 /**
719 * Returns the flow table for a specific device.
720 *
721 * @param deviceId the device identifier
722 * @return the flow table for the given device
723 */
724 private DeviceFlowTable getFlowTable(DeviceId deviceId) {
725 DeviceFlowTable flowTable = flowTables.get(deviceId);
726 return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
727 deviceId,
728 clusterService,
729 clusterCommunicator,
730 new InternalLifecycleManager(deviceId),
731 backupSenderExecutor,
732 backupPeriod,
733 antiEntropyPeriod));
Jon Hallfa132292017-10-24 11:11:24 -0700734 }
735
Jordan Halterman281dbf32018-06-15 17:46:28 -0700736 /**
737 * Returns the flow rule count for the given device.
738 *
739 * @param deviceId the device for which to return the flow rule count
740 * @return the flow rule count for the given device
741 */
742 public int getFlowRuleCount(DeviceId deviceId) {
743 return getFlowTable(deviceId).count();
744 }
745
746 /**
747 * Returns the flow entry for the given rule.
748 *
749 * @param rule the rule for which to return the flow entry
750 * @return the flow entry for the given rule
751 */
Jon Hallfa132292017-10-24 11:11:24 -0700752 public StoredFlowEntry getFlowEntry(FlowRule rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700753 return getFlowTable(rule.deviceId()).getFlowEntry(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700754 }
755
Jordan Halterman281dbf32018-06-15 17:46:28 -0700756 /**
757 * Returns the set of flow entries for the given device.
758 *
759 * @param deviceId the device for which to lookup flow entries
760 * @return the set of flow entries for the given device
761 */
Jon Hallfa132292017-10-24 11:11:24 -0700762 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700763 return getFlowTable(deviceId).getFlowEntries();
Jon Hallfa132292017-10-24 11:11:24 -0700764 }
765
Jordan Halterman281dbf32018-06-15 17:46:28 -0700766 /**
767 * Adds the given flow rule.
768 *
769 * @param rule the rule to add
770 */
Jon Hallfa132292017-10-24 11:11:24 -0700771 public void add(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700772 Tools.futureGetOrElse(
773 getFlowTable(rule.deviceId()).add(rule),
774 FLOW_RULE_STORE_TIMEOUT_MILLIS,
775 TimeUnit.MILLISECONDS,
776 null);
Jon Hallfa132292017-10-24 11:11:24 -0700777 }
778
Jordan Halterman281dbf32018-06-15 17:46:28 -0700779 /**
780 * Updates the given flow rule.
781 *
782 * @param rule the rule to update
783 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800784 public void update(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700785 Tools.futureGetOrElse(
786 getFlowTable(rule.deviceId()).update(rule),
787 FLOW_RULE_STORE_TIMEOUT_MILLIS,
788 TimeUnit.MILLISECONDS,
789 null);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800790 }
791
Jordan Halterman281dbf32018-06-15 17:46:28 -0700792 /**
793 * Applies the given update function to the rule.
794 *
795 * @param function the update function to apply
796 * @return a future to be completed with the update event or {@code null} if the rule was not updated
797 */
798 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
799 return Tools.futureGetOrElse(
800 getFlowTable(rule.deviceId()).update(rule, function),
801 FLOW_RULE_STORE_TIMEOUT_MILLIS,
802 TimeUnit.MILLISECONDS,
803 null);
804 }
805
806 /**
807 * Removes the given flow rule.
808 *
809 * @param rule the rule to remove
810 */
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800811 public FlowEntry remove(FlowEntry rule) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700812 return Tools.futureGetOrElse(
813 getFlowTable(rule.deviceId()).remove(rule),
814 FLOW_RULE_STORE_TIMEOUT_MILLIS,
815 TimeUnit.MILLISECONDS,
816 null);
Jon Hallfa132292017-10-24 11:11:24 -0700817 }
818
Jordan Halterman281dbf32018-06-15 17:46:28 -0700819 /**
820 * Purges flow rules for the given device.
821 *
822 * @param deviceId the device for which to purge flow rules
823 */
Jon Hallfa132292017-10-24 11:11:24 -0700824 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700825 // If the device is still present in the store, purge the underlying DeviceFlowTable.
826 // Otherwise, remove the DeviceFlowTable and unregister message handlers.
827 if (deviceService.getDevice(deviceId) != null) {
828 DeviceFlowTable flowTable = flowTables.get(deviceId);
829 if (flowTable != null) {
830 flowTable.purge();
831 }
832 } else {
833 DeviceFlowTable flowTable = flowTables.remove(deviceId);
834 if (flowTable != null) {
835 flowTable.close();
836 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700837 }
Jon Hallfa132292017-10-24 11:11:24 -0700838 }
839
Jordan Halterman281dbf32018-06-15 17:46:28 -0700840 /**
841 * Purges all flow rules from the table.
842 */
Jon Hallfa132292017-10-24 11:11:24 -0700843 public void purgeFlowRules() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700844 Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
845 while (iterator.hasNext()) {
846 iterator.next().close();
847 iterator.remove();
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700848 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700849 }
Jon Hallfa132292017-10-24 11:11:24 -0700850 }
851
852 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -0700853 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -0700854 deviceTableStats.put(deviceId, tableStats);
855 return null;
856 }
857
858 @Override
859 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
860 NodeId master = mastershipService.getMasterFor(deviceId);
861
862 if (master == null) {
863 log.debug("Failed to getTableStats: No master for {}", deviceId);
864 return Collections.emptyList();
865 }
866
867 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
868 if (tableStats == null) {
869 return Collections.emptyList();
870 }
871 return ImmutableList.copyOf(tableStats);
872 }
873
874 @Override
875 public long getActiveFlowRuleCount(DeviceId deviceId) {
876 return Streams.stream(getTableStatistics(deviceId))
Jordan Halterman281dbf32018-06-15 17:46:28 -0700877 .mapToLong(TableStatisticsEntry::activeFlowEntries)
878 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700879 }
880
881 private class InternalTableStatsListener
882 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
883 @Override
884 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700885 List<TableStatisticsEntry>> event) {
Jon Hallfa132292017-10-24 11:11:24 -0700886 //TODO: Generate an event to listeners (do we need?)
887 }
888 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700889
890 /**
891 * Device lifecycle manager implementation.
892 */
893 private final class InternalLifecycleManager
894 extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
895 implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
896
897 private final DeviceId deviceId;
898
899 private volatile DeviceReplicaInfo replicaInfo;
900
901 InternalLifecycleManager(DeviceId deviceId) {
902 this.deviceId = deviceId;
903 replicaInfoManager.addListener(this);
904 mastershipTermLifecycles.addListener(this);
905 replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
906 }
907
908 @Override
909 public DeviceReplicaInfo getReplicaInfo() {
910 return replicaInfo;
911 }
912
913 @Override
914 public void activate(long term) {
915 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
916 if (replicaInfo != null && replicaInfo.term() == term) {
917 mastershipTermLifecycles.put(deviceId, term);
918 }
919 }
920
921 @Override
922 public void event(ReplicaInfoEvent event) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700923 if (event.subject().equals(deviceId)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700924 onReplicaInfoChange(event.replicaInfo());
925 }
926 }
927
928 @Override
929 public void event(MapEvent<DeviceId, Long> event) {
930 if (event.key().equals(deviceId) && event.newValue() != null) {
931 onActivate(event.newValue().value());
932 }
933 }
934
935 /**
936 * Handles a term activation event.
937 *
938 * @param term the term that was activated
939 */
940 private void onActivate(long term) {
941 final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
942 if (replicaInfo != null && replicaInfo.term() == term) {
943 NodeId master = replicaInfo.master().orElse(null);
944 List<NodeId> backups = replicaInfo.backups()
945 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
946 listenerRegistry.process(new LifecycleEvent(
947 LifecycleEvent.Type.TERM_ACTIVE,
948 new DeviceReplicaInfo(term, master, backups)));
949 }
950 }
951
952 /**
953 * Handles a replica info change event.
954 *
955 * @param replicaInfo the updated replica info
956 */
957 private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
958 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
959 this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
960 if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
961 if (oldReplicaInfo != null) {
962 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
963 }
964 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700965 } else if (oldReplicaInfo.term() == replicaInfo.term()) {
966 listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700967 }
968 }
969
970 /**
971 * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
972 *
973 * @param replicaInfo the replica info to convert
974 * @return the converted replica info
975 */
976 private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
977 NodeId master = replicaInfo.master().orElse(null);
978 List<NodeId> backups = replicaInfo.backups()
979 .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
980 return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
981 }
982
983 @Override
984 public void close() {
985 replicaInfoManager.removeListener(this);
986 mastershipTermLifecycles.removeListener(this);
987 }
988 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700989}