blob: 440022132cef25c369611ed90328f9fe0c6b9a27 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Ray Milkey2bf5ea72017-06-01 09:03:34 -070018import java.io.IOException;
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.Collections;
22import java.util.HashMap;
23import java.util.HashSet;
24import java.util.Iterator;
25import java.util.List;
26import java.util.Map;
27import java.util.Map.Entry;
28import java.util.Objects;
29import java.util.Optional;
30import java.util.Set;
31import java.util.concurrent.ConcurrentHashMap;
32import java.util.concurrent.ConcurrentMap;
33import java.util.concurrent.ExecutorService;
34import java.util.concurrent.ScheduledExecutorService;
35import java.util.concurrent.TimeUnit;
36import java.util.function.Consumer;
37import java.util.stream.Stream;
38
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070039import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070040import org.apache.felix.scr.annotations.Activate;
41import org.apache.felix.scr.annotations.Component;
42import org.apache.felix.scr.annotations.Deactivate;
43import org.apache.felix.scr.annotations.Reference;
44import org.apache.felix.scr.annotations.ReferenceCardinality;
45import org.apache.felix.scr.annotations.Service;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080046import org.onlab.packet.ChassisId;
47import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.cluster.ClusterService;
49import org.onosproject.cluster.ControllerNode;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.mastership.MastershipService;
52import org.onosproject.mastership.MastershipTerm;
53import org.onosproject.mastership.MastershipTermService;
Marc De Leenheer88194c32015-05-29 22:10:59 -070054import org.onosproject.net.Annotations;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import org.onosproject.net.AnnotationsUtil;
56import org.onosproject.net.DefaultAnnotations;
57import org.onosproject.net.DefaultDevice;
58import org.onosproject.net.DefaultPort;
59import org.onosproject.net.Device;
60import org.onosproject.net.Device.Type;
61import org.onosproject.net.DeviceId;
62import org.onosproject.net.MastershipRole;
63import org.onosproject.net.Port;
64import org.onosproject.net.PortNumber;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070065import org.onosproject.net.device.DefaultPortStatistics;
Brian O'Connorabafb502014-12-02 22:26:20 -080066import org.onosproject.net.device.DeviceClockService;
67import org.onosproject.net.device.DeviceDescription;
68import org.onosproject.net.device.DeviceEvent;
69import org.onosproject.net.device.DeviceStore;
70import org.onosproject.net.device.DeviceStoreDelegate;
71import org.onosproject.net.device.PortDescription;
sangho538108b2015-04-08 14:29:20 -070072import org.onosproject.net.device.PortStatistics;
Brian O'Connorabafb502014-12-02 22:26:20 -080073import org.onosproject.net.provider.ProviderId;
74import org.onosproject.store.AbstractStore;
75import org.onosproject.store.Timestamp;
76import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080077import org.onosproject.store.cluster.messaging.MessageSubject;
78import org.onosproject.store.impl.Timestamped;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070079import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070080import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070081import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070082import org.onosproject.store.service.EventuallyConsistentMap;
83import org.onosproject.store.service.EventuallyConsistentMapEvent;
84import org.onosproject.store.service.EventuallyConsistentMapListener;
85import org.onosproject.store.service.MultiValuedTimestamp;
86import org.onosproject.store.service.StorageService;
87import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import org.slf4j.Logger;
89
Ray Milkey2bf5ea72017-06-01 09:03:34 -070090import com.google.common.collect.FluentIterable;
91import com.google.common.collect.ImmutableList;
92import com.google.common.collect.Maps;
93import com.google.common.collect.Sets;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094
95import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070096import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097import static com.google.common.base.Verify.verify;
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -080098import static java.util.concurrent.Executors.newCachedThreadPool;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080099import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800100import static org.onlab.util.Tools.groupedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800101import static org.onlab.util.Tools.minPriority;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800102import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
103import static org.onosproject.net.DefaultAnnotations.merge;
Ray Milkey9ef22232016-07-14 12:42:37 -0700104import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
105import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
106import static org.onosproject.net.device.DeviceEvent.Type.PORT_REMOVED;
107import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
108import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
109import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700110import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE;
111import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVED;
Ray Milkey9ef22232016-07-14 12:42:37 -0700112import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700113import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_UPDATE;
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700114import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE;
115import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_UPDATE;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700116import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800117import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700118
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700119/**
120 * Manages inventory of infrastructure devices using gossip protocol to distribute
121 * information.
122 */
123@Component(immediate = true)
124@Service
125public class GossipDeviceStore
126 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
127 implements DeviceStore {
128
129 private final Logger log = getLogger(getClass());
130
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700131 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800132 // Timeout in milliseconds to process device or ports on remote master node
133 private static final int REMOTE_MASTER_TIMEOUT = 1000;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700134
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700135 // innerMap is used to lock a Device, thus instance should never be replaced.
136 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700137 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700138 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700139
140 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700141 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
142 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700143
144 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200145 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700146 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
147 portStatsListener = new InternalPortStatsListener();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700148
149 // to be updated under Device lock
150 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
151 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700152
153 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700154 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700157 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700158
Madan Jampani47c93732014-10-06 20:46:08 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700160 protected StorageService storageService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani47c93732014-10-06 20:46:08 -0700163 protected ClusterCommunicationService clusterCommunicator;
164
Madan Jampani53e44e62014-10-07 12:39:51 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166 protected ClusterService clusterService;
167
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700168 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169 protected MastershipService mastershipService;
170
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800171 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
172 protected MastershipTermService termService;
173
174
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700175 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800176 .register(DistributedStoreSerializers.STORE_COMMON)
177 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
178 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
179 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700180 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800181 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
182 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700183 .register(DeviceAntiEntropyAdvertisement.class)
184 .register(DeviceFragmentId.class)
185 .register(PortFragmentId.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700186 .build("GossipDevice"));
Madan Jampani53e44e62014-10-07 12:39:51 -0700187
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800188 private ExecutorService executor;
189
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800190 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700191
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800192 // TODO make these anti-entropy parameters configurable
193 private long initialDelaySec = 5;
194 private long periodSec = 5;
195
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700196 @Activate
197 public void activate() {
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800198 executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800199
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800200 backgroundExecutor =
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800201 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700202
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700203 addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
204 addSubscriber(DEVICE_OFFLINE, this::handleDeviceOfflineEvent);
205 addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
206 addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
207 addSubscriber(PORT_UPDATE, this::handlePortEvent);
208 addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
209 addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);
Madan Jampani2af244a2015-02-22 13:12:01 -0800210
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700211 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800212 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700213 initialDelaySec, periodSec, TimeUnit.SECONDS);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700214
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700215 // Create a distributed map for port stats.
216 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
217 .register(KryoNamespaces.API)
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700218 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
219 .register(MultiValuedTimestamp.class);
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700220
221 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
222 .withName("port-stats")
223 .withSerializer(deviceDataSerializer)
224 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700225 .withTimestampProvider((k, v) -> new WallClockTimestamp())
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700226 .withTombstonesDisabled()
227 .build();
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200228 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
229 eventuallyConsistentMapBuilder()
230 .withName("port-stats-delta")
231 .withSerializer(deviceDataSerializer)
232 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
233 .withTimestampProvider((k, v) -> new WallClockTimestamp())
234 .withTombstonesDisabled()
235 .build();
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700236 devicePortStats.addListener(portStatsListener);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237 log.info("Started");
238 }
239
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700240 private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
241 clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
242 }
243
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700244 @Deactivate
245 public void deactivate() {
Frank Wange0eb5ce2016-07-01 18:21:25 +0800246 devicePortStats.removeListener(portStatsListener);
Madan Jampani632f16b2015-08-11 12:42:59 -0700247 devicePortStats.destroy();
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200248 devicePortDeltaStats.destroy();
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800249 executor.shutdownNow();
250
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800251 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700252 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800253 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700254 log.error("Timeout during executor shutdown");
255 }
256 } catch (InterruptedException e) {
257 log.error("Error during executor shutdown", e);
258 }
259
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700260 deviceDescs.clear();
261 devices.clear();
262 devicePorts.clear();
263 availableDevices.clear();
Jonathan Hart46ab5cc2016-09-15 15:42:39 -0700264 clusterCommunicator.removeSubscriber(DEVICE_UPDATE);
265 clusterCommunicator.removeSubscriber(DEVICE_OFFLINE);
266 clusterCommunicator.removeSubscriber(DEVICE_REMOVE_REQ);
267 clusterCommunicator.removeSubscriber(DEVICE_REMOVED);
268 clusterCommunicator.removeSubscriber(PORT_UPDATE);
269 clusterCommunicator.removeSubscriber(PORT_STATUS_UPDATE);
270 clusterCommunicator.removeSubscriber(DEVICE_ADVERTISE);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700271 log.info("Stopped");
272 }
273
274 @Override
275 public int getDeviceCount() {
276 return devices.size();
277 }
278
279 @Override
280 public Iterable<Device> getDevices() {
281 return Collections.unmodifiableCollection(devices.values());
282 }
283
284 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800285 public Iterable<Device> getAvailableDevices() {
286 return FluentIterable.from(getDevices())
Sho SHIMIZU06a6c9f2015-06-12 14:49:06 -0700287 .filter(input -> isAvailable(input.id()));
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800288 }
289
290 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700291 public Device getDevice(DeviceId deviceId) {
292 return devices.get(deviceId);
293 }
294
295 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700296 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700297 DeviceId deviceId,
298 DeviceDescription deviceDescription) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800299 NodeId localNode = clusterService.getLocalNode().id();
300 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
301
302 // Process device update only if we're the master,
303 // otherwise signal the actual master.
304 DeviceEvent deviceEvent = null;
305 if (localNode.equals(deviceNode)) {
306
307 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
308 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
309 final Timestamped<DeviceDescription> mergedDesc;
310 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
311
312 synchronized (device) {
313 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
314 mergedDesc = device.get(providerId).getDeviceDesc();
315 }
316
317 if (deviceEvent != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700318 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
helenyrwufd296b62016-06-22 17:43:02 -0700319 providerId, deviceId);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800320 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
321 }
322
323 } else {
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700324 return null;
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700325 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800326
327 return deviceEvent;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700328 }
329
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700330 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700331 DeviceId deviceId,
332 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700333
334 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800335 Map<ProviderId, DeviceDescriptions> device
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700336 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700337
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800338 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700339 // locking per device
340
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
342 log.debug("Ignoring outdated event: {}", deltaDesc);
343 return null;
344 }
345
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800346 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700347
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700348 final Device oldDevice = devices.get(deviceId);
349 final Device newDevice;
350
351 if (deltaDesc == descs.getDeviceDesc() ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700352 deltaDesc.isNewer(descs.getDeviceDesc())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700353 // on new device or valid update
354 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800355 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700356 } else {
357 // outdated event, ignored.
358 return null;
359 }
360 if (oldDevice == null) {
helenyrwufd296b62016-06-22 17:43:02 -0700361 // REGISTER
362 if (!deltaDesc.value().isDefaultAvailable()) {
Jordan Halterman5b78dc82017-04-19 07:55:48 -0700363 return registerDevice(providerId, newDevice, deltaDesc.timestamp());
helenyrwufd296b62016-06-22 17:43:02 -0700364 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700366 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700367 } else {
368 // UPDATE or ignore (no change or stale)
helenyrwufd296b62016-06-22 17:43:02 -0700369 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp(),
370 deltaDesc.value().isDefaultAvailable());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700371 }
372 }
373 }
374
375 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700376 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700377 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700378 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700379
380 // update composed device cache
381 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
382 verify(oldDevice == null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700383 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
384 providerId, oldDevice, newDevice);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700385
386 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700387 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700388 }
389
390 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
391 }
392
393 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700394 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700396 Device oldDevice,
helenyrwufd296b62016-06-22 17:43:02 -0700397 Device newDevice, Timestamp newTimestamp,
398 boolean forceAvailable) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700399 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700400 boolean propertiesChanged =
401 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
Thomas Vachuskadaaa42d2015-04-21 16:21:37 -0700402 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
403 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700404 boolean annotationsChanged =
405 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700406
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700407 // Primary providers can respond to all changes, but ancillary ones
408 // should respond only to annotation changes.
alshabibdc5d8bd2015-11-02 15:41:29 -0800409 DeviceEvent event = null;
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700410 if ((providerId.isAncillary() && annotationsChanged) ||
411 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700412 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
413 if (!replaced) {
414 verify(replaced,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700415 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
Jian Li68c4fc42016-01-11 16:07:03 -0800416 providerId, oldDevice, devices.get(newDevice.id()), newDevice);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700417 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700418
alshabibdc5d8bd2015-11-02 15:41:29 -0800419 event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700420 }
alshabibdc5d8bd2015-11-02 15:41:29 -0800421
helenyrwufd296b62016-06-22 17:43:02 -0700422 if (!providerId.isAncillary() && forceAvailable) {
alshabibdc5d8bd2015-11-02 15:41:29 -0800423 boolean wasOnline = availableDevices.contains(newDevice.id());
424 markOnline(newDevice.id(), newTimestamp);
425 if (!wasOnline) {
426 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
427 }
428 }
429 return event;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700430 }
431
Jordan Halterman5b78dc82017-04-19 07:55:48 -0700432 private DeviceEvent registerDevice(ProviderId providerId, Device newDevice, Timestamp newTimestamp) {
helenyrwufd296b62016-06-22 17:43:02 -0700433 // update composed device cache
434 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
435 verify(oldDevice == null,
436 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
437 providerId, oldDevice, newDevice);
438
439 if (!providerId.isAncillary()) {
Jordan Halterman5b78dc82017-04-19 07:55:48 -0700440 markOffline(newDevice.id(), newTimestamp);
helenyrwufd296b62016-06-22 17:43:02 -0700441 }
442
443 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
444 }
445
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700446 @Override
447 public DeviceEvent markOffline(DeviceId deviceId) {
Jordan Halterman5b78dc82017-04-19 07:55:48 -0700448 return markOffline(deviceId, deviceClockService.getTimestamp(deviceId));
449 }
450
451 private DeviceEvent markOffline(DeviceId deviceId, Timestamp timestamp) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700452 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700453 if (event != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700454 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700455 deviceId, timestamp);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800456 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700457 }
458 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700459 }
460
461 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700462 Map<ProviderId, DeviceDescriptions> providerDescs
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700463 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700464
465 // locking device
466 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700467 // accept off-line if given timestamp is newer than
468 // the latest Timestamp from Primary provider
469 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
Thomas Vachuska4bd10b92016-12-15 10:13:38 -0800470 if (primDescs == null) {
471 return null;
472 }
473
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700474 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
475 if (timestamp.compareTo(lastTimestamp) <= 0) {
476 // outdated event ignore
477 return null;
478 }
479
480 offline.put(deviceId, timestamp);
481
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482 Device device = devices.get(deviceId);
483 if (device == null) {
484 return null;
485 }
486 boolean removed = availableDevices.remove(deviceId);
487 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700488 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700489 }
490 return null;
491 }
492 }
493
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700494 @Override
helenyrwufd296b62016-06-22 17:43:02 -0700495 public boolean markOnline(DeviceId deviceId) {
496 if (devices.containsKey(deviceId)) {
497 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
498 Map<?, ?> deviceLock = getOrCreateDeviceDescriptionsMap(deviceId);
499 synchronized (deviceLock) {
500 if (markOnline(deviceId, timestamp)) {
501 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, getDevice(deviceId), null));
502 return true;
503 } else {
504 return false;
505 }
506 }
507 }
508 log.warn("Device {} does not exist in store", deviceId);
509 return false;
510
511 }
512
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700513 /**
514 * Marks the device as available if the given timestamp is not outdated,
515 * compared to the time the device has been marked offline.
516 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700517 * @param deviceId identifier of the device
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700518 * @param timestamp of the event triggering this change.
519 * @return true if availability change request was accepted and changed the state
520 */
521 // Guarded by deviceDescs value (=Device lock)
522 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
523 // accept on-line if given timestamp is newer than
524 // the latest offline request Timestamp
525 Timestamp offlineTimestamp = offline.get(deviceId);
526 if (offlineTimestamp == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700527 offlineTimestamp.compareTo(timestamp) < 0) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700528
529 offline.remove(deviceId);
530 return availableDevices.add(deviceId);
531 }
532 return false;
533 }
534
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700535 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700536 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700537 DeviceId deviceId,
538 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700539
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800540 NodeId localNode = clusterService.getLocalNode().id();
541 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
542 // since it will trigger distributed store read.
543 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
544 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
545 // If we don't care much about topology performance, then it might be OK.
546 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700547
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800548 // Process port update only if we're the master of the device,
549 // otherwise signal the actual master.
550 List<DeviceEvent> deviceEvents = null;
551 if (localNode.equals(deviceNode)) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700552
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800553 final Timestamp newTimestamp;
554 try {
555 newTimestamp = deviceClockService.getTimestamp(deviceId);
556 } catch (IllegalStateException e) {
557 log.info("Timestamp was not available for device {}", deviceId);
558 log.debug(" discarding {}", portDescriptions);
559 // Failed to generate timestamp.
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700560
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800561 // Possible situation:
562 // Device connected and became master for short period of time,
563 // but lost mastership before this instance had the chance to
564 // retrieve term information.
565
566 // Information dropped here is expected to be recoverable by
567 // device probing after mastership change
568
569 return Collections.emptyList();
570 }
571 log.debug("timestamp for {} {}", deviceId, newTimestamp);
572
573 final Timestamped<List<PortDescription>> timestampedInput
574 = new Timestamped<>(portDescriptions, newTimestamp);
575 final Timestamped<List<PortDescription>> merged;
576
577 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
578
579 synchronized (device) {
580 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
581 final DeviceDescriptions descs = device.get(providerId);
582 List<PortDescription> mergedList =
583 FluentIterable.from(portDescriptions)
Sho SHIMIZU74626412015-09-11 11:46:27 -0700584 .transform(input ->
585 // lookup merged port description
586 descs.getPortDesc(input.portNumber()).value()
587 ).toList();
Sho SHIMIZUa0fda212015-06-10 19:15:38 -0700588 merged = new Timestamped<>(mergedList, newTimestamp);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800589 }
590
591 if (!deviceEvents.isEmpty()) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700592 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700593 providerId, deviceId);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800594 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
595 }
596
597 } else {
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700598 return Collections.emptyList();
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700599 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700600
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800601 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700602 }
603
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700604 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700605 DeviceId deviceId,
606 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700607
608 Device device = devices.get(deviceId);
Ray Milkey9ef22232016-07-14 12:42:37 -0700609 if (device == null) {
610 log.debug("Device is no longer valid: {}", deviceId);
611 return Collections.emptyList();
612 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700613
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700614 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700615 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
616
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700617 List<DeviceEvent> events = new ArrayList<>();
618 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700619
620 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
621 log.debug("Ignoring outdated events: {}", portDescriptions);
Sho SHIMIZU7b7eabc2015-06-10 20:30:19 -0700622 return Collections.emptyList();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700623 }
624
625 DeviceDescriptions descs = descsMap.get(providerId);
626 // every provider must provide DeviceDescription.
627 checkArgument(descs != null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700628 "Device description for Device ID %s from Provider %s was not found",
629 deviceId, providerId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700630
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700631 Map<PortNumber, Port> ports = getPortMap(deviceId);
632
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700633 final Timestamp newTimestamp = portDescriptions.timestamp();
634
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700635 // Add new ports
636 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700637 for (PortDescription portDescription : portDescriptions.value()) {
638 final PortNumber number = portDescription.portNumber();
639 processed.add(number);
640
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700641 final Port oldPort = ports.get(number);
642 final Port newPort;
643
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700644
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700645 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
646 if (existingPortDesc == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700647 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700648 // on new port or valid update
649 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700650 descs.putPortDesc(new Timestamped<>(portDescription,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700651 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700652 newPort = composePort(device, number, descsMap);
653 } else {
654 // outdated event, ignored.
655 continue;
656 }
657
658 events.add(oldPort == null ?
659 createPort(device, newPort, ports) :
660 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700661 }
662
663 events.addAll(pruneOldPorts(device, ports, processed));
664 }
665 return FluentIterable.from(events).filter(notNull()).toList();
666 }
667
668 // Creates a new port based on the port description adds it to the map and
669 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700670 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700671 private DeviceEvent createPort(Device device, Port newPort,
672 Map<PortNumber, Port> ports) {
673 ports.put(newPort.number(), newPort);
674 return new DeviceEvent(PORT_ADDED, device, newPort);
675 }
676
677 // Checks if the specified port requires update and if so, it replaces the
678 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700679 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700680 private DeviceEvent updatePort(Device device, Port oldPort,
681 Port newPort,
682 Map<PortNumber, Port> ports) {
Michal Machce774332017-01-25 11:02:55 +0100683
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700684 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700685 oldPort.type() != newPort.type() ||
686 oldPort.portSpeed() != newPort.portSpeed() ||
687 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700688 ports.put(oldPort.number(), newPort);
689 return new DeviceEvent(PORT_UPDATED, device, newPort);
690 }
691 return null;
692 }
693
Michal Machce774332017-01-25 11:02:55 +0100694 private DeviceEvent removePort(DeviceId deviceId, PortNumber portNumber) {
695
696 log.info("Deleted port: " + deviceId.toString() + "/" + portNumber.toString());
697 Port deletedPort = devicePorts.get(deviceId).remove(portNumber);
698
699 return new DeviceEvent(PORT_REMOVED, getDevice(deviceId), deletedPort);
700 }
701
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700702 // Prunes the specified list of ports based on which ports are in the
703 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700704 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700705 private List<DeviceEvent> pruneOldPorts(Device device,
706 Map<PortNumber, Port> ports,
707 Set<PortNumber> processed) {
708 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700709 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700710 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700711 Entry<PortNumber, Port> e = iterator.next();
712 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700713 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700714 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700715 iterator.remove();
716 }
717 }
718 return events;
719 }
720
721 // Gets the map of ports for the specified device; if one does not already
722 // exist, it creates and registers a new one.
723 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
Yuta HIGUCHIc2e68152016-08-16 13:47:36 -0700724 return devicePorts.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700725 }
726
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700727 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700728 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700729 Map<ProviderId, DeviceDescriptions> r;
730 r = deviceDescs.get(deviceId);
731 if (r == null) {
Sho SHIMIZUa0fda212015-06-10 19:15:38 -0700732 r = new HashMap<>();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700733 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
734 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
735 if (concurrentlyAdded != null) {
736 r = concurrentlyAdded;
737 }
738 }
739 return r;
740 }
741
742 // Guarded by deviceDescs value (=Device lock)
743 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
744 Map<ProviderId, DeviceDescriptions> device,
745 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700746 synchronized (device) {
747 DeviceDescriptions r = device.get(providerId);
748 if (r == null) {
749 r = new DeviceDescriptions(deltaDesc);
750 device.put(providerId, r);
751 }
752 return r;
753 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700754 }
755
756 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700757 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
758 DeviceId deviceId,
759 PortDescription portDescription) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700760 final Timestamp newTimestamp;
761 try {
762 newTimestamp = deviceClockService.getTimestamp(deviceId);
763 } catch (IllegalStateException e) {
764 log.info("Timestamp was not available for device {}", deviceId);
765 log.debug(" discarding {}", portDescription);
766 // Failed to generate timestamp. Ignoring.
767 // See updatePorts comment
768 return null;
769 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700770 final Timestamped<PortDescription> deltaDesc
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700771 = new Timestamped<>(portDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700772 final DeviceEvent event;
773 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800774 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
775 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700776 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800777 mergedDesc = device.get(providerId)
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700778 .getPortDesc(portDescription.portNumber());
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700779 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700780 if (event != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700781 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700782 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800783 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700784 }
785 return event;
786 }
787
788 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700789 Timestamped<PortDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700790 Device device = devices.get(deviceId);
791 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
792
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700793 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700794 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
795
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700796 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700797
798 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
799 log.debug("Ignoring outdated event: {}", deltaDesc);
800 return null;
801 }
802
803 DeviceDescriptions descs = descsMap.get(providerId);
804 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700805 verify(descs != null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700806 "Device description for Device ID %s from Provider %s was not found",
807 deviceId, providerId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700808
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700809 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
810 final PortNumber number = deltaDesc.value().portNumber();
811 final Port oldPort = ports.get(number);
812 final Port newPort;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700813 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
Michal Machce774332017-01-25 11:02:55 +0100814 boolean toDelete = false;
815
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700816 if (existingPortDesc == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700817 deltaDesc.isNewer(existingPortDesc)) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700818 // on new port or valid update
819 // update description
820 descs.putPortDesc(deltaDesc);
821 newPort = composePort(device, number, descsMap);
Michal Machce774332017-01-25 11:02:55 +0100822 toDelete = deltaDesc.value().isRemoved();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700823 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700824 // same or outdated event, ignored.
825 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700826 return null;
827 }
828
829 if (oldPort == null) {
830 return createPort(device, newPort, ports);
831 } else {
Michal Machce774332017-01-25 11:02:55 +0100832 return toDelete ? removePort(deviceId, number) : updatePort(device, oldPort, newPort, ports);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700833 }
834 }
835 }
836
837 @Override
838 public List<Port> getPorts(DeviceId deviceId) {
839 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
840 if (ports == null) {
841 return Collections.emptyList();
842 }
843 return ImmutableList.copyOf(ports.values());
844 }
845
846 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700847 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
848 DeviceId deviceId) {
849 Map<ProviderId, DeviceDescriptions> descs = this.deviceDescs.get(deviceId);
850 if (descs == null) {
851 return null;
852 }
853 // inner-Map(=descs) is HashMap, thus requires synchronization even for reads
854 final Optional<DeviceDescriptions> devDescs;
855 synchronized (descs) {
856 devDescs = Optional.ofNullable(descs.get(pid));
857 }
858 // DeviceDescriptions is concurrent access-safe
859 return devDescs
860 .map(dd -> dd.getPortDescs().values().stream()
861 .map(Timestamped::value))
862 .orElse(Stream.empty());
863 }
864
865 @Override
sangho538108b2015-04-08 14:29:20 -0700866 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200867 Collection<PortStatistics> newStatsCollection) {
sangho538108b2015-04-08 14:29:20 -0700868
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200869 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
870 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
871 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
872
873 if (prvStatsMap != null) {
874 for (PortStatistics newStats : newStatsCollection) {
875 PortNumber port = PortNumber.portNumber(newStats.port());
876 PortStatistics prvStats = prvStatsMap.get(port);
877 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
878 PortStatistics deltaStats = builder.build();
879 if (prvStats != null) {
880 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
881 }
882 deltaStatsMap.put(port, deltaStats);
883 newStatsMap.put(port, newStats);
884 }
885 } else {
886 for (PortStatistics newStats : newStatsCollection) {
887 PortNumber port = PortNumber.portNumber(newStats.port());
888 newStatsMap.put(port, newStats);
889 }
sangho538108b2015-04-08 14:29:20 -0700890 }
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200891 devicePortDeltaStats.put(deviceId, deltaStatsMap);
892 devicePortStats.put(deviceId, newStatsMap);
Dusan Pajin517e2232015-08-24 16:50:11 +0200893 // DeviceEvent returns null because of InternalPortStatsListener usage
894 return null;
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200895 }
896
897 /**
898 * Calculate delta statistics by subtracting previous from new statistics.
899 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700900 * @param deviceId device identifier
901 * @param prvStats previous port statistics
902 * @param newStats new port statistics
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200903 * @return PortStatistics
904 */
905 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
906 // calculate time difference
907 long deltaStatsSec, deltaStatsNano;
908 if (newStats.durationNano() < prvStats.durationNano()) {
909 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
910 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
911 } else {
912 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
913 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
914 }
915 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
916 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
917 .setPort(newStats.port())
918 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
919 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
920 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
921 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
922 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
923 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
924 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
925 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
926 .setDurationSec(deltaStatsSec)
927 .setDurationNano(deltaStatsNano)
928 .build();
929 return deltaStats;
sangho538108b2015-04-08 14:29:20 -0700930 }
931
932 @Override
933 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
sangho538108b2015-04-08 14:29:20 -0700934 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
935 if (portStats == null) {
936 return Collections.emptyList();
937 }
938 return ImmutableList.copyOf(portStats.values());
939 }
940
941 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530942 public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
943 Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
944 if (portStatsMap == null) {
945 return null;
946 }
947 PortStatistics portStats = portStatsMap.get(portNumber);
948 return portStats;
949 }
950
951 @Override
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200952 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
953 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
954 if (portStats == null) {
955 return Collections.emptyList();
956 }
957 return ImmutableList.copyOf(portStats.values());
958 }
959
960 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530961 public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
962 Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
963 if (portStatsMap == null) {
964 return null;
965 }
966 PortStatistics portStats = portStatsMap.get(portNumber);
967 return portStats;
968 }
969
970 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700971 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
972 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
973 return ports == null ? null : ports.get(portNumber);
974 }
975
976 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700977 public PortDescription getPortDescription(ProviderId pid,
978 DeviceId deviceId,
979 PortNumber portNumber) {
980 Map<ProviderId, DeviceDescriptions> descs = this.deviceDescs.get(deviceId);
981 if (descs == null) {
982 return null;
983 }
984 // inner-Map(=descs) is HashMap, thus requires synchronization even for reads
985 final Optional<DeviceDescriptions> devDescs;
986 synchronized (descs) {
987 devDescs = Optional.ofNullable(descs.get(pid));
988 }
989 // DeviceDescriptions is concurrent access-safe
990 return devDescs
991 .map(deviceDescriptions -> deviceDescriptions.getPortDesc(portNumber))
992 .map(Timestamped::value)
993 .orElse(null);
994 }
995
996 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700997 public boolean isAvailable(DeviceId deviceId) {
998 return availableDevices.contains(deviceId);
999 }
1000
1001 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001002 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001003 final NodeId myId = clusterService.getLocalNode().id();
1004 NodeId master = mastershipService.getMasterFor(deviceId);
1005
1006 // if there exist a master, forward
1007 // if there is no master, try to become one and process
1008
1009 boolean relinquishAtEnd = false;
1010 if (master == null) {
1011 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
1012 if (myRole != MastershipRole.NONE) {
1013 relinquishAtEnd = true;
1014 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001015 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001016 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001017 MastershipTerm term = termService.getMastershipTerm(deviceId);
Madan Jampani7cdf3f12015-05-12 23:18:05 -07001018 if (term != null && myId.equals(term.master())) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001019 master = myId;
1020 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -07001021 }
1022
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001023 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001024 log.debug("{} has control of {}, forwarding remove request",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001025 master, deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001026
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001027 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001028 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001029 /* error log:
1030 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
1031 */
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001032
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001033 // event will be triggered after master processes it.
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001034 return null;
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001035 }
1036
1037 // I have control..
1038
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -07001039 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001040 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001041 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001042 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001043 deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -08001044 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001045 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001046 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001047 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001048 mastershipService.relinquishMastership(deviceId);
1049 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001050 return event;
1051 }
1052
1053 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
1054 Timestamp timestamp) {
1055
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001056 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001057 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001058 // accept removal request if given timestamp is newer than
1059 // the latest Timestamp from Primary provider
1060 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
Thomas Vachuska710293f2015-11-13 12:29:31 -08001061 if (primDescs == null) {
1062 return null;
1063 }
1064
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001065 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
1066 if (timestamp.compareTo(lastTimestamp) <= 0) {
1067 // outdated event ignore
1068 return null;
1069 }
1070 removalRequest.put(deviceId, timestamp);
1071
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001072 Device device = devices.remove(deviceId);
1073 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001074 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1075 if (ports != null) {
1076 ports.clear();
1077 }
1078 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001079 descs.clear();
1080 return device == null ? null :
Dusan Pajin11ff4a82015-08-20 18:03:05 +02001081 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001082 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001083 }
1084
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001085 /**
1086 * Checks if given timestamp is superseded by removal request
1087 * with more recent timestamp.
1088 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001089 * @param deviceId identifier of a device
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001090 * @param timestampToCheck timestamp of an event to check
1091 * @return true if device is already removed
1092 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001093 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1094 Timestamp removalTimestamp = removalRequest.get(deviceId);
1095 if (removalTimestamp != null &&
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001096 removalTimestamp.compareTo(timestampToCheck) >= 0) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001097 // removalRequest is more recent
1098 return true;
1099 }
1100 return false;
1101 }
1102
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001103 /**
1104 * Returns a Device, merging description given from multiple Providers.
1105 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001106 * @param deviceId device identifier
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001107 * @param providerDescs Collection of Descriptions from multiple providers
1108 * @return Device instance
1109 */
1110 private Device composeDevice(DeviceId deviceId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001111 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001112
Thomas Vachuska444eda62014-10-28 13:09:42 -07001113 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001114
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001115 ProviderId primary = pickPrimaryPid(providerDescs);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001116
1117 DeviceDescriptions desc = providerDescs.get(primary);
1118
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001119 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001120 Type type = base.type();
1121 String manufacturer = base.manufacturer();
1122 String hwVersion = base.hwVersion();
1123 String swVersion = base.swVersion();
1124 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -07001125 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001126 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1127 annotations = merge(annotations, base.annotations());
1128
1129 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1130 if (e.getKey().equals(primary)) {
1131 continue;
1132 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -08001133 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001134 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001135 // Currently assuming there will never be a key conflict between
1136 // providers
1137
1138 // annotation merging. not so efficient, should revisit later
1139 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1140 }
1141
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001142 return new DefaultDevice(primary, deviceId, type, manufacturer,
1143 hwVersion, swVersion, serialNumber,
1144 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001145 }
1146
Marc De Leenheer88194c32015-05-29 22:10:59 -07001147 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1148 PortDescription description, Annotations annotations) {
Marc De Leenheer88194c32015-05-29 22:10:59 -07001149 return new DefaultPort(device, number, isEnabled, description.type(),
1150 description.portSpeed(), annotations);
Marc De Leenheer88194c32015-05-29 22:10:59 -07001151 }
1152
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001153 /**
1154 * Returns a Port, merging description given from multiple Providers.
1155 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001156 * @param device device the port is on
1157 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001158 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001159 * @return Port instance
1160 */
1161 private Port composePort(Device device, PortNumber number,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001162 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001163
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001164 ProviderId primary = pickPrimaryPid(descsMap);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001165 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001166 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001167 boolean isEnabled = false;
1168 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
Ayaka Koshibeae541732015-05-19 13:37:27 -07001169 Timestamp newest = null;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001170 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1171 if (portDesc != null) {
1172 isEnabled = portDesc.value().isEnabled();
1173 annotations = merge(annotations, portDesc.value().annotations());
Ayaka Koshibeae541732015-05-19 13:37:27 -07001174 newest = portDesc.timestamp();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001175 }
Ayaka Koshibeae541732015-05-19 13:37:27 -07001176 Port updated = null;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001177 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001178 if (e.getKey().equals(primary)) {
1179 continue;
1180 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -08001181 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001182 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001183 // Currently assuming there will never be a key conflict between
1184 // providers
1185
1186 // annotation merging. not so efficient, should revisit later
1187 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1188 if (otherPortDesc != null) {
Ayaka Koshibeae541732015-05-19 13:37:27 -07001189 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1190 continue;
1191 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001192 annotations = merge(annotations, otherPortDesc.value().annotations());
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001193 PortDescription other = otherPortDesc.value();
Marc De Leenheer88194c32015-05-29 22:10:59 -07001194 updated = buildTypedPort(device, number, isEnabled, other, annotations);
Ayaka Koshibeae541732015-05-19 13:37:27 -07001195 newest = otherPortDesc.timestamp();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001196 }
1197 }
Marc De Leenheer4b18a232015-04-30 11:58:20 -07001198 if (portDesc == null) {
Ayaka Koshibeae541732015-05-19 13:37:27 -07001199 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
Marc De Leenheer4b18a232015-04-30 11:58:20 -07001200 }
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001201 PortDescription current = portDesc.value();
1202 return updated == null
Marc De Leenheer88194c32015-05-29 22:10:59 -07001203 ? buildTypedPort(device, number, isEnabled, current, annotations)
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001204 : updated;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001205 }
1206
1207 /**
1208 * @return primary ProviderID, or randomly chosen one if none exists
1209 */
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001210 private ProviderId pickPrimaryPid(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001211 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001212 ProviderId fallBackPrimary = null;
1213 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1214 if (!e.getKey().isAncillary()) {
1215 return e.getKey();
1216 } else if (fallBackPrimary == null) {
1217 // pick randomly as a fallback in case there is no primary
1218 fallBackPrimary = e.getKey();
1219 }
1220 }
1221 return fallBackPrimary;
1222 }
1223
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001224 private DeviceDescriptions getPrimaryDescriptions(
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001225 Map<ProviderId, DeviceDescriptions> providerDescs) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001226 ProviderId pid = pickPrimaryPid(providerDescs);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001227 return providerDescs.get(pid);
1228 }
1229
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001230 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001231 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001232 }
1233
Jonathan Hart7d656f42015-01-27 14:07:23 -08001234 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001235 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001236 }
Madan Jampani47c93732014-10-06 20:46:08 -07001237
Jonathan Hart7d656f42015-01-27 14:07:23 -08001238 private void notifyPeers(InternalDeviceEvent event) {
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001239 broadcastMessage(DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001240 }
1241
Jonathan Hart7d656f42015-01-27 14:07:23 -08001242 private void notifyPeers(InternalDeviceOfflineEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001243 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -07001244 }
1245
Jonathan Hart7d656f42015-01-27 14:07:23 -08001246 private void notifyPeers(InternalDeviceRemovedEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001247 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001248 }
1249
Jonathan Hart7d656f42015-01-27 14:07:23 -08001250 private void notifyPeers(InternalPortEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001251 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001252 }
1253
Jonathan Hart7d656f42015-01-27 14:07:23 -08001254 private void notifyPeers(InternalPortStatusEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001255 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1256 }
1257
1258 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1259 try {
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001260 unicastMessage(recipient, DEVICE_UPDATE, event);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001261 } catch (IOException e) {
1262 log.error("Failed to send" + event + " to " + recipient, e);
1263 }
1264 }
1265
1266 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1267 try {
1268 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1269 } catch (IOException e) {
1270 log.error("Failed to send" + event + " to " + recipient, e);
1271 }
1272 }
1273
1274 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1275 try {
1276 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1277 } catch (IOException e) {
1278 log.error("Failed to send" + event + " to " + recipient, e);
1279 }
1280 }
1281
1282 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1283 try {
1284 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1285 } catch (IOException e) {
1286 log.error("Failed to send" + event + " to " + recipient, e);
1287 }
1288 }
1289
1290 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1291 try {
1292 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1293 } catch (IOException e) {
1294 log.error("Failed to send" + event + " to " + recipient, e);
1295 }
1296 }
1297
1298 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1299 final NodeId self = clusterService.getLocalNode().id();
1300
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001301 final int numDevices = deviceDescs.size();
1302 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1303 final int portsPerDevice = 8; // random factor to minimize reallocation
1304 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1305 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001306
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001307 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001308
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001309 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001310 synchronized (devDescs) {
1311
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001312 // send device offline timestamp
1313 Timestamp lOffline = this.offline.get(deviceId);
1314 if (lOffline != null) {
1315 adOffline.put(deviceId, lOffline);
1316 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001317
1318 for (Entry<ProviderId, DeviceDescriptions>
1319 prov : devDescs.entrySet()) {
1320
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001321 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001322 final ProviderId provId = prov.getKey();
1323 final DeviceDescriptions descs = prov.getValue();
1324
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001325 adDevices.put(new DeviceFragmentId(deviceId, provId),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001326 descs.getDeviceDesc().timestamp());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001327
1328 for (Entry<PortNumber, Timestamped<PortDescription>>
1329 portDesc : descs.getPortDescs().entrySet()) {
1330
1331 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001332 adPorts.put(new PortFragmentId(deviceId, provId, number),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001333 portDesc.getValue().timestamp());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001334 }
1335 }
1336 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001337 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001338
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001339 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001340 }
1341
1342 /**
1343 * Responds to anti-entropy advertisement message.
HIGUCHI Yuta67023a22016-03-28 13:35:44 -07001344 * <p>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001345 * Notify sender about out-dated information using regular replication message.
1346 * Send back advertisement to sender if not in sync.
1347 *
1348 * @param advertisement to respond to
1349 */
1350 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1351
1352 final NodeId sender = advertisement.sender();
1353
1354 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1355 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1356 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1357
1358 // Fragments to request
1359 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1360 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1361
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001362 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001363 final DeviceId deviceId = de.getKey();
1364 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1365
1366 synchronized (lDevice) {
1367 // latestTimestamp across provider
1368 // Note: can be null initially
1369 Timestamp localLatest = offline.get(deviceId);
1370
1371 // handle device Ads
1372 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1373 final ProviderId provId = prov.getKey();
1374 final DeviceDescriptions lDeviceDescs = prov.getValue();
1375
1376 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1377
1378
1379 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1380 Timestamp advDevTimestamp = devAds.get(devFragId);
1381
Jonathan Hart403ea932015-02-20 16:23:00 -08001382 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1383 advDevTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001384 // remote does not have it or outdated, suggest
1385 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1386 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1387 // local is outdated, request
1388 reqDevices.add(devFragId);
1389 }
1390
1391 // handle port Ads
1392 for (Entry<PortNumber, Timestamped<PortDescription>>
1393 pe : lDeviceDescs.getPortDescs().entrySet()) {
1394
1395 final PortNumber num = pe.getKey();
1396 final Timestamped<PortDescription> lPort = pe.getValue();
1397
1398 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1399
1400 Timestamp advPortTimestamp = portAds.get(portFragId);
Jonathan Hart403ea932015-02-20 16:23:00 -08001401 if (advPortTimestamp == null || lPort.isNewerThan(
1402 advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001403 // remote does not have it or outdated, suggest
1404 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1405 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1406 // local is outdated, request
1407 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1408 reqPorts.add(portFragId);
1409 }
1410
1411 // remove port Ad already processed
1412 portAds.remove(portFragId);
1413 } // end local port loop
1414
1415 // remove device Ad already processed
1416 devAds.remove(devFragId);
1417
1418 // find latest and update
1419 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1420 if (localLatest == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001421 providerLatest.compareTo(localLatest) > 0) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001422 localLatest = providerLatest;
1423 }
1424 } // end local provider loop
1425
1426 // checking if remote timestamp is more recent.
1427 Timestamp rOffline = offlineAds.get(deviceId);
1428 if (rOffline != null &&
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001429 rOffline.compareTo(localLatest) > 0) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001430 // remote offline timestamp suggests that the
1431 // device is off-line
1432 markOfflineInternal(deviceId, rOffline);
1433 }
1434
1435 Timestamp lOffline = offline.get(deviceId);
1436 if (lOffline != null && rOffline == null) {
1437 // locally offline, but remote is online, suggest offline
1438 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1439 }
1440
1441 // remove device offline Ad already processed
1442 offlineAds.remove(deviceId);
1443 } // end local device loop
1444 } // device lock
1445
1446 // If there is any Ads left, request them
1447 log.trace("Ads left {}, {}", devAds, portAds);
1448 reqDevices.addAll(devAds.keySet());
1449 reqPorts.addAll(portAds.keySet());
1450
1451 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1452 log.trace("Nothing to request to remote peer {}", sender);
1453 return;
1454 }
1455
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001456 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001457
1458 // 2-way Anti-Entropy for now
1459 try {
1460 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1461 } catch (IOException e) {
1462 log.error("Failed to send response advertisement to " + sender, e);
1463 }
1464
1465// Sketch of 3-way Anti-Entropy
1466// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1467// ClusterMessage message = new ClusterMessage(
1468// clusterService.getLocalNode().id(),
1469// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1470// SERIALIZER.encode(request));
1471//
1472// try {
1473// clusterCommunicator.unicast(message, advertisement.sender());
1474// } catch (IOException e) {
1475// log.error("Failed to send advertisement reply to "
1476// + advertisement.sender(), e);
1477// }
Madan Jampani47c93732014-10-06 20:46:08 -07001478 }
1479
Madan Jampani255a58b2014-10-09 12:08:20 -07001480 private void notifyDelegateIfNotNull(DeviceEvent event) {
1481 if (event != null) {
1482 notifyDelegate(event);
1483 }
1484 }
1485
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001486 private final class SendAdvertisementTask implements Runnable {
1487
1488 @Override
1489 public void run() {
1490 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001491 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001492 return;
1493 }
1494
1495 try {
1496 final NodeId self = clusterService.getLocalNode().id();
1497 Set<ControllerNode> nodes = clusterService.getNodes();
1498
1499 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1500 .transform(toNodeId())
1501 .toList();
1502
1503 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001504 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001505 return;
1506 }
1507
1508 NodeId peer;
1509 do {
1510 int idx = RandomUtils.nextInt(0, nodeIds.size());
1511 peer = nodeIds.get(idx);
1512 } while (peer.equals(self));
1513
1514 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1515
1516 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001517 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001518 return;
1519 }
1520
1521 try {
1522 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1523 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001524 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001525 return;
1526 }
1527 } catch (Exception e) {
1528 // catch all Exception to avoid Scheduled task being suppressed.
1529 log.error("Exception thrown while sending advertisement", e);
1530 }
1531 }
1532 }
1533
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001534 private void handleDeviceEvent(InternalDeviceEvent event) {
1535 ProviderId providerId = event.providerId();
1536 DeviceId deviceId = event.deviceId();
1537 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001538
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001539 try {
1540 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
1541 deviceDescription));
1542 } catch (Exception e) {
1543 log.warn("Exception thrown handling device update", e);
Madan Jampani47c93732014-10-06 20:46:08 -07001544 }
1545 }
1546
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001547 private void handleDeviceOfflineEvent(InternalDeviceOfflineEvent event) {
1548 DeviceId deviceId = event.deviceId();
1549 Timestamp timestamp = event.timestamp();
Madan Jampani25322532014-10-08 11:20:38 -07001550
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001551 try {
1552 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1553 } catch (Exception e) {
1554 log.warn("Exception thrown handling device offline", e);
Madan Jampani25322532014-10-08 11:20:38 -07001555 }
1556 }
1557
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001558 private void handleRemoveRequest(DeviceId did) {
1559 try {
1560 removeDevice(did);
1561 } catch (Exception e) {
1562 log.warn("Exception thrown handling device remove", e);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001563 }
1564 }
1565
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001566 private void handleDeviceRemovedEvent(InternalDeviceRemovedEvent event) {
1567 DeviceId deviceId = event.deviceId();
1568 Timestamp timestamp = event.timestamp();
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001569
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001570 try {
1571 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1572 } catch (Exception e) {
1573 log.warn("Exception thrown handling device removed", e);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001574 }
1575 }
1576
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001577 private void handlePortEvent(InternalPortEvent event) {
1578 ProviderId providerId = event.providerId();
1579 DeviceId deviceId = event.deviceId();
1580 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
Madan Jampani47c93732014-10-06 20:46:08 -07001581
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001582 if (getDevice(deviceId) == null) {
1583 log.debug("{} not found on this node yet, ignoring.", deviceId);
1584 // Note: dropped information will be recovered by anti-entropy
1585 return;
1586 }
Madan Jampani47c93732014-10-06 20:46:08 -07001587
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001588 try {
1589 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1590 } catch (Exception e) {
1591 log.warn("Exception thrown handling port update", e);
Madan Jampani47c93732014-10-06 20:46:08 -07001592 }
1593 }
1594
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001595 private void handlePortStatusEvent(InternalPortStatusEvent event) {
1596 ProviderId providerId = event.providerId();
1597 DeviceId deviceId = event.deviceId();
1598 Timestamped<PortDescription> portDescription = event.portDescription();
Madan Jampani47c93732014-10-06 20:46:08 -07001599
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001600 if (getDevice(deviceId) == null) {
1601 log.debug("{} not found on this node yet, ignoring.", deviceId);
1602 // Note: dropped information will be recovered by anti-entropy
1603 return;
1604 }
Madan Jampani47c93732014-10-06 20:46:08 -07001605
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001606 try {
1607 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1608 } catch (Exception e) {
1609 log.warn("Exception thrown handling port update", e);
Madan Jampani47c93732014-10-06 20:46:08 -07001610 }
1611 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001612
Jonathan Hart46ab5cc2016-09-15 15:42:39 -07001613 private void handleDeviceAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1614 try {
1615 handleAdvertisement(advertisement);
1616 } catch (Exception e) {
1617 log.warn("Exception thrown handling Device advertisements.", e);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001618 }
1619 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001620
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001621 private class InternalPortStatsListener
1622 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1623 @Override
1624 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1625 if (event.type() == PUT) {
1626 Device device = devices.get(event.key());
1627 if (device != null) {
Thomas Vachuskad4955ae2016-08-23 14:56:37 -07001628 notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001629 }
1630 }
1631 }
1632 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001633}