blob: 12ecf74d75140a94eb65cb26f37c573602fb515a [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070019import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import org.onlab.onos.net.DefaultAnnotations;
21import org.onlab.onos.net.DefaultDevice;
22import org.onlab.onos.net.DefaultPort;
23import org.onlab.onos.net.Device;
24import org.onlab.onos.net.Device.Type;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.Port;
27import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070028import org.onlab.onos.net.device.DeviceDescription;
29import org.onlab.onos.net.device.DeviceEvent;
30import org.onlab.onos.net.device.DeviceStore;
31import org.onlab.onos.net.device.DeviceStoreDelegate;
32import org.onlab.onos.net.device.PortDescription;
33import org.onlab.onos.net.provider.ProviderId;
34import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070035import org.onlab.onos.store.ClockService;
36import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIfa891c92014-10-09 15:21:40 -070041import org.onlab.onos.store.common.impl.Timestamped;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070042import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
43import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
44import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
Madan Jampani53e44e62014-10-07 12:39:51 -070045import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070046import org.onlab.onos.store.serializers.DistributedStoreSerializers;
Madan Jampani53e44e62014-10-07 12:39:51 -070047import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070048import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070049import org.slf4j.Logger;
50
Madan Jampani47c93732014-10-06 20:46:08 -070051import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070052import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070053import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070054import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070055import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070056import java.util.HashSet;
57import java.util.Iterator;
58import java.util.List;
59import java.util.Map;
60import java.util.Map.Entry;
61import java.util.Objects;
62import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070063import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070064import java.util.concurrent.ScheduledExecutorService;
65import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066
67import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070068import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070069import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070import static org.onlab.onos.net.device.DeviceEvent.Type.*;
71import static org.slf4j.LoggerFactory.getLogger;
72import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
73import static org.onlab.onos.net.DefaultAnnotations.merge;
74import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070075import static org.onlab.util.Tools.namedThreads;
76import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
77import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070078
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070079// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070080/**
81 * Manages inventory of infrastructure devices using gossip protocol to distribute
82 * information.
83 */
84@Component(immediate = true)
85@Service
86public class GossipDeviceStore
87 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
88 implements DeviceStore {
89
90 private final Logger log = getLogger(getClass());
91
92 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
93
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070094 // TODO: Check if inner Map can be replaced with plain Map.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095 // innerMap is used to lock a Device, thus instance should never be replaced.
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070096
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097 // collection of Description given from various providers
98 private final ConcurrentMap<DeviceId,
99 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700100 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101
102 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700103 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
104 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
105
106 // to be updated under Device lock
107 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
108 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700109
110 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700111 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHId40483d2014-10-09 15:20:30 -0700114 protected ClockService clockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115
Madan Jampani47c93732014-10-06 20:46:08 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterCommunicationService clusterCommunicator;
118
Madan Jampani53e44e62014-10-07 12:39:51 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterService clusterService;
121
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700122 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700123 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700124 protected void setupKryoPool() {
125 serializerPool = KryoPool.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700126 .register(DistributedStoreSerializers.COMMON)
127
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700128 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700129 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700130 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700131 .register(InternalPortEvent.class, new InternalPortEventSerializer())
132 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700133 .register(DeviceAntiEntropyAdvertisement.class)
134 .register(DeviceFragmentId.class)
135 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700136 .build()
137 .populate(1);
138 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700139 };
140
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700141 private ScheduledExecutorService executor;
142
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700143 @Activate
144 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700145 clusterCommunicator.addSubscriber(
146 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
147 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700148 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
149 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700150 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
151 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700152 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
153 clusterCommunicator.addSubscriber(
154 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700155 clusterCommunicator.addSubscriber(
156 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
157
158 executor =
159 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
160
161 // TODO: Make these configurable
162 long initialDelaySec = 5;
163 long periodSec = 5;
164 // start anti-entropy thread
165 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
166 initialDelaySec, periodSec, TimeUnit.SECONDS);
167
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700168 log.info("Started");
169 }
170
171 @Deactivate
172 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700173
174 executor.shutdownNow();
175 try {
176 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
177 if (timedout) {
178 log.error("Timeout during executor shutdown");
179 }
180 } catch (InterruptedException e) {
181 log.error("Error during executor shutdown", e);
182 }
183
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700184 deviceDescs.clear();
185 devices.clear();
186 devicePorts.clear();
187 availableDevices.clear();
188 log.info("Stopped");
189 }
190
191 @Override
192 public int getDeviceCount() {
193 return devices.size();
194 }
195
196 @Override
197 public Iterable<Device> getDevices() {
198 return Collections.unmodifiableCollection(devices.values());
199 }
200
201 @Override
202 public Device getDevice(DeviceId deviceId) {
203 return devices.get(deviceId);
204 }
205
206 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700207 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
208 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700209 DeviceDescription deviceDescription) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700210 final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700211 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700212 final DeviceEvent event;
213 final Timestamped<DeviceDescription> mergedDesc;
214 synchronized (getDeviceDescriptions(deviceId)) {
215 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
216 mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
217 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700218 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700219 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
220 providerId, deviceId);
221 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700222 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700223 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700224 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700225 + providerId + " and deviceId: " + deviceId, e);
226 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700227 }
228 return event;
229 }
230
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700231 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
232 DeviceId deviceId,
233 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700234
235 // Collection of DeviceDescriptions for a Device
236 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700237 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700238
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700239 synchronized (providerDescs) {
240 // locking per device
241
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700242 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
243 log.debug("Ignoring outdated event: {}", deltaDesc);
244 return null;
245 }
246
247 DeviceDescriptions descs
248 = createIfAbsentUnchecked(providerDescs, providerId,
249 new InitDeviceDescs(deltaDesc));
250
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700251 final Device oldDevice = devices.get(deviceId);
252 final Device newDevice;
253
254 if (deltaDesc == descs.getDeviceDesc() ||
255 deltaDesc.isNewer(descs.getDeviceDesc())) {
256 // on new device or valid update
257 descs.putDeviceDesc(deltaDesc);
258 newDevice = composeDevice(deviceId, providerDescs);
259 } else {
260 // outdated event, ignored.
261 return null;
262 }
263 if (oldDevice == null) {
264 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 } else {
267 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700268 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700269 }
270 }
271 }
272
273 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700274 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700275 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700276 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700277
278 // update composed device cache
279 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
280 verify(oldDevice == null,
281 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
282 providerId, oldDevice, newDevice);
283
284 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700285 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700286 }
287
288 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
289 }
290
291 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700292 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700294 Device oldDevice,
295 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700296
297 // We allow only certain attributes to trigger update
298 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
299 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700300 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700301
302 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
303 if (!replaced) {
304 verify(replaced,
305 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
306 providerId, oldDevice, devices.get(newDevice.id())
307 , newDevice);
308 }
309 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700310 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700311 }
312 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
313 }
314
315 // Otherwise merely attempt to change availability if primary provider
316 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700317 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700318 return !added ? null :
319 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
320 }
321 return null;
322 }
323
324 @Override
325 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700326 final Timestamp timestamp = clockService.getTimestamp(deviceId);
327 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700328 if (event != null) {
329 log.info("Notifying peers of a device offline topology event for deviceId: {}",
330 deviceId);
331 try {
332 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
333 } catch (IOException e) {
334 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
335 deviceId);
336 }
337 }
338 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700339 }
340
341 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
342
343 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700344 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700345
346 // locking device
347 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700348
349 // accept off-line if given timestamp is newer than
350 // the latest Timestamp from Primary provider
351 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
352 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
353 if (timestamp.compareTo(lastTimestamp) <= 0) {
354 // outdated event ignore
355 return null;
356 }
357
358 offline.put(deviceId, timestamp);
359
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700360 Device device = devices.get(deviceId);
361 if (device == null) {
362 return null;
363 }
364 boolean removed = availableDevices.remove(deviceId);
365 if (removed) {
366 // TODO: broadcast ... DOWN only?
367 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700368 }
369 return null;
370 }
371 }
372
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700373 /**
374 * Marks the device as available if the given timestamp is not outdated,
375 * compared to the time the device has been marked offline.
376 *
377 * @param deviceId identifier of the device
378 * @param timestamp of the event triggering this change.
379 * @return true if availability change request was accepted and changed the state
380 */
381 // Guarded by deviceDescs value (=Device lock)
382 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
383 // accept on-line if given timestamp is newer than
384 // the latest offline request Timestamp
385 Timestamp offlineTimestamp = offline.get(deviceId);
386 if (offlineTimestamp == null ||
387 offlineTimestamp.compareTo(timestamp) < 0) {
388
389 offline.remove(deviceId);
390 return availableDevices.add(deviceId);
391 }
392 return false;
393 }
394
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700396 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
397 DeviceId deviceId,
398 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700399
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700400 final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700401
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700402 final Timestamped<List<PortDescription>> timestampedInput
403 = new Timestamped<>(portDescriptions, newTimestamp);
404 final List<DeviceEvent> events;
405 final Timestamped<List<PortDescription>> merged;
406
407 synchronized (getDeviceDescriptions(deviceId)) {
408 events = updatePortsInternal(providerId, deviceId, timestampedInput);
409 final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
410 List<PortDescription> mergedList =
411 FluentIterable.from(portDescriptions)
412 .transform(new Function<PortDescription, PortDescription>() {
413 @Override
414 public PortDescription apply(PortDescription input) {
415 // lookup merged port description
416 return descs.getPortDesc(input.portNumber()).value();
417 }
418 }).toList();
419 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
420 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700421 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700422 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
423 providerId, deviceId);
424 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700425 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700426 } catch (IOException e) {
427 log.error("Failed to notify peers of a port update topology event or providerId: "
428 + providerId + " and deviceId: " + deviceId, e);
429 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700430 }
431 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700432 }
433
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700434 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
435 DeviceId deviceId,
436 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700437
438 Device device = devices.get(deviceId);
439 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
440
441 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
442 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
443
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700444 List<DeviceEvent> events = new ArrayList<>();
445 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700446
447 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
448 log.debug("Ignoring outdated events: {}", portDescriptions);
449 return null;
450 }
451
452 DeviceDescriptions descs = descsMap.get(providerId);
453 // every provider must provide DeviceDescription.
454 checkArgument(descs != null,
455 "Device description for Device ID %s from Provider %s was not found",
456 deviceId, providerId);
457
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700458 Map<PortNumber, Port> ports = getPortMap(deviceId);
459
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700460 final Timestamp newTimestamp = portDescriptions.timestamp();
461
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700462 // Add new ports
463 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700464 for (PortDescription portDescription : portDescriptions.value()) {
465 final PortNumber number = portDescription.portNumber();
466 processed.add(number);
467
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700468 final Port oldPort = ports.get(number);
469 final Port newPort;
470
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700471
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700472 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
473 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700474 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700475 // on new port or valid update
476 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700477 descs.putPortDesc(new Timestamped<>(portDescription,
478 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700479 newPort = composePort(device, number, descsMap);
480 } else {
481 // outdated event, ignored.
482 continue;
483 }
484
485 events.add(oldPort == null ?
486 createPort(device, newPort, ports) :
487 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700488 }
489
490 events.addAll(pruneOldPorts(device, ports, processed));
491 }
492 return FluentIterable.from(events).filter(notNull()).toList();
493 }
494
495 // Creates a new port based on the port description adds it to the map and
496 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700497 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700498 private DeviceEvent createPort(Device device, Port newPort,
499 Map<PortNumber, Port> ports) {
500 ports.put(newPort.number(), newPort);
501 return new DeviceEvent(PORT_ADDED, device, newPort);
502 }
503
504 // Checks if the specified port requires update and if so, it replaces the
505 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700506 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700507 private DeviceEvent updatePort(Device device, Port oldPort,
508 Port newPort,
509 Map<PortNumber, Port> ports) {
510 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700511 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700512
513 ports.put(oldPort.number(), newPort);
514 return new DeviceEvent(PORT_UPDATED, device, newPort);
515 }
516 return null;
517 }
518
519 // Prunes the specified list of ports based on which ports are in the
520 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700521 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700522 private List<DeviceEvent> pruneOldPorts(Device device,
523 Map<PortNumber, Port> ports,
524 Set<PortNumber> processed) {
525 List<DeviceEvent> events = new ArrayList<>();
526 Iterator<PortNumber> iterator = ports.keySet().iterator();
527 while (iterator.hasNext()) {
528 PortNumber portNumber = iterator.next();
529 if (!processed.contains(portNumber)) {
530 events.add(new DeviceEvent(PORT_REMOVED, device,
531 ports.get(portNumber)));
532 iterator.remove();
533 }
534 }
535 return events;
536 }
537
538 // Gets the map of ports for the specified device; if one does not already
539 // exist, it creates and registers a new one.
540 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
541 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700542 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
543 }
544
545 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
546 DeviceId deviceId) {
547 return createIfAbsentUnchecked(deviceDescs, deviceId,
548 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700549 }
550
551 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700552 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
553 DeviceId deviceId,
554 PortDescription portDescription) {
555
556 final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
557 final Timestamped<PortDescription> deltaDesc
558 = new Timestamped<>(portDescription, newTimestamp);
559 final DeviceEvent event;
560 final Timestamped<PortDescription> mergedDesc;
561 synchronized (getDeviceDescriptions(deviceId)) {
562 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
563 mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
564 .getPortDesc(portDescription.portNumber());
565 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700566 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700567 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
568 providerId, deviceId);
569 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700570 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700571 } catch (IOException e) {
572 log.error("Failed to notify peers of a port status update topology event or providerId: "
573 + providerId + " and deviceId: " + deviceId, e);
574 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700575 }
576 return event;
577 }
578
579 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
580 Timestamped<PortDescription> deltaDesc) {
581
582 Device device = devices.get(deviceId);
583 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
584
585 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
586 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
587
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700588 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700589
590 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
591 log.debug("Ignoring outdated event: {}", deltaDesc);
592 return null;
593 }
594
595 DeviceDescriptions descs = descsMap.get(providerId);
596 // assuming all providers must to give DeviceDescription
597 checkArgument(descs != null,
598 "Device description for Device ID %s from Provider %s was not found",
599 deviceId, providerId);
600
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700601 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
602 final PortNumber number = deltaDesc.value().portNumber();
603 final Port oldPort = ports.get(number);
604 final Port newPort;
605
606 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
607 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700608 deltaDesc.isNewer(existingPortDesc)) {
609 // on new port or valid update
610 // update description
611 descs.putPortDesc(deltaDesc);
612 newPort = composePort(device, number, descsMap);
613 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700614 // same or outdated event, ignored.
615 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700616 return null;
617 }
618
619 if (oldPort == null) {
620 return createPort(device, newPort, ports);
621 } else {
622 return updatePort(device, oldPort, newPort, ports);
623 }
624 }
625 }
626
627 @Override
628 public List<Port> getPorts(DeviceId deviceId) {
629 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
630 if (ports == null) {
631 return Collections.emptyList();
632 }
633 return ImmutableList.copyOf(ports.values());
634 }
635
636 @Override
637 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
638 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
639 return ports == null ? null : ports.get(portNumber);
640 }
641
642 @Override
643 public boolean isAvailable(DeviceId deviceId) {
644 return availableDevices.contains(deviceId);
645 }
646
647 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700648 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
649 Timestamp timestamp = clockService.getTimestamp(deviceId);
650 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700651 if (event != null) {
652 log.info("Notifying peers of a device removed topology event for deviceId: {}",
653 deviceId);
654 try {
655 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
656 } catch (IOException e) {
657 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
658 deviceId);
659 }
660 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700661 return event;
662 }
663
664 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
665 Timestamp timestamp) {
666
667 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700668 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700669 // accept removal request if given timestamp is newer than
670 // the latest Timestamp from Primary provider
671 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
672 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
673 if (timestamp.compareTo(lastTimestamp) <= 0) {
674 // outdated event ignore
675 return null;
676 }
677 removalRequest.put(deviceId, timestamp);
678
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700679 Device device = devices.remove(deviceId);
680 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700681 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
682 if (ports != null) {
683 ports.clear();
684 }
685 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700686 descs.clear();
687 return device == null ? null :
688 new DeviceEvent(DEVICE_REMOVED, device, null);
689 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700690 }
691
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700692 /**
693 * Checks if given timestamp is superseded by removal request
694 * with more recent timestamp.
695 *
696 * @param deviceId identifier of a device
697 * @param timestampToCheck timestamp of an event to check
698 * @return true if device is already removed
699 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700700 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
701 Timestamp removalTimestamp = removalRequest.get(deviceId);
702 if (removalTimestamp != null &&
703 removalTimestamp.compareTo(timestampToCheck) >= 0) {
704 // removalRequest is more recent
705 return true;
706 }
707 return false;
708 }
709
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700710 /**
711 * Returns a Device, merging description given from multiple Providers.
712 *
713 * @param deviceId device identifier
714 * @param providerDescs Collection of Descriptions from multiple providers
715 * @return Device instance
716 */
717 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700718 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700719
720 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
721
722 ProviderId primary = pickPrimaryPID(providerDescs);
723
724 DeviceDescriptions desc = providerDescs.get(primary);
725
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700726 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700727 Type type = base.type();
728 String manufacturer = base.manufacturer();
729 String hwVersion = base.hwVersion();
730 String swVersion = base.swVersion();
731 String serialNumber = base.serialNumber();
732 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
733 annotations = merge(annotations, base.annotations());
734
735 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
736 if (e.getKey().equals(primary)) {
737 continue;
738 }
739 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700740 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700741 // Currently assuming there will never be a key conflict between
742 // providers
743
744 // annotation merging. not so efficient, should revisit later
745 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
746 }
747
748 return new DefaultDevice(primary, deviceId , type, manufacturer,
749 hwVersion, swVersion, serialNumber, annotations);
750 }
751
752 /**
753 * Returns a Port, merging description given from multiple Providers.
754 *
755 * @param device device the port is on
756 * @param number port number
757 * @param providerDescs Collection of Descriptions from multiple providers
758 * @return Port instance
759 */
760 private Port composePort(Device device, PortNumber number,
761 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
762
763 ProviderId primary = pickPrimaryPID(providerDescs);
764 DeviceDescriptions primDescs = providerDescs.get(primary);
765 // if no primary, assume not enabled
766 // TODO: revisit this default port enabled/disabled behavior
767 boolean isEnabled = false;
768 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
769
770 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
771 if (portDesc != null) {
772 isEnabled = portDesc.value().isEnabled();
773 annotations = merge(annotations, portDesc.value().annotations());
774 }
775
776 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
777 if (e.getKey().equals(primary)) {
778 continue;
779 }
780 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700781 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700782 // Currently assuming there will never be a key conflict between
783 // providers
784
785 // annotation merging. not so efficient, should revisit later
786 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
787 if (otherPortDesc != null) {
788 annotations = merge(annotations, otherPortDesc.value().annotations());
789 }
790 }
791
792 return new DefaultPort(device, number, isEnabled, annotations);
793 }
794
795 /**
796 * @return primary ProviderID, or randomly chosen one if none exists
797 */
798 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700799 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700800 ProviderId fallBackPrimary = null;
801 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
802 if (!e.getKey().isAncillary()) {
803 return e.getKey();
804 } else if (fallBackPrimary == null) {
805 // pick randomly as a fallback in case there is no primary
806 fallBackPrimary = e.getKey();
807 }
808 }
809 return fallBackPrimary;
810 }
811
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700812 private DeviceDescriptions getPrimaryDescriptions(
813 Map<ProviderId, DeviceDescriptions> providerDescs) {
814 ProviderId pid = pickPrimaryPID(providerDescs);
815 return providerDescs.get(pid);
816 }
817
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700818 // TODO: should we be throwing exception?
819 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
820 ClusterMessage message = new ClusterMessage(
821 clusterService.getLocalNode().id(),
822 subject,
823 SERIALIZER.encode(event));
824 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700825 }
826
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700827 // TODO: should we be throwing exception?
828 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
829 ClusterMessage message = new ClusterMessage(
830 clusterService.getLocalNode().id(),
831 subject,
832 SERIALIZER.encode(event));
833 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700834 }
Madan Jampani47c93732014-10-06 20:46:08 -0700835
836 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700837 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700838 }
839
Madan Jampani25322532014-10-08 11:20:38 -0700840 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700841 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700842 }
843
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700844 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700845 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700846 }
847
Madan Jampani47c93732014-10-06 20:46:08 -0700848 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700849 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700850 }
851
852 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700853 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
854 }
855
856 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
857 try {
858 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
859 } catch (IOException e) {
860 log.error("Failed to send" + event + " to " + recipient, e);
861 }
862 }
863
864 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
865 try {
866 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
867 } catch (IOException e) {
868 log.error("Failed to send" + event + " to " + recipient, e);
869 }
870 }
871
872 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
873 try {
874 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
875 } catch (IOException e) {
876 log.error("Failed to send" + event + " to " + recipient, e);
877 }
878 }
879
880 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
881 try {
882 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
883 } catch (IOException e) {
884 log.error("Failed to send" + event + " to " + recipient, e);
885 }
886 }
887
888 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
889 try {
890 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
891 } catch (IOException e) {
892 log.error("Failed to send" + event + " to " + recipient, e);
893 }
894 }
895
896 private DeviceAntiEntropyAdvertisement createAdvertisement() {
897 final NodeId self = clusterService.getLocalNode().id();
898
899 Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
900 final int portsPerDevice = 8; // random guess to minimize reallocation
901 Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
902 Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
903
904 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
905 provs : deviceDescs.entrySet()) {
906
907 final DeviceId deviceId = provs.getKey();
908 final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
909 synchronized (devDescs) {
910
911 offline.put(deviceId, this.offline.get(deviceId));
912
913 for (Entry<ProviderId, DeviceDescriptions>
914 prov : devDescs.entrySet()) {
915
916 final ProviderId provId = prov.getKey();
917 final DeviceDescriptions descs = prov.getValue();
918
919 devices.put(new DeviceFragmentId(deviceId, provId),
920 descs.getDeviceDesc().timestamp());
921
922 for (Entry<PortNumber, Timestamped<PortDescription>>
923 portDesc : descs.getPortDescs().entrySet()) {
924
925 final PortNumber number = portDesc.getKey();
926 ports.put(new PortFragmentId(deviceId, provId, number),
927 portDesc.getValue().timestamp());
928 }
929 }
930 }
931 }
932
933 return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
934 }
935
936 /**
937 * Responds to anti-entropy advertisement message.
938 * <P>
939 * Notify sender about out-dated information using regular replication message.
940 * Send back advertisement to sender if not in sync.
941 *
942 * @param advertisement to respond to
943 */
944 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
945
946 final NodeId sender = advertisement.sender();
947
948 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
949 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
950 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
951
952 // Fragments to request
953 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
954 Collection<PortFragmentId> reqPorts = new ArrayList<>();
955
956 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
957 final DeviceId deviceId = de.getKey();
958 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
959
960 synchronized (lDevice) {
961 // latestTimestamp across provider
962 // Note: can be null initially
963 Timestamp localLatest = offline.get(deviceId);
964
965 // handle device Ads
966 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
967 final ProviderId provId = prov.getKey();
968 final DeviceDescriptions lDeviceDescs = prov.getValue();
969
970 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
971
972
973 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
974 Timestamp advDevTimestamp = devAds.get(devFragId);
975
976 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
977 // remote does not have it or outdated, suggest
978 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
979 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
980 // local is outdated, request
981 reqDevices.add(devFragId);
982 }
983
984 // handle port Ads
985 for (Entry<PortNumber, Timestamped<PortDescription>>
986 pe : lDeviceDescs.getPortDescs().entrySet()) {
987
988 final PortNumber num = pe.getKey();
989 final Timestamped<PortDescription> lPort = pe.getValue();
990
991 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
992
993 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -0700994 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700995 // remote does not have it or outdated, suggest
996 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
997 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
998 // local is outdated, request
999 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1000 reqPorts.add(portFragId);
1001 }
1002
1003 // remove port Ad already processed
1004 portAds.remove(portFragId);
1005 } // end local port loop
1006
1007 // remove device Ad already processed
1008 devAds.remove(devFragId);
1009
1010 // find latest and update
1011 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1012 if (localLatest == null ||
1013 providerLatest.compareTo(localLatest) > 0) {
1014 localLatest = providerLatest;
1015 }
1016 } // end local provider loop
1017
1018 // checking if remote timestamp is more recent.
1019 Timestamp rOffline = offlineAds.get(deviceId);
1020 if (rOffline != null &&
1021 rOffline.compareTo(localLatest) > 0) {
1022 // remote offline timestamp suggests that the
1023 // device is off-line
1024 markOfflineInternal(deviceId, rOffline);
1025 }
1026
1027 Timestamp lOffline = offline.get(deviceId);
1028 if (lOffline != null && rOffline == null) {
1029 // locally offline, but remote is online, suggest offline
1030 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1031 }
1032
1033 // remove device offline Ad already processed
1034 offlineAds.remove(deviceId);
1035 } // end local device loop
1036 } // device lock
1037
1038 // If there is any Ads left, request them
1039 log.trace("Ads left {}, {}", devAds, portAds);
1040 reqDevices.addAll(devAds.keySet());
1041 reqPorts.addAll(portAds.keySet());
1042
1043 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1044 log.trace("Nothing to request to remote peer {}", sender);
1045 return;
1046 }
1047
1048 log.info("Need to sync {} {}", reqDevices, reqPorts);
1049
1050 // 2-way Anti-Entropy for now
1051 try {
1052 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1053 } catch (IOException e) {
1054 log.error("Failed to send response advertisement to " + sender, e);
1055 }
1056
1057// Sketch of 3-way Anti-Entropy
1058// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1059// ClusterMessage message = new ClusterMessage(
1060// clusterService.getLocalNode().id(),
1061// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1062// SERIALIZER.encode(request));
1063//
1064// try {
1065// clusterCommunicator.unicast(message, advertisement.sender());
1066// } catch (IOException e) {
1067// log.error("Failed to send advertisement reply to "
1068// + advertisement.sender(), e);
1069// }
Madan Jampani47c93732014-10-06 20:46:08 -07001070 }
1071
Madan Jampani255a58b2014-10-09 12:08:20 -07001072 private void notifyDelegateIfNotNull(DeviceEvent event) {
1073 if (event != null) {
1074 notifyDelegate(event);
1075 }
1076 }
1077
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001078 private final class SendAdvertisementTask implements Runnable {
1079
1080 @Override
1081 public void run() {
1082 if (Thread.currentThread().isInterrupted()) {
1083 log.info("Interrupted, quitting");
1084 return;
1085 }
1086
1087 try {
1088 final NodeId self = clusterService.getLocalNode().id();
1089 Set<ControllerNode> nodes = clusterService.getNodes();
1090
1091 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1092 .transform(toNodeId())
1093 .toList();
1094
1095 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1096 log.info("No other peers in the cluster.");
1097 return;
1098 }
1099
1100 NodeId peer;
1101 do {
1102 int idx = RandomUtils.nextInt(0, nodeIds.size());
1103 peer = nodeIds.get(idx);
1104 } while (peer.equals(self));
1105
1106 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1107
1108 if (Thread.currentThread().isInterrupted()) {
1109 log.info("Interrupted, quitting");
1110 return;
1111 }
1112
1113 try {
1114 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1115 } catch (IOException e) {
1116 log.error("Failed to send anti-entropy advertisement", e);
1117 return;
1118 }
1119 } catch (Exception e) {
1120 // catch all Exception to avoid Scheduled task being suppressed.
1121 log.error("Exception thrown while sending advertisement", e);
1122 }
1123 }
1124 }
1125
Madan Jampani47c93732014-10-06 20:46:08 -07001126 private class InternalDeviceEventListener implements ClusterMessageHandler {
1127 @Override
1128 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001129
Madan Jampani47c93732014-10-06 20:46:08 -07001130 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001131 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001132
Madan Jampani47c93732014-10-06 20:46:08 -07001133 ProviderId providerId = event.providerId();
1134 DeviceId deviceId = event.deviceId();
1135 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001136
Madan Jampani255a58b2014-10-09 12:08:20 -07001137 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001138 }
1139 }
1140
Madan Jampani25322532014-10-08 11:20:38 -07001141 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1142 @Override
1143 public void handle(ClusterMessage message) {
1144
1145 log.info("Received device offline event from peer: {}", message.sender());
1146 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1147
1148 DeviceId deviceId = event.deviceId();
1149 Timestamp timestamp = event.timestamp();
1150
Madan Jampani255a58b2014-10-09 12:08:20 -07001151 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001152 }
1153 }
1154
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001155 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1156 @Override
1157 public void handle(ClusterMessage message) {
1158
1159 log.info("Received device removed event from peer: {}", message.sender());
1160 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1161
1162 DeviceId deviceId = event.deviceId();
1163 Timestamp timestamp = event.timestamp();
1164
Madan Jampani255a58b2014-10-09 12:08:20 -07001165 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001166 }
1167 }
1168
Madan Jampani47c93732014-10-06 20:46:08 -07001169 private class InternalPortEventListener implements ClusterMessageHandler {
1170 @Override
1171 public void handle(ClusterMessage message) {
1172
1173 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001174 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001175
1176 ProviderId providerId = event.providerId();
1177 DeviceId deviceId = event.deviceId();
1178 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1179
Madan Jampani255a58b2014-10-09 12:08:20 -07001180 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001181 }
1182 }
1183
1184 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1185 @Override
1186 public void handle(ClusterMessage message) {
1187
1188 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001189 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001190 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001191
1192 ProviderId providerId = event.providerId();
1193 DeviceId deviceId = event.deviceId();
1194 Timestamped<PortDescription> portDescription = event.portDescription();
1195
Madan Jampani255a58b2014-10-09 12:08:20 -07001196 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001197 }
1198 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001199
1200 private final class InternalDeviceAdvertisementListener
1201 implements ClusterMessageHandler {
1202
1203 @Override
1204 public void handle(ClusterMessage message) {
1205 log.info("Received Device advertisement from peer: {}", message.sender());
1206 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1207 handleAdvertisement(advertisement);
1208 }
1209 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001210}