blob: 06a60d5048f3dd0059b4c278f11abc663612445c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -080019import com.google.common.base.Predicate;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import com.google.common.collect.FluentIterable;
21import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070022import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070024import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080031import org.onlab.packet.ChassisId;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.mastership.MastershipTerm;
39import org.onosproject.mastership.MastershipTermService;
40import org.onosproject.net.AnnotationsUtil;
41import org.onosproject.net.DefaultAnnotations;
42import org.onosproject.net.DefaultDevice;
43import org.onosproject.net.DefaultPort;
44import org.onosproject.net.Device;
45import org.onosproject.net.Device.Type;
46import org.onosproject.net.DeviceId;
47import org.onosproject.net.MastershipRole;
48import org.onosproject.net.Port;
49import org.onosproject.net.PortNumber;
50import org.onosproject.net.device.DeviceClockService;
51import org.onosproject.net.device.DeviceDescription;
52import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceStore;
54import org.onosproject.net.device.DeviceStoreDelegate;
55import org.onosproject.net.device.PortDescription;
56import org.onosproject.net.provider.ProviderId;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.impl.Timestamped;
64import org.onosproject.store.serializers.KryoSerializer;
65import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066import org.slf4j.Logger;
67
Madan Jampani47c93732014-10-06 20:46:08 -070068import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070070import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070071import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070072import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070073import java.util.HashSet;
74import java.util.Iterator;
75import java.util.List;
76import java.util.Map;
77import java.util.Map.Entry;
78import java.util.Objects;
79import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070080import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080081import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070083import java.util.concurrent.ScheduledExecutorService;
84import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070085
86import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070087import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import static com.google.common.base.Verify.verify;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080089import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
90import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080091import static org.onlab.util.Tools.groupedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080092import static org.onlab.util.Tools.minPriority;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080093import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
94import static org.onosproject.net.DefaultAnnotations.merge;
95import static org.onosproject.net.device.DeviceEvent.Type.*;
96import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
97import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
98import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070099
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100/**
101 * Manages inventory of infrastructure devices using gossip protocol to distribute
102 * information.
103 */
104@Component(immediate = true)
105@Service
106public class GossipDeviceStore
107 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
108 implements DeviceStore {
109
110 private final Logger log = getLogger(getClass());
111
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700112 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800113 // Timeout in milliseconds to process device or ports on remote master node
114 private static final int REMOTE_MASTER_TIMEOUT = 1000;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700116 // innerMap is used to lock a Device, thus instance should never be replaced.
117 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700118 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700119 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700120
121 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700122 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
123 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
124
125 // to be updated under Device lock
126 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
127 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700128
129 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700130 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700133 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700134
Madan Jampani47c93732014-10-06 20:46:08 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected ClusterCommunicationService clusterCommunicator;
137
Madan Jampani53e44e62014-10-07 12:39:51 -0700138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected ClusterService clusterService;
140
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected MastershipService mastershipService;
143
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected MastershipTermService termService;
146
147
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700148 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700149 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700150 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700151 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800152 .register(DistributedStoreSerializers.STORE_COMMON)
153 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
154 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
155 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700156 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800157 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
158 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700159 .register(DeviceAntiEntropyAdvertisement.class)
160 .register(DeviceFragmentId.class)
161 .register(PortFragmentId.class)
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800162 .register(DeviceInjectedEvent.class)
163 .register(PortInjectedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800164 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700165 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700166 };
167
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800168 private ExecutorService executor;
169
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800170 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700171
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800172 // TODO make these anti-entropy parameters configurable
173 private long initialDelaySec = 5;
174 private long periodSec = 5;
175
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800176
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700177 @Activate
178 public void activate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700179
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800180 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800181
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800182 backgroundExecutor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800183 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700184
Madan Jampani2af244a2015-02-22 13:12:01 -0800185 clusterCommunicator.addSubscriber(
186 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
187 clusterCommunicator.addSubscriber(
188 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
189 new InternalDeviceOfflineEventListener(),
190 executor);
191 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
192 new InternalRemoveRequestListener(),
193 executor);
194 clusterCommunicator.addSubscriber(
195 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
196 clusterCommunicator.addSubscriber(
197 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
198 clusterCommunicator.addSubscriber(
199 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
200 clusterCommunicator.addSubscriber(
201 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
202 new InternalDeviceAdvertisementListener(),
203 backgroundExecutor);
204 clusterCommunicator.addSubscriber(
205 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
206 clusterCommunicator.addSubscriber(
207 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
208
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700209 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800210 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700211 initialDelaySec, periodSec, TimeUnit.SECONDS);
212
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700213 log.info("Started");
214 }
215
216 @Deactivate
217 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700218
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800219 executor.shutdownNow();
220
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800221 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700222 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800223 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700224 log.error("Timeout during executor shutdown");
225 }
226 } catch (InterruptedException e) {
227 log.error("Error during executor shutdown", e);
228 }
229
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700230 deviceDescs.clear();
231 devices.clear();
232 devicePorts.clear();
233 availableDevices.clear();
234 log.info("Stopped");
235 }
236
237 @Override
238 public int getDeviceCount() {
239 return devices.size();
240 }
241
242 @Override
243 public Iterable<Device> getDevices() {
244 return Collections.unmodifiableCollection(devices.values());
245 }
246
247 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800248 public Iterable<Device> getAvailableDevices() {
249 return FluentIterable.from(getDevices())
250 .filter(new Predicate<Device>() {
251
252 @Override
253 public boolean apply(Device input) {
254 return isAvailable(input.id());
255 }
256 });
257 }
258
259 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700260 public Device getDevice(DeviceId deviceId) {
261 return devices.get(deviceId);
262 }
263
264 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
266 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700267 DeviceDescription deviceDescription) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800268 NodeId localNode = clusterService.getLocalNode().id();
269 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
270
271 // Process device update only if we're the master,
272 // otherwise signal the actual master.
273 DeviceEvent deviceEvent = null;
274 if (localNode.equals(deviceNode)) {
275
276 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
277 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
278 final Timestamped<DeviceDescription> mergedDesc;
279 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
280
281 synchronized (device) {
282 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
283 mergedDesc = device.get(providerId).getDeviceDesc();
284 }
285
286 if (deviceEvent != null) {
287 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
288 providerId, deviceId);
289 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
290 }
291
292 } else {
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800293 // FIXME Temporary hack for NPE (ONOS-1171).
294 // Proper fix is to implement forwarding to master on ConfigProvider
295 // redo ONOS-490
296 if (deviceNode == null) {
297 // silently ignore
298 return null;
299 }
300
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800301
302 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
303 providerId, deviceId, deviceDescription);
304 ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
305 SERIALIZER.encode(deviceInjectedEvent));
306
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800307 // TODO check unicast return value
308 clusterCommunicator.unicast(clusterMessage, deviceNode);
309 /* error log:
310 log.warn("Failed to process injected device id: {} desc: {} " +
311 "(cluster messaging failed: {})",
312 deviceId, deviceDescription, e);
313 */
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700314 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800315
316 return deviceEvent;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700317 }
318
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700319 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
320 DeviceId deviceId,
321 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700322
323 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800324 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700325 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700326
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800327 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700328 // locking per device
329
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700330 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
331 log.debug("Ignoring outdated event: {}", deltaDesc);
332 return null;
333 }
334
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800335 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700337 final Device oldDevice = devices.get(deviceId);
338 final Device newDevice;
339
340 if (deltaDesc == descs.getDeviceDesc() ||
341 deltaDesc.isNewer(descs.getDeviceDesc())) {
342 // on new device or valid update
343 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800344 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700345 } else {
346 // outdated event, ignored.
347 return null;
348 }
349 if (oldDevice == null) {
350 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700351 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700352 } else {
353 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700354 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700355 }
356 }
357 }
358
359 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700360 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700362 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700363
364 // update composed device cache
365 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
366 verify(oldDevice == null,
367 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
368 providerId, oldDevice, newDevice);
369
370 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700371 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700372 }
373
374 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
375 }
376
377 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700378 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700379 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700380 Device oldDevice,
381 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700382 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700383 boolean propertiesChanged =
384 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
385 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
386 boolean annotationsChanged =
387 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700388
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700389 // Primary providers can respond to all changes, but ancillary ones
390 // should respond only to annotation changes.
391 if ((providerId.isAncillary() && annotationsChanged) ||
392 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700393 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
394 if (!replaced) {
395 verify(replaced,
396 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
397 providerId, oldDevice, devices.get(newDevice.id())
398 , newDevice);
399 }
400 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700401 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700402 }
403 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
404 }
405
406 // Otherwise merely attempt to change availability if primary provider
407 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700408 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700409 return !added ? null :
410 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
411 }
412 return null;
413 }
414
415 @Override
416 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700417 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700418 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700419 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700420 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
421 deviceId, timestamp);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800422 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700423 }
424 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700425 }
426
427 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
428
429 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700430 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700431
432 // locking device
433 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700434
435 // accept off-line if given timestamp is newer than
436 // the latest Timestamp from Primary provider
437 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
438 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
439 if (timestamp.compareTo(lastTimestamp) <= 0) {
440 // outdated event ignore
441 return null;
442 }
443
444 offline.put(deviceId, timestamp);
445
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700446 Device device = devices.get(deviceId);
447 if (device == null) {
448 return null;
449 }
450 boolean removed = availableDevices.remove(deviceId);
451 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700452 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700453 }
454 return null;
455 }
456 }
457
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700458 /**
459 * Marks the device as available if the given timestamp is not outdated,
460 * compared to the time the device has been marked offline.
461 *
462 * @param deviceId identifier of the device
463 * @param timestamp of the event triggering this change.
464 * @return true if availability change request was accepted and changed the state
465 */
466 // Guarded by deviceDescs value (=Device lock)
467 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
468 // accept on-line if given timestamp is newer than
469 // the latest offline request Timestamp
470 Timestamp offlineTimestamp = offline.get(deviceId);
471 if (offlineTimestamp == null ||
472 offlineTimestamp.compareTo(timestamp) < 0) {
473
474 offline.remove(deviceId);
475 return availableDevices.add(deviceId);
476 }
477 return false;
478 }
479
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700480 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700481 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
482 DeviceId deviceId,
483 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700484
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800485 NodeId localNode = clusterService.getLocalNode().id();
486 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
487 // since it will trigger distributed store read.
488 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
489 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
490 // If we don't care much about topology performance, then it might be OK.
491 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700492
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800493 // Process port update only if we're the master of the device,
494 // otherwise signal the actual master.
495 List<DeviceEvent> deviceEvents = null;
496 if (localNode.equals(deviceNode)) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700497
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800498 final Timestamp newTimestamp;
499 try {
500 newTimestamp = deviceClockService.getTimestamp(deviceId);
501 } catch (IllegalStateException e) {
502 log.info("Timestamp was not available for device {}", deviceId);
503 log.debug(" discarding {}", portDescriptions);
504 // Failed to generate timestamp.
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700505
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800506 // Possible situation:
507 // Device connected and became master for short period of time,
508 // but lost mastership before this instance had the chance to
509 // retrieve term information.
510
511 // Information dropped here is expected to be recoverable by
512 // device probing after mastership change
513
514 return Collections.emptyList();
515 }
516 log.debug("timestamp for {} {}", deviceId, newTimestamp);
517
518 final Timestamped<List<PortDescription>> timestampedInput
519 = new Timestamped<>(portDescriptions, newTimestamp);
520 final Timestamped<List<PortDescription>> merged;
521
522 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
523
524 synchronized (device) {
525 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
526 final DeviceDescriptions descs = device.get(providerId);
527 List<PortDescription> mergedList =
528 FluentIterable.from(portDescriptions)
529 .transform(new Function<PortDescription, PortDescription>() {
530 @Override
531 public PortDescription apply(PortDescription input) {
532 // lookup merged port description
533 return descs.getPortDesc(input.portNumber()).value();
534 }
535 }).toList();
536 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
537 }
538
539 if (!deviceEvents.isEmpty()) {
540 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
541 providerId, deviceId);
542 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
543 }
544
545 } else {
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800546 // FIXME Temporary hack for NPE (ONOS-1171).
547 // Proper fix is to implement forwarding to master on ConfigProvider
548 // redo ONOS-490
549 if (deviceNode == null) {
550 // silently ignore
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800551 return Collections.emptyList();
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800552 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800553
554 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
555 ClusterMessage clusterMessage = new ClusterMessage(
556 localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800557
558 //TODO check unicast return value
559 clusterCommunicator.unicast(clusterMessage, deviceNode);
560 /* error log:
561 log.warn("Failed to process injected ports of device id: {} " +
562 "(cluster messaging failed: {})",
563 deviceId, e);
564 */
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700565 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700566
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800567 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700568 }
569
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700570 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
571 DeviceId deviceId,
572 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700573
574 Device device = devices.get(deviceId);
575 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
576
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700577 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700578 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
579
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700580 List<DeviceEvent> events = new ArrayList<>();
581 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700582
583 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
584 log.debug("Ignoring outdated events: {}", portDescriptions);
585 return null;
586 }
587
588 DeviceDescriptions descs = descsMap.get(providerId);
589 // every provider must provide DeviceDescription.
590 checkArgument(descs != null,
591 "Device description for Device ID %s from Provider %s was not found",
592 deviceId, providerId);
593
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700594 Map<PortNumber, Port> ports = getPortMap(deviceId);
595
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700596 final Timestamp newTimestamp = portDescriptions.timestamp();
597
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700598 // Add new ports
599 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700600 for (PortDescription portDescription : portDescriptions.value()) {
601 final PortNumber number = portDescription.portNumber();
602 processed.add(number);
603
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700604 final Port oldPort = ports.get(number);
605 final Port newPort;
606
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700607
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700608 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
609 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700610 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700611 // on new port or valid update
612 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700613 descs.putPortDesc(new Timestamped<>(portDescription,
614 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700615 newPort = composePort(device, number, descsMap);
616 } else {
617 // outdated event, ignored.
618 continue;
619 }
620
621 events.add(oldPort == null ?
622 createPort(device, newPort, ports) :
623 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700624 }
625
626 events.addAll(pruneOldPorts(device, ports, processed));
627 }
628 return FluentIterable.from(events).filter(notNull()).toList();
629 }
630
631 // Creates a new port based on the port description adds it to the map and
632 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700633 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700634 private DeviceEvent createPort(Device device, Port newPort,
635 Map<PortNumber, Port> ports) {
636 ports.put(newPort.number(), newPort);
637 return new DeviceEvent(PORT_ADDED, device, newPort);
638 }
639
640 // Checks if the specified port requires update and if so, it replaces the
641 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700642 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700643 private DeviceEvent updatePort(Device device, Port oldPort,
644 Port newPort,
645 Map<PortNumber, Port> ports) {
646 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700647 oldPort.type() != newPort.type() ||
648 oldPort.portSpeed() != newPort.portSpeed() ||
649 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700650 ports.put(oldPort.number(), newPort);
651 return new DeviceEvent(PORT_UPDATED, device, newPort);
652 }
653 return null;
654 }
655
656 // Prunes the specified list of ports based on which ports are in the
657 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700658 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700659 private List<DeviceEvent> pruneOldPorts(Device device,
660 Map<PortNumber, Port> ports,
661 Set<PortNumber> processed) {
662 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700663 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700664 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700665 Entry<PortNumber, Port> e = iterator.next();
666 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700667 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700668 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700669 iterator.remove();
670 }
671 }
672 return events;
673 }
674
675 // Gets the map of ports for the specified device; if one does not already
676 // exist, it creates and registers a new one.
677 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
678 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700679 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
680 }
681
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700682 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700683 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700684 Map<ProviderId, DeviceDescriptions> r;
685 r = deviceDescs.get(deviceId);
686 if (r == null) {
687 r = new HashMap<ProviderId, DeviceDescriptions>();
688 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
689 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
690 if (concurrentlyAdded != null) {
691 r = concurrentlyAdded;
692 }
693 }
694 return r;
695 }
696
697 // Guarded by deviceDescs value (=Device lock)
698 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
699 Map<ProviderId, DeviceDescriptions> device,
700 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
701
702 synchronized (device) {
703 DeviceDescriptions r = device.get(providerId);
704 if (r == null) {
705 r = new DeviceDescriptions(deltaDesc);
706 device.put(providerId, r);
707 }
708 return r;
709 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700710 }
711
712 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700713 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
714 DeviceId deviceId,
715 PortDescription portDescription) {
716
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700717 final Timestamp newTimestamp;
718 try {
719 newTimestamp = deviceClockService.getTimestamp(deviceId);
720 } catch (IllegalStateException e) {
721 log.info("Timestamp was not available for device {}", deviceId);
722 log.debug(" discarding {}", portDescription);
723 // Failed to generate timestamp. Ignoring.
724 // See updatePorts comment
725 return null;
726 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700727 final Timestamped<PortDescription> deltaDesc
728 = new Timestamped<>(portDescription, newTimestamp);
729 final DeviceEvent event;
730 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800731 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
732 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700733 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800734 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700735 .getPortDesc(portDescription.portNumber());
736 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700737 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700738 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
739 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800740 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700741 }
742 return event;
743 }
744
745 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
746 Timestamped<PortDescription> deltaDesc) {
747
748 Device device = devices.get(deviceId);
749 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
750
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700751 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700752 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
753
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700754 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700755
756 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
757 log.debug("Ignoring outdated event: {}", deltaDesc);
758 return null;
759 }
760
761 DeviceDescriptions descs = descsMap.get(providerId);
762 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700763 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700764 "Device description for Device ID %s from Provider %s was not found",
765 deviceId, providerId);
766
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700767 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
768 final PortNumber number = deltaDesc.value().portNumber();
769 final Port oldPort = ports.get(number);
770 final Port newPort;
771
772 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
773 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700774 deltaDesc.isNewer(existingPortDesc)) {
775 // on new port or valid update
776 // update description
777 descs.putPortDesc(deltaDesc);
778 newPort = composePort(device, number, descsMap);
779 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700780 // same or outdated event, ignored.
781 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700782 return null;
783 }
784
785 if (oldPort == null) {
786 return createPort(device, newPort, ports);
787 } else {
788 return updatePort(device, oldPort, newPort, ports);
789 }
790 }
791 }
792
793 @Override
794 public List<Port> getPorts(DeviceId deviceId) {
795 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
796 if (ports == null) {
797 return Collections.emptyList();
798 }
799 return ImmutableList.copyOf(ports.values());
800 }
801
802 @Override
803 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
804 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
805 return ports == null ? null : ports.get(portNumber);
806 }
807
808 @Override
809 public boolean isAvailable(DeviceId deviceId) {
810 return availableDevices.contains(deviceId);
811 }
812
813 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700814 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800815 final NodeId myId = clusterService.getLocalNode().id();
816 NodeId master = mastershipService.getMasterFor(deviceId);
817
818 // if there exist a master, forward
819 // if there is no master, try to become one and process
820
821 boolean relinquishAtEnd = false;
822 if (master == null) {
823 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
824 if (myRole != MastershipRole.NONE) {
825 relinquishAtEnd = true;
826 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800827 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800828 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800829 MastershipTerm term = termService.getMastershipTerm(deviceId);
830 if (myId.equals(term.master())) {
831 master = myId;
832 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700833 }
834
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800835 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800836 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800837 master, deviceId);
838
839 ClusterMessage message = new ClusterMessage(
840 myId,
841 DEVICE_REMOVE_REQ,
842 SERIALIZER.encode(deviceId));
843
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800844 // TODO check unicast return value
845 clusterCommunicator.unicast(message, master);
846 /* error log:
847 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
848 */
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800849
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800850 // event will be triggered after master processes it.
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800851 return null;
852 }
853
854 // I have control..
855
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700856 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700857 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700858 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800859 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700860 deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800861 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700862 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800863 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800864 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800865 mastershipService.relinquishMastership(deviceId);
866 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700867 return event;
868 }
869
870 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
871 Timestamp timestamp) {
872
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700873 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700874 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700875 // accept removal request if given timestamp is newer than
876 // the latest Timestamp from Primary provider
877 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
878 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
879 if (timestamp.compareTo(lastTimestamp) <= 0) {
880 // outdated event ignore
881 return null;
882 }
883 removalRequest.put(deviceId, timestamp);
884
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700885 Device device = devices.remove(deviceId);
886 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700887 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
888 if (ports != null) {
889 ports.clear();
890 }
891 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700892 descs.clear();
893 return device == null ? null :
894 new DeviceEvent(DEVICE_REMOVED, device, null);
895 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700896 }
897
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700898 /**
899 * Checks if given timestamp is superseded by removal request
900 * with more recent timestamp.
901 *
902 * @param deviceId identifier of a device
903 * @param timestampToCheck timestamp of an event to check
904 * @return true if device is already removed
905 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700906 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
907 Timestamp removalTimestamp = removalRequest.get(deviceId);
908 if (removalTimestamp != null &&
909 removalTimestamp.compareTo(timestampToCheck) >= 0) {
910 // removalRequest is more recent
911 return true;
912 }
913 return false;
914 }
915
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700916 /**
917 * Returns a Device, merging description given from multiple Providers.
918 *
919 * @param deviceId device identifier
920 * @param providerDescs Collection of Descriptions from multiple providers
921 * @return Device instance
922 */
923 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700924 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700925
Thomas Vachuska444eda62014-10-28 13:09:42 -0700926 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700927
928 ProviderId primary = pickPrimaryPID(providerDescs);
929
930 DeviceDescriptions desc = providerDescs.get(primary);
931
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700932 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700933 Type type = base.type();
934 String manufacturer = base.manufacturer();
935 String hwVersion = base.hwVersion();
936 String swVersion = base.swVersion();
937 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700938 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700939 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
940 annotations = merge(annotations, base.annotations());
941
942 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
943 if (e.getKey().equals(primary)) {
944 continue;
945 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800946 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700947 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700948 // Currently assuming there will never be a key conflict between
949 // providers
950
951 // annotation merging. not so efficient, should revisit later
952 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
953 }
954
955 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700956 hwVersion, swVersion, serialNumber,
957 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700958 }
959
960 /**
961 * Returns a Port, merging description given from multiple Providers.
962 *
963 * @param device device the port is on
964 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700965 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700966 * @return Port instance
967 */
968 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700969 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700970
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700971 ProviderId primary = pickPrimaryPID(descsMap);
972 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700973 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700974 boolean isEnabled = false;
975 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
976
977 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
978 if (portDesc != null) {
979 isEnabled = portDesc.value().isEnabled();
980 annotations = merge(annotations, portDesc.value().annotations());
981 }
982
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700983 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700984 if (e.getKey().equals(primary)) {
985 continue;
986 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800987 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700988 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700989 // Currently assuming there will never be a key conflict between
990 // providers
991
992 // annotation merging. not so efficient, should revisit later
993 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
994 if (otherPortDesc != null) {
995 annotations = merge(annotations, otherPortDesc.value().annotations());
996 }
997 }
998
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700999 return portDesc == null ?
1000 new DefaultPort(device, number, false, annotations) :
1001 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
1002 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001003 }
1004
1005 /**
1006 * @return primary ProviderID, or randomly chosen one if none exists
1007 */
1008 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001009 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001010 ProviderId fallBackPrimary = null;
1011 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1012 if (!e.getKey().isAncillary()) {
1013 return e.getKey();
1014 } else if (fallBackPrimary == null) {
1015 // pick randomly as a fallback in case there is no primary
1016 fallBackPrimary = e.getKey();
1017 }
1018 }
1019 return fallBackPrimary;
1020 }
1021
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001022 private DeviceDescriptions getPrimaryDescriptions(
1023 Map<ProviderId, DeviceDescriptions> providerDescs) {
1024 ProviderId pid = pickPrimaryPID(providerDescs);
1025 return providerDescs.get(pid);
1026 }
1027
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001028 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1029 ClusterMessage message = new ClusterMessage(
1030 clusterService.getLocalNode().id(),
1031 subject,
1032 SERIALIZER.encode(event));
1033 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001034 }
1035
Jonathan Hart7d656f42015-01-27 14:07:23 -08001036 private void broadcastMessage(MessageSubject subject, Object event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001037 ClusterMessage message = new ClusterMessage(
1038 clusterService.getLocalNode().id(),
1039 subject,
1040 SERIALIZER.encode(event));
1041 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001042 }
Madan Jampani47c93732014-10-06 20:46:08 -07001043
Jonathan Hart7d656f42015-01-27 14:07:23 -08001044 private void notifyPeers(InternalDeviceEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001045 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001046 }
1047
Jonathan Hart7d656f42015-01-27 14:07:23 -08001048 private void notifyPeers(InternalDeviceOfflineEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001049 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -07001050 }
1051
Jonathan Hart7d656f42015-01-27 14:07:23 -08001052 private void notifyPeers(InternalDeviceRemovedEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001053 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001054 }
1055
Jonathan Hart7d656f42015-01-27 14:07:23 -08001056 private void notifyPeers(InternalPortEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001057 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001058 }
1059
Jonathan Hart7d656f42015-01-27 14:07:23 -08001060 private void notifyPeers(InternalPortStatusEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001061 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1062 }
1063
1064 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1065 try {
1066 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1067 } catch (IOException e) {
1068 log.error("Failed to send" + event + " to " + recipient, e);
1069 }
1070 }
1071
1072 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1073 try {
1074 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1075 } catch (IOException e) {
1076 log.error("Failed to send" + event + " to " + recipient, e);
1077 }
1078 }
1079
1080 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1081 try {
1082 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1083 } catch (IOException e) {
1084 log.error("Failed to send" + event + " to " + recipient, e);
1085 }
1086 }
1087
1088 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1089 try {
1090 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1091 } catch (IOException e) {
1092 log.error("Failed to send" + event + " to " + recipient, e);
1093 }
1094 }
1095
1096 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1097 try {
1098 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1099 } catch (IOException e) {
1100 log.error("Failed to send" + event + " to " + recipient, e);
1101 }
1102 }
1103
1104 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1105 final NodeId self = clusterService.getLocalNode().id();
1106
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001107 final int numDevices = deviceDescs.size();
1108 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1109 final int portsPerDevice = 8; // random factor to minimize reallocation
1110 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1111 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001112
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001113 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001114
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001115 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001116 synchronized (devDescs) {
1117
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001118 // send device offline timestamp
1119 Timestamp lOffline = this.offline.get(deviceId);
1120 if (lOffline != null) {
1121 adOffline.put(deviceId, lOffline);
1122 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001123
1124 for (Entry<ProviderId, DeviceDescriptions>
1125 prov : devDescs.entrySet()) {
1126
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001127 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001128 final ProviderId provId = prov.getKey();
1129 final DeviceDescriptions descs = prov.getValue();
1130
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001131 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001132 descs.getDeviceDesc().timestamp());
1133
1134 for (Entry<PortNumber, Timestamped<PortDescription>>
1135 portDesc : descs.getPortDescs().entrySet()) {
1136
1137 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001138 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001139 portDesc.getValue().timestamp());
1140 }
1141 }
1142 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001143 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001144
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001145 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001146 }
1147
1148 /**
1149 * Responds to anti-entropy advertisement message.
1150 * <P>
1151 * Notify sender about out-dated information using regular replication message.
1152 * Send back advertisement to sender if not in sync.
1153 *
1154 * @param advertisement to respond to
1155 */
1156 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1157
1158 final NodeId sender = advertisement.sender();
1159
1160 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1161 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1162 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1163
1164 // Fragments to request
1165 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1166 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1167
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001168 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001169 final DeviceId deviceId = de.getKey();
1170 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1171
1172 synchronized (lDevice) {
1173 // latestTimestamp across provider
1174 // Note: can be null initially
1175 Timestamp localLatest = offline.get(deviceId);
1176
1177 // handle device Ads
1178 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1179 final ProviderId provId = prov.getKey();
1180 final DeviceDescriptions lDeviceDescs = prov.getValue();
1181
1182 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1183
1184
1185 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1186 Timestamp advDevTimestamp = devAds.get(devFragId);
1187
Jonathan Hart403ea932015-02-20 16:23:00 -08001188 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1189 advDevTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001190 // remote does not have it or outdated, suggest
1191 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1192 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1193 // local is outdated, request
1194 reqDevices.add(devFragId);
1195 }
1196
1197 // handle port Ads
1198 for (Entry<PortNumber, Timestamped<PortDescription>>
1199 pe : lDeviceDescs.getPortDescs().entrySet()) {
1200
1201 final PortNumber num = pe.getKey();
1202 final Timestamped<PortDescription> lPort = pe.getValue();
1203
1204 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1205
1206 Timestamp advPortTimestamp = portAds.get(portFragId);
Jonathan Hart403ea932015-02-20 16:23:00 -08001207 if (advPortTimestamp == null || lPort.isNewerThan(
1208 advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001209 // remote does not have it or outdated, suggest
1210 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1211 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1212 // local is outdated, request
1213 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1214 reqPorts.add(portFragId);
1215 }
1216
1217 // remove port Ad already processed
1218 portAds.remove(portFragId);
1219 } // end local port loop
1220
1221 // remove device Ad already processed
1222 devAds.remove(devFragId);
1223
1224 // find latest and update
1225 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1226 if (localLatest == null ||
1227 providerLatest.compareTo(localLatest) > 0) {
1228 localLatest = providerLatest;
1229 }
1230 } // end local provider loop
1231
1232 // checking if remote timestamp is more recent.
1233 Timestamp rOffline = offlineAds.get(deviceId);
1234 if (rOffline != null &&
1235 rOffline.compareTo(localLatest) > 0) {
1236 // remote offline timestamp suggests that the
1237 // device is off-line
1238 markOfflineInternal(deviceId, rOffline);
1239 }
1240
1241 Timestamp lOffline = offline.get(deviceId);
1242 if (lOffline != null && rOffline == null) {
1243 // locally offline, but remote is online, suggest offline
1244 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1245 }
1246
1247 // remove device offline Ad already processed
1248 offlineAds.remove(deviceId);
1249 } // end local device loop
1250 } // device lock
1251
1252 // If there is any Ads left, request them
1253 log.trace("Ads left {}, {}", devAds, portAds);
1254 reqDevices.addAll(devAds.keySet());
1255 reqPorts.addAll(portAds.keySet());
1256
1257 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1258 log.trace("Nothing to request to remote peer {}", sender);
1259 return;
1260 }
1261
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001262 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001263
1264 // 2-way Anti-Entropy for now
1265 try {
1266 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1267 } catch (IOException e) {
1268 log.error("Failed to send response advertisement to " + sender, e);
1269 }
1270
1271// Sketch of 3-way Anti-Entropy
1272// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1273// ClusterMessage message = new ClusterMessage(
1274// clusterService.getLocalNode().id(),
1275// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1276// SERIALIZER.encode(request));
1277//
1278// try {
1279// clusterCommunicator.unicast(message, advertisement.sender());
1280// } catch (IOException e) {
1281// log.error("Failed to send advertisement reply to "
1282// + advertisement.sender(), e);
1283// }
Madan Jampani47c93732014-10-06 20:46:08 -07001284 }
1285
Madan Jampani255a58b2014-10-09 12:08:20 -07001286 private void notifyDelegateIfNotNull(DeviceEvent event) {
1287 if (event != null) {
1288 notifyDelegate(event);
1289 }
1290 }
1291
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001292 private final class SendAdvertisementTask implements Runnable {
1293
1294 @Override
1295 public void run() {
1296 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001297 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001298 return;
1299 }
1300
1301 try {
1302 final NodeId self = clusterService.getLocalNode().id();
1303 Set<ControllerNode> nodes = clusterService.getNodes();
1304
1305 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1306 .transform(toNodeId())
1307 .toList();
1308
1309 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001310 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001311 return;
1312 }
1313
1314 NodeId peer;
1315 do {
1316 int idx = RandomUtils.nextInt(0, nodeIds.size());
1317 peer = nodeIds.get(idx);
1318 } while (peer.equals(self));
1319
1320 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1321
1322 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001323 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001324 return;
1325 }
1326
1327 try {
1328 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1329 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001330 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001331 return;
1332 }
1333 } catch (Exception e) {
1334 // catch all Exception to avoid Scheduled task being suppressed.
1335 log.error("Exception thrown while sending advertisement", e);
1336 }
1337 }
1338 }
1339
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001340 private final class InternalDeviceEventListener
1341 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001342 @Override
1343 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001344
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001345 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001346 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001347
Madan Jampani47c93732014-10-06 20:46:08 -07001348 ProviderId providerId = event.providerId();
1349 DeviceId deviceId = event.deviceId();
1350 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001351
Madan Jampani2af244a2015-02-22 13:12:01 -08001352 try {
1353 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1354 } catch (Exception e) {
1355 log.warn("Exception thrown handling device update", e);
1356 }
Madan Jampani47c93732014-10-06 20:46:08 -07001357 }
1358 }
1359
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001360 private final class InternalDeviceOfflineEventListener
1361 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001362 @Override
1363 public void handle(ClusterMessage message) {
1364
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001365 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001366 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001367
1368 DeviceId deviceId = event.deviceId();
1369 Timestamp timestamp = event.timestamp();
1370
Madan Jampani2af244a2015-02-22 13:12:01 -08001371 try {
1372 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1373 } catch (Exception e) {
1374 log.warn("Exception thrown handling device offline", e);
1375 }
Madan Jampani25322532014-10-08 11:20:38 -07001376 }
1377 }
1378
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001379 private final class InternalRemoveRequestListener
1380 implements ClusterMessageHandler {
1381 @Override
1382 public void handle(ClusterMessage message) {
1383 log.debug("Received device remove request from peer: {}", message.sender());
1384 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001385
Madan Jampani2af244a2015-02-22 13:12:01 -08001386 try {
1387 removeDevice(did);
1388 } catch (Exception e) {
1389 log.warn("Exception thrown handling device remove", e);
1390 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001391 }
1392 }
1393
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001394 private final class InternalDeviceRemovedEventListener
1395 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001396 @Override
1397 public void handle(ClusterMessage message) {
1398
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001399 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001400 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001401
1402 DeviceId deviceId = event.deviceId();
1403 Timestamp timestamp = event.timestamp();
1404
Madan Jampani2af244a2015-02-22 13:12:01 -08001405 try {
1406 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1407 } catch (Exception e) {
1408 log.warn("Exception thrown handling device removed", e);
1409 }
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001410 }
1411 }
1412
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001413 private final class InternalPortEventListener
1414 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001415 @Override
1416 public void handle(ClusterMessage message) {
1417
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001418 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001419 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001420
1421 ProviderId providerId = event.providerId();
1422 DeviceId deviceId = event.deviceId();
1423 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1424
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001425 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001426 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001427 // Note: dropped information will be recovered by anti-entropy
1428 return;
1429 }
1430
Madan Jampani2af244a2015-02-22 13:12:01 -08001431 try {
1432 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1433 } catch (Exception e) {
1434 log.warn("Exception thrown handling port update", e);
1435 }
Madan Jampani47c93732014-10-06 20:46:08 -07001436 }
1437 }
1438
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001439 private final class InternalPortStatusEventListener
1440 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001441 @Override
1442 public void handle(ClusterMessage message) {
1443
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001444 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001445 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001446
1447 ProviderId providerId = event.providerId();
1448 DeviceId deviceId = event.deviceId();
1449 Timestamped<PortDescription> portDescription = event.portDescription();
1450
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001451 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001452 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001453 // Note: dropped information will be recovered by anti-entropy
1454 return;
1455 }
1456
Madan Jampani2af244a2015-02-22 13:12:01 -08001457 try {
1458 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1459 } catch (Exception e) {
1460 log.warn("Exception thrown handling port update", e);
1461 }
Madan Jampani47c93732014-10-06 20:46:08 -07001462 }
1463 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001464
1465 private final class InternalDeviceAdvertisementListener
1466 implements ClusterMessageHandler {
1467
1468 @Override
1469 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001470 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001471 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -08001472 try {
1473 handleAdvertisement(advertisement);
1474 } catch (Exception e) {
1475 log.warn("Exception thrown handling Device advertisements.", e);
1476 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001477 }
1478 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001479
1480 private final class DeviceInjectedEventListener
1481 implements ClusterMessageHandler {
1482 @Override
1483 public void handle(ClusterMessage message) {
1484
1485 log.debug("Received injected device event from peer: {}", message.sender());
1486 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1487
1488 ProviderId providerId = event.providerId();
1489 DeviceId deviceId = event.deviceId();
1490 DeviceDescription deviceDescription = event.deviceDescription();
1491
Madan Jampani2af244a2015-02-22 13:12:01 -08001492 try {
1493 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1494 } catch (Exception e) {
1495 log.warn("Exception thrown handling device injected event.", e);
1496 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001497 }
1498 }
1499
1500 private final class PortInjectedEventListener
1501 implements ClusterMessageHandler {
1502 @Override
1503 public void handle(ClusterMessage message) {
1504
1505 log.debug("Received injected port event from peer: {}", message.sender());
1506 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1507
1508 ProviderId providerId = event.providerId();
1509 DeviceId deviceId = event.deviceId();
1510 List<PortDescription> portDescriptions = event.portDescriptions();
1511
Madan Jampani2af244a2015-02-22 13:12:01 -08001512 try {
1513 updatePorts(providerId, deviceId, portDescriptions);
1514 } catch (Exception e) {
1515 log.warn("Exception thrown handling port injected event.", e);
1516 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001517 }
1518 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001519}