blob: 3a94087f41c4d2f71a199de310fecdf185c79469 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -080019import com.google.common.base.Predicate;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import com.google.common.collect.FluentIterable;
21import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070022import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -070024
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070025import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.mastership.MastershipService;
36import org.onosproject.mastership.MastershipTerm;
37import org.onosproject.mastership.MastershipTermService;
38import org.onosproject.net.AnnotationsUtil;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.DefaultDevice;
41import org.onosproject.net.DefaultPort;
42import org.onosproject.net.Device;
43import org.onosproject.net.Device.Type;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.MastershipRole;
46import org.onosproject.net.Port;
47import org.onosproject.net.PortNumber;
48import org.onosproject.net.device.DeviceClockService;
49import org.onosproject.net.device.DeviceDescription;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceStore;
52import org.onosproject.net.device.DeviceStoreDelegate;
53import org.onosproject.net.device.PortDescription;
54import org.onosproject.net.provider.ProviderId;
55import org.onosproject.store.AbstractStore;
56import org.onosproject.store.Timestamp;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58import org.onosproject.store.cluster.messaging.ClusterMessage;
59import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60import org.onosproject.store.cluster.messaging.MessageSubject;
61import org.onosproject.store.impl.Timestamped;
62import org.onosproject.store.serializers.KryoSerializer;
63import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070064import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070065import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070066import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import org.slf4j.Logger;
68
Madan Jampani47c93732014-10-06 20:46:08 -070069import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070071import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074import java.util.HashSet;
75import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
78import java.util.Map.Entry;
79import java.util.Objects;
80import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070081import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080082import java.util.concurrent.ExecutorService;
83import java.util.concurrent.Executors;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070084import java.util.concurrent.ScheduledExecutorService;
85import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070086
87import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import static com.google.common.base.Predicates.notNull;
Brian O'Connorabafb502014-12-02 22:26:20 -080089import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
90import static org.onosproject.net.device.DeviceEvent.Type.*;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091import static org.slf4j.LoggerFactory.getLogger;
92import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorabafb502014-12-02 22:26:20 -080093import static org.onosproject.net.DefaultAnnotations.merge;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094import static com.google.common.base.Verify.verify;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080095import static org.onlab.util.Tools.minPriority;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070096import static org.onlab.util.Tools.namedThreads;
97import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorabafb502014-12-02 22:26:20 -080098import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
99import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101/**
102 * Manages inventory of infrastructure devices using gossip protocol to distribute
103 * information.
104 */
105@Component(immediate = true)
106@Service
107public class GossipDeviceStore
108 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
109 implements DeviceStore {
110
111 private final Logger log = getLogger(getClass());
112
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700113 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700114
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115 // innerMap is used to lock a Device, thus instance should never be replaced.
116 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700117 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700118 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700119
120 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700121 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123
124 // to be updated under Device lock
125 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
126 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700127
128 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700129 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700132 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700133
Madan Jampani47c93732014-10-06 20:46:08 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterCommunicationService clusterCommunicator;
136
Madan Jampani53e44e62014-10-07 12:39:51 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterService clusterService;
139
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected MastershipService mastershipService;
142
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected MastershipTermService termService;
145
146
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700147 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700148 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700149 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700150 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800151 .register(DistributedStoreSerializers.STORE_COMMON)
152 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
153 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
154 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700155 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800156 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
157 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700158 .register(DeviceAntiEntropyAdvertisement.class)
159 .register(DeviceFragmentId.class)
160 .register(PortFragmentId.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800161 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700162 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700163 };
164
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800165 private ExecutorService executor;
166
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800167 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700168
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800169 // TODO make these anti-entropy parameters configurable
170 private long initialDelaySec = 5;
171 private long periodSec = 5;
172
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800173
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700174 @Activate
175 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700176 clusterCommunicator.addSubscriber(
177 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
178 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700179 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800180 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
Madan Jampani25322532014-10-08 11:20:38 -0700181 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700182 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
183 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700184 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
185 clusterCommunicator.addSubscriber(
186 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700187 clusterCommunicator.addSubscriber(
188 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
189
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800190 executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800191
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800192 backgroundExecutor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800193 newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-device-bg-%d")));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700194
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700195 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800196 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700197 initialDelaySec, periodSec, TimeUnit.SECONDS);
198
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700199 log.info("Started");
200 }
201
202 @Deactivate
203 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700204
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800205 executor.shutdownNow();
206
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800207 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700208 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800209 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700210 log.error("Timeout during executor shutdown");
211 }
212 } catch (InterruptedException e) {
213 log.error("Error during executor shutdown", e);
214 }
215
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700216 deviceDescs.clear();
217 devices.clear();
218 devicePorts.clear();
219 availableDevices.clear();
220 log.info("Stopped");
221 }
222
223 @Override
224 public int getDeviceCount() {
225 return devices.size();
226 }
227
228 @Override
229 public Iterable<Device> getDevices() {
230 return Collections.unmodifiableCollection(devices.values());
231 }
232
233 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800234 public Iterable<Device> getAvailableDevices() {
235 return FluentIterable.from(getDevices())
236 .filter(new Predicate<Device>() {
237
238 @Override
239 public boolean apply(Device input) {
240 return isAvailable(input.id());
241 }
242 });
243 }
244
245 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700246 public Device getDevice(DeviceId deviceId) {
247 return devices.get(deviceId);
248 }
249
250 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700251 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
252 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700253 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700254 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700255 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700256 final DeviceEvent event;
257 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800258 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
259 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700260 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800261 mergedDesc = device.get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700262 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700264 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
265 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800266 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700267 }
268 return event;
269 }
270
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
272 DeviceId deviceId,
273 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274
275 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800276 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700277 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700278
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800279 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700280 // locking per device
281
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
283 log.debug("Ignoring outdated event: {}", deltaDesc);
284 return null;
285 }
286
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800287 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700288
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700289 final Device oldDevice = devices.get(deviceId);
290 final Device newDevice;
291
292 if (deltaDesc == descs.getDeviceDesc() ||
293 deltaDesc.isNewer(descs.getDeviceDesc())) {
294 // on new device or valid update
295 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800296 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700297 } else {
298 // outdated event, ignored.
299 return null;
300 }
301 if (oldDevice == null) {
302 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700303 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 } else {
305 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700306 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700307 }
308 }
309 }
310
311 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700312 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700313 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700314 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700315
316 // update composed device cache
317 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
318 verify(oldDevice == null,
319 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
320 providerId, oldDevice, newDevice);
321
322 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700323 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700324 }
325
326 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
327 }
328
329 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700330 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700331 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700332 Device oldDevice,
333 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700334 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700335 boolean propertiesChanged =
336 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
337 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
338 boolean annotationsChanged =
339 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700340
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700341 // Primary providers can respond to all changes, but ancillary ones
342 // should respond only to annotation changes.
343 if ((providerId.isAncillary() && annotationsChanged) ||
344 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700345 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
346 if (!replaced) {
347 verify(replaced,
348 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
349 providerId, oldDevice, devices.get(newDevice.id())
350 , newDevice);
351 }
352 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700353 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700354 }
355 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
356 }
357
358 // Otherwise merely attempt to change availability if primary provider
359 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700360 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 return !added ? null :
362 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
363 }
364 return null;
365 }
366
367 @Override
368 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700369 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700370 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700371 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700372 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
373 deviceId, timestamp);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800374 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700375 }
376 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700377 }
378
379 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
380
381 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700382 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700383
384 // locking device
385 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700386
387 // accept off-line if given timestamp is newer than
388 // the latest Timestamp from Primary provider
389 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
390 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
391 if (timestamp.compareTo(lastTimestamp) <= 0) {
392 // outdated event ignore
393 return null;
394 }
395
396 offline.put(deviceId, timestamp);
397
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700398 Device device = devices.get(deviceId);
399 if (device == null) {
400 return null;
401 }
402 boolean removed = availableDevices.remove(deviceId);
403 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700404 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700405 }
406 return null;
407 }
408 }
409
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700410 /**
411 * Marks the device as available if the given timestamp is not outdated,
412 * compared to the time the device has been marked offline.
413 *
414 * @param deviceId identifier of the device
415 * @param timestamp of the event triggering this change.
416 * @return true if availability change request was accepted and changed the state
417 */
418 // Guarded by deviceDescs value (=Device lock)
419 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
420 // accept on-line if given timestamp is newer than
421 // the latest offline request Timestamp
422 Timestamp offlineTimestamp = offline.get(deviceId);
423 if (offlineTimestamp == null ||
424 offlineTimestamp.compareTo(timestamp) < 0) {
425
426 offline.remove(deviceId);
427 return availableDevices.add(deviceId);
428 }
429 return false;
430 }
431
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700432 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700433 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
434 DeviceId deviceId,
435 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700436
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700437 final Timestamp newTimestamp;
438 try {
439 newTimestamp = deviceClockService.getTimestamp(deviceId);
440 } catch (IllegalStateException e) {
441 log.info("Timestamp was not available for device {}", deviceId);
442 log.debug(" discarding {}", portDescriptions);
443 // Failed to generate timestamp.
444
445 // Possible situation:
446 // Device connected and became master for short period of time,
447 // but lost mastership before this instance had the chance to
448 // retrieve term information.
449
450 // Information dropped here is expected to be recoverable by
451 // device probing after mastership change
452
453 return Collections.emptyList();
454 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800455 log.debug("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700456
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700457 final Timestamped<List<PortDescription>> timestampedInput
458 = new Timestamped<>(portDescriptions, newTimestamp);
459 final List<DeviceEvent> events;
460 final Timestamped<List<PortDescription>> merged;
461
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800462 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
463 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700464 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800465 final DeviceDescriptions descs = device.get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700466 List<PortDescription> mergedList =
467 FluentIterable.from(portDescriptions)
468 .transform(new Function<PortDescription, PortDescription>() {
469 @Override
470 public PortDescription apply(PortDescription input) {
471 // lookup merged port description
472 return descs.getPortDesc(input.portNumber()).value();
473 }
474 }).toList();
475 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
476 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700477 if (!events.isEmpty()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800478 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Madan Jampani47c93732014-10-06 20:46:08 -0700479 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800480 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700481 }
482 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700483 }
484
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700485 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
486 DeviceId deviceId,
487 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700488
489 Device device = devices.get(deviceId);
490 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
491
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700492 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700493 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
494
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700495 List<DeviceEvent> events = new ArrayList<>();
496 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700497
498 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
499 log.debug("Ignoring outdated events: {}", portDescriptions);
500 return null;
501 }
502
503 DeviceDescriptions descs = descsMap.get(providerId);
504 // every provider must provide DeviceDescription.
505 checkArgument(descs != null,
506 "Device description for Device ID %s from Provider %s was not found",
507 deviceId, providerId);
508
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700509 Map<PortNumber, Port> ports = getPortMap(deviceId);
510
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700511 final Timestamp newTimestamp = portDescriptions.timestamp();
512
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700513 // Add new ports
514 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700515 for (PortDescription portDescription : portDescriptions.value()) {
516 final PortNumber number = portDescription.portNumber();
517 processed.add(number);
518
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700519 final Port oldPort = ports.get(number);
520 final Port newPort;
521
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700522
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700523 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
524 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700525 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700526 // on new port or valid update
527 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700528 descs.putPortDesc(new Timestamped<>(portDescription,
529 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700530 newPort = composePort(device, number, descsMap);
531 } else {
532 // outdated event, ignored.
533 continue;
534 }
535
536 events.add(oldPort == null ?
537 createPort(device, newPort, ports) :
538 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700539 }
540
541 events.addAll(pruneOldPorts(device, ports, processed));
542 }
543 return FluentIterable.from(events).filter(notNull()).toList();
544 }
545
546 // Creates a new port based on the port description adds it to the map and
547 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700548 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700549 private DeviceEvent createPort(Device device, Port newPort,
550 Map<PortNumber, Port> ports) {
551 ports.put(newPort.number(), newPort);
552 return new DeviceEvent(PORT_ADDED, device, newPort);
553 }
554
555 // Checks if the specified port requires update and if so, it replaces the
556 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700557 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700558 private DeviceEvent updatePort(Device device, Port oldPort,
559 Port newPort,
560 Map<PortNumber, Port> ports) {
561 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700562 oldPort.type() != newPort.type() ||
563 oldPort.portSpeed() != newPort.portSpeed() ||
564 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700565 ports.put(oldPort.number(), newPort);
566 return new DeviceEvent(PORT_UPDATED, device, newPort);
567 }
568 return null;
569 }
570
571 // Prunes the specified list of ports based on which ports are in the
572 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700573 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700574 private List<DeviceEvent> pruneOldPorts(Device device,
575 Map<PortNumber, Port> ports,
576 Set<PortNumber> processed) {
577 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700578 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700579 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700580 Entry<PortNumber, Port> e = iterator.next();
581 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700582 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700583 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700584 iterator.remove();
585 }
586 }
587 return events;
588 }
589
590 // Gets the map of ports for the specified device; if one does not already
591 // exist, it creates and registers a new one.
592 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
593 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700594 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
595 }
596
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700597 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700598 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700599 Map<ProviderId, DeviceDescriptions> r;
600 r = deviceDescs.get(deviceId);
601 if (r == null) {
602 r = new HashMap<ProviderId, DeviceDescriptions>();
603 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
604 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
605 if (concurrentlyAdded != null) {
606 r = concurrentlyAdded;
607 }
608 }
609 return r;
610 }
611
612 // Guarded by deviceDescs value (=Device lock)
613 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
614 Map<ProviderId, DeviceDescriptions> device,
615 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
616
617 synchronized (device) {
618 DeviceDescriptions r = device.get(providerId);
619 if (r == null) {
620 r = new DeviceDescriptions(deltaDesc);
621 device.put(providerId, r);
622 }
623 return r;
624 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700625 }
626
627 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700628 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
629 DeviceId deviceId,
630 PortDescription portDescription) {
631
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700632 final Timestamp newTimestamp;
633 try {
634 newTimestamp = deviceClockService.getTimestamp(deviceId);
635 } catch (IllegalStateException e) {
636 log.info("Timestamp was not available for device {}", deviceId);
637 log.debug(" discarding {}", portDescription);
638 // Failed to generate timestamp. Ignoring.
639 // See updatePorts comment
640 return null;
641 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700642 final Timestamped<PortDescription> deltaDesc
643 = new Timestamped<>(portDescription, newTimestamp);
644 final DeviceEvent event;
645 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800646 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
647 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700648 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800649 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700650 .getPortDesc(portDescription.portNumber());
651 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700652 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700653 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
654 providerId, deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800655 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700656 }
657 return event;
658 }
659
660 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
661 Timestamped<PortDescription> deltaDesc) {
662
663 Device device = devices.get(deviceId);
664 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
665
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700666 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700667 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
668
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700669 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700670
671 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
672 log.debug("Ignoring outdated event: {}", deltaDesc);
673 return null;
674 }
675
676 DeviceDescriptions descs = descsMap.get(providerId);
677 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700678 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700679 "Device description for Device ID %s from Provider %s was not found",
680 deviceId, providerId);
681
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700682 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
683 final PortNumber number = deltaDesc.value().portNumber();
684 final Port oldPort = ports.get(number);
685 final Port newPort;
686
687 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
688 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700689 deltaDesc.isNewer(existingPortDesc)) {
690 // on new port or valid update
691 // update description
692 descs.putPortDesc(deltaDesc);
693 newPort = composePort(device, number, descsMap);
694 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700695 // same or outdated event, ignored.
696 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700697 return null;
698 }
699
700 if (oldPort == null) {
701 return createPort(device, newPort, ports);
702 } else {
703 return updatePort(device, oldPort, newPort, ports);
704 }
705 }
706 }
707
708 @Override
709 public List<Port> getPorts(DeviceId deviceId) {
710 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
711 if (ports == null) {
712 return Collections.emptyList();
713 }
714 return ImmutableList.copyOf(ports.values());
715 }
716
717 @Override
718 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
719 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
720 return ports == null ? null : ports.get(portNumber);
721 }
722
723 @Override
724 public boolean isAvailable(DeviceId deviceId) {
725 return availableDevices.contains(deviceId);
726 }
727
728 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700729 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800730 final NodeId myId = clusterService.getLocalNode().id();
731 NodeId master = mastershipService.getMasterFor(deviceId);
732
733 // if there exist a master, forward
734 // if there is no master, try to become one and process
735
736 boolean relinquishAtEnd = false;
737 if (master == null) {
738 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
739 if (myRole != MastershipRole.NONE) {
740 relinquishAtEnd = true;
741 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800742 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800743 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800744 MastershipTerm term = termService.getMastershipTerm(deviceId);
745 if (myId.equals(term.master())) {
746 master = myId;
747 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700748 }
749
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800750 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800751 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800752 master, deviceId);
753
754 ClusterMessage message = new ClusterMessage(
755 myId,
756 DEVICE_REMOVE_REQ,
757 SERIALIZER.encode(deviceId));
758
759 try {
760 clusterCommunicator.unicast(message, master);
761 } catch (IOException e) {
762 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
763 }
764
765 // event will be triggered after master processes it.
766 return null;
767 }
768
769 // I have control..
770
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700771 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700772 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700773 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800774 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700775 deviceId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800776 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700777 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800778 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800779 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800780 mastershipService.relinquishMastership(deviceId);
781 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700782 return event;
783 }
784
785 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
786 Timestamp timestamp) {
787
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700788 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700789 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700790 // accept removal request if given timestamp is newer than
791 // the latest Timestamp from Primary provider
792 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
793 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
794 if (timestamp.compareTo(lastTimestamp) <= 0) {
795 // outdated event ignore
796 return null;
797 }
798 removalRequest.put(deviceId, timestamp);
799
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700800 Device device = devices.remove(deviceId);
801 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700802 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
803 if (ports != null) {
804 ports.clear();
805 }
806 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700807 descs.clear();
808 return device == null ? null :
809 new DeviceEvent(DEVICE_REMOVED, device, null);
810 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700811 }
812
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700813 /**
814 * Checks if given timestamp is superseded by removal request
815 * with more recent timestamp.
816 *
817 * @param deviceId identifier of a device
818 * @param timestampToCheck timestamp of an event to check
819 * @return true if device is already removed
820 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700821 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
822 Timestamp removalTimestamp = removalRequest.get(deviceId);
823 if (removalTimestamp != null &&
824 removalTimestamp.compareTo(timestampToCheck) >= 0) {
825 // removalRequest is more recent
826 return true;
827 }
828 return false;
829 }
830
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700831 /**
832 * Returns a Device, merging description given from multiple Providers.
833 *
834 * @param deviceId device identifier
835 * @param providerDescs Collection of Descriptions from multiple providers
836 * @return Device instance
837 */
838 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700839 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700840
Thomas Vachuska444eda62014-10-28 13:09:42 -0700841 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700842
843 ProviderId primary = pickPrimaryPID(providerDescs);
844
845 DeviceDescriptions desc = providerDescs.get(primary);
846
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700847 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700848 Type type = base.type();
849 String manufacturer = base.manufacturer();
850 String hwVersion = base.hwVersion();
851 String swVersion = base.swVersion();
852 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700853 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700854 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
855 annotations = merge(annotations, base.annotations());
856
857 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
858 if (e.getKey().equals(primary)) {
859 continue;
860 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800861 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700862 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700863 // Currently assuming there will never be a key conflict between
864 // providers
865
866 // annotation merging. not so efficient, should revisit later
867 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
868 }
869
870 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700871 hwVersion, swVersion, serialNumber,
872 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700873 }
874
875 /**
876 * Returns a Port, merging description given from multiple Providers.
877 *
878 * @param device device the port is on
879 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700880 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700881 * @return Port instance
882 */
883 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700884 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700885
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700886 ProviderId primary = pickPrimaryPID(descsMap);
887 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700888 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700889 boolean isEnabled = false;
890 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
891
892 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
893 if (portDesc != null) {
894 isEnabled = portDesc.value().isEnabled();
895 annotations = merge(annotations, portDesc.value().annotations());
896 }
897
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700898 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700899 if (e.getKey().equals(primary)) {
900 continue;
901 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800902 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700903 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700904 // Currently assuming there will never be a key conflict between
905 // providers
906
907 // annotation merging. not so efficient, should revisit later
908 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
909 if (otherPortDesc != null) {
910 annotations = merge(annotations, otherPortDesc.value().annotations());
911 }
912 }
913
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700914 return portDesc == null ?
915 new DefaultPort(device, number, false, annotations) :
916 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
917 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700918 }
919
920 /**
921 * @return primary ProviderID, or randomly chosen one if none exists
922 */
923 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700924 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700925 ProviderId fallBackPrimary = null;
926 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
927 if (!e.getKey().isAncillary()) {
928 return e.getKey();
929 } else if (fallBackPrimary == null) {
930 // pick randomly as a fallback in case there is no primary
931 fallBackPrimary = e.getKey();
932 }
933 }
934 return fallBackPrimary;
935 }
936
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700937 private DeviceDescriptions getPrimaryDescriptions(
938 Map<ProviderId, DeviceDescriptions> providerDescs) {
939 ProviderId pid = pickPrimaryPID(providerDescs);
940 return providerDescs.get(pid);
941 }
942
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700943 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
944 ClusterMessage message = new ClusterMessage(
945 clusterService.getLocalNode().id(),
946 subject,
947 SERIALIZER.encode(event));
948 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700949 }
950
Jonathan Hart7d656f42015-01-27 14:07:23 -0800951 private void broadcastMessage(MessageSubject subject, Object event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700952 ClusterMessage message = new ClusterMessage(
953 clusterService.getLocalNode().id(),
954 subject,
955 SERIALIZER.encode(event));
956 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700957 }
Madan Jampani47c93732014-10-06 20:46:08 -0700958
Jonathan Hart7d656f42015-01-27 14:07:23 -0800959 private void notifyPeers(InternalDeviceEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700960 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700961 }
962
Jonathan Hart7d656f42015-01-27 14:07:23 -0800963 private void notifyPeers(InternalDeviceOfflineEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700964 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700965 }
966
Jonathan Hart7d656f42015-01-27 14:07:23 -0800967 private void notifyPeers(InternalDeviceRemovedEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700968 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700969 }
970
Jonathan Hart7d656f42015-01-27 14:07:23 -0800971 private void notifyPeers(InternalPortEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700972 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700973 }
974
Jonathan Hart7d656f42015-01-27 14:07:23 -0800975 private void notifyPeers(InternalPortStatusEvent event) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700976 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
977 }
978
979 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
980 try {
981 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
982 } catch (IOException e) {
983 log.error("Failed to send" + event + " to " + recipient, e);
984 }
985 }
986
987 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
988 try {
989 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
990 } catch (IOException e) {
991 log.error("Failed to send" + event + " to " + recipient, e);
992 }
993 }
994
995 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
996 try {
997 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
998 } catch (IOException e) {
999 log.error("Failed to send" + event + " to " + recipient, e);
1000 }
1001 }
1002
1003 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1004 try {
1005 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1006 } catch (IOException e) {
1007 log.error("Failed to send" + event + " to " + recipient, e);
1008 }
1009 }
1010
1011 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1012 try {
1013 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1014 } catch (IOException e) {
1015 log.error("Failed to send" + event + " to " + recipient, e);
1016 }
1017 }
1018
1019 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1020 final NodeId self = clusterService.getLocalNode().id();
1021
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001022 final int numDevices = deviceDescs.size();
1023 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1024 final int portsPerDevice = 8; // random factor to minimize reallocation
1025 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1026 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001027
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001028 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001029
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001030 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001031 synchronized (devDescs) {
1032
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001033 // send device offline timestamp
1034 Timestamp lOffline = this.offline.get(deviceId);
1035 if (lOffline != null) {
1036 adOffline.put(deviceId, lOffline);
1037 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001038
1039 for (Entry<ProviderId, DeviceDescriptions>
1040 prov : devDescs.entrySet()) {
1041
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001042 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001043 final ProviderId provId = prov.getKey();
1044 final DeviceDescriptions descs = prov.getValue();
1045
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001046 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001047 descs.getDeviceDesc().timestamp());
1048
1049 for (Entry<PortNumber, Timestamped<PortDescription>>
1050 portDesc : descs.getPortDescs().entrySet()) {
1051
1052 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001053 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001054 portDesc.getValue().timestamp());
1055 }
1056 }
1057 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001058 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001059
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001060 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001061 }
1062
1063 /**
1064 * Responds to anti-entropy advertisement message.
1065 * <P>
1066 * Notify sender about out-dated information using regular replication message.
1067 * Send back advertisement to sender if not in sync.
1068 *
1069 * @param advertisement to respond to
1070 */
1071 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1072
1073 final NodeId sender = advertisement.sender();
1074
1075 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1076 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1077 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1078
1079 // Fragments to request
1080 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1081 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1082
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001083 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001084 final DeviceId deviceId = de.getKey();
1085 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1086
1087 synchronized (lDevice) {
1088 // latestTimestamp across provider
1089 // Note: can be null initially
1090 Timestamp localLatest = offline.get(deviceId);
1091
1092 // handle device Ads
1093 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1094 final ProviderId provId = prov.getKey();
1095 final DeviceDescriptions lDeviceDescs = prov.getValue();
1096
1097 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1098
1099
1100 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1101 Timestamp advDevTimestamp = devAds.get(devFragId);
1102
1103 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1104 // remote does not have it or outdated, suggest
1105 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1106 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1107 // local is outdated, request
1108 reqDevices.add(devFragId);
1109 }
1110
1111 // handle port Ads
1112 for (Entry<PortNumber, Timestamped<PortDescription>>
1113 pe : lDeviceDescs.getPortDescs().entrySet()) {
1114
1115 final PortNumber num = pe.getKey();
1116 final Timestamped<PortDescription> lPort = pe.getValue();
1117
1118 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1119
1120 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001121 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001122 // remote does not have it or outdated, suggest
1123 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1124 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1125 // local is outdated, request
1126 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1127 reqPorts.add(portFragId);
1128 }
1129
1130 // remove port Ad already processed
1131 portAds.remove(portFragId);
1132 } // end local port loop
1133
1134 // remove device Ad already processed
1135 devAds.remove(devFragId);
1136
1137 // find latest and update
1138 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1139 if (localLatest == null ||
1140 providerLatest.compareTo(localLatest) > 0) {
1141 localLatest = providerLatest;
1142 }
1143 } // end local provider loop
1144
1145 // checking if remote timestamp is more recent.
1146 Timestamp rOffline = offlineAds.get(deviceId);
1147 if (rOffline != null &&
1148 rOffline.compareTo(localLatest) > 0) {
1149 // remote offline timestamp suggests that the
1150 // device is off-line
1151 markOfflineInternal(deviceId, rOffline);
1152 }
1153
1154 Timestamp lOffline = offline.get(deviceId);
1155 if (lOffline != null && rOffline == null) {
1156 // locally offline, but remote is online, suggest offline
1157 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1158 }
1159
1160 // remove device offline Ad already processed
1161 offlineAds.remove(deviceId);
1162 } // end local device loop
1163 } // device lock
1164
1165 // If there is any Ads left, request them
1166 log.trace("Ads left {}, {}", devAds, portAds);
1167 reqDevices.addAll(devAds.keySet());
1168 reqPorts.addAll(portAds.keySet());
1169
1170 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1171 log.trace("Nothing to request to remote peer {}", sender);
1172 return;
1173 }
1174
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001175 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001176
1177 // 2-way Anti-Entropy for now
1178 try {
1179 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1180 } catch (IOException e) {
1181 log.error("Failed to send response advertisement to " + sender, e);
1182 }
1183
1184// Sketch of 3-way Anti-Entropy
1185// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1186// ClusterMessage message = new ClusterMessage(
1187// clusterService.getLocalNode().id(),
1188// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1189// SERIALIZER.encode(request));
1190//
1191// try {
1192// clusterCommunicator.unicast(message, advertisement.sender());
1193// } catch (IOException e) {
1194// log.error("Failed to send advertisement reply to "
1195// + advertisement.sender(), e);
1196// }
Madan Jampani47c93732014-10-06 20:46:08 -07001197 }
1198
Madan Jampani255a58b2014-10-09 12:08:20 -07001199 private void notifyDelegateIfNotNull(DeviceEvent event) {
1200 if (event != null) {
1201 notifyDelegate(event);
1202 }
1203 }
1204
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001205 private final class SendAdvertisementTask implements Runnable {
1206
1207 @Override
1208 public void run() {
1209 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001210 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001211 return;
1212 }
1213
1214 try {
1215 final NodeId self = clusterService.getLocalNode().id();
1216 Set<ControllerNode> nodes = clusterService.getNodes();
1217
1218 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1219 .transform(toNodeId())
1220 .toList();
1221
1222 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001223 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001224 return;
1225 }
1226
1227 NodeId peer;
1228 do {
1229 int idx = RandomUtils.nextInt(0, nodeIds.size());
1230 peer = nodeIds.get(idx);
1231 } while (peer.equals(self));
1232
1233 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1234
1235 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001236 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001237 return;
1238 }
1239
1240 try {
1241 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1242 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001243 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001244 return;
1245 }
1246 } catch (Exception e) {
1247 // catch all Exception to avoid Scheduled task being suppressed.
1248 log.error("Exception thrown while sending advertisement", e);
1249 }
1250 }
1251 }
1252
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001253 private final class InternalDeviceEventListener
1254 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001255 @Override
1256 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001257
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001258 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001259 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001260
Madan Jampani47c93732014-10-06 20:46:08 -07001261 ProviderId providerId = event.providerId();
1262 DeviceId deviceId = event.deviceId();
1263 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001264
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001265 executor.submit(new Runnable() {
1266
1267 @Override
1268 public void run() {
1269 try {
1270 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1271 } catch (Exception e) {
1272 log.warn("Exception thrown handling device update", e);
1273 }
1274 }
1275 });
Madan Jampani47c93732014-10-06 20:46:08 -07001276 }
1277 }
1278
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001279 private final class InternalDeviceOfflineEventListener
1280 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001281 @Override
1282 public void handle(ClusterMessage message) {
1283
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001284 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001285 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001286
1287 DeviceId deviceId = event.deviceId();
1288 Timestamp timestamp = event.timestamp();
1289
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001290 executor.submit(new Runnable() {
1291
1292 @Override
1293 public void run() {
1294 try {
1295 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1296 } catch (Exception e) {
1297 log.warn("Exception thrown handling device offline", e);
1298 }
1299 }
1300 });
Madan Jampani25322532014-10-08 11:20:38 -07001301 }
1302 }
1303
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001304 private final class InternalRemoveRequestListener
1305 implements ClusterMessageHandler {
1306 @Override
1307 public void handle(ClusterMessage message) {
1308 log.debug("Received device remove request from peer: {}", message.sender());
1309 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001310
1311 executor.submit(new Runnable() {
1312
1313 @Override
1314 public void run() {
1315 try {
1316 removeDevice(did);
1317 } catch (Exception e) {
1318 log.warn("Exception thrown handling device remove", e);
1319 }
1320 }
1321 });
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001322 }
1323 }
1324
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001325 private final class InternalDeviceRemovedEventListener
1326 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001327 @Override
1328 public void handle(ClusterMessage message) {
1329
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001330 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001331 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001332
1333 DeviceId deviceId = event.deviceId();
1334 Timestamp timestamp = event.timestamp();
1335
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001336 executor.submit(new Runnable() {
1337
1338 @Override
1339 public void run() {
1340 try {
1341 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1342 } catch (Exception e) {
1343 log.warn("Exception thrown handling device removed", e);
1344 }
1345 }
1346 });
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001347 }
1348 }
1349
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001350 private final class InternalPortEventListener
1351 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001352 @Override
1353 public void handle(ClusterMessage message) {
1354
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001355 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001356 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001357
1358 ProviderId providerId = event.providerId();
1359 DeviceId deviceId = event.deviceId();
1360 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1361
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001362 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001363 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001364 // Note: dropped information will be recovered by anti-entropy
1365 return;
1366 }
1367
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001368 executor.submit(new Runnable() {
1369
1370 @Override
1371 public void run() {
1372 try {
1373 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1374 } catch (Exception e) {
1375 log.warn("Exception thrown handling port update", e);
1376 }
1377 }
1378 });
Madan Jampani47c93732014-10-06 20:46:08 -07001379 }
1380 }
1381
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001382 private final class InternalPortStatusEventListener
1383 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001384 @Override
1385 public void handle(ClusterMessage message) {
1386
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001387 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001388 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001389
1390 ProviderId providerId = event.providerId();
1391 DeviceId deviceId = event.deviceId();
1392 Timestamped<PortDescription> portDescription = event.portDescription();
1393
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001394 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001395 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001396 // Note: dropped information will be recovered by anti-entropy
1397 return;
1398 }
1399
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001400 executor.submit(new Runnable() {
1401
1402 @Override
1403 public void run() {
1404 try {
1405 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1406 } catch (Exception e) {
1407 log.warn("Exception thrown handling port update", e);
1408 }
1409 }
1410 });
Madan Jampani47c93732014-10-06 20:46:08 -07001411 }
1412 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001413
1414 private final class InternalDeviceAdvertisementListener
1415 implements ClusterMessageHandler {
1416
1417 @Override
1418 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001419 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001420 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -08001421 backgroundExecutor.submit(new Runnable() {
1422
1423 @Override
1424 public void run() {
1425 try {
1426 handleAdvertisement(advertisement);
1427 } catch (Exception e) {
1428 log.warn("Exception thrown handling Device advertisements.", e);
1429 }
1430 }
1431 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001432 }
1433 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001434}