blob: 6fbc034d2c85bf3b22fcb54ee7e90f6a3213cc99 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -070019import org.onlab.onos.mastership.MastershipService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070020import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070021import org.onlab.onos.net.DefaultAnnotations;
22import org.onlab.onos.net.DefaultDevice;
23import org.onlab.onos.net.DefaultPort;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.Device.Type;
26import org.onlab.onos.net.DeviceId;
27import org.onlab.onos.net.Port;
28import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070029import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070030import org.onlab.onos.net.device.DeviceDescription;
31import org.onlab.onos.net.device.DeviceEvent;
32import org.onlab.onos.net.device.DeviceStore;
33import org.onlab.onos.net.device.DeviceStoreDelegate;
34import org.onlab.onos.net.device.PortDescription;
35import org.onlab.onos.net.provider.ProviderId;
36import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070037import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070041import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070042import org.onlab.onos.store.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070044import org.onlab.onos.store.serializers.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070045import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070046import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070054import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070055import java.util.HashSet;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
59import java.util.Map.Entry;
60import java.util.Objects;
61import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070062import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070063import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065
66import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070068import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069import static org.onlab.onos.net.device.DeviceEvent.Type.*;
70import static org.slf4j.LoggerFactory.getLogger;
71import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
72import static org.onlab.onos.net.DefaultAnnotations.merge;
73import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070074import static org.onlab.util.Tools.namedThreads;
75import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
76import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070078// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070079/**
80 * Manages inventory of infrastructure devices using gossip protocol to distribute
81 * information.
82 */
83@Component(immediate = true)
84@Service
85public class GossipDeviceStore
86 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
87 implements DeviceStore {
88
89 private final Logger log = getLogger(getClass());
90
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070091 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070093 // innerMap is used to lock a Device, thus instance should never be replaced.
94 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070095 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070096 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097
98 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070099 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
100 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
101
102 // to be updated under Device lock
103 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
104 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105
106 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700107 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700110 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700111
Madan Jampani47c93732014-10-06 20:46:08 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterCommunicationService clusterCommunicator;
114
Madan Jampani53e44e62014-10-07 12:39:51 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ClusterService clusterService;
117
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected MastershipService mastershipService;
120
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700121 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700122 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700123 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700124 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700125 .register(DistributedStoreSerializers.COMMON)
126
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700127 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700128 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700129 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700130 .register(InternalPortEvent.class, new InternalPortEventSerializer())
131 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700132 .register(DeviceAntiEntropyAdvertisement.class)
133 .register(DeviceFragmentId.class)
134 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700135 .build()
136 .populate(1);
137 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700138 };
139
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700140 private ScheduledExecutorService executor;
141
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700142 @Activate
143 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700144 clusterCommunicator.addSubscriber(
145 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700147 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
148 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700149 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
150 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700151 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
152 clusterCommunicator.addSubscriber(
153 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700154 clusterCommunicator.addSubscriber(
155 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
156
157 executor =
158 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
159
160 // TODO: Make these configurable
161 long initialDelaySec = 5;
162 long periodSec = 5;
163 // start anti-entropy thread
164 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
165 initialDelaySec, periodSec, TimeUnit.SECONDS);
166
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700167 log.info("Started");
168 }
169
170 @Deactivate
171 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700172
173 executor.shutdownNow();
174 try {
175 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
176 if (timedout) {
177 log.error("Timeout during executor shutdown");
178 }
179 } catch (InterruptedException e) {
180 log.error("Error during executor shutdown", e);
181 }
182
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700183 deviceDescs.clear();
184 devices.clear();
185 devicePorts.clear();
186 availableDevices.clear();
187 log.info("Stopped");
188 }
189
190 @Override
191 public int getDeviceCount() {
192 return devices.size();
193 }
194
195 @Override
196 public Iterable<Device> getDevices() {
197 return Collections.unmodifiableCollection(devices.values());
198 }
199
200 @Override
201 public Device getDevice(DeviceId deviceId) {
202 return devices.get(deviceId);
203 }
204
205 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700206 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
207 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700209 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700210 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700211 final DeviceEvent event;
212 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700213 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700214 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700215 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700216 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700217 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700218 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
219 providerId, deviceId);
220 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700221 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700222 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700223 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700224 + providerId + " and deviceId: " + deviceId, e);
225 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700226 }
227 return event;
228 }
229
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700230 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
231 DeviceId deviceId,
232 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
234 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700235 Map<ProviderId, DeviceDescriptions> providerDescs
236 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700238 synchronized (providerDescs) {
239 // locking per device
240
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700241 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
242 log.debug("Ignoring outdated event: {}", deltaDesc);
243 return null;
244 }
245
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700246 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700247
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700248 final Device oldDevice = devices.get(deviceId);
249 final Device newDevice;
250
251 if (deltaDesc == descs.getDeviceDesc() ||
252 deltaDesc.isNewer(descs.getDeviceDesc())) {
253 // on new device or valid update
254 descs.putDeviceDesc(deltaDesc);
255 newDevice = composeDevice(deviceId, providerDescs);
256 } else {
257 // outdated event, ignored.
258 return null;
259 }
260 if (oldDevice == null) {
261 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700262 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 } else {
264 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 }
267 }
268 }
269
270 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274
275 // update composed device cache
276 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
277 verify(oldDevice == null,
278 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
279 providerId, oldDevice, newDevice);
280
281 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283 }
284
285 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
286 }
287
288 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700291 Device oldDevice,
292 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700294 boolean propertiesChanged =
295 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
296 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
297 boolean annotationsChanged =
298 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700299
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700300 // Primary providers can respond to all changes, but ancillary ones
301 // should respond only to annotation changes.
302 if ((providerId.isAncillary() && annotationsChanged) ||
303 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
305 if (!replaced) {
306 verify(replaced,
307 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
308 providerId, oldDevice, devices.get(newDevice.id())
309 , newDevice);
310 }
311 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700312 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700313 }
314 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
315 }
316
317 // Otherwise merely attempt to change availability if primary provider
318 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700319 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700320 return !added ? null :
321 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
322 }
323 return null;
324 }
325
326 @Override
327 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700328 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700329 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700330 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700331 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
332 deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700333 try {
334 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
335 } catch (IOException e) {
336 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
337 deviceId);
338 }
339 }
340 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341 }
342
343 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
344
345 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700346 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700347
348 // locking device
349 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700350
351 // accept off-line if given timestamp is newer than
352 // the latest Timestamp from Primary provider
353 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
354 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
355 if (timestamp.compareTo(lastTimestamp) <= 0) {
356 // outdated event ignore
357 return null;
358 }
359
360 offline.put(deviceId, timestamp);
361
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362 Device device = devices.get(deviceId);
363 if (device == null) {
364 return null;
365 }
366 boolean removed = availableDevices.remove(deviceId);
367 if (removed) {
368 // TODO: broadcast ... DOWN only?
369 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700370 }
371 return null;
372 }
373 }
374
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700375 /**
376 * Marks the device as available if the given timestamp is not outdated,
377 * compared to the time the device has been marked offline.
378 *
379 * @param deviceId identifier of the device
380 * @param timestamp of the event triggering this change.
381 * @return true if availability change request was accepted and changed the state
382 */
383 // Guarded by deviceDescs value (=Device lock)
384 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
385 // accept on-line if given timestamp is newer than
386 // the latest offline request Timestamp
387 Timestamp offlineTimestamp = offline.get(deviceId);
388 if (offlineTimestamp == null ||
389 offlineTimestamp.compareTo(timestamp) < 0) {
390
391 offline.remove(deviceId);
392 return availableDevices.add(deviceId);
393 }
394 return false;
395 }
396
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700397 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700398 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
399 DeviceId deviceId,
400 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700401
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700402 final Timestamp newTimestamp;
403 try {
404 newTimestamp = deviceClockService.getTimestamp(deviceId);
405 } catch (IllegalStateException e) {
406 log.info("Timestamp was not available for device {}", deviceId);
407 log.debug(" discarding {}", portDescriptions);
408 // Failed to generate timestamp.
409
410 // Possible situation:
411 // Device connected and became master for short period of time,
412 // but lost mastership before this instance had the chance to
413 // retrieve term information.
414
415 // Information dropped here is expected to be recoverable by
416 // device probing after mastership change
417
418 return Collections.emptyList();
419 }
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700420 log.info("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700421
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700422 final Timestamped<List<PortDescription>> timestampedInput
423 = new Timestamped<>(portDescriptions, newTimestamp);
424 final List<DeviceEvent> events;
425 final Timestamped<List<PortDescription>> merged;
426
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700427 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700428 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700429 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700430 List<PortDescription> mergedList =
431 FluentIterable.from(portDescriptions)
432 .transform(new Function<PortDescription, PortDescription>() {
433 @Override
434 public PortDescription apply(PortDescription input) {
435 // lookup merged port description
436 return descs.getPortDesc(input.portNumber()).value();
437 }
438 }).toList();
439 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
440 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700441 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700442 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
443 providerId, deviceId);
444 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700445 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700446 } catch (IOException e) {
447 log.error("Failed to notify peers of a port update topology event or providerId: "
448 + providerId + " and deviceId: " + deviceId, e);
449 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700450 }
451 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700452 }
453
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700454 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
455 DeviceId deviceId,
456 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700457
458 Device device = devices.get(deviceId);
459 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
460
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700461 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700462 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
463
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700464 List<DeviceEvent> events = new ArrayList<>();
465 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700466
467 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
468 log.debug("Ignoring outdated events: {}", portDescriptions);
469 return null;
470 }
471
472 DeviceDescriptions descs = descsMap.get(providerId);
473 // every provider must provide DeviceDescription.
474 checkArgument(descs != null,
475 "Device description for Device ID %s from Provider %s was not found",
476 deviceId, providerId);
477
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700478 Map<PortNumber, Port> ports = getPortMap(deviceId);
479
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700480 final Timestamp newTimestamp = portDescriptions.timestamp();
481
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482 // Add new ports
483 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700484 for (PortDescription portDescription : portDescriptions.value()) {
485 final PortNumber number = portDescription.portNumber();
486 processed.add(number);
487
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700488 final Port oldPort = ports.get(number);
489 final Port newPort;
490
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700491
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700492 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
493 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700494 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700495 // on new port or valid update
496 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700497 descs.putPortDesc(new Timestamped<>(portDescription,
498 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700499 newPort = composePort(device, number, descsMap);
500 } else {
501 // outdated event, ignored.
502 continue;
503 }
504
505 events.add(oldPort == null ?
506 createPort(device, newPort, ports) :
507 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700508 }
509
510 events.addAll(pruneOldPorts(device, ports, processed));
511 }
512 return FluentIterable.from(events).filter(notNull()).toList();
513 }
514
515 // Creates a new port based on the port description adds it to the map and
516 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700517 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700518 private DeviceEvent createPort(Device device, Port newPort,
519 Map<PortNumber, Port> ports) {
520 ports.put(newPort.number(), newPort);
521 return new DeviceEvent(PORT_ADDED, device, newPort);
522 }
523
524 // Checks if the specified port requires update and if so, it replaces the
525 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700526 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700527 private DeviceEvent updatePort(Device device, Port oldPort,
528 Port newPort,
529 Map<PortNumber, Port> ports) {
530 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700531 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700532
533 ports.put(oldPort.number(), newPort);
534 return new DeviceEvent(PORT_UPDATED, device, newPort);
535 }
536 return null;
537 }
538
539 // Prunes the specified list of ports based on which ports are in the
540 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700541 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700542 private List<DeviceEvent> pruneOldPorts(Device device,
543 Map<PortNumber, Port> ports,
544 Set<PortNumber> processed) {
545 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700546 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700547 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700548 Entry<PortNumber, Port> e = iterator.next();
549 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700550 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700551 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700552 iterator.remove();
553 }
554 }
555 return events;
556 }
557
558 // Gets the map of ports for the specified device; if one does not already
559 // exist, it creates and registers a new one.
560 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
561 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700562 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
563 }
564
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700565 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700566 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700567 Map<ProviderId, DeviceDescriptions> r;
568 r = deviceDescs.get(deviceId);
569 if (r == null) {
570 r = new HashMap<ProviderId, DeviceDescriptions>();
571 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
572 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
573 if (concurrentlyAdded != null) {
574 r = concurrentlyAdded;
575 }
576 }
577 return r;
578 }
579
580 // Guarded by deviceDescs value (=Device lock)
581 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
582 Map<ProviderId, DeviceDescriptions> device,
583 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
584
585 synchronized (device) {
586 DeviceDescriptions r = device.get(providerId);
587 if (r == null) {
588 r = new DeviceDescriptions(deltaDesc);
589 device.put(providerId, r);
590 }
591 return r;
592 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700593 }
594
595 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700596 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
597 DeviceId deviceId,
598 PortDescription portDescription) {
599
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700600 final Timestamp newTimestamp;
601 try {
602 newTimestamp = deviceClockService.getTimestamp(deviceId);
603 } catch (IllegalStateException e) {
604 log.info("Timestamp was not available for device {}", deviceId);
605 log.debug(" discarding {}", portDescription);
606 // Failed to generate timestamp. Ignoring.
607 // See updatePorts comment
608 return null;
609 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700610 final Timestamped<PortDescription> deltaDesc
611 = new Timestamped<>(portDescription, newTimestamp);
612 final DeviceEvent event;
613 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700614 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700615 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700616 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700617 .getPortDesc(portDescription.portNumber());
618 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700619 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700620 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
621 providerId, deviceId);
622 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700623 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700624 } catch (IOException e) {
625 log.error("Failed to notify peers of a port status update topology event or providerId: "
626 + providerId + " and deviceId: " + deviceId, e);
627 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700628 }
629 return event;
630 }
631
632 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
633 Timestamped<PortDescription> deltaDesc) {
634
635 Device device = devices.get(deviceId);
636 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
637
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700638 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700639 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
640
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700641 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700642
643 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
644 log.debug("Ignoring outdated event: {}", deltaDesc);
645 return null;
646 }
647
648 DeviceDescriptions descs = descsMap.get(providerId);
649 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700650 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700651 "Device description for Device ID %s from Provider %s was not found",
652 deviceId, providerId);
653
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700654 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
655 final PortNumber number = deltaDesc.value().portNumber();
656 final Port oldPort = ports.get(number);
657 final Port newPort;
658
659 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
660 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700661 deltaDesc.isNewer(existingPortDesc)) {
662 // on new port or valid update
663 // update description
664 descs.putPortDesc(deltaDesc);
665 newPort = composePort(device, number, descsMap);
666 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700667 // same or outdated event, ignored.
668 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700669 return null;
670 }
671
672 if (oldPort == null) {
673 return createPort(device, newPort, ports);
674 } else {
675 return updatePort(device, oldPort, newPort, ports);
676 }
677 }
678 }
679
680 @Override
681 public List<Port> getPorts(DeviceId deviceId) {
682 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
683 if (ports == null) {
684 return Collections.emptyList();
685 }
686 return ImmutableList.copyOf(ports.values());
687 }
688
689 @Override
690 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
691 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
692 return ports == null ? null : ports.get(portNumber);
693 }
694
695 @Override
696 public boolean isAvailable(DeviceId deviceId) {
697 return availableDevices.contains(deviceId);
698 }
699
700 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700701 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700702 final NodeId master = mastershipService.getMasterFor(deviceId);
703 if (!clusterService.getLocalNode().id().equals(master)) {
704 log.info("remove Device {} requested on non master node", deviceId);
705 // FIXME silently ignoring. Should be forwarding or broadcasting to
706 // master.
707 return null;
708 }
709
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700710 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700711 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700712 if (event != null) {
713 log.info("Notifying peers of a device removed topology event for deviceId: {}",
714 deviceId);
715 try {
716 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
717 } catch (IOException e) {
718 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
719 deviceId);
720 }
721 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700722 return event;
723 }
724
725 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
726 Timestamp timestamp) {
727
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700728 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700729 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700730 // accept removal request if given timestamp is newer than
731 // the latest Timestamp from Primary provider
732 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
733 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
734 if (timestamp.compareTo(lastTimestamp) <= 0) {
735 // outdated event ignore
736 return null;
737 }
738 removalRequest.put(deviceId, timestamp);
739
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700740 Device device = devices.remove(deviceId);
741 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700742 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
743 if (ports != null) {
744 ports.clear();
745 }
746 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700747 descs.clear();
748 return device == null ? null :
749 new DeviceEvent(DEVICE_REMOVED, device, null);
750 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700751 }
752
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700753 /**
754 * Checks if given timestamp is superseded by removal request
755 * with more recent timestamp.
756 *
757 * @param deviceId identifier of a device
758 * @param timestampToCheck timestamp of an event to check
759 * @return true if device is already removed
760 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700761 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
762 Timestamp removalTimestamp = removalRequest.get(deviceId);
763 if (removalTimestamp != null &&
764 removalTimestamp.compareTo(timestampToCheck) >= 0) {
765 // removalRequest is more recent
766 return true;
767 }
768 return false;
769 }
770
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700771 /**
772 * Returns a Device, merging description given from multiple Providers.
773 *
774 * @param deviceId device identifier
775 * @param providerDescs Collection of Descriptions from multiple providers
776 * @return Device instance
777 */
778 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700779 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700780
781 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
782
783 ProviderId primary = pickPrimaryPID(providerDescs);
784
785 DeviceDescriptions desc = providerDescs.get(primary);
786
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700787 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700788 Type type = base.type();
789 String manufacturer = base.manufacturer();
790 String hwVersion = base.hwVersion();
791 String swVersion = base.swVersion();
792 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700793 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700794 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
795 annotations = merge(annotations, base.annotations());
796
797 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
798 if (e.getKey().equals(primary)) {
799 continue;
800 }
801 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700802 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700803 // Currently assuming there will never be a key conflict between
804 // providers
805
806 // annotation merging. not so efficient, should revisit later
807 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
808 }
809
810 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700811 hwVersion, swVersion, serialNumber,
812 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700813 }
814
815 /**
816 * Returns a Port, merging description given from multiple Providers.
817 *
818 * @param device device the port is on
819 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700820 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700821 * @return Port instance
822 */
823 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700824 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700825
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700826 ProviderId primary = pickPrimaryPID(descsMap);
827 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700828 // if no primary, assume not enabled
829 // TODO: revisit this default port enabled/disabled behavior
830 boolean isEnabled = false;
831 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
832
833 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
834 if (portDesc != null) {
835 isEnabled = portDesc.value().isEnabled();
836 annotations = merge(annotations, portDesc.value().annotations());
837 }
838
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700839 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700840 if (e.getKey().equals(primary)) {
841 continue;
842 }
843 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700844 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700845 // Currently assuming there will never be a key conflict between
846 // providers
847
848 // annotation merging. not so efficient, should revisit later
849 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
850 if (otherPortDesc != null) {
851 annotations = merge(annotations, otherPortDesc.value().annotations());
852 }
853 }
854
855 return new DefaultPort(device, number, isEnabled, annotations);
856 }
857
858 /**
859 * @return primary ProviderID, or randomly chosen one if none exists
860 */
861 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700862 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700863 ProviderId fallBackPrimary = null;
864 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
865 if (!e.getKey().isAncillary()) {
866 return e.getKey();
867 } else if (fallBackPrimary == null) {
868 // pick randomly as a fallback in case there is no primary
869 fallBackPrimary = e.getKey();
870 }
871 }
872 return fallBackPrimary;
873 }
874
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700875 private DeviceDescriptions getPrimaryDescriptions(
876 Map<ProviderId, DeviceDescriptions> providerDescs) {
877 ProviderId pid = pickPrimaryPID(providerDescs);
878 return providerDescs.get(pid);
879 }
880
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700881 // TODO: should we be throwing exception?
882 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
883 ClusterMessage message = new ClusterMessage(
884 clusterService.getLocalNode().id(),
885 subject,
886 SERIALIZER.encode(event));
887 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700888 }
889
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700890 // TODO: should we be throwing exception?
891 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
892 ClusterMessage message = new ClusterMessage(
893 clusterService.getLocalNode().id(),
894 subject,
895 SERIALIZER.encode(event));
896 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700897 }
Madan Jampani47c93732014-10-06 20:46:08 -0700898
899 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700900 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700901 }
902
Madan Jampani25322532014-10-08 11:20:38 -0700903 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700904 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700905 }
906
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700907 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700908 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700909 }
910
Madan Jampani47c93732014-10-06 20:46:08 -0700911 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700912 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700913 }
914
915 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700916 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
917 }
918
919 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
920 try {
921 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
922 } catch (IOException e) {
923 log.error("Failed to send" + event + " to " + recipient, e);
924 }
925 }
926
927 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
928 try {
929 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
930 } catch (IOException e) {
931 log.error("Failed to send" + event + " to " + recipient, e);
932 }
933 }
934
935 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
936 try {
937 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
938 } catch (IOException e) {
939 log.error("Failed to send" + event + " to " + recipient, e);
940 }
941 }
942
943 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
944 try {
945 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
946 } catch (IOException e) {
947 log.error("Failed to send" + event + " to " + recipient, e);
948 }
949 }
950
951 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
952 try {
953 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
954 } catch (IOException e) {
955 log.error("Failed to send" + event + " to " + recipient, e);
956 }
957 }
958
959 private DeviceAntiEntropyAdvertisement createAdvertisement() {
960 final NodeId self = clusterService.getLocalNode().id();
961
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700962 final int numDevices = deviceDescs.size();
963 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
964 final int portsPerDevice = 8; // random factor to minimize reallocation
965 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
966 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700967
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700968 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700969 provs : deviceDescs.entrySet()) {
970
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700971 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700972 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700973 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700974 synchronized (devDescs) {
975
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700976 // send device offline timestamp
977 Timestamp lOffline = this.offline.get(deviceId);
978 if (lOffline != null) {
979 adOffline.put(deviceId, lOffline);
980 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700981
982 for (Entry<ProviderId, DeviceDescriptions>
983 prov : devDescs.entrySet()) {
984
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700985 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700986 final ProviderId provId = prov.getKey();
987 final DeviceDescriptions descs = prov.getValue();
988
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700989 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700990 descs.getDeviceDesc().timestamp());
991
992 for (Entry<PortNumber, Timestamped<PortDescription>>
993 portDesc : descs.getPortDescs().entrySet()) {
994
995 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700996 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700997 portDesc.getValue().timestamp());
998 }
999 }
1000 }
1001 }
1002
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001003 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001004 }
1005
1006 /**
1007 * Responds to anti-entropy advertisement message.
1008 * <P>
1009 * Notify sender about out-dated information using regular replication message.
1010 * Send back advertisement to sender if not in sync.
1011 *
1012 * @param advertisement to respond to
1013 */
1014 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1015
1016 final NodeId sender = advertisement.sender();
1017
1018 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1019 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1020 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1021
1022 // Fragments to request
1023 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1024 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1025
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001026 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001027 final DeviceId deviceId = de.getKey();
1028 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1029
1030 synchronized (lDevice) {
1031 // latestTimestamp across provider
1032 // Note: can be null initially
1033 Timestamp localLatest = offline.get(deviceId);
1034
1035 // handle device Ads
1036 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1037 final ProviderId provId = prov.getKey();
1038 final DeviceDescriptions lDeviceDescs = prov.getValue();
1039
1040 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1041
1042
1043 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1044 Timestamp advDevTimestamp = devAds.get(devFragId);
1045
1046 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1047 // remote does not have it or outdated, suggest
1048 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1049 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1050 // local is outdated, request
1051 reqDevices.add(devFragId);
1052 }
1053
1054 // handle port Ads
1055 for (Entry<PortNumber, Timestamped<PortDescription>>
1056 pe : lDeviceDescs.getPortDescs().entrySet()) {
1057
1058 final PortNumber num = pe.getKey();
1059 final Timestamped<PortDescription> lPort = pe.getValue();
1060
1061 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1062
1063 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001064 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001065 // remote does not have it or outdated, suggest
1066 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1067 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1068 // local is outdated, request
1069 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1070 reqPorts.add(portFragId);
1071 }
1072
1073 // remove port Ad already processed
1074 portAds.remove(portFragId);
1075 } // end local port loop
1076
1077 // remove device Ad already processed
1078 devAds.remove(devFragId);
1079
1080 // find latest and update
1081 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1082 if (localLatest == null ||
1083 providerLatest.compareTo(localLatest) > 0) {
1084 localLatest = providerLatest;
1085 }
1086 } // end local provider loop
1087
1088 // checking if remote timestamp is more recent.
1089 Timestamp rOffline = offlineAds.get(deviceId);
1090 if (rOffline != null &&
1091 rOffline.compareTo(localLatest) > 0) {
1092 // remote offline timestamp suggests that the
1093 // device is off-line
1094 markOfflineInternal(deviceId, rOffline);
1095 }
1096
1097 Timestamp lOffline = offline.get(deviceId);
1098 if (lOffline != null && rOffline == null) {
1099 // locally offline, but remote is online, suggest offline
1100 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1101 }
1102
1103 // remove device offline Ad already processed
1104 offlineAds.remove(deviceId);
1105 } // end local device loop
1106 } // device lock
1107
1108 // If there is any Ads left, request them
1109 log.trace("Ads left {}, {}", devAds, portAds);
1110 reqDevices.addAll(devAds.keySet());
1111 reqPorts.addAll(portAds.keySet());
1112
1113 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1114 log.trace("Nothing to request to remote peer {}", sender);
1115 return;
1116 }
1117
1118 log.info("Need to sync {} {}", reqDevices, reqPorts);
1119
1120 // 2-way Anti-Entropy for now
1121 try {
1122 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1123 } catch (IOException e) {
1124 log.error("Failed to send response advertisement to " + sender, e);
1125 }
1126
1127// Sketch of 3-way Anti-Entropy
1128// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1129// ClusterMessage message = new ClusterMessage(
1130// clusterService.getLocalNode().id(),
1131// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1132// SERIALIZER.encode(request));
1133//
1134// try {
1135// clusterCommunicator.unicast(message, advertisement.sender());
1136// } catch (IOException e) {
1137// log.error("Failed to send advertisement reply to "
1138// + advertisement.sender(), e);
1139// }
Madan Jampani47c93732014-10-06 20:46:08 -07001140 }
1141
Madan Jampani255a58b2014-10-09 12:08:20 -07001142 private void notifyDelegateIfNotNull(DeviceEvent event) {
1143 if (event != null) {
1144 notifyDelegate(event);
1145 }
1146 }
1147
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001148 private final class SendAdvertisementTask implements Runnable {
1149
1150 @Override
1151 public void run() {
1152 if (Thread.currentThread().isInterrupted()) {
1153 log.info("Interrupted, quitting");
1154 return;
1155 }
1156
1157 try {
1158 final NodeId self = clusterService.getLocalNode().id();
1159 Set<ControllerNode> nodes = clusterService.getNodes();
1160
1161 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1162 .transform(toNodeId())
1163 .toList();
1164
1165 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001166 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001167 return;
1168 }
1169
1170 NodeId peer;
1171 do {
1172 int idx = RandomUtils.nextInt(0, nodeIds.size());
1173 peer = nodeIds.get(idx);
1174 } while (peer.equals(self));
1175
1176 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1177
1178 if (Thread.currentThread().isInterrupted()) {
1179 log.info("Interrupted, quitting");
1180 return;
1181 }
1182
1183 try {
1184 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1185 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001186 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001187 return;
1188 }
1189 } catch (Exception e) {
1190 // catch all Exception to avoid Scheduled task being suppressed.
1191 log.error("Exception thrown while sending advertisement", e);
1192 }
1193 }
1194 }
1195
Madan Jampani47c93732014-10-06 20:46:08 -07001196 private class InternalDeviceEventListener implements ClusterMessageHandler {
1197 @Override
1198 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001199
Madan Jampani47c93732014-10-06 20:46:08 -07001200 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001201 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001202
Madan Jampani47c93732014-10-06 20:46:08 -07001203 ProviderId providerId = event.providerId();
1204 DeviceId deviceId = event.deviceId();
1205 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001206
Madan Jampani255a58b2014-10-09 12:08:20 -07001207 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001208 }
1209 }
1210
Madan Jampani25322532014-10-08 11:20:38 -07001211 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1212 @Override
1213 public void handle(ClusterMessage message) {
1214
1215 log.info("Received device offline event from peer: {}", message.sender());
1216 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1217
1218 DeviceId deviceId = event.deviceId();
1219 Timestamp timestamp = event.timestamp();
1220
Madan Jampani255a58b2014-10-09 12:08:20 -07001221 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001222 }
1223 }
1224
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001225 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1226 @Override
1227 public void handle(ClusterMessage message) {
1228
1229 log.info("Received device removed event from peer: {}", message.sender());
1230 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1231
1232 DeviceId deviceId = event.deviceId();
1233 Timestamp timestamp = event.timestamp();
1234
Madan Jampani255a58b2014-10-09 12:08:20 -07001235 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001236 }
1237 }
1238
Madan Jampani47c93732014-10-06 20:46:08 -07001239 private class InternalPortEventListener implements ClusterMessageHandler {
1240 @Override
1241 public void handle(ClusterMessage message) {
1242
1243 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001244 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001245
1246 ProviderId providerId = event.providerId();
1247 DeviceId deviceId = event.deviceId();
1248 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1249
Madan Jampani255a58b2014-10-09 12:08:20 -07001250 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001251 }
1252 }
1253
1254 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1255 @Override
1256 public void handle(ClusterMessage message) {
1257
1258 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001259 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001260 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001261
1262 ProviderId providerId = event.providerId();
1263 DeviceId deviceId = event.deviceId();
1264 Timestamped<PortDescription> portDescription = event.portDescription();
1265
Madan Jampani255a58b2014-10-09 12:08:20 -07001266 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001267 }
1268 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001269
1270 private final class InternalDeviceAdvertisementListener
1271 implements ClusterMessageHandler {
1272
1273 @Override
1274 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001275 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001276 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1277 handleAdvertisement(advertisement);
1278 }
1279 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001280}