blob: b18462a092d3504fc90dc420738c7741f82814a6 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.device.impl;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -080019import com.google.common.base.Predicate;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import com.google.common.collect.FluentIterable;
21import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070022import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -070024
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070025import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.mastership.MastershipService;
36import org.onosproject.mastership.MastershipTerm;
37import org.onosproject.mastership.MastershipTermService;
38import org.onosproject.net.AnnotationsUtil;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.DefaultDevice;
41import org.onosproject.net.DefaultPort;
42import org.onosproject.net.Device;
43import org.onosproject.net.Device.Type;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.MastershipRole;
46import org.onosproject.net.Port;
47import org.onosproject.net.PortNumber;
48import org.onosproject.net.device.DeviceClockService;
49import org.onosproject.net.device.DeviceDescription;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceStore;
52import org.onosproject.net.device.DeviceStoreDelegate;
53import org.onosproject.net.device.PortDescription;
54import org.onosproject.net.provider.ProviderId;
55import org.onosproject.store.AbstractStore;
56import org.onosproject.store.Timestamp;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58import org.onosproject.store.cluster.messaging.ClusterMessage;
59import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60import org.onosproject.store.cluster.messaging.MessageSubject;
61import org.onosproject.store.impl.Timestamped;
62import org.onosproject.store.serializers.KryoSerializer;
63import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070064import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070065import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070066import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import org.slf4j.Logger;
68
Madan Jampani47c93732014-10-06 20:46:08 -070069import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070071import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074import java.util.HashSet;
75import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
78import java.util.Map.Entry;
79import java.util.Objects;
80import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070081import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080082import java.util.concurrent.ExecutorService;
83import java.util.concurrent.Executors;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070084import java.util.concurrent.ScheduledExecutorService;
85import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070086
87import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070088import static com.google.common.base.Predicates.notNull;
Brian O'Connorabafb502014-12-02 22:26:20 -080089import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
90import static org.onosproject.net.device.DeviceEvent.Type.*;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091import static org.slf4j.LoggerFactory.getLogger;
92import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
Brian O'Connorabafb502014-12-02 22:26:20 -080093import static org.onosproject.net.DefaultAnnotations.merge;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094import static com.google.common.base.Verify.verify;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080095import static org.onlab.util.Tools.minPriority;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070096import static org.onlab.util.Tools.namedThreads;
97import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorabafb502014-12-02 22:26:20 -080098import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
99import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101/**
102 * Manages inventory of infrastructure devices using gossip protocol to distribute
103 * information.
104 */
105@Component(immediate = true)
106@Service
107public class GossipDeviceStore
108 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
109 implements DeviceStore {
110
111 private final Logger log = getLogger(getClass());
112
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700113 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700114
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115 // innerMap is used to lock a Device, thus instance should never be replaced.
116 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700117 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700118 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700119
120 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700121 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123
124 // to be updated under Device lock
125 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
126 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700127
128 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700129 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700132 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700133
Madan Jampani47c93732014-10-06 20:46:08 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterCommunicationService clusterCommunicator;
136
Madan Jampani53e44e62014-10-07 12:39:51 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterService clusterService;
139
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected MastershipService mastershipService;
142
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected MastershipTermService termService;
145
146
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700147 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700148 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700149 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700150 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800151 .register(DistributedStoreSerializers.STORE_COMMON)
152 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
153 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
154 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700155 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800156 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
157 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700158 .register(DeviceAntiEntropyAdvertisement.class)
159 .register(DeviceFragmentId.class)
160 .register(PortFragmentId.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800161 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700162 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700163 };
164
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800165 private ExecutorService executor;
166
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800167 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700168
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800169 // TODO make these anti-entropy parameters configurable
170 private long initialDelaySec = 5;
171 private long periodSec = 5;
172
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800173
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700174 @Activate
175 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700176 clusterCommunicator.addSubscriber(
177 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
178 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700179 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800180 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
Madan Jampani25322532014-10-08 11:20:38 -0700181 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700182 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
183 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700184 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
185 clusterCommunicator.addSubscriber(
186 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700187 clusterCommunicator.addSubscriber(
188 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
189
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800190 executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
191
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800192 backgroundExecutor =
193 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700194
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700195 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800196 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700197 initialDelaySec, periodSec, TimeUnit.SECONDS);
198
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700199 log.info("Started");
200 }
201
202 @Deactivate
203 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700204
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800205 executor.shutdownNow();
206
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800207 backgroundExecutor.shutdownNow();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700208 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800209 boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700210 if (timedout) {
211 log.error("Timeout during executor shutdown");
212 }
213 } catch (InterruptedException e) {
214 log.error("Error during executor shutdown", e);
215 }
216
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700217 deviceDescs.clear();
218 devices.clear();
219 devicePorts.clear();
220 availableDevices.clear();
221 log.info("Stopped");
222 }
223
224 @Override
225 public int getDeviceCount() {
226 return devices.size();
227 }
228
229 @Override
230 public Iterable<Device> getDevices() {
231 return Collections.unmodifiableCollection(devices.values());
232 }
233
234 @Override
Yuta HIGUCHIf1f2ac02014-11-26 14:02:22 -0800235 public Iterable<Device> getAvailableDevices() {
236 return FluentIterable.from(getDevices())
237 .filter(new Predicate<Device>() {
238
239 @Override
240 public boolean apply(Device input) {
241 return isAvailable(input.id());
242 }
243 });
244 }
245
246 @Override
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700247 public Device getDevice(DeviceId deviceId) {
248 return devices.get(deviceId);
249 }
250
251 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700252 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
253 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700254 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700255 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700256 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700257 final DeviceEvent event;
258 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800259 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
260 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700261 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800262 mergedDesc = device.get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700263 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700264 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700265 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
266 providerId, deviceId);
267 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700268 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700269 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700270 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700271 + providerId + " and deviceId: " + deviceId, e);
272 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700273 }
274 return event;
275 }
276
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700277 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
278 DeviceId deviceId,
279 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700280
281 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800282 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700283 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700284
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800285 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700286 // locking per device
287
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700288 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
289 log.debug("Ignoring outdated event: {}", deltaDesc);
290 return null;
291 }
292
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800293 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700294
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700295 final Device oldDevice = devices.get(deviceId);
296 final Device newDevice;
297
298 if (deltaDesc == descs.getDeviceDesc() ||
299 deltaDesc.isNewer(descs.getDeviceDesc())) {
300 // on new device or valid update
301 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800302 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700303 } else {
304 // outdated event, ignored.
305 return null;
306 }
307 if (oldDevice == null) {
308 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700309 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700310 } else {
311 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700312 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700313 }
314 }
315 }
316
317 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700318 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700319 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700320 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700321
322 // update composed device cache
323 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
324 verify(oldDevice == null,
325 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
326 providerId, oldDevice, newDevice);
327
328 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700329 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700330 }
331
332 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
333 }
334
335 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700337 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700338 Device oldDevice,
339 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700340 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700341 boolean propertiesChanged =
342 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
343 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
344 boolean annotationsChanged =
345 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700346
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700347 // Primary providers can respond to all changes, but ancillary ones
348 // should respond only to annotation changes.
349 if ((providerId.isAncillary() && annotationsChanged) ||
350 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700351 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
352 if (!replaced) {
353 verify(replaced,
354 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
355 providerId, oldDevice, devices.get(newDevice.id())
356 , newDevice);
357 }
358 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700359 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700360 }
361 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
362 }
363
364 // Otherwise merely attempt to change availability if primary provider
365 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700366 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700367 return !added ? null :
368 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
369 }
370 return null;
371 }
372
373 @Override
374 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700375 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700376 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700377 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700378 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
379 deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700380 try {
381 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
382 } catch (IOException e) {
383 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
384 deviceId);
385 }
386 }
387 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700388 }
389
390 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
391
392 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700393 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700394
395 // locking device
396 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700397
398 // accept off-line if given timestamp is newer than
399 // the latest Timestamp from Primary provider
400 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
401 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
402 if (timestamp.compareTo(lastTimestamp) <= 0) {
403 // outdated event ignore
404 return null;
405 }
406
407 offline.put(deviceId, timestamp);
408
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700409 Device device = devices.get(deviceId);
410 if (device == null) {
411 return null;
412 }
413 boolean removed = availableDevices.remove(deviceId);
414 if (removed) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700416 }
417 return null;
418 }
419 }
420
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700421 /**
422 * Marks the device as available if the given timestamp is not outdated,
423 * compared to the time the device has been marked offline.
424 *
425 * @param deviceId identifier of the device
426 * @param timestamp of the event triggering this change.
427 * @return true if availability change request was accepted and changed the state
428 */
429 // Guarded by deviceDescs value (=Device lock)
430 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
431 // accept on-line if given timestamp is newer than
432 // the latest offline request Timestamp
433 Timestamp offlineTimestamp = offline.get(deviceId);
434 if (offlineTimestamp == null ||
435 offlineTimestamp.compareTo(timestamp) < 0) {
436
437 offline.remove(deviceId);
438 return availableDevices.add(deviceId);
439 }
440 return false;
441 }
442
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700443 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700444 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
445 DeviceId deviceId,
446 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700447
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700448 final Timestamp newTimestamp;
449 try {
450 newTimestamp = deviceClockService.getTimestamp(deviceId);
451 } catch (IllegalStateException e) {
452 log.info("Timestamp was not available for device {}", deviceId);
453 log.debug(" discarding {}", portDescriptions);
454 // Failed to generate timestamp.
455
456 // Possible situation:
457 // Device connected and became master for short period of time,
458 // but lost mastership before this instance had the chance to
459 // retrieve term information.
460
461 // Information dropped here is expected to be recoverable by
462 // device probing after mastership change
463
464 return Collections.emptyList();
465 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800466 log.debug("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700467
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700468 final Timestamped<List<PortDescription>> timestampedInput
469 = new Timestamped<>(portDescriptions, newTimestamp);
470 final List<DeviceEvent> events;
471 final Timestamped<List<PortDescription>> merged;
472
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800473 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
474 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700475 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800476 final DeviceDescriptions descs = device.get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700477 List<PortDescription> mergedList =
478 FluentIterable.from(portDescriptions)
479 .transform(new Function<PortDescription, PortDescription>() {
480 @Override
481 public PortDescription apply(PortDescription input) {
482 // lookup merged port description
483 return descs.getPortDesc(input.portNumber()).value();
484 }
485 }).toList();
486 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
487 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700488 if (!events.isEmpty()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800489 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Madan Jampani47c93732014-10-06 20:46:08 -0700490 providerId, deviceId);
491 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700492 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700493 } catch (IOException e) {
494 log.error("Failed to notify peers of a port update topology event or providerId: "
495 + providerId + " and deviceId: " + deviceId, e);
496 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700497 }
498 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700499 }
500
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700501 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
502 DeviceId deviceId,
503 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700504
505 Device device = devices.get(deviceId);
506 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
507
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700508 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700509 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
510
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700511 List<DeviceEvent> events = new ArrayList<>();
512 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700513
514 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
515 log.debug("Ignoring outdated events: {}", portDescriptions);
516 return null;
517 }
518
519 DeviceDescriptions descs = descsMap.get(providerId);
520 // every provider must provide DeviceDescription.
521 checkArgument(descs != null,
522 "Device description for Device ID %s from Provider %s was not found",
523 deviceId, providerId);
524
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700525 Map<PortNumber, Port> ports = getPortMap(deviceId);
526
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700527 final Timestamp newTimestamp = portDescriptions.timestamp();
528
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700529 // Add new ports
530 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700531 for (PortDescription portDescription : portDescriptions.value()) {
532 final PortNumber number = portDescription.portNumber();
533 processed.add(number);
534
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700535 final Port oldPort = ports.get(number);
536 final Port newPort;
537
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700538
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700539 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
540 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700541 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700542 // on new port or valid update
543 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700544 descs.putPortDesc(new Timestamped<>(portDescription,
545 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700546 newPort = composePort(device, number, descsMap);
547 } else {
548 // outdated event, ignored.
549 continue;
550 }
551
552 events.add(oldPort == null ?
553 createPort(device, newPort, ports) :
554 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700555 }
556
557 events.addAll(pruneOldPorts(device, ports, processed));
558 }
559 return FluentIterable.from(events).filter(notNull()).toList();
560 }
561
562 // Creates a new port based on the port description adds it to the map and
563 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700564 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700565 private DeviceEvent createPort(Device device, Port newPort,
566 Map<PortNumber, Port> ports) {
567 ports.put(newPort.number(), newPort);
568 return new DeviceEvent(PORT_ADDED, device, newPort);
569 }
570
571 // Checks if the specified port requires update and if so, it replaces the
572 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700573 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700574 private DeviceEvent updatePort(Device device, Port oldPort,
575 Port newPort,
576 Map<PortNumber, Port> ports) {
577 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700578 oldPort.type() != newPort.type() ||
579 oldPort.portSpeed() != newPort.portSpeed() ||
580 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700581 ports.put(oldPort.number(), newPort);
582 return new DeviceEvent(PORT_UPDATED, device, newPort);
583 }
584 return null;
585 }
586
587 // Prunes the specified list of ports based on which ports are in the
588 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700589 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700590 private List<DeviceEvent> pruneOldPorts(Device device,
591 Map<PortNumber, Port> ports,
592 Set<PortNumber> processed) {
593 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700594 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700595 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700596 Entry<PortNumber, Port> e = iterator.next();
597 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700598 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700599 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700600 iterator.remove();
601 }
602 }
603 return events;
604 }
605
606 // Gets the map of ports for the specified device; if one does not already
607 // exist, it creates and registers a new one.
608 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
609 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700610 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
611 }
612
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700613 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700614 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700615 Map<ProviderId, DeviceDescriptions> r;
616 r = deviceDescs.get(deviceId);
617 if (r == null) {
618 r = new HashMap<ProviderId, DeviceDescriptions>();
619 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
620 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
621 if (concurrentlyAdded != null) {
622 r = concurrentlyAdded;
623 }
624 }
625 return r;
626 }
627
628 // Guarded by deviceDescs value (=Device lock)
629 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
630 Map<ProviderId, DeviceDescriptions> device,
631 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
632
633 synchronized (device) {
634 DeviceDescriptions r = device.get(providerId);
635 if (r == null) {
636 r = new DeviceDescriptions(deltaDesc);
637 device.put(providerId, r);
638 }
639 return r;
640 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700641 }
642
643 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700644 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
645 DeviceId deviceId,
646 PortDescription portDescription) {
647
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700648 final Timestamp newTimestamp;
649 try {
650 newTimestamp = deviceClockService.getTimestamp(deviceId);
651 } catch (IllegalStateException e) {
652 log.info("Timestamp was not available for device {}", deviceId);
653 log.debug(" discarding {}", portDescription);
654 // Failed to generate timestamp. Ignoring.
655 // See updatePorts comment
656 return null;
657 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700658 final Timestamped<PortDescription> deltaDesc
659 = new Timestamped<>(portDescription, newTimestamp);
660 final DeviceEvent event;
661 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800662 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
663 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700664 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800665 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700666 .getPortDesc(portDescription.portNumber());
667 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700668 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700669 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
670 providerId, deviceId);
671 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700672 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700673 } catch (IOException e) {
674 log.error("Failed to notify peers of a port status update topology event or providerId: "
675 + providerId + " and deviceId: " + deviceId, e);
676 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700677 }
678 return event;
679 }
680
681 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
682 Timestamped<PortDescription> deltaDesc) {
683
684 Device device = devices.get(deviceId);
685 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
686
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700687 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700688 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
689
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700690 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700691
692 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
693 log.debug("Ignoring outdated event: {}", deltaDesc);
694 return null;
695 }
696
697 DeviceDescriptions descs = descsMap.get(providerId);
698 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700699 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700700 "Device description for Device ID %s from Provider %s was not found",
701 deviceId, providerId);
702
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700703 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
704 final PortNumber number = deltaDesc.value().portNumber();
705 final Port oldPort = ports.get(number);
706 final Port newPort;
707
708 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
709 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700710 deltaDesc.isNewer(existingPortDesc)) {
711 // on new port or valid update
712 // update description
713 descs.putPortDesc(deltaDesc);
714 newPort = composePort(device, number, descsMap);
715 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700716 // same or outdated event, ignored.
717 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700718 return null;
719 }
720
721 if (oldPort == null) {
722 return createPort(device, newPort, ports);
723 } else {
724 return updatePort(device, oldPort, newPort, ports);
725 }
726 }
727 }
728
729 @Override
730 public List<Port> getPorts(DeviceId deviceId) {
731 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
732 if (ports == null) {
733 return Collections.emptyList();
734 }
735 return ImmutableList.copyOf(ports.values());
736 }
737
738 @Override
739 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
740 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
741 return ports == null ? null : ports.get(portNumber);
742 }
743
744 @Override
745 public boolean isAvailable(DeviceId deviceId) {
746 return availableDevices.contains(deviceId);
747 }
748
749 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700750 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800751 final NodeId myId = clusterService.getLocalNode().id();
752 NodeId master = mastershipService.getMasterFor(deviceId);
753
754 // if there exist a master, forward
755 // if there is no master, try to become one and process
756
757 boolean relinquishAtEnd = false;
758 if (master == null) {
759 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
760 if (myRole != MastershipRole.NONE) {
761 relinquishAtEnd = true;
762 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800763 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800764 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800765 MastershipTerm term = termService.getMastershipTerm(deviceId);
766 if (myId.equals(term.master())) {
767 master = myId;
768 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700769 }
770
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800771 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800772 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800773 master, deviceId);
774
775 ClusterMessage message = new ClusterMessage(
776 myId,
777 DEVICE_REMOVE_REQ,
778 SERIALIZER.encode(deviceId));
779
780 try {
781 clusterCommunicator.unicast(message, master);
782 } catch (IOException e) {
783 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
784 }
785
786 // event will be triggered after master processes it.
787 return null;
788 }
789
790 // I have control..
791
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700792 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700793 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700794 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800795 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700796 deviceId);
797 try {
798 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
799 } catch (IOException e) {
800 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
801 deviceId);
802 }
803 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800804 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800805 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800806 mastershipService.relinquishMastership(deviceId);
807 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700808 return event;
809 }
810
811 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
812 Timestamp timestamp) {
813
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700814 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700815 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700816 // accept removal request if given timestamp is newer than
817 // the latest Timestamp from Primary provider
818 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
819 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
820 if (timestamp.compareTo(lastTimestamp) <= 0) {
821 // outdated event ignore
822 return null;
823 }
824 removalRequest.put(deviceId, timestamp);
825
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700826 Device device = devices.remove(deviceId);
827 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700828 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
829 if (ports != null) {
830 ports.clear();
831 }
832 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700833 descs.clear();
834 return device == null ? null :
835 new DeviceEvent(DEVICE_REMOVED, device, null);
836 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700837 }
838
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700839 /**
840 * Checks if given timestamp is superseded by removal request
841 * with more recent timestamp.
842 *
843 * @param deviceId identifier of a device
844 * @param timestampToCheck timestamp of an event to check
845 * @return true if device is already removed
846 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700847 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
848 Timestamp removalTimestamp = removalRequest.get(deviceId);
849 if (removalTimestamp != null &&
850 removalTimestamp.compareTo(timestampToCheck) >= 0) {
851 // removalRequest is more recent
852 return true;
853 }
854 return false;
855 }
856
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700857 /**
858 * Returns a Device, merging description given from multiple Providers.
859 *
860 * @param deviceId device identifier
861 * @param providerDescs Collection of Descriptions from multiple providers
862 * @return Device instance
863 */
864 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700865 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700866
Thomas Vachuska444eda62014-10-28 13:09:42 -0700867 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700868
869 ProviderId primary = pickPrimaryPID(providerDescs);
870
871 DeviceDescriptions desc = providerDescs.get(primary);
872
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700873 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700874 Type type = base.type();
875 String manufacturer = base.manufacturer();
876 String hwVersion = base.hwVersion();
877 String swVersion = base.swVersion();
878 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700879 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700880 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
881 annotations = merge(annotations, base.annotations());
882
883 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
884 if (e.getKey().equals(primary)) {
885 continue;
886 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800887 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700888 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700889 // Currently assuming there will never be a key conflict between
890 // providers
891
892 // annotation merging. not so efficient, should revisit later
893 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
894 }
895
896 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700897 hwVersion, swVersion, serialNumber,
898 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700899 }
900
901 /**
902 * Returns a Port, merging description given from multiple Providers.
903 *
904 * @param device device the port is on
905 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700906 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700907 * @return Port instance
908 */
909 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700910 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700911
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700912 ProviderId primary = pickPrimaryPID(descsMap);
913 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700914 // if no primary, assume not enabled
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700915 boolean isEnabled = false;
916 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
917
918 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
919 if (portDesc != null) {
920 isEnabled = portDesc.value().isEnabled();
921 annotations = merge(annotations, portDesc.value().annotations());
922 }
923
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700924 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700925 if (e.getKey().equals(primary)) {
926 continue;
927 }
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800928 // Note: should keep track of Description timestamp in the future
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700929 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700930 // Currently assuming there will never be a key conflict between
931 // providers
932
933 // annotation merging. not so efficient, should revisit later
934 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
935 if (otherPortDesc != null) {
936 annotations = merge(annotations, otherPortDesc.value().annotations());
937 }
938 }
939
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700940 return portDesc == null ?
941 new DefaultPort(device, number, false, annotations) :
942 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
943 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700944 }
945
946 /**
947 * @return primary ProviderID, or randomly chosen one if none exists
948 */
949 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700950 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700951 ProviderId fallBackPrimary = null;
952 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
953 if (!e.getKey().isAncillary()) {
954 return e.getKey();
955 } else if (fallBackPrimary == null) {
956 // pick randomly as a fallback in case there is no primary
957 fallBackPrimary = e.getKey();
958 }
959 }
960 return fallBackPrimary;
961 }
962
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700963 private DeviceDescriptions getPrimaryDescriptions(
964 Map<ProviderId, DeviceDescriptions> providerDescs) {
965 ProviderId pid = pickPrimaryPID(providerDescs);
966 return providerDescs.get(pid);
967 }
968
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700969 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
970 ClusterMessage message = new ClusterMessage(
971 clusterService.getLocalNode().id(),
972 subject,
973 SERIALIZER.encode(event));
974 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700975 }
976
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700977 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
978 ClusterMessage message = new ClusterMessage(
979 clusterService.getLocalNode().id(),
980 subject,
981 SERIALIZER.encode(event));
982 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700983 }
Madan Jampani47c93732014-10-06 20:46:08 -0700984
985 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700986 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700987 }
988
Madan Jampani25322532014-10-08 11:20:38 -0700989 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700990 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700991 }
992
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700993 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700994 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700995 }
996
Madan Jampani47c93732014-10-06 20:46:08 -0700997 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700998 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700999 }
1000
1001 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001002 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1003 }
1004
1005 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1006 try {
1007 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1008 } catch (IOException e) {
1009 log.error("Failed to send" + event + " to " + recipient, e);
1010 }
1011 }
1012
1013 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1014 try {
1015 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1016 } catch (IOException e) {
1017 log.error("Failed to send" + event + " to " + recipient, e);
1018 }
1019 }
1020
1021 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1022 try {
1023 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1024 } catch (IOException e) {
1025 log.error("Failed to send" + event + " to " + recipient, e);
1026 }
1027 }
1028
1029 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1030 try {
1031 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1032 } catch (IOException e) {
1033 log.error("Failed to send" + event + " to " + recipient, e);
1034 }
1035 }
1036
1037 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1038 try {
1039 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1040 } catch (IOException e) {
1041 log.error("Failed to send" + event + " to " + recipient, e);
1042 }
1043 }
1044
1045 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1046 final NodeId self = clusterService.getLocalNode().id();
1047
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001048 final int numDevices = deviceDescs.size();
1049 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1050 final int portsPerDevice = 8; // random factor to minimize reallocation
1051 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1052 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001053
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001054 deviceDescs.forEach((deviceId, devDescs) -> {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001055
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001056 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001057 synchronized (devDescs) {
1058
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001059 // send device offline timestamp
1060 Timestamp lOffline = this.offline.get(deviceId);
1061 if (lOffline != null) {
1062 adOffline.put(deviceId, lOffline);
1063 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001064
1065 for (Entry<ProviderId, DeviceDescriptions>
1066 prov : devDescs.entrySet()) {
1067
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001068 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001069 final ProviderId provId = prov.getKey();
1070 final DeviceDescriptions descs = prov.getValue();
1071
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001072 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001073 descs.getDeviceDesc().timestamp());
1074
1075 for (Entry<PortNumber, Timestamped<PortDescription>>
1076 portDesc : descs.getPortDescs().entrySet()) {
1077
1078 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001079 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001080 portDesc.getValue().timestamp());
1081 }
1082 }
1083 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -08001084 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001085
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001086 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001087 }
1088
1089 /**
1090 * Responds to anti-entropy advertisement message.
1091 * <P>
1092 * Notify sender about out-dated information using regular replication message.
1093 * Send back advertisement to sender if not in sync.
1094 *
1095 * @param advertisement to respond to
1096 */
1097 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1098
1099 final NodeId sender = advertisement.sender();
1100
1101 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1102 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1103 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1104
1105 // Fragments to request
1106 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1107 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1108
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001109 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001110 final DeviceId deviceId = de.getKey();
1111 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1112
1113 synchronized (lDevice) {
1114 // latestTimestamp across provider
1115 // Note: can be null initially
1116 Timestamp localLatest = offline.get(deviceId);
1117
1118 // handle device Ads
1119 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1120 final ProviderId provId = prov.getKey();
1121 final DeviceDescriptions lDeviceDescs = prov.getValue();
1122
1123 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1124
1125
1126 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1127 Timestamp advDevTimestamp = devAds.get(devFragId);
1128
1129 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1130 // remote does not have it or outdated, suggest
1131 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1132 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1133 // local is outdated, request
1134 reqDevices.add(devFragId);
1135 }
1136
1137 // handle port Ads
1138 for (Entry<PortNumber, Timestamped<PortDescription>>
1139 pe : lDeviceDescs.getPortDescs().entrySet()) {
1140
1141 final PortNumber num = pe.getKey();
1142 final Timestamped<PortDescription> lPort = pe.getValue();
1143
1144 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1145
1146 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001147 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001148 // remote does not have it or outdated, suggest
1149 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1150 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1151 // local is outdated, request
1152 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1153 reqPorts.add(portFragId);
1154 }
1155
1156 // remove port Ad already processed
1157 portAds.remove(portFragId);
1158 } // end local port loop
1159
1160 // remove device Ad already processed
1161 devAds.remove(devFragId);
1162
1163 // find latest and update
1164 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1165 if (localLatest == null ||
1166 providerLatest.compareTo(localLatest) > 0) {
1167 localLatest = providerLatest;
1168 }
1169 } // end local provider loop
1170
1171 // checking if remote timestamp is more recent.
1172 Timestamp rOffline = offlineAds.get(deviceId);
1173 if (rOffline != null &&
1174 rOffline.compareTo(localLatest) > 0) {
1175 // remote offline timestamp suggests that the
1176 // device is off-line
1177 markOfflineInternal(deviceId, rOffline);
1178 }
1179
1180 Timestamp lOffline = offline.get(deviceId);
1181 if (lOffline != null && rOffline == null) {
1182 // locally offline, but remote is online, suggest offline
1183 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1184 }
1185
1186 // remove device offline Ad already processed
1187 offlineAds.remove(deviceId);
1188 } // end local device loop
1189 } // device lock
1190
1191 // If there is any Ads left, request them
1192 log.trace("Ads left {}, {}", devAds, portAds);
1193 reqDevices.addAll(devAds.keySet());
1194 reqPorts.addAll(portAds.keySet());
1195
1196 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1197 log.trace("Nothing to request to remote peer {}", sender);
1198 return;
1199 }
1200
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001201 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001202
1203 // 2-way Anti-Entropy for now
1204 try {
1205 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1206 } catch (IOException e) {
1207 log.error("Failed to send response advertisement to " + sender, e);
1208 }
1209
1210// Sketch of 3-way Anti-Entropy
1211// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1212// ClusterMessage message = new ClusterMessage(
1213// clusterService.getLocalNode().id(),
1214// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1215// SERIALIZER.encode(request));
1216//
1217// try {
1218// clusterCommunicator.unicast(message, advertisement.sender());
1219// } catch (IOException e) {
1220// log.error("Failed to send advertisement reply to "
1221// + advertisement.sender(), e);
1222// }
Madan Jampani47c93732014-10-06 20:46:08 -07001223 }
1224
Madan Jampani255a58b2014-10-09 12:08:20 -07001225 private void notifyDelegateIfNotNull(DeviceEvent event) {
1226 if (event != null) {
1227 notifyDelegate(event);
1228 }
1229 }
1230
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001231 private final class SendAdvertisementTask implements Runnable {
1232
1233 @Override
1234 public void run() {
1235 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001236 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001237 return;
1238 }
1239
1240 try {
1241 final NodeId self = clusterService.getLocalNode().id();
1242 Set<ControllerNode> nodes = clusterService.getNodes();
1243
1244 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1245 .transform(toNodeId())
1246 .toList();
1247
1248 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001249 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001250 return;
1251 }
1252
1253 NodeId peer;
1254 do {
1255 int idx = RandomUtils.nextInt(0, nodeIds.size());
1256 peer = nodeIds.get(idx);
1257 } while (peer.equals(self));
1258
1259 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1260
1261 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001262 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001263 return;
1264 }
1265
1266 try {
1267 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1268 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001269 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001270 return;
1271 }
1272 } catch (Exception e) {
1273 // catch all Exception to avoid Scheduled task being suppressed.
1274 log.error("Exception thrown while sending advertisement", e);
1275 }
1276 }
1277 }
1278
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001279 private final class InternalDeviceEventListener
1280 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001281 @Override
1282 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001283
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001284 log.debug("Received device update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001285 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001286
Madan Jampani47c93732014-10-06 20:46:08 -07001287 ProviderId providerId = event.providerId();
1288 DeviceId deviceId = event.deviceId();
1289 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001290
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001291 executor.submit(new Runnable() {
1292
1293 @Override
1294 public void run() {
1295 try {
1296 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1297 } catch (Exception e) {
1298 log.warn("Exception thrown handling device update", e);
1299 }
1300 }
1301 });
Madan Jampani47c93732014-10-06 20:46:08 -07001302 }
1303 }
1304
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001305 private final class InternalDeviceOfflineEventListener
1306 implements ClusterMessageHandler {
Madan Jampani25322532014-10-08 11:20:38 -07001307 @Override
1308 public void handle(ClusterMessage message) {
1309
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001310 log.debug("Received device offline event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001311 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001312
1313 DeviceId deviceId = event.deviceId();
1314 Timestamp timestamp = event.timestamp();
1315
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001316 executor.submit(new Runnable() {
1317
1318 @Override
1319 public void run() {
1320 try {
1321 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1322 } catch (Exception e) {
1323 log.warn("Exception thrown handling device offline", e);
1324 }
1325 }
1326 });
Madan Jampani25322532014-10-08 11:20:38 -07001327 }
1328 }
1329
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001330 private final class InternalRemoveRequestListener
1331 implements ClusterMessageHandler {
1332 @Override
1333 public void handle(ClusterMessage message) {
1334 log.debug("Received device remove request from peer: {}", message.sender());
1335 DeviceId did = SERIALIZER.decode(message.payload());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001336
1337 executor.submit(new Runnable() {
1338
1339 @Override
1340 public void run() {
1341 try {
1342 removeDevice(did);
1343 } catch (Exception e) {
1344 log.warn("Exception thrown handling device remove", e);
1345 }
1346 }
1347 });
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001348 }
1349 }
1350
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001351 private final class InternalDeviceRemovedEventListener
1352 implements ClusterMessageHandler {
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001353 @Override
1354 public void handle(ClusterMessage message) {
1355
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001356 log.debug("Received device removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001357 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001358
1359 DeviceId deviceId = event.deviceId();
1360 Timestamp timestamp = event.timestamp();
1361
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001362 executor.submit(new Runnable() {
1363
1364 @Override
1365 public void run() {
1366 try {
1367 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1368 } catch (Exception e) {
1369 log.warn("Exception thrown handling device removed", e);
1370 }
1371 }
1372 });
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001373 }
1374 }
1375
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001376 private final class InternalPortEventListener
1377 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001378 @Override
1379 public void handle(ClusterMessage message) {
1380
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001381 log.debug("Received port update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001382 InternalPortEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001383
1384 ProviderId providerId = event.providerId();
1385 DeviceId deviceId = event.deviceId();
1386 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1387
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001388 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001389 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001390 // Note: dropped information will be recovered by anti-entropy
1391 return;
1392 }
1393
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001394 executor.submit(new Runnable() {
1395
1396 @Override
1397 public void run() {
1398 try {
1399 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1400 } catch (Exception e) {
1401 log.warn("Exception thrown handling port update", e);
1402 }
1403 }
1404 });
Madan Jampani47c93732014-10-06 20:46:08 -07001405 }
1406 }
1407
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001408 private final class InternalPortStatusEventListener
1409 implements ClusterMessageHandler {
Madan Jampani47c93732014-10-06 20:46:08 -07001410 @Override
1411 public void handle(ClusterMessage message) {
1412
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001413 log.debug("Received port status update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001414 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001415
1416 ProviderId providerId = event.providerId();
1417 DeviceId deviceId = event.deviceId();
1418 Timestamped<PortDescription> portDescription = event.portDescription();
1419
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001420 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001421 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001422 // Note: dropped information will be recovered by anti-entropy
1423 return;
1424 }
1425
Yuta HIGUCHI80d56592014-11-25 15:11:13 -08001426 executor.submit(new Runnable() {
1427
1428 @Override
1429 public void run() {
1430 try {
1431 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1432 } catch (Exception e) {
1433 log.warn("Exception thrown handling port update", e);
1434 }
1435 }
1436 });
Madan Jampani47c93732014-10-06 20:46:08 -07001437 }
1438 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001439
1440 private final class InternalDeviceAdvertisementListener
1441 implements ClusterMessageHandler {
1442
1443 @Override
1444 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001445 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001446 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -08001447 backgroundExecutor.submit(new Runnable() {
1448
1449 @Override
1450 public void run() {
1451 try {
1452 handleAdvertisement(advertisement);
1453 } catch (Exception e) {
1454 log.warn("Exception thrown handling Device advertisements.", e);
1455 }
1456 }
1457 });
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001458 }
1459 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001460}