blob: 83664f2cc90dcc6d6c6faa2bcbe4948f783f72d4 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -080019import com.google.common.base.Predicate;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import com.google.common.collect.FluentIterable;
21import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070022import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -070024
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070025import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.mastership.MastershipService;
36import org.onosproject.mastership.MastershipTerm;
37import org.onosproject.mastership.MastershipTermService;
38import org.onosproject.net.AnnotationsUtil;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.DefaultDevice;
41import org.onosproject.net.DefaultPort;
42import org.onosproject.net.Device;
43import org.onosproject.net.Device.Type;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.MastershipRole;
46import org.onosproject.net.Port;
47import org.onosproject.net.PortNumber;
48import org.onosproject.net.device.DeviceClockService;
49import org.onosproject.net.device.DeviceDescription;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceStore;
52import org.onosproject.net.device.DeviceStoreDelegate;
53import org.onosproject.net.device.PortDescription;
54import org.onosproject.net.provider.ProviderId;
55import org.onosproject.store.AbstractStore;
56import org.onosproject.store.Timestamp;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58import org.onosproject.store.cluster.messaging.ClusterMessage;
59import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60import org.onosproject.store.cluster.messaging.MessageSubject;
61import org.onosproject.store.impl.Timestamped;
62import org.onosproject.store.serializers.KryoSerializer;
63import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070064import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070065import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070066import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import org.slf4j.Logger;
68
Madan Jampani47c93732014-10-06 20:46:08 -070069import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070071import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074import java.util.HashSet;
75import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
78import java.util.Map.Entry;
79import java.util.Objects;
80import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070081import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080082import java.util.concurrent.ExecutorService;
83import java.util.concurrent.Executors;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070084import java.util.concurrent.ScheduledExecutorService;
85import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070086
87import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import static com.google.common.base.Predicates.notNull;
Brian O'Connorabafb502014-12-02 22:26:20 -080089import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
90import static org.onosproject.net.device.DeviceEvent.Type.*;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091import static org.slf4j.LoggerFactory.getLogger;
92import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorabafb502014-12-02 22:26:20 -080093import static org.onosproject.net.DefaultAnnotations.merge;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094import static com.google.common.base.Verify.verify;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080095import static org.onlab.util.Tools.minPriority;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070096import static org.onlab.util.Tools.namedThreads;
97import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorabafb502014-12-02 22:26:20 -080098import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
99import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101/**
102 * Manages inventory of infrastructure devices using gossip protocol to distribute
103 * information.
104 */
105@Component(immediate = true)
106@Service
107public class GossipDeviceStore
108 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
109 implements DeviceStore {
110
111 private final Logger log = getLogger(getClass());
112
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700113 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700114
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115 // innerMap is used to lock a Device, thus instance should never be replaced.
116 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700117 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700118 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700119
120 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700121 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123
124 // to be updated under Device lock
125 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
126 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700127
128 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700129 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700132 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700133
Madan Jampani47c93732014-10-06 20:46:08 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterCommunicationService clusterCommunicator;
136
Madan Jampani53e44e62014-10-07 12:39:51 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterService clusterService;
139
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected MastershipService mastershipService;
142
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected MastershipTermService termService;
145
146
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700147 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700148 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700149 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700150 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800151 .register(DistributedStoreSerializers.STORE_COMMON)
152 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
153 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
154 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700155 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800156 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
157 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700158 .register(DeviceAntiEntropyAdvertisement.class)
159 .register(DeviceFragmentId.class)
160 .register(PortFragmentId.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800161 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700162 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700163 };
164
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800165 private ExecutorService executor;
166
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800167 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700168
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800169 // TODO make these anti-entropy parameters configurable
170 private long initialDelaySec = 5;
171 private long periodSec = 5;
172
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800173
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700174 @Activate
175 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700176 clusterCommunicator.addSubscriber(
177 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
178 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700179 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800180 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
Madan Jampani25322532014-10-08 11:20:38 -0700181 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700182 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
183 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700184 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
185 clusterCommunicator.addSubscriber(
186 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700187 clusterCommunicator.addSubscriber(
188 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
189
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800190 executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
191
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800192 backgroundExecutor =
193 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700194
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700195 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800196 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700197 initialDelaySec, periodSec, TimeUnit.SECONDS);
198
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700199 log.info("Started");
200 }
201
202 @Deactivate
203 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700204
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800205 executor.shutdownNow();
206
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800207 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700208 try {
Yuta HIGUCHIc5783592014-12-05 11:13:29 -0800209 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700210 log.error("Timeout during executor shutdown");
211 }
212 } catch (InterruptedException e) {
213 log.error("Error during executor shutdown", e);
214 }
215
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700216 deviceDescs.clear();
217 devices.clear();
218 devicePorts.clear();
219 availableDevices.clear();
220 log.info("Stopped");
221 }
222
223 @Override
224 public int getDeviceCount() {
225 return devices.size();
226 }
227
228 @Override
229 public Iterable<Device> getDevices() {
230 return Collections.unmodifiableCollection(devices.values());
231 }
232
233 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800234 public Iterable<Device> getAvailableDevices() {
235 return FluentIterable.from(getDevices())
236 .filter(new Predicate<Device>() {
237
238 @Override
239 public boolean apply(Device input) {
240 return isAvailable(input.id());
241 }
242 });
243 }
244
245 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700246 public Device getDevice(DeviceId deviceId) {
247 return devices.get(deviceId);
248 }
249
250 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700251 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
252 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700253 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700254 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700255 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700256 final DeviceEvent event;
257 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800258 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
259 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700260 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800261 mergedDesc = device.get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700262 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700264 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
265 providerId, deviceId);
266 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700267 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700268 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700269 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700270 + providerId + " and deviceId: " + deviceId, e);
271 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 }
273 return event;
274 }
275
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700276 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
277 DeviceId deviceId,
278 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700279
280 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800281 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700282 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800284 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700285 // locking per device
286
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700287 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
288 log.debug("Ignoring outdated event: {}", deltaDesc);
289 return null;
290 }
291
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800292 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700293
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700294 final Device oldDevice = devices.get(deviceId);
295 final Device newDevice;
296
297 if (deltaDesc == descs.getDeviceDesc() ||
298 deltaDesc.isNewer(descs.getDeviceDesc())) {
299 // on new device or valid update
300 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800301 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700302 } else {
303 // outdated event, ignored.
304 return null;
305 }
306 if (oldDevice == null) {
307 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700308 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700309 } else {
310 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700311 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700312 }
313 }
314 }
315
316 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700317 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700318 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700319 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700320
321 // update composed device cache
322 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
323 verify(oldDevice == null,
324 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
325 providerId, oldDevice, newDevice);
326
327 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700328 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700329 }
330
331 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
332 }
333
334 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700335 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700336 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700337 Device oldDevice,
338 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700339 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700340 boolean propertiesChanged =
341 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
342 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
343 boolean annotationsChanged =
344 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700345
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700346 // Primary providers can respond to all changes, but ancillary ones
347 // should respond only to annotation changes.
348 if ((providerId.isAncillary() && annotationsChanged) ||
349 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700350 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
351 if (!replaced) {
352 verify(replaced,
353 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
354 providerId, oldDevice, devices.get(newDevice.id())
355 , newDevice);
356 }
357 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700358 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700359 }
360 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
361 }
362
363 // Otherwise merely attempt to change availability if primary provider
364 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700365 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700366 return !added ? null :
367 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
368 }
369 return null;
370 }
371
372 @Override
373 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700374 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700375 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700376 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700377 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
378 deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700379 try {
380 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
381 } catch (IOException e) {
382 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
383 deviceId);
384 }
385 }
386 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700387 }
388
389 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
390
391 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700392 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700393
394 // locking device
395 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700396
397 // accept off-line if given timestamp is newer than
398 // the latest Timestamp from Primary provider
399 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
400 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
401 if (timestamp.compareTo(lastTimestamp) <= 0) {
402 // outdated event ignore
403 return null;
404 }
405
406 offline.put(deviceId, timestamp);
407
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700408 Device device = devices.get(deviceId);
409 if (device == null) {
410 return null;
411 }
412 boolean removed = availableDevices.remove(deviceId);
413 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700414 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415 }
416 return null;
417 }
418 }
419
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700420 /**
421 * Marks the device as available if the given timestamp is not outdated,
422 * compared to the time the device has been marked offline.
423 *
424 * @param deviceId identifier of the device
425 * @param timestamp of the event triggering this change.
426 * @return true if availability change request was accepted and changed the state
427 */
428 // Guarded by deviceDescs value (=Device lock)
429 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
430 // accept on-line if given timestamp is newer than
431 // the latest offline request Timestamp
432 Timestamp offlineTimestamp = offline.get(deviceId);
433 if (offlineTimestamp == null ||
434 offlineTimestamp.compareTo(timestamp) < 0) {
435
436 offline.remove(deviceId);
437 return availableDevices.add(deviceId);
438 }
439 return false;
440 }
441
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700442 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700443 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
444 DeviceId deviceId,
445 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700446
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700447 final Timestamp newTimestamp;
448 try {
449 newTimestamp = deviceClockService.getTimestamp(deviceId);
450 } catch (IllegalStateException e) {
451 log.info("Timestamp was not available for device {}", deviceId);
452 log.debug(" discarding {}", portDescriptions);
453 // Failed to generate timestamp.
454
455 // Possible situation:
456 // Device connected and became master for short period of time,
457 // but lost mastership before this instance had the chance to
458 // retrieve term information.
459
460 // Information dropped here is expected to be recoverable by
461 // device probing after mastership change
462
463 return Collections.emptyList();
464 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800465 log.debug("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700467 final Timestamped<List<PortDescription>> timestampedInput
468 = new Timestamped<>(portDescriptions, newTimestamp);
469 final List<DeviceEvent> events;
470 final Timestamped<List<PortDescription>> merged;
471
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800472 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
473 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700474 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800475 final DeviceDescriptions descs = device.get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700476 List<PortDescription> mergedList =
477 FluentIterable.from(portDescriptions)
478 .transform(new Function<PortDescription, PortDescription>() {
479 @Override
480 public PortDescription apply(PortDescription input) {
481 // lookup merged port description
482 return descs.getPortDesc(input.portNumber()).value();
483 }
484 }).toList();
485 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
486 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700487 if (!events.isEmpty()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800488 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Madan Jampani47c93732014-10-06 20:46:08 -0700489 providerId, deviceId);
490 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700491 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700492 } catch (IOException e) {
493 log.error("Failed to notify peers of a port update topology event or providerId: "
494 + providerId + " and deviceId: " + deviceId, e);
495 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700496 }
497 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700498 }
499
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700500 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
501 DeviceId deviceId,
502 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700503
504 Device device = devices.get(deviceId);
505 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
506
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700507 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700508 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
509
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700510 List<DeviceEvent> events = new ArrayList<>();
511 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700512
513 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
514 log.debug("Ignoring outdated events: {}", portDescriptions);
515 return null;
516 }
517
518 DeviceDescriptions descs = descsMap.get(providerId);
519 // every provider must provide DeviceDescription.
520 checkArgument(descs != null,
521 "Device description for Device ID %s from Provider %s was not found",
522 deviceId, providerId);
523
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700524 Map<PortNumber, Port> ports = getPortMap(deviceId);
525
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700526 final Timestamp newTimestamp = portDescriptions.timestamp();
527
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700528 // Add new ports
529 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700530 for (PortDescription portDescription : portDescriptions.value()) {
531 final PortNumber number = portDescription.portNumber();
532 processed.add(number);
533
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700534 final Port oldPort = ports.get(number);
535 final Port newPort;
536
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700537
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700538 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
539 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700540 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700541 // on new port or valid update
542 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700543 descs.putPortDesc(new Timestamped<>(portDescription,
544 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700545 newPort = composePort(device, number, descsMap);
546 } else {
547 // outdated event, ignored.
548 continue;
549 }
550
551 events.add(oldPort == null ?
552 createPort(device, newPort, ports) :
553 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700554 }
555
556 events.addAll(pruneOldPorts(device, ports, processed));
557 }
558 return FluentIterable.from(events).filter(notNull()).toList();
559 }
560
561 // Creates a new port based on the port description adds it to the map and
562 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700563 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700564 private DeviceEvent createPort(Device device, Port newPort,
565 Map<PortNumber, Port> ports) {
566 ports.put(newPort.number(), newPort);
567 return new DeviceEvent(PORT_ADDED, device, newPort);
568 }
569
570 // Checks if the specified port requires update and if so, it replaces the
571 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700572 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700573 private DeviceEvent updatePort(Device device, Port oldPort,
574 Port newPort,
575 Map<PortNumber, Port> ports) {
576 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700577 oldPort.type() != newPort.type() ||
578 oldPort.portSpeed() != newPort.portSpeed() ||
579 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700580 ports.put(oldPort.number(), newPort);
581 return new DeviceEvent(PORT_UPDATED, device, newPort);
582 }
583 return null;
584 }
585
586 // Prunes the specified list of ports based on which ports are in the
587 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700588 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700589 private List<DeviceEvent> pruneOldPorts(Device device,
590 Map<PortNumber, Port> ports,
591 Set<PortNumber> processed) {
592 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700593 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700594 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700595 Entry<PortNumber, Port> e = iterator.next();
596 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700597 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700598 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700599 iterator.remove();
600 }
601 }
602 return events;
603 }
604
605 // Gets the map of ports for the specified device; if one does not already
606 // exist, it creates and registers a new one.
607 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
608 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700609 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
610 }
611
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700612 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700613 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700614 Map<ProviderId, DeviceDescriptions> r;
615 r = deviceDescs.get(deviceId);
616 if (r == null) {
617 r = new HashMap<ProviderId, DeviceDescriptions>();
618 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
619 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
620 if (concurrentlyAdded != null) {
621 r = concurrentlyAdded;
622 }
623 }
624 return r;
625 }
626
627 // Guarded by deviceDescs value (=Device lock)
628 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
629 Map<ProviderId, DeviceDescriptions> device,
630 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
631
632 synchronized (device) {
633 DeviceDescriptions r = device.get(providerId);
634 if (r == null) {
635 r = new DeviceDescriptions(deltaDesc);
636 device.put(providerId, r);
637 }
638 return r;
639 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700640 }
641
642 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700643 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
644 DeviceId deviceId,
645 PortDescription portDescription) {
646
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700647 final Timestamp newTimestamp;
648 try {
649 newTimestamp = deviceClockService.getTimestamp(deviceId);
650 } catch (IllegalStateException e) {
651 log.info("Timestamp was not available for device {}", deviceId);
652 log.debug(" discarding {}", portDescription);
653 // Failed to generate timestamp. Ignoring.
654 // See updatePorts comment
655 return null;
656 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700657 final Timestamped<PortDescription> deltaDesc
658 = new Timestamped<>(portDescription, newTimestamp);
659 final DeviceEvent event;
660 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800661 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
662 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700663 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800664 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700665 .getPortDesc(portDescription.portNumber());
666 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700667 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700668 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
669 providerId, deviceId);
670 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700671 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700672 } catch (IOException e) {
673 log.error("Failed to notify peers of a port status update topology event or providerId: "
674 + providerId + " and deviceId: " + deviceId, e);
675 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700676 }
677 return event;
678 }
679
680 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
681 Timestamped<PortDescription> deltaDesc) {
682
683 Device device = devices.get(deviceId);
684 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
685
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700686 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700687 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
688
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700689 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700690
691 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
692 log.debug("Ignoring outdated event: {}", deltaDesc);
693 return null;
694 }
695
696 DeviceDescriptions descs = descsMap.get(providerId);
697 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700698 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700699 "Device description for Device ID %s from Provider %s was not found",
700 deviceId, providerId);
701
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700702 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
703 final PortNumber number = deltaDesc.value().portNumber();
704 final Port oldPort = ports.get(number);
705 final Port newPort;
706
707 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
708 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700709 deltaDesc.isNewer(existingPortDesc)) {
710 // on new port or valid update
711 // update description
712 descs.putPortDesc(deltaDesc);
713 newPort = composePort(device, number, descsMap);
714 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700715 // same or outdated event, ignored.
716 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700717 return null;
718 }
719
720 if (oldPort == null) {
721 return createPort(device, newPort, ports);
722 } else {
723 return updatePort(device, oldPort, newPort, ports);
724 }
725 }
726 }
727
728 @Override
729 public List<Port> getPorts(DeviceId deviceId) {
730 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
731 if (ports == null) {
732 return Collections.emptyList();
733 }
734 return ImmutableList.copyOf(ports.values());
735 }
736
737 @Override
738 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
739 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
740 return ports == null ? null : ports.get(portNumber);
741 }
742
743 @Override
744 public boolean isAvailable(DeviceId deviceId) {
745 return availableDevices.contains(deviceId);
746 }
747
748 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700749 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800750 final NodeId myId = clusterService.getLocalNode().id();
751 NodeId master = mastershipService.getMasterFor(deviceId);
752
753 // if there exist a master, forward
754 // if there is no master, try to become one and process
755
756 boolean relinquishAtEnd = false;
757 if (master == null) {
758 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
759 if (myRole != MastershipRole.NONE) {
760 relinquishAtEnd = true;
761 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800762 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800763 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800764 MastershipTerm term = termService.getMastershipTerm(deviceId);
765 if (myId.equals(term.master())) {
766 master = myId;
767 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700768 }
769
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800770 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800771 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800772 master, deviceId);
773
774 ClusterMessage message = new ClusterMessage(
775 myId,
776 DEVICE_REMOVE_REQ,
777 SERIALIZER.encode(deviceId));
778
779 try {
780 clusterCommunicator.unicast(message, master);
781 } catch (IOException e) {
782 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
783 }
784
785 // event will be triggered after master processes it.
786 return null;
787 }
788
789 // I have control..
790
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700791 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700792 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700793 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800794 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700795 deviceId);
796 try {
797 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
798 } catch (IOException e) {
799 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
800 deviceId);
801 }
802 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800803 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800804 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800805 mastershipService.relinquishMastership(deviceId);
806 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700807 return event;
808 }
809
810 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
811 Timestamp timestamp) {
812
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700813 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700814 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700815 // accept removal request if given timestamp is newer than
816 // the latest Timestamp from Primary provider
817 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
818 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
819 if (timestamp.compareTo(lastTimestamp) <= 0) {
820 // outdated event ignore
821 return null;
822 }
823 removalRequest.put(deviceId, timestamp);
824
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700825 Device device = devices.remove(deviceId);
826 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700827 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
828 if (ports != null) {
829 ports.clear();
830 }
831 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700832 descs.clear();
833 return device == null ? null :
834 new DeviceEvent(DEVICE_REMOVED, device, null);
835 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700836 }
837
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700838 /**
839 * Checks if given timestamp is superseded by removal request
840 * with more recent timestamp.
841 *
842 * @param deviceId identifier of a device
843 * @param timestampToCheck timestamp of an event to check
844 * @return true if device is already removed
845 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700846 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
847 Timestamp removalTimestamp = removalRequest.get(deviceId);
848 if (removalTimestamp != null &&
849 removalTimestamp.compareTo(timestampToCheck) >= 0) {
850 // removalRequest is more recent
851 return true;
852 }
853 return false;
854 }
855
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700856 /**
857 * Returns a Device, merging description given from multiple Providers.
858 *
859 * @param deviceId device identifier
860 * @param providerDescs Collection of Descriptions from multiple providers
861 * @return Device instance
862 */
863 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700864 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700865
Thomas Vachuska444eda62014-10-28 13:09:42 -0700866 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700867
868 ProviderId primary = pickPrimaryPID(providerDescs);
869
870 DeviceDescriptions desc = providerDescs.get(primary);
871
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700872 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700873 Type type = base.type();
874 String manufacturer = base.manufacturer();
875 String hwVersion = base.hwVersion();
876 String swVersion = base.swVersion();
877 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700878 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700879 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
880 annotations = merge(annotations, base.annotations());
881
882 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
883 if (e.getKey().equals(primary)) {
884 continue;
885 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800886 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700887 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700888 // Currently assuming there will never be a key conflict between
889 // providers
890
891 // annotation merging. not so efficient, should revisit later
892 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
893 }
894
895 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700896 hwVersion, swVersion, serialNumber,
897 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700898 }
899
900 /**
901 * Returns a Port, merging description given from multiple Providers.
902 *
903 * @param device device the port is on
904 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700905 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700906 * @return Port instance
907 */
908 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700909 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700910
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700911 ProviderId primary = pickPrimaryPID(descsMap);
912 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700913 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700914 boolean isEnabled = false;
915 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
916
917 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
918 if (portDesc != null) {
919 isEnabled = portDesc.value().isEnabled();
920 annotations = merge(annotations, portDesc.value().annotations());
921 }
922
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700923 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700924 if (e.getKey().equals(primary)) {
925 continue;
926 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800927 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700928 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700929 // Currently assuming there will never be a key conflict between
930 // providers
931
932 // annotation merging. not so efficient, should revisit later
933 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
934 if (otherPortDesc != null) {
935 annotations = merge(annotations, otherPortDesc.value().annotations());
936 }
937 }
938
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700939 return portDesc == null ?
940 new DefaultPort(device, number, false, annotations) :
941 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
942 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700943 }
944
945 /**
946 * @return primary ProviderID, or randomly chosen one if none exists
947 */
948 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700949 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700950 ProviderId fallBackPrimary = null;
951 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
952 if (!e.getKey().isAncillary()) {
953 return e.getKey();
954 } else if (fallBackPrimary == null) {
955 // pick randomly as a fallback in case there is no primary
956 fallBackPrimary = e.getKey();
957 }
958 }
959 return fallBackPrimary;
960 }
961
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700962 private DeviceDescriptions getPrimaryDescriptions(
963 Map<ProviderId, DeviceDescriptions> providerDescs) {
964 ProviderId pid = pickPrimaryPID(providerDescs);
965 return providerDescs.get(pid);
966 }
967
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700968 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
969 ClusterMessage message = new ClusterMessage(
970 clusterService.getLocalNode().id(),
971 subject,
972 SERIALIZER.encode(event));
973 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700974 }
975
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700976 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
977 ClusterMessage message = new ClusterMessage(
978 clusterService.getLocalNode().id(),
979 subject,
980 SERIALIZER.encode(event));
981 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700982 }
Madan Jampani47c93732014-10-06 20:46:08 -0700983
984 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700985 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700986 }
987
Madan Jampani25322532014-10-08 11:20:38 -0700988 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700989 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700990 }
991
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700992 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700993 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700994 }
995
Madan Jampani47c93732014-10-06 20:46:08 -0700996 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700997 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700998 }
999
1000 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001001 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1002 }
1003
1004 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1005 try {
1006 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1007 } catch (IOException e) {
1008 log.error("Failed to send" + event + " to " + recipient, e);
1009 }
1010 }
1011
1012 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1013 try {
1014 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1015 } catch (IOException e) {
1016 log.error("Failed to send" + event + " to " + recipient, e);
1017 }
1018 }
1019
1020 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1021 try {
1022 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1023 } catch (IOException e) {
1024 log.error("Failed to send" + event + " to " + recipient, e);
1025 }
1026 }
1027
1028 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1029 try {
1030 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1031 } catch (IOException e) {
1032 log.error("Failed to send" + event + " to " + recipient, e);
1033 }
1034 }
1035
1036 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1037 try {
1038 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1039 } catch (IOException e) {
1040 log.error("Failed to send" + event + " to " + recipient, e);
1041 }
1042 }
1043
1044 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1045 final NodeId self = clusterService.getLocalNode().id();
1046
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001047 final int numDevices = deviceDescs.size();
1048 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1049 final int portsPerDevice = 8; // random factor to minimize reallocation
1050 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1051 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001052
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001053 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001054
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001055 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001056 synchronized (devDescs) {
1057
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001058 // send device offline timestamp
1059 Timestamp lOffline = this.offline.get(deviceId);
1060 if (lOffline != null) {
1061 adOffline.put(deviceId, lOffline);
1062 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001063
1064 for (Entry<ProviderId, DeviceDescriptions>
1065 prov : devDescs.entrySet()) {
1066
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001067 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001068 final ProviderId provId = prov.getKey();
1069 final DeviceDescriptions descs = prov.getValue();
1070
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001071 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001072 descs.getDeviceDesc().timestamp());
1073
1074 for (Entry<PortNumber, Timestamped<PortDescription>>
1075 portDesc : descs.getPortDescs().entrySet()) {
1076
1077 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001078 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001079 portDesc.getValue().timestamp());
1080 }
1081 }
1082 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001083 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001084
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001085 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001086 }
1087
1088 /**
1089 * Responds to anti-entropy advertisement message.
1090 * <P>
1091 * Notify sender about out-dated information using regular replication message.
1092 * Send back advertisement to sender if not in sync.
1093 *
1094 * @param advertisement to respond to
1095 */
1096 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1097
1098 final NodeId sender = advertisement.sender();
1099
1100 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1101 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1102 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1103
1104 // Fragments to request
1105 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1106 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1107
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001108 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001109 final DeviceId deviceId = de.getKey();
1110 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1111
1112 synchronized (lDevice) {
1113 // latestTimestamp across provider
1114 // Note: can be null initially
1115 Timestamp localLatest = offline.get(deviceId);
1116
1117 // handle device Ads
1118 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1119 final ProviderId provId = prov.getKey();
1120 final DeviceDescriptions lDeviceDescs = prov.getValue();
1121
1122 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1123
1124
1125 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1126 Timestamp advDevTimestamp = devAds.get(devFragId);
1127
1128 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1129 // remote does not have it or outdated, suggest
1130 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1131 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1132 // local is outdated, request
1133 reqDevices.add(devFragId);
1134 }
1135
1136 // handle port Ads
1137 for (Entry<PortNumber, Timestamped<PortDescription>>
1138 pe : lDeviceDescs.getPortDescs().entrySet()) {
1139
1140 final PortNumber num = pe.getKey();
1141 final Timestamped<PortDescription> lPort = pe.getValue();
1142
1143 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1144
1145 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001146 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001147 // remote does not have it or outdated, suggest
1148 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1149 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1150 // local is outdated, request
1151 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1152 reqPorts.add(portFragId);
1153 }
1154
1155 // remove port Ad already processed
1156 portAds.remove(portFragId);
1157 } // end local port loop
1158
1159 // remove device Ad already processed
1160 devAds.remove(devFragId);
1161
1162 // find latest and update
1163 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1164 if (localLatest == null ||
1165 providerLatest.compareTo(localLatest) > 0) {
1166 localLatest = providerLatest;
1167 }
1168 } // end local provider loop
1169
1170 // checking if remote timestamp is more recent.
1171 Timestamp rOffline = offlineAds.get(deviceId);
1172 if (rOffline != null &&
1173 rOffline.compareTo(localLatest) > 0) {
1174 // remote offline timestamp suggests that the
1175 // device is off-line
1176 markOfflineInternal(deviceId, rOffline);
1177 }
1178
1179 Timestamp lOffline = offline.get(deviceId);
1180 if (lOffline != null && rOffline == null) {
1181 // locally offline, but remote is online, suggest offline
1182 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1183 }
1184
1185 // remove device offline Ad already processed
1186 offlineAds.remove(deviceId);
1187 } // end local device loop
1188 } // device lock
1189
1190 // If there is any Ads left, request them
1191 log.trace("Ads left {}, {}", devAds, portAds);
1192 reqDevices.addAll(devAds.keySet());
1193 reqPorts.addAll(portAds.keySet());
1194
1195 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1196 log.trace("Nothing to request to remote peer {}", sender);
1197 return;
1198 }
1199
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001200 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001201
1202 // 2-way Anti-Entropy for now
1203 try {
1204 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1205 } catch (IOException e) {
1206 log.error("Failed to send response advertisement to " + sender, e);
1207 }
1208
1209// Sketch of 3-way Anti-Entropy
1210// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1211// ClusterMessage message = new ClusterMessage(
1212// clusterService.getLocalNode().id(),
1213// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1214// SERIALIZER.encode(request));
1215//
1216// try {
1217// clusterCommunicator.unicast(message, advertisement.sender());
1218// } catch (IOException e) {
1219// log.error("Failed to send advertisement reply to "
1220// + advertisement.sender(), e);
1221// }
Madan Jampani47c93732014-10-06 20:46:08 -07001222 }
1223
Madan Jampani255a58b2014-10-09 12:08:20 -07001224 private void notifyDelegateIfNotNull(DeviceEvent event) {
1225 if (event != null) {
1226 notifyDelegate(event);
1227 }
1228 }
1229
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001230 private final class SendAdvertisementTask implements Runnable {
1231
1232 @Override
1233 public void run() {
1234 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001235 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001236 return;
1237 }
1238
1239 try {
1240 final NodeId self = clusterService.getLocalNode().id();
1241 Set<ControllerNode> nodes = clusterService.getNodes();
1242
1243 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1244 .transform(toNodeId())
1245 .toList();
1246
1247 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001248 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001249 return;
1250 }
1251
1252 NodeId peer;
1253 do {
1254 int idx = RandomUtils.nextInt(0, nodeIds.size());
1255 peer = nodeIds.get(idx);
1256 } while (peer.equals(self));
1257
1258 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1259
1260 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001261 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001262 return;
1263 }
1264
1265 try {
1266 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1267 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001268 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001269 return;
1270 }
1271 } catch (Exception e) {
1272 // catch all Exception to avoid Scheduled task being suppressed.
1273 log.error("Exception thrown while sending advertisement", e);
1274 }
1275 }
1276 }
1277
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001278 private final class InternalDeviceEventListener
1279 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001280 @Override
1281 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001282
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001283 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001284 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001285
Madan Jampani47c93732014-10-06 20:46:08 -07001286 ProviderId providerId = event.providerId();
1287 DeviceId deviceId = event.deviceId();
1288 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001289
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001290 executor.submit(new Runnable() {
1291
1292 @Override
1293 public void run() {
1294 try {
1295 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1296 } catch (Exception e) {
1297 log.warn("Exception thrown handling device update", e);
1298 }
1299 }
1300 });
Madan Jampani47c93732014-10-06 20:46:08 -07001301 }
1302 }
1303
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001304 private final class InternalDeviceOfflineEventListener
1305 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001306 @Override
1307 public void handle(ClusterMessage message) {
1308
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001309 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001310 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001311
1312 DeviceId deviceId = event.deviceId();
1313 Timestamp timestamp = event.timestamp();
1314
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001315 executor.submit(new Runnable() {
1316
1317 @Override
1318 public void run() {
1319 try {
1320 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1321 } catch (Exception e) {
1322 log.warn("Exception thrown handling device offline", e);
1323 }
1324 }
1325 });
Madan Jampani25322532014-10-08 11:20:38 -07001326 }
1327 }
1328
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001329 private final class InternalRemoveRequestListener
1330 implements ClusterMessageHandler {
1331 @Override
1332 public void handle(ClusterMessage message) {
1333 log.debug("Received device remove request from peer: {}", message.sender());
1334 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001335
1336 executor.submit(new Runnable() {
1337
1338 @Override
1339 public void run() {
1340 try {
1341 removeDevice(did);
1342 } catch (Exception e) {
1343 log.warn("Exception thrown handling device remove", e);
1344 }
1345 }
1346 });
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001347 }
1348 }
1349
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001350 private final class InternalDeviceRemovedEventListener
1351 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001352 @Override
1353 public void handle(ClusterMessage message) {
1354
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001355 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001356 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001357
1358 DeviceId deviceId = event.deviceId();
1359 Timestamp timestamp = event.timestamp();
1360
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001361 executor.submit(new Runnable() {
1362
1363 @Override
1364 public void run() {
1365 try {
1366 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1367 } catch (Exception e) {
1368 log.warn("Exception thrown handling device removed", e);
1369 }
1370 }
1371 });
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001372 }
1373 }
1374
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001375 private final class InternalPortEventListener
1376 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001377 @Override
1378 public void handle(ClusterMessage message) {
1379
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001380 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001381 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001382
1383 ProviderId providerId = event.providerId();
1384 DeviceId deviceId = event.deviceId();
1385 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1386
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001387 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001388 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001389 // Note: dropped information will be recovered by anti-entropy
1390 return;
1391 }
1392
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001393 executor.submit(new Runnable() {
1394
1395 @Override
1396 public void run() {
1397 try {
1398 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1399 } catch (Exception e) {
1400 log.warn("Exception thrown handling port update", e);
1401 }
1402 }
1403 });
Madan Jampani47c93732014-10-06 20:46:08 -07001404 }
1405 }
1406
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001407 private final class InternalPortStatusEventListener
1408 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001409 @Override
1410 public void handle(ClusterMessage message) {
1411
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001412 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001413 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001414
1415 ProviderId providerId = event.providerId();
1416 DeviceId deviceId = event.deviceId();
1417 Timestamped<PortDescription> portDescription = event.portDescription();
1418
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001419 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001420 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001421 // Note: dropped information will be recovered by anti-entropy
1422 return;
1423 }
1424
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001425 executor.submit(new Runnable() {
1426
1427 @Override
1428 public void run() {
1429 try {
1430 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1431 } catch (Exception e) {
1432 log.warn("Exception thrown handling port update", e);
1433 }
1434 }
1435 });
Madan Jampani47c93732014-10-06 20:46:08 -07001436 }
1437 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001438
1439 private final class InternalDeviceAdvertisementListener
1440 implements ClusterMessageHandler {
1441
1442 @Override
1443 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001444 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001445 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -08001446 backgroundExecutor.submit(new Runnable() {
1447
1448 @Override
1449 public void run() {
1450 try {
1451 handleAdvertisement(advertisement);
1452 } catch (Exception e) {
1453 log.warn("Exception thrown handling Device advertisements.", e);
1454 }
1455 }
1456 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001457 }
1458 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001459}