blob: 531974946648d21eca167b032a6af68d3c523ef9 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07008import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070015import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070016import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070018import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070019import org.onlab.onos.net.DefaultAnnotations;
20import org.onlab.onos.net.DefaultDevice;
21import org.onlab.onos.net.DefaultPort;
22import org.onlab.onos.net.Device;
23import org.onlab.onos.net.Device.Type;
24import org.onlab.onos.net.DeviceId;
25import org.onlab.onos.net.Port;
26import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070027import org.onlab.onos.net.device.DeviceDescription;
28import org.onlab.onos.net.device.DeviceEvent;
29import org.onlab.onos.net.device.DeviceStore;
30import org.onlab.onos.net.device.DeviceStoreDelegate;
31import org.onlab.onos.net.device.PortDescription;
32import org.onlab.onos.net.provider.ProviderId;
33import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070034import org.onlab.onos.store.ClockService;
35import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070036import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
37import org.onlab.onos.store.cluster.messaging.ClusterMessage;
38import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070039import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIfa891c92014-10-09 15:21:40 -070040import org.onlab.onos.store.common.impl.Timestamped;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070041import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
42import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
43import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
Madan Jampani53e44e62014-10-07 12:39:51 -070044import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070045import org.onlab.onos.store.serializers.DistributedStoreSerializers;
Madan Jampani53e44e62014-10-07 12:39:51 -070046import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070054import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070055import java.util.HashSet;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
59import java.util.Map.Entry;
60import java.util.Objects;
61import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070062import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070063import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065
66import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static com.google.common.base.Predicates.notNull;
68import static org.onlab.onos.net.device.DeviceEvent.Type.*;
69import static org.slf4j.LoggerFactory.getLogger;
70import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
71import static org.onlab.onos.net.DefaultAnnotations.merge;
72import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import static org.onlab.util.Tools.namedThreads;
74import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
75import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
76import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070078// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070079/**
80 * Manages inventory of infrastructure devices using gossip protocol to distribute
81 * information.
82 */
83@Component(immediate = true)
84@Service
85public class GossipDeviceStore
86 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
87 implements DeviceStore {
88
89 private final Logger log = getLogger(getClass());
90
91 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
92
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070093 // TODO: Check if inner Map can be replaced with plain Map.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094 // innerMap is used to lock a Device, thus instance should never be replaced.
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070095
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070096 // collection of Description given from various providers
97 private final ConcurrentMap<DeviceId,
98 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070099 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
101 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700102 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
103 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
104
105 // to be updated under Device lock
106 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
107 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700108
109 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700110 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHId40483d2014-10-09 15:20:30 -0700113 protected ClockService clockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700114
Madan Jampani47c93732014-10-06 20:46:08 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ClusterCommunicationService clusterCommunicator;
117
Madan Jampani53e44e62014-10-07 12:39:51 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected ClusterService clusterService;
120
121 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700122 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700123 protected void setupKryoPool() {
124 serializerPool = KryoPool.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700125 .register(DistributedStoreSerializers.COMMON)
126
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700127 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700128 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700129 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700130 .register(InternalPortEvent.class, new InternalPortEventSerializer())
131 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700132 .register(DeviceAntiEntropyAdvertisement.class)
133 .register(DeviceFragmentId.class)
134 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700135 .build()
136 .populate(1);
137 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700138 };
139
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700140 private ScheduledExecutorService executor;
141
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700142 @Activate
143 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700144 clusterCommunicator.addSubscriber(
145 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700147 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
148 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700149 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
150 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700151 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
152 clusterCommunicator.addSubscriber(
153 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700154 clusterCommunicator.addSubscriber(
155 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
156
157 executor =
158 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
159
160 // TODO: Make these configurable
161 long initialDelaySec = 5;
162 long periodSec = 5;
163 // start anti-entropy thread
164 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
165 initialDelaySec, periodSec, TimeUnit.SECONDS);
166
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700167 log.info("Started");
168 }
169
170 @Deactivate
171 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700172
173 executor.shutdownNow();
174 try {
175 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
176 if (timedout) {
177 log.error("Timeout during executor shutdown");
178 }
179 } catch (InterruptedException e) {
180 log.error("Error during executor shutdown", e);
181 }
182
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700183 deviceDescs.clear();
184 devices.clear();
185 devicePorts.clear();
186 availableDevices.clear();
187 log.info("Stopped");
188 }
189
190 @Override
191 public int getDeviceCount() {
192 return devices.size();
193 }
194
195 @Override
196 public Iterable<Device> getDevices() {
197 return Collections.unmodifiableCollection(devices.values());
198 }
199
200 @Override
201 public Device getDevice(DeviceId deviceId) {
202 return devices.get(deviceId);
203 }
204
205 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700206 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
207 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208 DeviceDescription deviceDescription) {
209 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
210 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
211 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
212 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700213 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
214 providerId, deviceId);
215 try {
216 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
217 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700218 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700219 + providerId + " and deviceId: " + deviceId, e);
220 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700221 }
222 return event;
223 }
224
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700225 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
226 DeviceId deviceId,
227 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700228
229 // Collection of DeviceDescriptions for a Device
230 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700231 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700232
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233 synchronized (providerDescs) {
234 // locking per device
235
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700236 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
237 log.debug("Ignoring outdated event: {}", deltaDesc);
238 return null;
239 }
240
241 DeviceDescriptions descs
242 = createIfAbsentUnchecked(providerDescs, providerId,
243 new InitDeviceDescs(deltaDesc));
244
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700245 final Device oldDevice = devices.get(deviceId);
246 final Device newDevice;
247
248 if (deltaDesc == descs.getDeviceDesc() ||
249 deltaDesc.isNewer(descs.getDeviceDesc())) {
250 // on new device or valid update
251 descs.putDeviceDesc(deltaDesc);
252 newDevice = composeDevice(deviceId, providerDescs);
253 } else {
254 // outdated event, ignored.
255 return null;
256 }
257 if (oldDevice == null) {
258 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700259 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700260 } else {
261 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700262 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 }
264 }
265 }
266
267 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700268 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700269 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700270 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700271
272 // update composed device cache
273 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
274 verify(oldDevice == null,
275 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
276 providerId, oldDevice, newDevice);
277
278 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700279 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700280 }
281
282 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
283 }
284
285 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700286 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700287 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700288 Device oldDevice,
289 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290
291 // We allow only certain attributes to trigger update
292 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
293 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700294 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700295
296 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
297 if (!replaced) {
298 verify(replaced,
299 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
300 providerId, oldDevice, devices.get(newDevice.id())
301 , newDevice);
302 }
303 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700304 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700305 }
306 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
307 }
308
309 // Otherwise merely attempt to change availability if primary provider
310 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700311 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700312 return !added ? null :
313 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
314 }
315 return null;
316 }
317
318 @Override
319 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700320 Timestamp timestamp = clockService.getTimestamp(deviceId);
Madan Jampani25322532014-10-08 11:20:38 -0700321 DeviceEvent event = markOfflineInternal(deviceId, timestamp);
322 if (event != null) {
323 log.info("Notifying peers of a device offline topology event for deviceId: {}",
324 deviceId);
325 try {
326 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
327 } catch (IOException e) {
328 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
329 deviceId);
330 }
331 }
332 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700333 }
334
335 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
336
337 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700338 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700339
340 // locking device
341 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700342
343 // accept off-line if given timestamp is newer than
344 // the latest Timestamp from Primary provider
345 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
346 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
347 if (timestamp.compareTo(lastTimestamp) <= 0) {
348 // outdated event ignore
349 return null;
350 }
351
352 offline.put(deviceId, timestamp);
353
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700354 Device device = devices.get(deviceId);
355 if (device == null) {
356 return null;
357 }
358 boolean removed = availableDevices.remove(deviceId);
359 if (removed) {
360 // TODO: broadcast ... DOWN only?
361 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362 }
363 return null;
364 }
365 }
366
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700367 /**
368 * Marks the device as available if the given timestamp is not outdated,
369 * compared to the time the device has been marked offline.
370 *
371 * @param deviceId identifier of the device
372 * @param timestamp of the event triggering this change.
373 * @return true if availability change request was accepted and changed the state
374 */
375 // Guarded by deviceDescs value (=Device lock)
376 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
377 // accept on-line if given timestamp is newer than
378 // the latest offline request Timestamp
379 Timestamp offlineTimestamp = offline.get(deviceId);
380 if (offlineTimestamp == null ||
381 offlineTimestamp.compareTo(timestamp) < 0) {
382
383 offline.remove(deviceId);
384 return availableDevices.add(deviceId);
385 }
386 return false;
387 }
388
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700389 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700390 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
391 DeviceId deviceId,
392 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700393 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
394
Madan Jampani47c93732014-10-06 20:46:08 -0700395 Timestamped<List<PortDescription>> timestampedPortDescriptions =
396 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700397
Madan Jampani47c93732014-10-06 20:46:08 -0700398 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700399 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700400 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
401 providerId, deviceId);
402 try {
403 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
404 } catch (IOException e) {
405 log.error("Failed to notify peers of a port update topology event or providerId: "
406 + providerId + " and deviceId: " + deviceId, e);
407 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700408 }
409 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700410 }
411
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700412 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
413 DeviceId deviceId,
414 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415
416 Device device = devices.get(deviceId);
417 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
418
419 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
420 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
421
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700422 List<DeviceEvent> events = new ArrayList<>();
423 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700424
425 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
426 log.debug("Ignoring outdated events: {}", portDescriptions);
427 return null;
428 }
429
430 DeviceDescriptions descs = descsMap.get(providerId);
431 // every provider must provide DeviceDescription.
432 checkArgument(descs != null,
433 "Device description for Device ID %s from Provider %s was not found",
434 deviceId, providerId);
435
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700436 Map<PortNumber, Port> ports = getPortMap(deviceId);
437
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700438 final Timestamp newTimestamp = portDescriptions.timestamp();
439
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700440 // Add new ports
441 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700442 for (PortDescription portDescription : portDescriptions.value()) {
443 final PortNumber number = portDescription.portNumber();
444 processed.add(number);
445
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700446 final Port oldPort = ports.get(number);
447 final Port newPort;
448
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700449
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700450 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
451 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700452 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700453 // on new port or valid update
454 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700455 descs.putPortDesc(new Timestamped<>(portDescription,
456 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700457 newPort = composePort(device, number, descsMap);
458 } else {
459 // outdated event, ignored.
460 continue;
461 }
462
463 events.add(oldPort == null ?
464 createPort(device, newPort, ports) :
465 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466 }
467
468 events.addAll(pruneOldPorts(device, ports, processed));
469 }
470 return FluentIterable.from(events).filter(notNull()).toList();
471 }
472
473 // Creates a new port based on the port description adds it to the map and
474 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700475 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700476 private DeviceEvent createPort(Device device, Port newPort,
477 Map<PortNumber, Port> ports) {
478 ports.put(newPort.number(), newPort);
479 return new DeviceEvent(PORT_ADDED, device, newPort);
480 }
481
482 // Checks if the specified port requires update and if so, it replaces the
483 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700484 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700485 private DeviceEvent updatePort(Device device, Port oldPort,
486 Port newPort,
487 Map<PortNumber, Port> ports) {
488 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700489 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700490
491 ports.put(oldPort.number(), newPort);
492 return new DeviceEvent(PORT_UPDATED, device, newPort);
493 }
494 return null;
495 }
496
497 // Prunes the specified list of ports based on which ports are in the
498 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700499 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700500 private List<DeviceEvent> pruneOldPorts(Device device,
501 Map<PortNumber, Port> ports,
502 Set<PortNumber> processed) {
503 List<DeviceEvent> events = new ArrayList<>();
504 Iterator<PortNumber> iterator = ports.keySet().iterator();
505 while (iterator.hasNext()) {
506 PortNumber portNumber = iterator.next();
507 if (!processed.contains(portNumber)) {
508 events.add(new DeviceEvent(PORT_REMOVED, device,
509 ports.get(portNumber)));
510 iterator.remove();
511 }
512 }
513 return events;
514 }
515
516 // Gets the map of ports for the specified device; if one does not already
517 // exist, it creates and registers a new one.
518 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
519 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700520 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
521 }
522
523 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
524 DeviceId deviceId) {
525 return createIfAbsentUnchecked(deviceDescs, deviceId,
526 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700527 }
528
529 @Override
530 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
531 PortDescription portDescription) {
532 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
533 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
534 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
535 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700536 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
537 providerId, deviceId);
538 try {
539 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
540 } catch (IOException e) {
541 log.error("Failed to notify peers of a port status update topology event or providerId: "
542 + providerId + " and deviceId: " + deviceId, e);
543 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700544 }
545 return event;
546 }
547
548 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
549 Timestamped<PortDescription> deltaDesc) {
550
551 Device device = devices.get(deviceId);
552 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
553
554 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
555 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
556
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700557 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700558
559 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
560 log.debug("Ignoring outdated event: {}", deltaDesc);
561 return null;
562 }
563
564 DeviceDescriptions descs = descsMap.get(providerId);
565 // assuming all providers must to give DeviceDescription
566 checkArgument(descs != null,
567 "Device description for Device ID %s from Provider %s was not found",
568 deviceId, providerId);
569
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700570 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
571 final PortNumber number = deltaDesc.value().portNumber();
572 final Port oldPort = ports.get(number);
573 final Port newPort;
574
575 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
576 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700577 deltaDesc.isNewer(existingPortDesc)) {
578 // on new port or valid update
579 // update description
580 descs.putPortDesc(deltaDesc);
581 newPort = composePort(device, number, descsMap);
582 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700583 // same or outdated event, ignored.
584 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700585 return null;
586 }
587
588 if (oldPort == null) {
589 return createPort(device, newPort, ports);
590 } else {
591 return updatePort(device, oldPort, newPort, ports);
592 }
593 }
594 }
595
596 @Override
597 public List<Port> getPorts(DeviceId deviceId) {
598 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
599 if (ports == null) {
600 return Collections.emptyList();
601 }
602 return ImmutableList.copyOf(ports.values());
603 }
604
605 @Override
606 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
607 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
608 return ports == null ? null : ports.get(portNumber);
609 }
610
611 @Override
612 public boolean isAvailable(DeviceId deviceId) {
613 return availableDevices.contains(deviceId);
614 }
615
616 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700617 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
618 Timestamp timestamp = clockService.getTimestamp(deviceId);
619 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700620 if (event != null) {
621 log.info("Notifying peers of a device removed topology event for deviceId: {}",
622 deviceId);
623 try {
624 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
625 } catch (IOException e) {
626 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
627 deviceId);
628 }
629 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700630 return event;
631 }
632
633 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
634 Timestamp timestamp) {
635
636 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700637 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700638 // accept removal request if given timestamp is newer than
639 // the latest Timestamp from Primary provider
640 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
641 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
642 if (timestamp.compareTo(lastTimestamp) <= 0) {
643 // outdated event ignore
644 return null;
645 }
646 removalRequest.put(deviceId, timestamp);
647
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700648 Device device = devices.remove(deviceId);
649 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700650 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
651 if (ports != null) {
652 ports.clear();
653 }
654 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700655 descs.clear();
656 return device == null ? null :
657 new DeviceEvent(DEVICE_REMOVED, device, null);
658 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700659 }
660
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700661 /**
662 * Checks if given timestamp is superseded by removal request
663 * with more recent timestamp.
664 *
665 * @param deviceId identifier of a device
666 * @param timestampToCheck timestamp of an event to check
667 * @return true if device is already removed
668 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700669 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
670 Timestamp removalTimestamp = removalRequest.get(deviceId);
671 if (removalTimestamp != null &&
672 removalTimestamp.compareTo(timestampToCheck) >= 0) {
673 // removalRequest is more recent
674 return true;
675 }
676 return false;
677 }
678
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700679 /**
680 * Returns a Device, merging description given from multiple Providers.
681 *
682 * @param deviceId device identifier
683 * @param providerDescs Collection of Descriptions from multiple providers
684 * @return Device instance
685 */
686 private Device composeDevice(DeviceId deviceId,
687 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
688
689 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
690
691 ProviderId primary = pickPrimaryPID(providerDescs);
692
693 DeviceDescriptions desc = providerDescs.get(primary);
694
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700695 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700696 Type type = base.type();
697 String manufacturer = base.manufacturer();
698 String hwVersion = base.hwVersion();
699 String swVersion = base.swVersion();
700 String serialNumber = base.serialNumber();
701 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
702 annotations = merge(annotations, base.annotations());
703
704 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
705 if (e.getKey().equals(primary)) {
706 continue;
707 }
708 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700709 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700710 // Currently assuming there will never be a key conflict between
711 // providers
712
713 // annotation merging. not so efficient, should revisit later
714 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
715 }
716
717 return new DefaultDevice(primary, deviceId , type, manufacturer,
718 hwVersion, swVersion, serialNumber, annotations);
719 }
720
721 /**
722 * Returns a Port, merging description given from multiple Providers.
723 *
724 * @param device device the port is on
725 * @param number port number
726 * @param providerDescs Collection of Descriptions from multiple providers
727 * @return Port instance
728 */
729 private Port composePort(Device device, PortNumber number,
730 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
731
732 ProviderId primary = pickPrimaryPID(providerDescs);
733 DeviceDescriptions primDescs = providerDescs.get(primary);
734 // if no primary, assume not enabled
735 // TODO: revisit this default port enabled/disabled behavior
736 boolean isEnabled = false;
737 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
738
739 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
740 if (portDesc != null) {
741 isEnabled = portDesc.value().isEnabled();
742 annotations = merge(annotations, portDesc.value().annotations());
743 }
744
745 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
746 if (e.getKey().equals(primary)) {
747 continue;
748 }
749 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700750 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700751 // Currently assuming there will never be a key conflict between
752 // providers
753
754 // annotation merging. not so efficient, should revisit later
755 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
756 if (otherPortDesc != null) {
757 annotations = merge(annotations, otherPortDesc.value().annotations());
758 }
759 }
760
761 return new DefaultPort(device, number, isEnabled, annotations);
762 }
763
764 /**
765 * @return primary ProviderID, or randomly chosen one if none exists
766 */
767 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700768 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700769 ProviderId fallBackPrimary = null;
770 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
771 if (!e.getKey().isAncillary()) {
772 return e.getKey();
773 } else if (fallBackPrimary == null) {
774 // pick randomly as a fallback in case there is no primary
775 fallBackPrimary = e.getKey();
776 }
777 }
778 return fallBackPrimary;
779 }
780
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700781 private DeviceDescriptions getPrimaryDescriptions(
782 Map<ProviderId, DeviceDescriptions> providerDescs) {
783 ProviderId pid = pickPrimaryPID(providerDescs);
784 return providerDescs.get(pid);
785 }
786
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700787 // TODO: should we be throwing exception?
788 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
789 ClusterMessage message = new ClusterMessage(
790 clusterService.getLocalNode().id(),
791 subject,
792 SERIALIZER.encode(event));
793 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700794 }
795
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700796 // TODO: should we be throwing exception?
797 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
798 ClusterMessage message = new ClusterMessage(
799 clusterService.getLocalNode().id(),
800 subject,
801 SERIALIZER.encode(event));
802 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700803 }
Madan Jampani47c93732014-10-06 20:46:08 -0700804
805 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700806 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700807 }
808
Madan Jampani25322532014-10-08 11:20:38 -0700809 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700810 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700811 }
812
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700813 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700814 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700815 }
816
Madan Jampani47c93732014-10-06 20:46:08 -0700817 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700818 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700819 }
820
821 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700822 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
823 }
824
825 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
826 try {
827 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
828 } catch (IOException e) {
829 log.error("Failed to send" + event + " to " + recipient, e);
830 }
831 }
832
833 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
834 try {
835 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
836 } catch (IOException e) {
837 log.error("Failed to send" + event + " to " + recipient, e);
838 }
839 }
840
841 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
842 try {
843 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
844 } catch (IOException e) {
845 log.error("Failed to send" + event + " to " + recipient, e);
846 }
847 }
848
849 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
850 try {
851 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
852 } catch (IOException e) {
853 log.error("Failed to send" + event + " to " + recipient, e);
854 }
855 }
856
857 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
858 try {
859 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
860 } catch (IOException e) {
861 log.error("Failed to send" + event + " to " + recipient, e);
862 }
863 }
864
865 private DeviceAntiEntropyAdvertisement createAdvertisement() {
866 final NodeId self = clusterService.getLocalNode().id();
867
868 Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
869 final int portsPerDevice = 8; // random guess to minimize reallocation
870 Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
871 Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
872
873 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
874 provs : deviceDescs.entrySet()) {
875
876 final DeviceId deviceId = provs.getKey();
877 final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
878 synchronized (devDescs) {
879
880 offline.put(deviceId, this.offline.get(deviceId));
881
882 for (Entry<ProviderId, DeviceDescriptions>
883 prov : devDescs.entrySet()) {
884
885 final ProviderId provId = prov.getKey();
886 final DeviceDescriptions descs = prov.getValue();
887
888 devices.put(new DeviceFragmentId(deviceId, provId),
889 descs.getDeviceDesc().timestamp());
890
891 for (Entry<PortNumber, Timestamped<PortDescription>>
892 portDesc : descs.getPortDescs().entrySet()) {
893
894 final PortNumber number = portDesc.getKey();
895 ports.put(new PortFragmentId(deviceId, provId, number),
896 portDesc.getValue().timestamp());
897 }
898 }
899 }
900 }
901
902 return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
903 }
904
905 /**
906 * Responds to anti-entropy advertisement message.
907 * <P>
908 * Notify sender about out-dated information using regular replication message.
909 * Send back advertisement to sender if not in sync.
910 *
911 * @param advertisement to respond to
912 */
913 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
914
915 final NodeId sender = advertisement.sender();
916
917 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
918 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
919 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
920
921 // Fragments to request
922 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
923 Collection<PortFragmentId> reqPorts = new ArrayList<>();
924
925 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
926 final DeviceId deviceId = de.getKey();
927 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
928
929 synchronized (lDevice) {
930 // latestTimestamp across provider
931 // Note: can be null initially
932 Timestamp localLatest = offline.get(deviceId);
933
934 // handle device Ads
935 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
936 final ProviderId provId = prov.getKey();
937 final DeviceDescriptions lDeviceDescs = prov.getValue();
938
939 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
940
941
942 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
943 Timestamp advDevTimestamp = devAds.get(devFragId);
944
945 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
946 // remote does not have it or outdated, suggest
947 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
948 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
949 // local is outdated, request
950 reqDevices.add(devFragId);
951 }
952
953 // handle port Ads
954 for (Entry<PortNumber, Timestamped<PortDescription>>
955 pe : lDeviceDescs.getPortDescs().entrySet()) {
956
957 final PortNumber num = pe.getKey();
958 final Timestamped<PortDescription> lPort = pe.getValue();
959
960 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
961
962 Timestamp advPortTimestamp = portAds.get(portFragId);
963 if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
964 // remote does not have it or outdated, suggest
965 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
966 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
967 // local is outdated, request
968 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
969 reqPorts.add(portFragId);
970 }
971
972 // remove port Ad already processed
973 portAds.remove(portFragId);
974 } // end local port loop
975
976 // remove device Ad already processed
977 devAds.remove(devFragId);
978
979 // find latest and update
980 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
981 if (localLatest == null ||
982 providerLatest.compareTo(localLatest) > 0) {
983 localLatest = providerLatest;
984 }
985 } // end local provider loop
986
987 // checking if remote timestamp is more recent.
988 Timestamp rOffline = offlineAds.get(deviceId);
989 if (rOffline != null &&
990 rOffline.compareTo(localLatest) > 0) {
991 // remote offline timestamp suggests that the
992 // device is off-line
993 markOfflineInternal(deviceId, rOffline);
994 }
995
996 Timestamp lOffline = offline.get(deviceId);
997 if (lOffline != null && rOffline == null) {
998 // locally offline, but remote is online, suggest offline
999 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1000 }
1001
1002 // remove device offline Ad already processed
1003 offlineAds.remove(deviceId);
1004 } // end local device loop
1005 } // device lock
1006
1007 // If there is any Ads left, request them
1008 log.trace("Ads left {}, {}", devAds, portAds);
1009 reqDevices.addAll(devAds.keySet());
1010 reqPorts.addAll(portAds.keySet());
1011
1012 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1013 log.trace("Nothing to request to remote peer {}", sender);
1014 return;
1015 }
1016
1017 log.info("Need to sync {} {}", reqDevices, reqPorts);
1018
1019 // 2-way Anti-Entropy for now
1020 try {
1021 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1022 } catch (IOException e) {
1023 log.error("Failed to send response advertisement to " + sender, e);
1024 }
1025
1026// Sketch of 3-way Anti-Entropy
1027// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1028// ClusterMessage message = new ClusterMessage(
1029// clusterService.getLocalNode().id(),
1030// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1031// SERIALIZER.encode(request));
1032//
1033// try {
1034// clusterCommunicator.unicast(message, advertisement.sender());
1035// } catch (IOException e) {
1036// log.error("Failed to send advertisement reply to "
1037// + advertisement.sender(), e);
1038// }
Madan Jampani47c93732014-10-06 20:46:08 -07001039 }
1040
Madan Jampani255a58b2014-10-09 12:08:20 -07001041 private void notifyDelegateIfNotNull(DeviceEvent event) {
1042 if (event != null) {
1043 notifyDelegate(event);
1044 }
1045 }
1046
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001047 private final class SendAdvertisementTask implements Runnable {
1048
1049 @Override
1050 public void run() {
1051 if (Thread.currentThread().isInterrupted()) {
1052 log.info("Interrupted, quitting");
1053 return;
1054 }
1055
1056 try {
1057 final NodeId self = clusterService.getLocalNode().id();
1058 Set<ControllerNode> nodes = clusterService.getNodes();
1059
1060 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1061 .transform(toNodeId())
1062 .toList();
1063
1064 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1065 log.info("No other peers in the cluster.");
1066 return;
1067 }
1068
1069 NodeId peer;
1070 do {
1071 int idx = RandomUtils.nextInt(0, nodeIds.size());
1072 peer = nodeIds.get(idx);
1073 } while (peer.equals(self));
1074
1075 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1076
1077 if (Thread.currentThread().isInterrupted()) {
1078 log.info("Interrupted, quitting");
1079 return;
1080 }
1081
1082 try {
1083 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1084 } catch (IOException e) {
1085 log.error("Failed to send anti-entropy advertisement", e);
1086 return;
1087 }
1088 } catch (Exception e) {
1089 // catch all Exception to avoid Scheduled task being suppressed.
1090 log.error("Exception thrown while sending advertisement", e);
1091 }
1092 }
1093 }
1094
Madan Jampani47c93732014-10-06 20:46:08 -07001095 private class InternalDeviceEventListener implements ClusterMessageHandler {
1096 @Override
1097 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001098
Madan Jampani47c93732014-10-06 20:46:08 -07001099 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001100 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001101
Madan Jampani47c93732014-10-06 20:46:08 -07001102 ProviderId providerId = event.providerId();
1103 DeviceId deviceId = event.deviceId();
1104 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001105
Madan Jampani255a58b2014-10-09 12:08:20 -07001106 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001107 }
1108 }
1109
Madan Jampani25322532014-10-08 11:20:38 -07001110 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1111 @Override
1112 public void handle(ClusterMessage message) {
1113
1114 log.info("Received device offline event from peer: {}", message.sender());
1115 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1116
1117 DeviceId deviceId = event.deviceId();
1118 Timestamp timestamp = event.timestamp();
1119
Madan Jampani255a58b2014-10-09 12:08:20 -07001120 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001121 }
1122 }
1123
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001124 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1125 @Override
1126 public void handle(ClusterMessage message) {
1127
1128 log.info("Received device removed event from peer: {}", message.sender());
1129 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1130
1131 DeviceId deviceId = event.deviceId();
1132 Timestamp timestamp = event.timestamp();
1133
Madan Jampani255a58b2014-10-09 12:08:20 -07001134 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001135 }
1136 }
1137
Madan Jampani47c93732014-10-06 20:46:08 -07001138 private class InternalPortEventListener implements ClusterMessageHandler {
1139 @Override
1140 public void handle(ClusterMessage message) {
1141
1142 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001143 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001144
1145 ProviderId providerId = event.providerId();
1146 DeviceId deviceId = event.deviceId();
1147 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1148
Madan Jampani255a58b2014-10-09 12:08:20 -07001149 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001150 }
1151 }
1152
1153 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1154 @Override
1155 public void handle(ClusterMessage message) {
1156
1157 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001158 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001159 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001160
1161 ProviderId providerId = event.providerId();
1162 DeviceId deviceId = event.deviceId();
1163 Timestamped<PortDescription> portDescription = event.portDescription();
1164
Madan Jampani255a58b2014-10-09 12:08:20 -07001165 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001166 }
1167 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001168
1169 private final class InternalDeviceAdvertisementListener
1170 implements ClusterMessageHandler {
1171
1172 @Override
1173 public void handle(ClusterMessage message) {
1174 log.info("Received Device advertisement from peer: {}", message.sender());
1175 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1176 handleAdvertisement(advertisement);
1177 }
1178 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001179}