blob: acc8ff4d4c836785eb957ff0e446ee62deba75a2 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
18import com.google.common.collect.FluentIterable;
19import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Dusan Pajin11ff4a82015-08-20 18:03:05 +020022
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070023import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080030import org.onlab.packet.ChassisId;
31import org.onlab.util.KryoNamespace;
32import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.ControllerNode;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.mastership.MastershipService;
37import org.onosproject.mastership.MastershipTerm;
38import org.onosproject.mastership.MastershipTermService;
Marc De Leenheer88194c32015-05-29 22:10:59 -070039import org.onosproject.net.Annotations;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.net.AnnotationsUtil;
41import org.onosproject.net.DefaultAnnotations;
42import org.onosproject.net.DefaultDevice;
43import org.onosproject.net.DefaultPort;
44import org.onosproject.net.Device;
45import org.onosproject.net.Device.Type;
46import org.onosproject.net.DeviceId;
47import org.onosproject.net.MastershipRole;
Marc De Leenheer4b18a232015-04-30 11:58:20 -070048import org.onosproject.net.OchPort;
49import org.onosproject.net.OduCltPort;
50import org.onosproject.net.OmsPort;
Rimon Ashkenazy8ebfff02016-02-01 11:56:36 +020051import org.onosproject.net.OtuPort;
Brian O'Connorabafb502014-12-02 22:26:20 -080052import org.onosproject.net.Port;
53import org.onosproject.net.PortNumber;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070054import org.onosproject.net.device.DefaultPortStatistics;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import org.onosproject.net.device.DeviceClockService;
56import org.onosproject.net.device.DeviceDescription;
57import org.onosproject.net.device.DeviceEvent;
58import org.onosproject.net.device.DeviceStore;
59import org.onosproject.net.device.DeviceStoreDelegate;
Marc De Leenheer4b18a232015-04-30 11:58:20 -070060import org.onosproject.net.device.OchPortDescription;
61import org.onosproject.net.device.OduCltPortDescription;
62import org.onosproject.net.device.OmsPortDescription;
Rimon Ashkenazy8ebfff02016-02-01 11:56:36 +020063import org.onosproject.net.device.OtuPortDescription;
Brian O'Connorabafb502014-12-02 22:26:20 -080064import org.onosproject.net.device.PortDescription;
sangho538108b2015-04-08 14:29:20 -070065import org.onosproject.net.device.PortStatistics;
Brian O'Connorabafb502014-12-02 22:26:20 -080066import org.onosproject.net.provider.ProviderId;
67import org.onosproject.store.AbstractStore;
68import org.onosproject.store.Timestamp;
69import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
70import org.onosproject.store.cluster.messaging.ClusterMessage;
71import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
72import org.onosproject.store.cluster.messaging.MessageSubject;
73import org.onosproject.store.impl.Timestamped;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070074import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070075import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070076import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -070077import org.onosproject.store.service.EventuallyConsistentMap;
78import org.onosproject.store.service.EventuallyConsistentMapEvent;
79import org.onosproject.store.service.EventuallyConsistentMapListener;
80import org.onosproject.store.service.MultiValuedTimestamp;
81import org.onosproject.store.service.StorageService;
82import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070083import org.slf4j.Logger;
84
Madan Jampani47c93732014-10-06 20:46:08 -070085import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070086import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070087import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070089import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070090import java.util.HashSet;
91import java.util.Iterator;
92import java.util.List;
93import java.util.Map;
94import java.util.Map.Entry;
95import java.util.Objects;
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -070096import java.util.Optional;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070098import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080099import java.util.concurrent.ExecutorService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700100import java.util.concurrent.ScheduledExecutorService;
101import java.util.concurrent.TimeUnit;
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700102import java.util.stream.Stream;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700103
104import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106import static com.google.common.base.Verify.verify;
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800107import static java.util.concurrent.Executors.newCachedThreadPool;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800108import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
109import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800110import static org.onlab.util.Tools.groupedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800111import static org.onlab.util.Tools.minPriority;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800112import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
113import static org.onosproject.net.DefaultAnnotations.merge;
114import static org.onosproject.net.device.DeviceEvent.Type.*;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800115import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700116import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800117import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700118
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700119/**
120 * Manages inventory of infrastructure devices using gossip protocol to distribute
121 * information.
122 */
123@Component(immediate = true)
124@Service
125public class GossipDeviceStore
126 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
127 implements DeviceStore {
128
129 private final Logger log = getLogger(getClass());
130
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700131 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800132 // Timeout in milliseconds to process device or ports on remote master node
133 private static final int REMOTE_MASTER_TIMEOUT = 1000;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700134
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700135 // innerMap is used to lock a Device, thus instance should never be replaced.
136 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700137 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700138 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700139
140 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700141 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
142 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700143
144 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200145 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700146 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
147 portStatsListener = new InternalPortStatsListener();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700148
149 // to be updated under Device lock
150 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
151 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700152
153 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700154 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700157 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700158
Madan Jampani47c93732014-10-06 20:46:08 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700160 protected StorageService storageService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani47c93732014-10-06 20:46:08 -0700163 protected ClusterCommunicationService clusterCommunicator;
164
Madan Jampani53e44e62014-10-07 12:39:51 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166 protected ClusterService clusterService;
167
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700168 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169 protected MastershipService mastershipService;
170
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800171 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
172 protected MastershipTermService termService;
173
174
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700175 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800176 .register(DistributedStoreSerializers.STORE_COMMON)
177 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
178 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
179 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700180 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800181 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
182 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700183 .register(DeviceAntiEntropyAdvertisement.class)
184 .register(DeviceFragmentId.class)
185 .register(PortFragmentId.class)
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800186 .register(DeviceInjectedEvent.class)
187 .register(PortInjectedEvent.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700188 .build("GossipDevice"));
Madan Jampani53e44e62014-10-07 12:39:51 -0700189
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800190 private ExecutorService executor;
191
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800192 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700193
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800194 // TODO make these anti-entropy parameters configurable
195 private long initialDelaySec = 5;
196 private long periodSec = 5;
197
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700198 @Activate
199 public void activate() {
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800200 executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800201
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800202 backgroundExecutor =
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800203 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700204
Madan Jampani2af244a2015-02-22 13:12:01 -0800205 clusterCommunicator.addSubscriber(
206 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
207 clusterCommunicator.addSubscriber(
208 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
209 new InternalDeviceOfflineEventListener(),
210 executor);
211 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700212 new InternalRemoveRequestListener(),
213 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800214 clusterCommunicator.addSubscriber(
215 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
216 clusterCommunicator.addSubscriber(
217 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
218 clusterCommunicator.addSubscriber(
219 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
220 clusterCommunicator.addSubscriber(
221 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
222 new InternalDeviceAdvertisementListener(),
223 backgroundExecutor);
224 clusterCommunicator.addSubscriber(
225 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
226 clusterCommunicator.addSubscriber(
227 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
228
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700229 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800230 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700231 initialDelaySec, periodSec, TimeUnit.SECONDS);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700232
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700233 // Create a distributed map for port stats.
234 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
235 .register(KryoNamespaces.API)
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700236 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
237 .register(MultiValuedTimestamp.class);
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700238
239 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
240 .withName("port-stats")
241 .withSerializer(deviceDataSerializer)
242 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700243 .withTimestampProvider((k, v) -> new WallClockTimestamp())
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700244 .withTombstonesDisabled()
245 .build();
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200246 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
247 eventuallyConsistentMapBuilder()
248 .withName("port-stats-delta")
249 .withSerializer(deviceDataSerializer)
250 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
251 .withTimestampProvider((k, v) -> new WallClockTimestamp())
252 .withTombstonesDisabled()
253 .build();
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700254 devicePortStats.addListener(portStatsListener);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700255 log.info("Started");
256 }
257
258 @Deactivate
259 public void deactivate() {
Madan Jampani632f16b2015-08-11 12:42:59 -0700260 devicePortStats.destroy();
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200261 devicePortDeltaStats.destroy();
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800262 executor.shutdownNow();
263
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800264 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700265 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800266 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700267 log.error("Timeout during executor shutdown");
268 }
269 } catch (InterruptedException e) {
270 log.error("Error during executor shutdown", e);
271 }
272
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700273 deviceDescs.clear();
274 devices.clear();
275 devicePorts.clear();
276 availableDevices.clear();
277 log.info("Stopped");
278 }
279
280 @Override
281 public int getDeviceCount() {
282 return devices.size();
283 }
284
285 @Override
286 public Iterable<Device> getDevices() {
287 return Collections.unmodifiableCollection(devices.values());
288 }
289
290 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800291 public Iterable<Device> getAvailableDevices() {
292 return FluentIterable.from(getDevices())
Sho SHIMIZU06a6c9f2015-06-12 14:49:06 -0700293 .filter(input -> isAvailable(input.id()));
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800294 }
295
296 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700297 public Device getDevice(DeviceId deviceId) {
298 return devices.get(deviceId);
299 }
300
301 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700302 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700303 DeviceId deviceId,
304 DeviceDescription deviceDescription) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800305 NodeId localNode = clusterService.getLocalNode().id();
306 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
307
308 // Process device update only if we're the master,
309 // otherwise signal the actual master.
310 DeviceEvent deviceEvent = null;
311 if (localNode.equals(deviceNode)) {
312
313 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
314 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
315 final Timestamped<DeviceDescription> mergedDesc;
316 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
317
318 synchronized (device) {
319 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
320 mergedDesc = device.get(providerId).getDeviceDesc();
321 }
322
323 if (deviceEvent != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700324 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700325 providerId, deviceId);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800326 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
327 }
328
329 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800330 // Only forward for ConfigProvider
331 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800332 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800333 return null;
334 }
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800335 // FIXME Temporary hack for NPE (ONOS-1171).
336 // Proper fix is to implement forwarding to master on ConfigProvider
337 // redo ONOS-490
338 if (deviceNode == null) {
339 // silently ignore
340 return null;
341 }
342
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800343
344 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
345 providerId, deviceId, deviceDescription);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800346
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800347 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700348 clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800349 /* error log:
350 log.warn("Failed to process injected device id: {} desc: {} " +
351 "(cluster messaging failed: {})",
352 deviceId, deviceDescription, e);
353 */
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700354 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800355
356 return deviceEvent;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 }
358
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700359 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700360 DeviceId deviceId,
361 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362
363 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800364 Map<ProviderId, DeviceDescriptions> device
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700365 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700366
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800367 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700368 // locking per device
369
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700370 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
371 log.debug("Ignoring outdated event: {}", deltaDesc);
372 return null;
373 }
374
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800375 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700376
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700377 final Device oldDevice = devices.get(deviceId);
378 final Device newDevice;
379
380 if (deltaDesc == descs.getDeviceDesc() ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700381 deltaDesc.isNewer(descs.getDeviceDesc())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700382 // on new device or valid update
383 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800384 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700385 } else {
386 // outdated event, ignored.
387 return null;
388 }
389 if (oldDevice == null) {
390 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700391 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392 } else {
393 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700394 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395 }
396 }
397 }
398
399 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700400 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700401 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700402 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700403
404 // update composed device cache
405 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
406 verify(oldDevice == null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700407 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
408 providerId, oldDevice, newDevice);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700409
410 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700411 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700412 }
413
414 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
415 }
416
417 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700418 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700419 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700420 Device oldDevice,
421 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700422 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700423 boolean propertiesChanged =
424 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
Thomas Vachuskadaaa42d2015-04-21 16:21:37 -0700425 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
426 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700427 boolean annotationsChanged =
428 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700429
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700430 // Primary providers can respond to all changes, but ancillary ones
431 // should respond only to annotation changes.
alshabibdc5d8bd2015-11-02 15:41:29 -0800432 DeviceEvent event = null;
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700433 if ((providerId.isAncillary() && annotationsChanged) ||
434 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
436 if (!replaced) {
437 verify(replaced,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700438 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
Jian Li68c4fc42016-01-11 16:07:03 -0800439 providerId, oldDevice, devices.get(newDevice.id()), newDevice);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700440 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700441
alshabibdc5d8bd2015-11-02 15:41:29 -0800442 event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700443 }
alshabibdc5d8bd2015-11-02 15:41:29 -0800444
445 if (!providerId.isAncillary()) {
446 boolean wasOnline = availableDevices.contains(newDevice.id());
447 markOnline(newDevice.id(), newTimestamp);
448 if (!wasOnline) {
449 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
450 }
451 }
452 return event;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700453 }
454
455 @Override
456 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700457 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700458 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700459 if (event != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700460 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700461 deviceId, timestamp);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800462 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700463 }
464 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700465 }
466
467 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
468
469 Map<ProviderId, DeviceDescriptions> providerDescs
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700470 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700471
472 // locking device
473 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700474
475 // accept off-line if given timestamp is newer than
476 // the latest Timestamp from Primary provider
477 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
478 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
479 if (timestamp.compareTo(lastTimestamp) <= 0) {
480 // outdated event ignore
481 return null;
482 }
483
484 offline.put(deviceId, timestamp);
485
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700486 Device device = devices.get(deviceId);
487 if (device == null) {
488 return null;
489 }
490 boolean removed = availableDevices.remove(deviceId);
491 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700492 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700493 }
494 return null;
495 }
496 }
497
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700498 /**
499 * Marks the device as available if the given timestamp is not outdated,
500 * compared to the time the device has been marked offline.
501 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700502 * @param deviceId identifier of the device
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700503 * @param timestamp of the event triggering this change.
504 * @return true if availability change request was accepted and changed the state
505 */
506 // Guarded by deviceDescs value (=Device lock)
507 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
508 // accept on-line if given timestamp is newer than
509 // the latest offline request Timestamp
510 Timestamp offlineTimestamp = offline.get(deviceId);
511 if (offlineTimestamp == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700512 offlineTimestamp.compareTo(timestamp) < 0) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700513
514 offline.remove(deviceId);
515 return availableDevices.add(deviceId);
516 }
517 return false;
518 }
519
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700520 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700521 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700522 DeviceId deviceId,
523 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700524
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800525 NodeId localNode = clusterService.getLocalNode().id();
526 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
527 // since it will trigger distributed store read.
528 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
529 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
530 // If we don't care much about topology performance, then it might be OK.
531 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700532
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800533 // Process port update only if we're the master of the device,
534 // otherwise signal the actual master.
535 List<DeviceEvent> deviceEvents = null;
536 if (localNode.equals(deviceNode)) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700537
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800538 final Timestamp newTimestamp;
539 try {
540 newTimestamp = deviceClockService.getTimestamp(deviceId);
541 } catch (IllegalStateException e) {
542 log.info("Timestamp was not available for device {}", deviceId);
543 log.debug(" discarding {}", portDescriptions);
544 // Failed to generate timestamp.
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700545
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800546 // Possible situation:
547 // Device connected and became master for short period of time,
548 // but lost mastership before this instance had the chance to
549 // retrieve term information.
550
551 // Information dropped here is expected to be recoverable by
552 // device probing after mastership change
553
554 return Collections.emptyList();
555 }
556 log.debug("timestamp for {} {}", deviceId, newTimestamp);
557
558 final Timestamped<List<PortDescription>> timestampedInput
559 = new Timestamped<>(portDescriptions, newTimestamp);
560 final Timestamped<List<PortDescription>> merged;
561
562 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
563
564 synchronized (device) {
565 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
566 final DeviceDescriptions descs = device.get(providerId);
567 List<PortDescription> mergedList =
568 FluentIterable.from(portDescriptions)
Sho SHIMIZU74626412015-09-11 11:46:27 -0700569 .transform(input ->
570 // lookup merged port description
571 descs.getPortDesc(input.portNumber()).value()
572 ).toList();
Sho SHIMIZUa0fda212015-06-10 19:15:38 -0700573 merged = new Timestamped<>(mergedList, newTimestamp);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800574 }
575
576 if (!deviceEvents.isEmpty()) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700577 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700578 providerId, deviceId);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800579 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
580 }
581
582 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800583 // Only forward for ConfigProvider
584 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800585 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta89461772016-01-26 12:18:10 -0800586 return Collections.emptyList();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800587 }
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800588 // FIXME Temporary hack for NPE (ONOS-1171).
589 // Proper fix is to implement forwarding to master on ConfigProvider
590 // redo ONOS-490
591 if (deviceNode == null) {
592 // silently ignore
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800593 return Collections.emptyList();
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800594 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800595
596 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800597
598 //TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700599 clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800600 /* error log:
601 log.warn("Failed to process injected ports of device id: {} " +
602 "(cluster messaging failed: {})",
603 deviceId, e);
604 */
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700605 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700606
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800607 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700608 }
609
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700610 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700611 DeviceId deviceId,
612 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700613
614 Device device = devices.get(deviceId);
615 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
616
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700617 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700618 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
619
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700620 List<DeviceEvent> events = new ArrayList<>();
621 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700622
623 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
624 log.debug("Ignoring outdated events: {}", portDescriptions);
Sho SHIMIZU7b7eabc2015-06-10 20:30:19 -0700625 return Collections.emptyList();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700626 }
627
628 DeviceDescriptions descs = descsMap.get(providerId);
629 // every provider must provide DeviceDescription.
630 checkArgument(descs != null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700631 "Device description for Device ID %s from Provider %s was not found",
632 deviceId, providerId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700633
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700634 Map<PortNumber, Port> ports = getPortMap(deviceId);
635
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700636 final Timestamp newTimestamp = portDescriptions.timestamp();
637
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700638 // Add new ports
639 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700640 for (PortDescription portDescription : portDescriptions.value()) {
641 final PortNumber number = portDescription.portNumber();
642 processed.add(number);
643
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700644 final Port oldPort = ports.get(number);
645 final Port newPort;
646
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700647
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700648 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
649 if (existingPortDesc == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700650 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700651 // on new port or valid update
652 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700653 descs.putPortDesc(new Timestamped<>(portDescription,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700654 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700655 newPort = composePort(device, number, descsMap);
656 } else {
657 // outdated event, ignored.
658 continue;
659 }
660
661 events.add(oldPort == null ?
662 createPort(device, newPort, ports) :
663 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700664 }
665
666 events.addAll(pruneOldPorts(device, ports, processed));
667 }
668 return FluentIterable.from(events).filter(notNull()).toList();
669 }
670
671 // Creates a new port based on the port description adds it to the map and
672 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700673 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700674 private DeviceEvent createPort(Device device, Port newPort,
675 Map<PortNumber, Port> ports) {
676 ports.put(newPort.number(), newPort);
677 return new DeviceEvent(PORT_ADDED, device, newPort);
678 }
679
680 // Checks if the specified port requires update and if so, it replaces the
681 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700682 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700683 private DeviceEvent updatePort(Device device, Port oldPort,
684 Port newPort,
685 Map<PortNumber, Port> ports) {
686 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700687 oldPort.type() != newPort.type() ||
688 oldPort.portSpeed() != newPort.portSpeed() ||
689 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700690 ports.put(oldPort.number(), newPort);
691 return new DeviceEvent(PORT_UPDATED, device, newPort);
692 }
693 return null;
694 }
695
696 // Prunes the specified list of ports based on which ports are in the
697 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700698 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700699 private List<DeviceEvent> pruneOldPorts(Device device,
700 Map<PortNumber, Port> ports,
701 Set<PortNumber> processed) {
702 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700703 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700704 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700705 Entry<PortNumber, Port> e = iterator.next();
706 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700707 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700708 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700709 iterator.remove();
710 }
711 }
712 return events;
713 }
714
715 // Gets the map of ports for the specified device; if one does not already
716 // exist, it creates and registers a new one.
717 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
718 return createIfAbsentUnchecked(devicePorts, deviceId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700719 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700720 }
721
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700722 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700723 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700724 Map<ProviderId, DeviceDescriptions> r;
725 r = deviceDescs.get(deviceId);
726 if (r == null) {
Sho SHIMIZUa0fda212015-06-10 19:15:38 -0700727 r = new HashMap<>();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700728 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
729 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
730 if (concurrentlyAdded != null) {
731 r = concurrentlyAdded;
732 }
733 }
734 return r;
735 }
736
737 // Guarded by deviceDescs value (=Device lock)
738 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
739 Map<ProviderId, DeviceDescriptions> device,
740 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700741 synchronized (device) {
742 DeviceDescriptions r = device.get(providerId);
743 if (r == null) {
744 r = new DeviceDescriptions(deltaDesc);
745 device.put(providerId, r);
746 }
747 return r;
748 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700749 }
750
751 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700752 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
753 DeviceId deviceId,
754 PortDescription portDescription) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700755 final Timestamp newTimestamp;
756 try {
757 newTimestamp = deviceClockService.getTimestamp(deviceId);
758 } catch (IllegalStateException e) {
759 log.info("Timestamp was not available for device {}", deviceId);
760 log.debug(" discarding {}", portDescription);
761 // Failed to generate timestamp. Ignoring.
762 // See updatePorts comment
763 return null;
764 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700765 final Timestamped<PortDescription> deltaDesc
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700766 = new Timestamped<>(portDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700767 final DeviceEvent event;
768 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800769 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
770 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700771 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800772 mergedDesc = device.get(providerId)
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700773 .getPortDesc(portDescription.portNumber());
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700774 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700775 if (event != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700776 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700777 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800778 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700779 }
780 return event;
781 }
782
783 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700784 Timestamped<PortDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700785 Device device = devices.get(deviceId);
786 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
787
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700788 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700789 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
790
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700791 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700792
793 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
794 log.debug("Ignoring outdated event: {}", deltaDesc);
795 return null;
796 }
797
798 DeviceDescriptions descs = descsMap.get(providerId);
799 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700800 verify(descs != null,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700801 "Device description for Device ID %s from Provider %s was not found",
802 deviceId, providerId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700803
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700804 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
805 final PortNumber number = deltaDesc.value().portNumber();
806 final Port oldPort = ports.get(number);
807 final Port newPort;
808
809 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
810 if (existingPortDesc == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700811 deltaDesc.isNewer(existingPortDesc)) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700812 // on new port or valid update
813 // update description
814 descs.putPortDesc(deltaDesc);
815 newPort = composePort(device, number, descsMap);
816 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700817 // same or outdated event, ignored.
818 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700819 return null;
820 }
821
822 if (oldPort == null) {
823 return createPort(device, newPort, ports);
824 } else {
825 return updatePort(device, oldPort, newPort, ports);
826 }
827 }
828 }
829
830 @Override
831 public List<Port> getPorts(DeviceId deviceId) {
832 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
833 if (ports == null) {
834 return Collections.emptyList();
835 }
836 return ImmutableList.copyOf(ports.values());
837 }
838
839 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700840 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
841 DeviceId deviceId) {
842 Map<ProviderId, DeviceDescriptions> descs = this.deviceDescs.get(deviceId);
843 if (descs == null) {
844 return null;
845 }
846 // inner-Map(=descs) is HashMap, thus requires synchronization even for reads
847 final Optional<DeviceDescriptions> devDescs;
848 synchronized (descs) {
849 devDescs = Optional.ofNullable(descs.get(pid));
850 }
851 // DeviceDescriptions is concurrent access-safe
852 return devDescs
853 .map(dd -> dd.getPortDescs().values().stream()
854 .map(Timestamped::value))
855 .orElse(Stream.empty());
856 }
857
858 @Override
sangho538108b2015-04-08 14:29:20 -0700859 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200860 Collection<PortStatistics> newStatsCollection) {
sangho538108b2015-04-08 14:29:20 -0700861
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200862 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
863 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
864 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
865
866 if (prvStatsMap != null) {
867 for (PortStatistics newStats : newStatsCollection) {
868 PortNumber port = PortNumber.portNumber(newStats.port());
869 PortStatistics prvStats = prvStatsMap.get(port);
870 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
871 PortStatistics deltaStats = builder.build();
872 if (prvStats != null) {
873 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
874 }
875 deltaStatsMap.put(port, deltaStats);
876 newStatsMap.put(port, newStats);
877 }
878 } else {
879 for (PortStatistics newStats : newStatsCollection) {
880 PortNumber port = PortNumber.portNumber(newStats.port());
881 newStatsMap.put(port, newStats);
882 }
sangho538108b2015-04-08 14:29:20 -0700883 }
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200884 devicePortDeltaStats.put(deviceId, deltaStatsMap);
885 devicePortStats.put(deviceId, newStatsMap);
Dusan Pajin517e2232015-08-24 16:50:11 +0200886 // DeviceEvent returns null because of InternalPortStatsListener usage
887 return null;
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200888 }
889
890 /**
891 * Calculate delta statistics by subtracting previous from new statistics.
892 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700893 * @param deviceId device identifier
894 * @param prvStats previous port statistics
895 * @param newStats new port statistics
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200896 * @return PortStatistics
897 */
898 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
899 // calculate time difference
900 long deltaStatsSec, deltaStatsNano;
901 if (newStats.durationNano() < prvStats.durationNano()) {
902 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
903 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
904 } else {
905 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
906 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
907 }
908 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
909 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
910 .setPort(newStats.port())
911 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
912 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
913 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
914 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
915 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
916 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
917 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
918 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
919 .setDurationSec(deltaStatsSec)
920 .setDurationNano(deltaStatsNano)
921 .build();
922 return deltaStats;
sangho538108b2015-04-08 14:29:20 -0700923 }
924
925 @Override
926 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
sangho538108b2015-04-08 14:29:20 -0700927 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
928 if (portStats == null) {
929 return Collections.emptyList();
930 }
931 return ImmutableList.copyOf(portStats.values());
932 }
933
934 @Override
Dusan Pajin11ff4a82015-08-20 18:03:05 +0200935 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
936 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
937 if (portStats == null) {
938 return Collections.emptyList();
939 }
940 return ImmutableList.copyOf(portStats.values());
941 }
942
943 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700944 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
945 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
946 return ports == null ? null : ports.get(portNumber);
947 }
948
949 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700950 public PortDescription getPortDescription(ProviderId pid,
951 DeviceId deviceId,
952 PortNumber portNumber) {
953 Map<ProviderId, DeviceDescriptions> descs = this.deviceDescs.get(deviceId);
954 if (descs == null) {
955 return null;
956 }
957 // inner-Map(=descs) is HashMap, thus requires synchronization even for reads
958 final Optional<DeviceDescriptions> devDescs;
959 synchronized (descs) {
960 devDescs = Optional.ofNullable(descs.get(pid));
961 }
962 // DeviceDescriptions is concurrent access-safe
963 return devDescs
964 .map(deviceDescriptions -> deviceDescriptions.getPortDesc(portNumber))
965 .map(Timestamped::value)
966 .orElse(null);
967 }
968
969 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700970 public boolean isAvailable(DeviceId deviceId) {
971 return availableDevices.contains(deviceId);
972 }
973
974 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700975 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800976 final NodeId myId = clusterService.getLocalNode().id();
977 NodeId master = mastershipService.getMasterFor(deviceId);
978
979 // if there exist a master, forward
980 // if there is no master, try to become one and process
981
982 boolean relinquishAtEnd = false;
983 if (master == null) {
984 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
985 if (myRole != MastershipRole.NONE) {
986 relinquishAtEnd = true;
987 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800988 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800989 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800990 MastershipTerm term = termService.getMastershipTerm(deviceId);
Madan Jampani7cdf3f12015-05-12 23:18:05 -0700991 if (term != null && myId.equals(term.master())) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800992 master = myId;
993 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700994 }
995
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800996 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800997 log.debug("{} has control of {}, forwarding remove request",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -0700998 master, deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800999
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001000 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001001 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001002 /* error log:
1003 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
1004 */
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001005
Brian O'Connor5eb77c82015-03-02 18:09:39 -08001006 // event will be triggered after master processes it.
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001007 return null;
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001008 }
1009
1010 // I have control..
1011
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -07001012 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001013 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001014 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001015 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001016 deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -08001017 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001018 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001019 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001020 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001021 mastershipService.relinquishMastership(deviceId);
1022 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001023 return event;
1024 }
1025
1026 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
1027 Timestamp timestamp) {
1028
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001029 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001030 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001031 // accept removal request if given timestamp is newer than
1032 // the latest Timestamp from Primary provider
1033 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
Thomas Vachuska710293f2015-11-13 12:29:31 -08001034 if (primDescs == null) {
1035 return null;
1036 }
1037
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001038 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
1039 if (timestamp.compareTo(lastTimestamp) <= 0) {
1040 // outdated event ignore
1041 return null;
1042 }
1043 removalRequest.put(deviceId, timestamp);
1044
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001045 Device device = devices.remove(deviceId);
1046 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001047 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1048 if (ports != null) {
1049 ports.clear();
1050 }
1051 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001052 descs.clear();
1053 return device == null ? null :
Dusan Pajin11ff4a82015-08-20 18:03:05 +02001054 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -07001055 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001056 }
1057
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001058 /**
1059 * Checks if given timestamp is superseded by removal request
1060 * with more recent timestamp.
1061 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001062 * @param deviceId identifier of a device
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001063 * @param timestampToCheck timestamp of an event to check
1064 * @return true if device is already removed
1065 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001066 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1067 Timestamp removalTimestamp = removalRequest.get(deviceId);
1068 if (removalTimestamp != null &&
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001069 removalTimestamp.compareTo(timestampToCheck) >= 0) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001070 // removalRequest is more recent
1071 return true;
1072 }
1073 return false;
1074 }
1075
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001076 /**
1077 * Returns a Device, merging description given from multiple Providers.
1078 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001079 * @param deviceId device identifier
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001080 * @param providerDescs Collection of Descriptions from multiple providers
1081 * @return Device instance
1082 */
1083 private Device composeDevice(DeviceId deviceId,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001084 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001085
Thomas Vachuska444eda62014-10-28 13:09:42 -07001086 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001087
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001088 ProviderId primary = pickPrimaryPid(providerDescs);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001089
1090 DeviceDescriptions desc = providerDescs.get(primary);
1091
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001092 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001093 Type type = base.type();
1094 String manufacturer = base.manufacturer();
1095 String hwVersion = base.hwVersion();
1096 String swVersion = base.swVersion();
1097 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -07001098 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001099 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1100 annotations = merge(annotations, base.annotations());
1101
1102 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1103 if (e.getKey().equals(primary)) {
1104 continue;
1105 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -08001106 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001107 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001108 // Currently assuming there will never be a key conflict between
1109 // providers
1110
1111 // annotation merging. not so efficient, should revisit later
1112 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1113 }
1114
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001115 return new DefaultDevice(primary, deviceId, type, manufacturer,
1116 hwVersion, swVersion, serialNumber,
1117 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001118 }
1119
Marc De Leenheer88194c32015-05-29 22:10:59 -07001120 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1121 PortDescription description, Annotations annotations) {
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -08001122 // FIXME this switch need to go away once all ports are done.
Marc De Leenheer88194c32015-05-29 22:10:59 -07001123 switch (description.type()) {
1124 case OMS:
HIGUCHI Yuta95d83e82016-04-26 12:13:48 -07001125 if (description instanceof OmsPortDescription) {
1126 // remove if-block once deprecation is complete
1127 OmsPortDescription omsDesc = (OmsPortDescription) description;
1128 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1129 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1130 }
1131 // same as default
1132 return new DefaultPort(device, number, isEnabled, description.type(),
1133 description.portSpeed(), annotations);
Marc De Leenheer88194c32015-05-29 22:10:59 -07001134 case OCH:
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -08001135 if (description instanceof OchPortDescription) {
1136 // remove if-block once Och deprecation is complete
1137 OchPortDescription ochDesc = (OchPortDescription) description;
1138 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1139 ochDesc.isTunable(), ochDesc.lambda(), annotations);
1140 }
1141 return new DefaultPort(device, number, isEnabled, description.type(),
1142 description.portSpeed(), annotations);
Marc De Leenheer88194c32015-05-29 22:10:59 -07001143 case ODUCLT:
HIGUCHI Yuta4c0ef6b2016-05-02 19:45:41 -07001144 if (description instanceof OduCltPortDescription) {
1145 // remove if-block once deprecation is complete
1146 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1147 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1148 }
1149 // same as default
1150 return new DefaultPort(device, number, isEnabled, description.type(),
1151 description.portSpeed(), annotations);
Rimon Ashkenazy8ebfff02016-02-01 11:56:36 +02001152 case OTU:
HIGUCHI Yuta5be3e822016-05-03 13:51:42 -07001153 if (description instanceof OtuPortDescription) {
1154 // remove if-block once deprecation is complete
1155 OtuPortDescription otuDesc = (OtuPortDescription) description;
1156 return new OtuPort(device, number, isEnabled, otuDesc.signalType(), annotations);
1157 }
1158 // same as default
1159 return new DefaultPort(device, number, isEnabled, description.type(),
1160 description.portSpeed(), annotations);
Marc De Leenheer88194c32015-05-29 22:10:59 -07001161 default:
1162 return new DefaultPort(device, number, isEnabled, description.type(),
1163 description.portSpeed(), annotations);
1164 }
1165 }
1166
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001167 /**
1168 * Returns a Port, merging description given from multiple Providers.
1169 *
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001170 * @param device device the port is on
1171 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001172 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001173 * @return Port instance
1174 */
1175 private Port composePort(Device device, PortNumber number,
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001176 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001177
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001178 ProviderId primary = pickPrimaryPid(descsMap);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001179 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001180 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001181 boolean isEnabled = false;
1182 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
Ayaka Koshibeae541732015-05-19 13:37:27 -07001183 Timestamp newest = null;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001184 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1185 if (portDesc != null) {
1186 isEnabled = portDesc.value().isEnabled();
1187 annotations = merge(annotations, portDesc.value().annotations());
Ayaka Koshibeae541732015-05-19 13:37:27 -07001188 newest = portDesc.timestamp();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001189 }
Ayaka Koshibeae541732015-05-19 13:37:27 -07001190 Port updated = null;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001191 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001192 if (e.getKey().equals(primary)) {
1193 continue;
1194 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -08001195 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001196 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001197 // Currently assuming there will never be a key conflict between
1198 // providers
1199
1200 // annotation merging. not so efficient, should revisit later
1201 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1202 if (otherPortDesc != null) {
Ayaka Koshibeae541732015-05-19 13:37:27 -07001203 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1204 continue;
1205 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001206 annotations = merge(annotations, otherPortDesc.value().annotations());
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001207 PortDescription other = otherPortDesc.value();
Marc De Leenheer88194c32015-05-29 22:10:59 -07001208 updated = buildTypedPort(device, number, isEnabled, other, annotations);
Ayaka Koshibeae541732015-05-19 13:37:27 -07001209 newest = otherPortDesc.timestamp();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001210 }
1211 }
Marc De Leenheer4b18a232015-04-30 11:58:20 -07001212 if (portDesc == null) {
Ayaka Koshibeae541732015-05-19 13:37:27 -07001213 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
Marc De Leenheer4b18a232015-04-30 11:58:20 -07001214 }
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001215 PortDescription current = portDesc.value();
1216 return updated == null
Marc De Leenheer88194c32015-05-29 22:10:59 -07001217 ? buildTypedPort(device, number, isEnabled, current, annotations)
Ayaka Koshibe74b55272015-05-28 15:16:04 -07001218 : updated;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001219 }
1220
1221 /**
1222 * @return primary ProviderID, or randomly chosen one if none exists
1223 */
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001224 private ProviderId pickPrimaryPid(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001225 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001226 ProviderId fallBackPrimary = null;
1227 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1228 if (!e.getKey().isAncillary()) {
1229 return e.getKey();
1230 } else if (fallBackPrimary == null) {
1231 // pick randomly as a fallback in case there is no primary
1232 fallBackPrimary = e.getKey();
1233 }
1234 }
1235 return fallBackPrimary;
1236 }
1237
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001238 private DeviceDescriptions getPrimaryDescriptions(
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001239 Map<ProviderId, DeviceDescriptions> providerDescs) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -08001240 ProviderId pid = pickPrimaryPid(providerDescs);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001241 return providerDescs.get(pid);
1242 }
1243
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001244 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001245 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001246 }
1247
Jonathan Hart7d656f42015-01-27 14:07:23 -08001248 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -07001249 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001250 }
Madan Jampani47c93732014-10-06 20:46:08 -07001251
Jonathan Hart7d656f42015-01-27 14:07:23 -08001252 private void notifyPeers(InternalDeviceEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001253 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001254 }
1255
Jonathan Hart7d656f42015-01-27 14:07:23 -08001256 private void notifyPeers(InternalDeviceOfflineEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001257 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -07001258 }
1259
Jonathan Hart7d656f42015-01-27 14:07:23 -08001260 private void notifyPeers(InternalDeviceRemovedEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001261 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001262 }
1263
Jonathan Hart7d656f42015-01-27 14:07:23 -08001264 private void notifyPeers(InternalPortEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001265 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001266 }
1267
Jonathan Hart7d656f42015-01-27 14:07:23 -08001268 private void notifyPeers(InternalPortStatusEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001269 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1270 }
1271
1272 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1273 try {
1274 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1275 } catch (IOException e) {
1276 log.error("Failed to send" + event + " to " + recipient, e);
1277 }
1278 }
1279
1280 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1281 try {
1282 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1283 } catch (IOException e) {
1284 log.error("Failed to send" + event + " to " + recipient, e);
1285 }
1286 }
1287
1288 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1289 try {
1290 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1291 } catch (IOException e) {
1292 log.error("Failed to send" + event + " to " + recipient, e);
1293 }
1294 }
1295
1296 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1297 try {
1298 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1299 } catch (IOException e) {
1300 log.error("Failed to send" + event + " to " + recipient, e);
1301 }
1302 }
1303
1304 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1305 try {
1306 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1307 } catch (IOException e) {
1308 log.error("Failed to send" + event + " to " + recipient, e);
1309 }
1310 }
1311
1312 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1313 final NodeId self = clusterService.getLocalNode().id();
1314
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001315 final int numDevices = deviceDescs.size();
1316 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1317 final int portsPerDevice = 8; // random factor to minimize reallocation
1318 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1319 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001320
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001321 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001322
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001323 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001324 synchronized (devDescs) {
1325
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001326 // send device offline timestamp
1327 Timestamp lOffline = this.offline.get(deviceId);
1328 if (lOffline != null) {
1329 adOffline.put(deviceId, lOffline);
1330 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001331
1332 for (Entry<ProviderId, DeviceDescriptions>
1333 prov : devDescs.entrySet()) {
1334
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001335 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001336 final ProviderId provId = prov.getKey();
1337 final DeviceDescriptions descs = prov.getValue();
1338
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001339 adDevices.put(new DeviceFragmentId(deviceId, provId),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001340 descs.getDeviceDesc().timestamp());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001341
1342 for (Entry<PortNumber, Timestamped<PortDescription>>
1343 portDesc : descs.getPortDescs().entrySet()) {
1344
1345 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001346 adPorts.put(new PortFragmentId(deviceId, provId, number),
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001347 portDesc.getValue().timestamp());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001348 }
1349 }
1350 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001351 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001352
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001353 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001354 }
1355
1356 /**
1357 * Responds to anti-entropy advertisement message.
HIGUCHI Yuta67023a22016-03-28 13:35:44 -07001358 * <p>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001359 * Notify sender about out-dated information using regular replication message.
1360 * Send back advertisement to sender if not in sync.
1361 *
1362 * @param advertisement to respond to
1363 */
1364 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1365
1366 final NodeId sender = advertisement.sender();
1367
1368 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1369 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1370 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1371
1372 // Fragments to request
1373 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1374 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1375
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001376 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001377 final DeviceId deviceId = de.getKey();
1378 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1379
1380 synchronized (lDevice) {
1381 // latestTimestamp across provider
1382 // Note: can be null initially
1383 Timestamp localLatest = offline.get(deviceId);
1384
1385 // handle device Ads
1386 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1387 final ProviderId provId = prov.getKey();
1388 final DeviceDescriptions lDeviceDescs = prov.getValue();
1389
1390 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1391
1392
1393 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1394 Timestamp advDevTimestamp = devAds.get(devFragId);
1395
Jonathan Hart403ea932015-02-20 16:23:00 -08001396 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1397 advDevTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001398 // remote does not have it or outdated, suggest
1399 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1400 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1401 // local is outdated, request
1402 reqDevices.add(devFragId);
1403 }
1404
1405 // handle port Ads
1406 for (Entry<PortNumber, Timestamped<PortDescription>>
1407 pe : lDeviceDescs.getPortDescs().entrySet()) {
1408
1409 final PortNumber num = pe.getKey();
1410 final Timestamped<PortDescription> lPort = pe.getValue();
1411
1412 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1413
1414 Timestamp advPortTimestamp = portAds.get(portFragId);
Jonathan Hart403ea932015-02-20 16:23:00 -08001415 if (advPortTimestamp == null || lPort.isNewerThan(
1416 advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001417 // remote does not have it or outdated, suggest
1418 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1419 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1420 // local is outdated, request
1421 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1422 reqPorts.add(portFragId);
1423 }
1424
1425 // remove port Ad already processed
1426 portAds.remove(portFragId);
1427 } // end local port loop
1428
1429 // remove device Ad already processed
1430 devAds.remove(devFragId);
1431
1432 // find latest and update
1433 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1434 if (localLatest == null ||
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001435 providerLatest.compareTo(localLatest) > 0) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001436 localLatest = providerLatest;
1437 }
1438 } // end local provider loop
1439
1440 // checking if remote timestamp is more recent.
1441 Timestamp rOffline = offlineAds.get(deviceId);
1442 if (rOffline != null &&
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001443 rOffline.compareTo(localLatest) > 0) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001444 // remote offline timestamp suggests that the
1445 // device is off-line
1446 markOfflineInternal(deviceId, rOffline);
1447 }
1448
1449 Timestamp lOffline = offline.get(deviceId);
1450 if (lOffline != null && rOffline == null) {
1451 // locally offline, but remote is online, suggest offline
1452 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1453 }
1454
1455 // remove device offline Ad already processed
1456 offlineAds.remove(deviceId);
1457 } // end local device loop
1458 } // device lock
1459
1460 // If there is any Ads left, request them
1461 log.trace("Ads left {}, {}", devAds, portAds);
1462 reqDevices.addAll(devAds.keySet());
1463 reqPorts.addAll(portAds.keySet());
1464
1465 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1466 log.trace("Nothing to request to remote peer {}", sender);
1467 return;
1468 }
1469
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001470 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001471
1472 // 2-way Anti-Entropy for now
1473 try {
1474 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1475 } catch (IOException e) {
1476 log.error("Failed to send response advertisement to " + sender, e);
1477 }
1478
1479// Sketch of 3-way Anti-Entropy
1480// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1481// ClusterMessage message = new ClusterMessage(
1482// clusterService.getLocalNode().id(),
1483// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1484// SERIALIZER.encode(request));
1485//
1486// try {
1487// clusterCommunicator.unicast(message, advertisement.sender());
1488// } catch (IOException e) {
1489// log.error("Failed to send advertisement reply to "
1490// + advertisement.sender(), e);
1491// }
Madan Jampani47c93732014-10-06 20:46:08 -07001492 }
1493
Madan Jampani255a58b2014-10-09 12:08:20 -07001494 private void notifyDelegateIfNotNull(DeviceEvent event) {
1495 if (event != null) {
1496 notifyDelegate(event);
1497 }
1498 }
1499
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001500 private final class SendAdvertisementTask implements Runnable {
1501
1502 @Override
1503 public void run() {
1504 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001505 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001506 return;
1507 }
1508
1509 try {
1510 final NodeId self = clusterService.getLocalNode().id();
1511 Set<ControllerNode> nodes = clusterService.getNodes();
1512
1513 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1514 .transform(toNodeId())
1515 .toList();
1516
1517 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001518 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001519 return;
1520 }
1521
1522 NodeId peer;
1523 do {
1524 int idx = RandomUtils.nextInt(0, nodeIds.size());
1525 peer = nodeIds.get(idx);
1526 } while (peer.equals(self));
1527
1528 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1529
1530 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001531 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001532 return;
1533 }
1534
1535 try {
1536 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1537 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001538 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001539 return;
1540 }
1541 } catch (Exception e) {
1542 // catch all Exception to avoid Scheduled task being suppressed.
1543 log.error("Exception thrown while sending advertisement", e);
1544 }
1545 }
1546 }
1547
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001548 private final class InternalDeviceEventListener
1549 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001550 @Override
1551 public void handle(ClusterMessage message) {
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001552 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001553 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001554
Madan Jampani47c93732014-10-06 20:46:08 -07001555 ProviderId providerId = event.providerId();
1556 DeviceId deviceId = event.deviceId();
1557 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001558
Madan Jampani2af244a2015-02-22 13:12:01 -08001559 try {
1560 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1561 } catch (Exception e) {
1562 log.warn("Exception thrown handling device update", e);
1563 }
Madan Jampani47c93732014-10-06 20:46:08 -07001564 }
1565 }
1566
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001567 private final class InternalDeviceOfflineEventListener
1568 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001569 @Override
1570 public void handle(ClusterMessage message) {
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001571 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001572 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001573
1574 DeviceId deviceId = event.deviceId();
1575 Timestamp timestamp = event.timestamp();
1576
Madan Jampani2af244a2015-02-22 13:12:01 -08001577 try {
1578 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1579 } catch (Exception e) {
1580 log.warn("Exception thrown handling device offline", e);
1581 }
Madan Jampani25322532014-10-08 11:20:38 -07001582 }
1583 }
1584
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001585 private final class InternalRemoveRequestListener
1586 implements ClusterMessageHandler {
1587 @Override
1588 public void handle(ClusterMessage message) {
1589 log.debug("Received device remove request from peer: {}", message.sender());
1590 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001591
Madan Jampani2af244a2015-02-22 13:12:01 -08001592 try {
1593 removeDevice(did);
1594 } catch (Exception e) {
1595 log.warn("Exception thrown handling device remove", e);
1596 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001597 }
1598 }
1599
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001600 private final class InternalDeviceRemovedEventListener
1601 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001602 @Override
1603 public void handle(ClusterMessage message) {
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001604 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001605 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001606
1607 DeviceId deviceId = event.deviceId();
1608 Timestamp timestamp = event.timestamp();
1609
Madan Jampani2af244a2015-02-22 13:12:01 -08001610 try {
1611 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1612 } catch (Exception e) {
1613 log.warn("Exception thrown handling device removed", e);
1614 }
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001615 }
1616 }
1617
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001618 private final class InternalPortEventListener
1619 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001620 @Override
1621 public void handle(ClusterMessage message) {
1622
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001623 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001624 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001625
1626 ProviderId providerId = event.providerId();
1627 DeviceId deviceId = event.deviceId();
1628 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1629
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001630 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001631 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001632 // Note: dropped information will be recovered by anti-entropy
1633 return;
1634 }
1635
Madan Jampani2af244a2015-02-22 13:12:01 -08001636 try {
1637 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1638 } catch (Exception e) {
1639 log.warn("Exception thrown handling port update", e);
1640 }
Madan Jampani47c93732014-10-06 20:46:08 -07001641 }
1642 }
1643
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001644 private final class InternalPortStatusEventListener
1645 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001646 @Override
1647 public void handle(ClusterMessage message) {
1648
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001649 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001650 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001651
1652 ProviderId providerId = event.providerId();
1653 DeviceId deviceId = event.deviceId();
1654 Timestamped<PortDescription> portDescription = event.portDescription();
1655
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001656 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001657 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001658 // Note: dropped information will be recovered by anti-entropy
1659 return;
1660 }
1661
Madan Jampani2af244a2015-02-22 13:12:01 -08001662 try {
1663 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1664 } catch (Exception e) {
1665 log.warn("Exception thrown handling port update", e);
1666 }
Madan Jampani47c93732014-10-06 20:46:08 -07001667 }
1668 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001669
1670 private final class InternalDeviceAdvertisementListener
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001671 implements ClusterMessageHandler {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001672 @Override
1673 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001674 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001675 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -08001676 try {
1677 handleAdvertisement(advertisement);
1678 } catch (Exception e) {
1679 log.warn("Exception thrown handling Device advertisements.", e);
1680 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001681 }
1682 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001683
1684 private final class DeviceInjectedEventListener
1685 implements ClusterMessageHandler {
1686 @Override
1687 public void handle(ClusterMessage message) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001688 log.debug("Received injected device event from peer: {}", message.sender());
1689 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1690
1691 ProviderId providerId = event.providerId();
1692 DeviceId deviceId = event.deviceId();
1693 DeviceDescription deviceDescription = event.deviceDescription();
HIGUCHI Yuta62334412015-03-13 13:23:05 -07001694 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1695 // workaround for ONOS-1208
1696 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1697 return;
1698 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001699
Madan Jampani2af244a2015-02-22 13:12:01 -08001700 try {
1701 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1702 } catch (Exception e) {
1703 log.warn("Exception thrown handling device injected event.", e);
1704 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001705 }
1706 }
1707
1708 private final class PortInjectedEventListener
1709 implements ClusterMessageHandler {
1710 @Override
1711 public void handle(ClusterMessage message) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001712 log.debug("Received injected port event from peer: {}", message.sender());
1713 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1714
1715 ProviderId providerId = event.providerId();
1716 DeviceId deviceId = event.deviceId();
1717 List<PortDescription> portDescriptions = event.portDescriptions();
HIGUCHI Yuta62334412015-03-13 13:23:05 -07001718 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1719 // workaround for ONOS-1208
1720 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1721 return;
1722 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001723
Madan Jampani2af244a2015-02-22 13:12:01 -08001724 try {
1725 updatePorts(providerId, deviceId, portDescriptions);
1726 } catch (Exception e) {
1727 log.warn("Exception thrown handling port injected event.", e);
1728 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001729 }
1730 }
Thomas Vachuskafdbc4c22015-05-29 15:53:01 -07001731
1732 private class InternalPortStatsListener
1733 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1734 @Override
1735 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1736 if (event.type() == PUT) {
1737 Device device = devices.get(event.key());
1738 if (device != null) {
1739 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
1740 }
1741 }
1742 }
1743 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001744}