blob: 56d01a0a0f010ec3ed6405286a35e3d2da3007de [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -080019import com.google.common.base.Predicate;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import com.google.common.collect.FluentIterable;
21import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070022import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Ayaka Koshibeeeb95102015-02-26 16:31:49 -080024
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070025import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080032import org.onlab.packet.ChassisId;
33import org.onlab.util.KryoNamespace;
34import org.onlab.util.NewConcurrentHashMap;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.mastership.MastershipService;
39import org.onosproject.mastership.MastershipTerm;
40import org.onosproject.mastership.MastershipTermService;
41import org.onosproject.net.AnnotationsUtil;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.DefaultDevice;
44import org.onosproject.net.DefaultPort;
45import org.onosproject.net.Device;
46import org.onosproject.net.Device.Type;
47import org.onosproject.net.DeviceId;
48import org.onosproject.net.MastershipRole;
49import org.onosproject.net.Port;
50import org.onosproject.net.PortNumber;
51import org.onosproject.net.device.DeviceClockService;
52import org.onosproject.net.device.DeviceDescription;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceStore;
55import org.onosproject.net.device.DeviceStoreDelegate;
56import org.onosproject.net.device.PortDescription;
57import org.onosproject.net.provider.ProviderId;
58import org.onosproject.store.AbstractStore;
59import org.onosproject.store.Timestamp;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
63import org.onosproject.store.cluster.messaging.MessageSubject;
64import org.onosproject.store.impl.Timestamped;
65import org.onosproject.store.serializers.KryoSerializer;
66import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import org.slf4j.Logger;
68
Madan Jampani47c93732014-10-06 20:46:08 -070069import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070071import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074import java.util.HashSet;
75import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
78import java.util.Map.Entry;
79import java.util.Objects;
80import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070081import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080082import java.util.concurrent.ExecutorService;
83import java.util.concurrent.Executors;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070084import java.util.concurrent.ScheduledExecutorService;
85import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070086
87import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070089import static com.google.common.base.Verify.verify;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080090import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
91import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080092import static org.onlab.util.Tools.groupedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080093import static org.onlab.util.Tools.minPriority;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080094import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
95import static org.onosproject.net.DefaultAnnotations.merge;
96import static org.onosproject.net.device.DeviceEvent.Type.*;
97import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
98import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
99import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101/**
102 * Manages inventory of infrastructure devices using gossip protocol to distribute
103 * information.
104 */
105@Component(immediate = true)
106@Service
107public class GossipDeviceStore
108 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
109 implements DeviceStore {
110
111 private final Logger log = getLogger(getClass());
112
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700113 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800114 // Timeout in milliseconds to process device or ports on remote master node
115 private static final int REMOTE_MASTER_TIMEOUT = 1000;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700116
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700117 // innerMap is used to lock a Device, thus instance should never be replaced.
118 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700119 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700120 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700121
122 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700123 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
124 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
125
126 // to be updated under Device lock
127 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
128 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700129
130 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700131 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700134 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700135
Madan Jampani47c93732014-10-06 20:46:08 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected ClusterCommunicationService clusterCommunicator;
138
Madan Jampani53e44e62014-10-07 12:39:51 -0700139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected ClusterService clusterService;
141
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected MastershipService mastershipService;
144
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected MastershipTermService termService;
147
148
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700149 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700150 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700151 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700152 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800153 .register(DistributedStoreSerializers.STORE_COMMON)
154 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
155 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
156 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700157 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800158 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
159 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700160 .register(DeviceAntiEntropyAdvertisement.class)
161 .register(DeviceFragmentId.class)
162 .register(PortFragmentId.class)
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800163 .register(DeviceInjectedEvent.class)
164 .register(PortInjectedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800165 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700166 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700167 };
168
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800169 private ExecutorService executor;
170
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800171 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700172
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800173 // TODO make these anti-entropy parameters configurable
174 private long initialDelaySec = 5;
175 private long periodSec = 5;
176
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800177
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700178 @Activate
179 public void activate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700180
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800181 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800182
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800183 backgroundExecutor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800184 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700185
Madan Jampani2af244a2015-02-22 13:12:01 -0800186 clusterCommunicator.addSubscriber(
187 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
188 clusterCommunicator.addSubscriber(
189 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
190 new InternalDeviceOfflineEventListener(),
191 executor);
192 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
193 new InternalRemoveRequestListener(),
194 executor);
195 clusterCommunicator.addSubscriber(
196 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
197 clusterCommunicator.addSubscriber(
198 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
199 clusterCommunicator.addSubscriber(
200 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
201 clusterCommunicator.addSubscriber(
202 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
203 new InternalDeviceAdvertisementListener(),
204 backgroundExecutor);
205 clusterCommunicator.addSubscriber(
206 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
207 clusterCommunicator.addSubscriber(
208 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
209
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700210 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800211 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700212 initialDelaySec, periodSec, TimeUnit.SECONDS);
213
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700214 log.info("Started");
215 }
216
217 @Deactivate
218 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700219
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800220 executor.shutdownNow();
221
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800222 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700223 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800224 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700225 log.error("Timeout during executor shutdown");
226 }
227 } catch (InterruptedException e) {
228 log.error("Error during executor shutdown", e);
229 }
230
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700231 deviceDescs.clear();
232 devices.clear();
233 devicePorts.clear();
234 availableDevices.clear();
235 log.info("Stopped");
236 }
237
238 @Override
239 public int getDeviceCount() {
240 return devices.size();
241 }
242
243 @Override
244 public Iterable<Device> getDevices() {
245 return Collections.unmodifiableCollection(devices.values());
246 }
247
248 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800249 public Iterable<Device> getAvailableDevices() {
250 return FluentIterable.from(getDevices())
251 .filter(new Predicate<Device>() {
252
253 @Override
254 public boolean apply(Device input) {
255 return isAvailable(input.id());
256 }
257 });
258 }
259
260 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700261 public Device getDevice(DeviceId deviceId) {
262 return devices.get(deviceId);
263 }
264
265 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700266 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
267 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700268 DeviceDescription deviceDescription) {
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800269 NodeId localNode = clusterService.getLocalNode().id();
270 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
271
272 // Process device update only if we're the master,
273 // otherwise signal the actual master.
274 DeviceEvent deviceEvent = null;
275 if (localNode.equals(deviceNode)) {
276
277 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
278 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
279 final Timestamped<DeviceDescription> mergedDesc;
280 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
281
282 synchronized (device) {
283 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
284 mergedDesc = device.get(providerId).getDeviceDesc();
285 }
286
287 if (deviceEvent != null) {
288 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
289 providerId, deviceId);
290 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
291 }
292
293 } else {
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800294 // FIXME Temporary hack for NPE (ONOS-1171).
295 // Proper fix is to implement forwarding to master on ConfigProvider
296 // redo ONOS-490
297 if (deviceNode == null) {
298 // silently ignore
299 return null;
300 }
301
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800302
303 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
304 providerId, deviceId, deviceDescription);
305 ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
306 SERIALIZER.encode(deviceInjectedEvent));
307
308 try {
309 clusterCommunicator.unicast(clusterMessage, deviceNode);
310 } catch (IOException e) {
311 log.warn("Failed to process injected device id: {} desc: {} " +
312 "(cluster messaging failed: {})",
313 deviceId, deviceDescription, e);
314 }
315
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700316 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800317
318 return deviceEvent;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700319 }
320
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700321 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
322 DeviceId deviceId,
323 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700324
325 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800326 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700327 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700328
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800329 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700330 // locking per device
331
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700332 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
333 log.debug("Ignoring outdated event: {}", deltaDesc);
334 return null;
335 }
336
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800337 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700338
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700339 final Device oldDevice = devices.get(deviceId);
340 final Device newDevice;
341
342 if (deltaDesc == descs.getDeviceDesc() ||
343 deltaDesc.isNewer(descs.getDeviceDesc())) {
344 // on new device or valid update
345 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800346 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700347 } else {
348 // outdated event, ignored.
349 return null;
350 }
351 if (oldDevice == null) {
352 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700353 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700354 } else {
355 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700356 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 }
358 }
359 }
360
361 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700362 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700363 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700364 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365
366 // update composed device cache
367 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
368 verify(oldDevice == null,
369 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
370 providerId, oldDevice, newDevice);
371
372 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700373 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700374 }
375
376 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
377 }
378
379 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700380 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700381 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700382 Device oldDevice,
383 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700384 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700385 boolean propertiesChanged =
386 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
387 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
388 boolean annotationsChanged =
389 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700390
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700391 // Primary providers can respond to all changes, but ancillary ones
392 // should respond only to annotation changes.
393 if ((providerId.isAncillary() && annotationsChanged) ||
394 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
396 if (!replaced) {
397 verify(replaced,
398 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
399 providerId, oldDevice, devices.get(newDevice.id())
400 , newDevice);
401 }
402 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700403 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700404 }
405 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
406 }
407
408 // Otherwise merely attempt to change availability if primary provider
409 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700410 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700411 return !added ? null :
412 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
413 }
414 return null;
415 }
416
417 @Override
418 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700419 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700420 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700421 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700422 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
423 deviceId, timestamp);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800424 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700425 }
426 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700427 }
428
429 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
430
431 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700432 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700433
434 // locking device
435 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700436
437 // accept off-line if given timestamp is newer than
438 // the latest Timestamp from Primary provider
439 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
440 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
441 if (timestamp.compareTo(lastTimestamp) <= 0) {
442 // outdated event ignore
443 return null;
444 }
445
446 offline.put(deviceId, timestamp);
447
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700448 Device device = devices.get(deviceId);
449 if (device == null) {
450 return null;
451 }
452 boolean removed = availableDevices.remove(deviceId);
453 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700454 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700455 }
456 return null;
457 }
458 }
459
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700460 /**
461 * Marks the device as available if the given timestamp is not outdated,
462 * compared to the time the device has been marked offline.
463 *
464 * @param deviceId identifier of the device
465 * @param timestamp of the event triggering this change.
466 * @return true if availability change request was accepted and changed the state
467 */
468 // Guarded by deviceDescs value (=Device lock)
469 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
470 // accept on-line if given timestamp is newer than
471 // the latest offline request Timestamp
472 Timestamp offlineTimestamp = offline.get(deviceId);
473 if (offlineTimestamp == null ||
474 offlineTimestamp.compareTo(timestamp) < 0) {
475
476 offline.remove(deviceId);
477 return availableDevices.add(deviceId);
478 }
479 return false;
480 }
481
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700483 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
484 DeviceId deviceId,
485 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700486
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800487 NodeId localNode = clusterService.getLocalNode().id();
488 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
489 // since it will trigger distributed store read.
490 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
491 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
492 // If we don't care much about topology performance, then it might be OK.
493 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700494
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800495 // Process port update only if we're the master of the device,
496 // otherwise signal the actual master.
497 List<DeviceEvent> deviceEvents = null;
498 if (localNode.equals(deviceNode)) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700499
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800500 final Timestamp newTimestamp;
501 try {
502 newTimestamp = deviceClockService.getTimestamp(deviceId);
503 } catch (IllegalStateException e) {
504 log.info("Timestamp was not available for device {}", deviceId);
505 log.debug(" discarding {}", portDescriptions);
506 // Failed to generate timestamp.
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700507
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800508 // Possible situation:
509 // Device connected and became master for short period of time,
510 // but lost mastership before this instance had the chance to
511 // retrieve term information.
512
513 // Information dropped here is expected to be recoverable by
514 // device probing after mastership change
515
516 return Collections.emptyList();
517 }
518 log.debug("timestamp for {} {}", deviceId, newTimestamp);
519
520 final Timestamped<List<PortDescription>> timestampedInput
521 = new Timestamped<>(portDescriptions, newTimestamp);
522 final Timestamped<List<PortDescription>> merged;
523
524 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
525
526 synchronized (device) {
527 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
528 final DeviceDescriptions descs = device.get(providerId);
529 List<PortDescription> mergedList =
530 FluentIterable.from(portDescriptions)
531 .transform(new Function<PortDescription, PortDescription>() {
532 @Override
533 public PortDescription apply(PortDescription input) {
534 // lookup merged port description
535 return descs.getPortDesc(input.portNumber()).value();
536 }
537 }).toList();
538 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
539 }
540
541 if (!deviceEvents.isEmpty()) {
542 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
543 providerId, deviceId);
544 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
545 }
546
547 } else {
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800548 // FIXME Temporary hack for NPE (ONOS-1171).
549 // Proper fix is to implement forwarding to master on ConfigProvider
550 // redo ONOS-490
551 if (deviceNode == null) {
552 // silently ignore
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800553 return Collections.emptyList();
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800554 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800555
556 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
557 ClusterMessage clusterMessage = new ClusterMessage(
558 localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
559 try {
560 clusterCommunicator.unicast(clusterMessage, deviceNode);
561 } catch (IOException e) {
562 log.warn("Failed to process injected ports of device id: {} " +
563 "(cluster messaging failed: {})",
564 deviceId, e);
565 }
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700566 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700567
Ayaka Koshibeeeb95102015-02-26 16:31:49 -0800568 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700569 }
570
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700571 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
572 DeviceId deviceId,
573 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700574
575 Device device = devices.get(deviceId);
576 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
577
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700578 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700579 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
580
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700581 List<DeviceEvent> events = new ArrayList<>();
582 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700583
584 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
585 log.debug("Ignoring outdated events: {}", portDescriptions);
586 return null;
587 }
588
589 DeviceDescriptions descs = descsMap.get(providerId);
590 // every provider must provide DeviceDescription.
591 checkArgument(descs != null,
592 "Device description for Device ID %s from Provider %s was not found",
593 deviceId, providerId);
594
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700595 Map<PortNumber, Port> ports = getPortMap(deviceId);
596
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700597 final Timestamp newTimestamp = portDescriptions.timestamp();
598
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700599 // Add new ports
600 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700601 for (PortDescription portDescription : portDescriptions.value()) {
602 final PortNumber number = portDescription.portNumber();
603 processed.add(number);
604
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700605 final Port oldPort = ports.get(number);
606 final Port newPort;
607
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700608
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700609 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
610 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700611 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700612 // on new port or valid update
613 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700614 descs.putPortDesc(new Timestamped<>(portDescription,
615 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700616 newPort = composePort(device, number, descsMap);
617 } else {
618 // outdated event, ignored.
619 continue;
620 }
621
622 events.add(oldPort == null ?
623 createPort(device, newPort, ports) :
624 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700625 }
626
627 events.addAll(pruneOldPorts(device, ports, processed));
628 }
629 return FluentIterable.from(events).filter(notNull()).toList();
630 }
631
632 // Creates a new port based on the port description adds it to the map and
633 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700634 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700635 private DeviceEvent createPort(Device device, Port newPort,
636 Map<PortNumber, Port> ports) {
637 ports.put(newPort.number(), newPort);
638 return new DeviceEvent(PORT_ADDED, device, newPort);
639 }
640
641 // Checks if the specified port requires update and if so, it replaces the
642 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700643 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700644 private DeviceEvent updatePort(Device device, Port oldPort,
645 Port newPort,
646 Map<PortNumber, Port> ports) {
647 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700648 oldPort.type() != newPort.type() ||
649 oldPort.portSpeed() != newPort.portSpeed() ||
650 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700651 ports.put(oldPort.number(), newPort);
652 return new DeviceEvent(PORT_UPDATED, device, newPort);
653 }
654 return null;
655 }
656
657 // Prunes the specified list of ports based on which ports are in the
658 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700659 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700660 private List<DeviceEvent> pruneOldPorts(Device device,
661 Map<PortNumber, Port> ports,
662 Set<PortNumber> processed) {
663 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700664 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700665 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700666 Entry<PortNumber, Port> e = iterator.next();
667 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700668 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700669 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700670 iterator.remove();
671 }
672 }
673 return events;
674 }
675
676 // Gets the map of ports for the specified device; if one does not already
677 // exist, it creates and registers a new one.
678 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
679 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700680 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
681 }
682
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700683 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700684 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700685 Map<ProviderId, DeviceDescriptions> r;
686 r = deviceDescs.get(deviceId);
687 if (r == null) {
688 r = new HashMap<ProviderId, DeviceDescriptions>();
689 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
690 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
691 if (concurrentlyAdded != null) {
692 r = concurrentlyAdded;
693 }
694 }
695 return r;
696 }
697
698 // Guarded by deviceDescs value (=Device lock)
699 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
700 Map<ProviderId, DeviceDescriptions> device,
701 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
702
703 synchronized (device) {
704 DeviceDescriptions r = device.get(providerId);
705 if (r == null) {
706 r = new DeviceDescriptions(deltaDesc);
707 device.put(providerId, r);
708 }
709 return r;
710 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700711 }
712
713 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700714 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
715 DeviceId deviceId,
716 PortDescription portDescription) {
717
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700718 final Timestamp newTimestamp;
719 try {
720 newTimestamp = deviceClockService.getTimestamp(deviceId);
721 } catch (IllegalStateException e) {
722 log.info("Timestamp was not available for device {}", deviceId);
723 log.debug(" discarding {}", portDescription);
724 // Failed to generate timestamp. Ignoring.
725 // See updatePorts comment
726 return null;
727 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700728 final Timestamped<PortDescription> deltaDesc
729 = new Timestamped<>(portDescription, newTimestamp);
730 final DeviceEvent event;
731 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800732 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
733 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700734 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800735 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700736 .getPortDesc(portDescription.portNumber());
737 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700738 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700739 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
740 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800741 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700742 }
743 return event;
744 }
745
746 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
747 Timestamped<PortDescription> deltaDesc) {
748
749 Device device = devices.get(deviceId);
750 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
751
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700752 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700753 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
754
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700755 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700756
757 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
758 log.debug("Ignoring outdated event: {}", deltaDesc);
759 return null;
760 }
761
762 DeviceDescriptions descs = descsMap.get(providerId);
763 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700764 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700765 "Device description for Device ID %s from Provider %s was not found",
766 deviceId, providerId);
767
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700768 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
769 final PortNumber number = deltaDesc.value().portNumber();
770 final Port oldPort = ports.get(number);
771 final Port newPort;
772
773 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
774 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700775 deltaDesc.isNewer(existingPortDesc)) {
776 // on new port or valid update
777 // update description
778 descs.putPortDesc(deltaDesc);
779 newPort = composePort(device, number, descsMap);
780 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700781 // same or outdated event, ignored.
782 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700783 return null;
784 }
785
786 if (oldPort == null) {
787 return createPort(device, newPort, ports);
788 } else {
789 return updatePort(device, oldPort, newPort, ports);
790 }
791 }
792 }
793
794 @Override
795 public List<Port> getPorts(DeviceId deviceId) {
796 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
797 if (ports == null) {
798 return Collections.emptyList();
799 }
800 return ImmutableList.copyOf(ports.values());
801 }
802
803 @Override
804 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
805 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
806 return ports == null ? null : ports.get(portNumber);
807 }
808
809 @Override
810 public boolean isAvailable(DeviceId deviceId) {
811 return availableDevices.contains(deviceId);
812 }
813
814 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700815 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800816 final NodeId myId = clusterService.getLocalNode().id();
817 NodeId master = mastershipService.getMasterFor(deviceId);
818
819 // if there exist a master, forward
820 // if there is no master, try to become one and process
821
822 boolean relinquishAtEnd = false;
823 if (master == null) {
824 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
825 if (myRole != MastershipRole.NONE) {
826 relinquishAtEnd = true;
827 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800828 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800829 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800830 MastershipTerm term = termService.getMastershipTerm(deviceId);
831 if (myId.equals(term.master())) {
832 master = myId;
833 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700834 }
835
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800836 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800837 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800838 master, deviceId);
839
840 ClusterMessage message = new ClusterMessage(
841 myId,
842 DEVICE_REMOVE_REQ,
843 SERIALIZER.encode(deviceId));
844
845 try {
846 clusterCommunicator.unicast(message, master);
847 } catch (IOException e) {
848 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
849 }
850
851 // event will be triggered after master processes it.
852 return null;
853 }
854
855 // I have control..
856
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700857 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700858 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700859 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800860 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700861 deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800862 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700863 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800864 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800865 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800866 mastershipService.relinquishMastership(deviceId);
867 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700868 return event;
869 }
870
871 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
872 Timestamp timestamp) {
873
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700874 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700875 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700876 // accept removal request if given timestamp is newer than
877 // the latest Timestamp from Primary provider
878 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
879 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
880 if (timestamp.compareTo(lastTimestamp) <= 0) {
881 // outdated event ignore
882 return null;
883 }
884 removalRequest.put(deviceId, timestamp);
885
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700886 Device device = devices.remove(deviceId);
887 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700888 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
889 if (ports != null) {
890 ports.clear();
891 }
892 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700893 descs.clear();
894 return device == null ? null :
895 new DeviceEvent(DEVICE_REMOVED, device, null);
896 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700897 }
898
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700899 /**
900 * Checks if given timestamp is superseded by removal request
901 * with more recent timestamp.
902 *
903 * @param deviceId identifier of a device
904 * @param timestampToCheck timestamp of an event to check
905 * @return true if device is already removed
906 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700907 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
908 Timestamp removalTimestamp = removalRequest.get(deviceId);
909 if (removalTimestamp != null &&
910 removalTimestamp.compareTo(timestampToCheck) >= 0) {
911 // removalRequest is more recent
912 return true;
913 }
914 return false;
915 }
916
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700917 /**
918 * Returns a Device, merging description given from multiple Providers.
919 *
920 * @param deviceId device identifier
921 * @param providerDescs Collection of Descriptions from multiple providers
922 * @return Device instance
923 */
924 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700925 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700926
Thomas Vachuska444eda62014-10-28 13:09:42 -0700927 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700928
929 ProviderId primary = pickPrimaryPID(providerDescs);
930
931 DeviceDescriptions desc = providerDescs.get(primary);
932
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700933 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700934 Type type = base.type();
935 String manufacturer = base.manufacturer();
936 String hwVersion = base.hwVersion();
937 String swVersion = base.swVersion();
938 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700939 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700940 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
941 annotations = merge(annotations, base.annotations());
942
943 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
944 if (e.getKey().equals(primary)) {
945 continue;
946 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800947 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700948 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700949 // Currently assuming there will never be a key conflict between
950 // providers
951
952 // annotation merging. not so efficient, should revisit later
953 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
954 }
955
956 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700957 hwVersion, swVersion, serialNumber,
958 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700959 }
960
961 /**
962 * Returns a Port, merging description given from multiple Providers.
963 *
964 * @param device device the port is on
965 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700966 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700967 * @return Port instance
968 */
969 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700970 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700971
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700972 ProviderId primary = pickPrimaryPID(descsMap);
973 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700974 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700975 boolean isEnabled = false;
976 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
977
978 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
979 if (portDesc != null) {
980 isEnabled = portDesc.value().isEnabled();
981 annotations = merge(annotations, portDesc.value().annotations());
982 }
983
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700984 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700985 if (e.getKey().equals(primary)) {
986 continue;
987 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800988 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700989 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700990 // Currently assuming there will never be a key conflict between
991 // providers
992
993 // annotation merging. not so efficient, should revisit later
994 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
995 if (otherPortDesc != null) {
996 annotations = merge(annotations, otherPortDesc.value().annotations());
997 }
998 }
999
Thomas Vachuskad16ce182014-10-29 17:25:29 -07001000 return portDesc == null ?
1001 new DefaultPort(device, number, false, annotations) :
1002 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
1003 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001004 }
1005
1006 /**
1007 * @return primary ProviderID, or randomly chosen one if none exists
1008 */
1009 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001010 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001011 ProviderId fallBackPrimary = null;
1012 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1013 if (!e.getKey().isAncillary()) {
1014 return e.getKey();
1015 } else if (fallBackPrimary == null) {
1016 // pick randomly as a fallback in case there is no primary
1017 fallBackPrimary = e.getKey();
1018 }
1019 }
1020 return fallBackPrimary;
1021 }
1022
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07001023 private DeviceDescriptions getPrimaryDescriptions(
1024 Map<ProviderId, DeviceDescriptions> providerDescs) {
1025 ProviderId pid = pickPrimaryPID(providerDescs);
1026 return providerDescs.get(pid);
1027 }
1028
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001029 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1030 ClusterMessage message = new ClusterMessage(
1031 clusterService.getLocalNode().id(),
1032 subject,
1033 SERIALIZER.encode(event));
1034 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001035 }
1036
Jonathan Hart7d656f42015-01-27 14:07:23 -08001037 private void broadcastMessage(MessageSubject subject, Object event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001038 ClusterMessage message = new ClusterMessage(
1039 clusterService.getLocalNode().id(),
1040 subject,
1041 SERIALIZER.encode(event));
1042 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001043 }
Madan Jampani47c93732014-10-06 20:46:08 -07001044
Jonathan Hart7d656f42015-01-27 14:07:23 -08001045 private void notifyPeers(InternalDeviceEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001046 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001047 }
1048
Jonathan Hart7d656f42015-01-27 14:07:23 -08001049 private void notifyPeers(InternalDeviceOfflineEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001050 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -07001051 }
1052
Jonathan Hart7d656f42015-01-27 14:07:23 -08001053 private void notifyPeers(InternalDeviceRemovedEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001054 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001055 }
1056
Jonathan Hart7d656f42015-01-27 14:07:23 -08001057 private void notifyPeers(InternalPortEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001058 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -07001059 }
1060
Jonathan Hart7d656f42015-01-27 14:07:23 -08001061 private void notifyPeers(InternalPortStatusEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001062 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1063 }
1064
1065 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1066 try {
1067 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1068 } catch (IOException e) {
1069 log.error("Failed to send" + event + " to " + recipient, e);
1070 }
1071 }
1072
1073 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1074 try {
1075 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1076 } catch (IOException e) {
1077 log.error("Failed to send" + event + " to " + recipient, e);
1078 }
1079 }
1080
1081 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1082 try {
1083 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1084 } catch (IOException e) {
1085 log.error("Failed to send" + event + " to " + recipient, e);
1086 }
1087 }
1088
1089 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1090 try {
1091 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1092 } catch (IOException e) {
1093 log.error("Failed to send" + event + " to " + recipient, e);
1094 }
1095 }
1096
1097 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1098 try {
1099 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1100 } catch (IOException e) {
1101 log.error("Failed to send" + event + " to " + recipient, e);
1102 }
1103 }
1104
1105 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1106 final NodeId self = clusterService.getLocalNode().id();
1107
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001108 final int numDevices = deviceDescs.size();
1109 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1110 final int portsPerDevice = 8; // random factor to minimize reallocation
1111 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1112 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001113
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001114 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001115
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001116 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001117 synchronized (devDescs) {
1118
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001119 // send device offline timestamp
1120 Timestamp lOffline = this.offline.get(deviceId);
1121 if (lOffline != null) {
1122 adOffline.put(deviceId, lOffline);
1123 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001124
1125 for (Entry<ProviderId, DeviceDescriptions>
1126 prov : devDescs.entrySet()) {
1127
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001128 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001129 final ProviderId provId = prov.getKey();
1130 final DeviceDescriptions descs = prov.getValue();
1131
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001132 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001133 descs.getDeviceDesc().timestamp());
1134
1135 for (Entry<PortNumber, Timestamped<PortDescription>>
1136 portDesc : descs.getPortDescs().entrySet()) {
1137
1138 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001139 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001140 portDesc.getValue().timestamp());
1141 }
1142 }
1143 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001144 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001145
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001146 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001147 }
1148
1149 /**
1150 * Responds to anti-entropy advertisement message.
1151 * <P>
1152 * Notify sender about out-dated information using regular replication message.
1153 * Send back advertisement to sender if not in sync.
1154 *
1155 * @param advertisement to respond to
1156 */
1157 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1158
1159 final NodeId sender = advertisement.sender();
1160
1161 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1162 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1163 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1164
1165 // Fragments to request
1166 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1167 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1168
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001169 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001170 final DeviceId deviceId = de.getKey();
1171 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1172
1173 synchronized (lDevice) {
1174 // latestTimestamp across provider
1175 // Note: can be null initially
1176 Timestamp localLatest = offline.get(deviceId);
1177
1178 // handle device Ads
1179 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1180 final ProviderId provId = prov.getKey();
1181 final DeviceDescriptions lDeviceDescs = prov.getValue();
1182
1183 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1184
1185
1186 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1187 Timestamp advDevTimestamp = devAds.get(devFragId);
1188
Jonathan Hart403ea932015-02-20 16:23:00 -08001189 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1190 advDevTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001191 // remote does not have it or outdated, suggest
1192 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1193 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1194 // local is outdated, request
1195 reqDevices.add(devFragId);
1196 }
1197
1198 // handle port Ads
1199 for (Entry<PortNumber, Timestamped<PortDescription>>
1200 pe : lDeviceDescs.getPortDescs().entrySet()) {
1201
1202 final PortNumber num = pe.getKey();
1203 final Timestamped<PortDescription> lPort = pe.getValue();
1204
1205 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1206
1207 Timestamp advPortTimestamp = portAds.get(portFragId);
Jonathan Hart403ea932015-02-20 16:23:00 -08001208 if (advPortTimestamp == null || lPort.isNewerThan(
1209 advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001210 // remote does not have it or outdated, suggest
1211 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1212 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1213 // local is outdated, request
1214 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1215 reqPorts.add(portFragId);
1216 }
1217
1218 // remove port Ad already processed
1219 portAds.remove(portFragId);
1220 } // end local port loop
1221
1222 // remove device Ad already processed
1223 devAds.remove(devFragId);
1224
1225 // find latest and update
1226 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1227 if (localLatest == null ||
1228 providerLatest.compareTo(localLatest) > 0) {
1229 localLatest = providerLatest;
1230 }
1231 } // end local provider loop
1232
1233 // checking if remote timestamp is more recent.
1234 Timestamp rOffline = offlineAds.get(deviceId);
1235 if (rOffline != null &&
1236 rOffline.compareTo(localLatest) > 0) {
1237 // remote offline timestamp suggests that the
1238 // device is off-line
1239 markOfflineInternal(deviceId, rOffline);
1240 }
1241
1242 Timestamp lOffline = offline.get(deviceId);
1243 if (lOffline != null && rOffline == null) {
1244 // locally offline, but remote is online, suggest offline
1245 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1246 }
1247
1248 // remove device offline Ad already processed
1249 offlineAds.remove(deviceId);
1250 } // end local device loop
1251 } // device lock
1252
1253 // If there is any Ads left, request them
1254 log.trace("Ads left {}, {}", devAds, portAds);
1255 reqDevices.addAll(devAds.keySet());
1256 reqPorts.addAll(portAds.keySet());
1257
1258 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1259 log.trace("Nothing to request to remote peer {}", sender);
1260 return;
1261 }
1262
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001263 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001264
1265 // 2-way Anti-Entropy for now
1266 try {
1267 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1268 } catch (IOException e) {
1269 log.error("Failed to send response advertisement to " + sender, e);
1270 }
1271
1272// Sketch of 3-way Anti-Entropy
1273// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1274// ClusterMessage message = new ClusterMessage(
1275// clusterService.getLocalNode().id(),
1276// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1277// SERIALIZER.encode(request));
1278//
1279// try {
1280// clusterCommunicator.unicast(message, advertisement.sender());
1281// } catch (IOException e) {
1282// log.error("Failed to send advertisement reply to "
1283// + advertisement.sender(), e);
1284// }
Madan Jampani47c93732014-10-06 20:46:08 -07001285 }
1286
Madan Jampani255a58b2014-10-09 12:08:20 -07001287 private void notifyDelegateIfNotNull(DeviceEvent event) {
1288 if (event != null) {
1289 notifyDelegate(event);
1290 }
1291 }
1292
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001293 private final class SendAdvertisementTask implements Runnable {
1294
1295 @Override
1296 public void run() {
1297 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001298 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001299 return;
1300 }
1301
1302 try {
1303 final NodeId self = clusterService.getLocalNode().id();
1304 Set<ControllerNode> nodes = clusterService.getNodes();
1305
1306 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1307 .transform(toNodeId())
1308 .toList();
1309
1310 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001311 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001312 return;
1313 }
1314
1315 NodeId peer;
1316 do {
1317 int idx = RandomUtils.nextInt(0, nodeIds.size());
1318 peer = nodeIds.get(idx);
1319 } while (peer.equals(self));
1320
1321 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1322
1323 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001324 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001325 return;
1326 }
1327
1328 try {
1329 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1330 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001331 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001332 return;
1333 }
1334 } catch (Exception e) {
1335 // catch all Exception to avoid Scheduled task being suppressed.
1336 log.error("Exception thrown while sending advertisement", e);
1337 }
1338 }
1339 }
1340
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001341 private final class InternalDeviceEventListener
1342 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001343 @Override
1344 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001345
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001346 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001347 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001348
Madan Jampani47c93732014-10-06 20:46:08 -07001349 ProviderId providerId = event.providerId();
1350 DeviceId deviceId = event.deviceId();
1351 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001352
Madan Jampani2af244a2015-02-22 13:12:01 -08001353 try {
1354 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1355 } catch (Exception e) {
1356 log.warn("Exception thrown handling device update", e);
1357 }
Madan Jampani47c93732014-10-06 20:46:08 -07001358 }
1359 }
1360
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001361 private final class InternalDeviceOfflineEventListener
1362 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001363 @Override
1364 public void handle(ClusterMessage message) {
1365
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001366 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001367 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001368
1369 DeviceId deviceId = event.deviceId();
1370 Timestamp timestamp = event.timestamp();
1371
Madan Jampani2af244a2015-02-22 13:12:01 -08001372 try {
1373 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1374 } catch (Exception e) {
1375 log.warn("Exception thrown handling device offline", e);
1376 }
Madan Jampani25322532014-10-08 11:20:38 -07001377 }
1378 }
1379
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001380 private final class InternalRemoveRequestListener
1381 implements ClusterMessageHandler {
1382 @Override
1383 public void handle(ClusterMessage message) {
1384 log.debug("Received device remove request from peer: {}", message.sender());
1385 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001386
Madan Jampani2af244a2015-02-22 13:12:01 -08001387 try {
1388 removeDevice(did);
1389 } catch (Exception e) {
1390 log.warn("Exception thrown handling device remove", e);
1391 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001392 }
1393 }
1394
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001395 private final class InternalDeviceRemovedEventListener
1396 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001397 @Override
1398 public void handle(ClusterMessage message) {
1399
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001400 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001401 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001402
1403 DeviceId deviceId = event.deviceId();
1404 Timestamp timestamp = event.timestamp();
1405
Madan Jampani2af244a2015-02-22 13:12:01 -08001406 try {
1407 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1408 } catch (Exception e) {
1409 log.warn("Exception thrown handling device removed", e);
1410 }
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001411 }
1412 }
1413
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001414 private final class InternalPortEventListener
1415 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001416 @Override
1417 public void handle(ClusterMessage message) {
1418
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001419 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001420 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001421
1422 ProviderId providerId = event.providerId();
1423 DeviceId deviceId = event.deviceId();
1424 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1425
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001426 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001427 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001428 // Note: dropped information will be recovered by anti-entropy
1429 return;
1430 }
1431
Madan Jampani2af244a2015-02-22 13:12:01 -08001432 try {
1433 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1434 } catch (Exception e) {
1435 log.warn("Exception thrown handling port update", e);
1436 }
Madan Jampani47c93732014-10-06 20:46:08 -07001437 }
1438 }
1439
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001440 private final class InternalPortStatusEventListener
1441 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001442 @Override
1443 public void handle(ClusterMessage message) {
1444
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001445 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001446 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001447
1448 ProviderId providerId = event.providerId();
1449 DeviceId deviceId = event.deviceId();
1450 Timestamped<PortDescription> portDescription = event.portDescription();
1451
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001452 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001453 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001454 // Note: dropped information will be recovered by anti-entropy
1455 return;
1456 }
1457
Madan Jampani2af244a2015-02-22 13:12:01 -08001458 try {
1459 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1460 } catch (Exception e) {
1461 log.warn("Exception thrown handling port update", e);
1462 }
Madan Jampani47c93732014-10-06 20:46:08 -07001463 }
1464 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001465
1466 private final class InternalDeviceAdvertisementListener
1467 implements ClusterMessageHandler {
1468
1469 @Override
1470 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001471 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001472 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -08001473 try {
1474 handleAdvertisement(advertisement);
1475 } catch (Exception e) {
1476 log.warn("Exception thrown handling Device advertisements.", e);
1477 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001478 }
1479 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001480
1481 private final class DeviceInjectedEventListener
1482 implements ClusterMessageHandler {
1483 @Override
1484 public void handle(ClusterMessage message) {
1485
1486 log.debug("Received injected device event from peer: {}", message.sender());
1487 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1488
1489 ProviderId providerId = event.providerId();
1490 DeviceId deviceId = event.deviceId();
1491 DeviceDescription deviceDescription = event.deviceDescription();
1492
Madan Jampani2af244a2015-02-22 13:12:01 -08001493 try {
1494 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1495 } catch (Exception e) {
1496 log.warn("Exception thrown handling device injected event.", e);
1497 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001498 }
1499 }
1500
1501 private final class PortInjectedEventListener
1502 implements ClusterMessageHandler {
1503 @Override
1504 public void handle(ClusterMessage message) {
1505
1506 log.debug("Received injected port event from peer: {}", message.sender());
1507 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1508
1509 ProviderId providerId = event.providerId();
1510 DeviceId deviceId = event.deviceId();
1511 List<PortDescription> portDescriptions = event.portDescriptions();
1512
Madan Jampani2af244a2015-02-22 13:12:01 -08001513 try {
1514 updatePorts(providerId, deviceId, portDescriptions);
1515 } catch (Exception e) {
1516 log.warn("Exception thrown handling port injected event.", e);
1517 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -08001518 }
1519 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001520}