blob: 12040ddfd8ea99018ba82f4a6845f6ba833f05c6 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070019import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import org.onlab.onos.net.DefaultAnnotations;
21import org.onlab.onos.net.DefaultDevice;
22import org.onlab.onos.net.DefaultPort;
23import org.onlab.onos.net.Device;
24import org.onlab.onos.net.Device.Type;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.Port;
27import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070028import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070029import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070036import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIfa891c92014-10-09 15:21:40 -070041import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070042import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070043import org.onlab.onos.store.serializers.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070044import org.onlab.packet.ChassisId;
Madan Jampani53e44e62014-10-07 12:39:51 -070045import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070046import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070047import org.slf4j.Logger;
48
Madan Jampani47c93732014-10-06 20:46:08 -070049import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070050import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070051import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070052import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070053import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070054import java.util.HashSet;
55import java.util.Iterator;
56import java.util.List;
57import java.util.Map;
58import java.util.Map.Entry;
59import java.util.Objects;
60import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070061import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070062import java.util.concurrent.ScheduledExecutorService;
63import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070064
65import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070067import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070068import static org.onlab.onos.net.device.DeviceEvent.Type.*;
69import static org.slf4j.LoggerFactory.getLogger;
70import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
71import static org.onlab.onos.net.DefaultAnnotations.merge;
72import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import static org.onlab.util.Tools.namedThreads;
74import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
75import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070076
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070077// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070078/**
79 * Manages inventory of infrastructure devices using gossip protocol to distribute
80 * information.
81 */
82@Component(immediate = true)
83@Service
84public class GossipDeviceStore
85 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
86 implements DeviceStore {
87
88 private final Logger log = getLogger(getClass());
89
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070090 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092 // innerMap is used to lock a Device, thus instance should never be replaced.
93 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070094 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070095 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070096
97 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070098 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
99 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
100
101 // to be updated under Device lock
102 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
103 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700104
105 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700106 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700109 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700110
Madan Jampani47c93732014-10-06 20:46:08 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterCommunicationService clusterCommunicator;
113
Madan Jampani53e44e62014-10-07 12:39:51 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ClusterService clusterService;
116
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700117 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700118 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700119 protected void setupKryoPool() {
120 serializerPool = KryoPool.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700121 .register(DistributedStoreSerializers.COMMON)
122
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700123 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700124 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700125 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700126 .register(InternalPortEvent.class, new InternalPortEventSerializer())
127 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700128 .register(DeviceAntiEntropyAdvertisement.class)
129 .register(DeviceFragmentId.class)
130 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700131 .build()
132 .populate(1);
133 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700134 };
135
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700136 private ScheduledExecutorService executor;
137
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700138 @Activate
139 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700140 clusterCommunicator.addSubscriber(
141 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
142 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700143 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
144 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700145 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700147 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
148 clusterCommunicator.addSubscriber(
149 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700150 clusterCommunicator.addSubscriber(
151 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
152
153 executor =
154 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
155
156 // TODO: Make these configurable
157 long initialDelaySec = 5;
158 long periodSec = 5;
159 // start anti-entropy thread
160 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
161 initialDelaySec, periodSec, TimeUnit.SECONDS);
162
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700163 log.info("Started");
164 }
165
166 @Deactivate
167 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700168
169 executor.shutdownNow();
170 try {
171 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
172 if (timedout) {
173 log.error("Timeout during executor shutdown");
174 }
175 } catch (InterruptedException e) {
176 log.error("Error during executor shutdown", e);
177 }
178
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700179 deviceDescs.clear();
180 devices.clear();
181 devicePorts.clear();
182 availableDevices.clear();
183 log.info("Stopped");
184 }
185
186 @Override
187 public int getDeviceCount() {
188 return devices.size();
189 }
190
191 @Override
192 public Iterable<Device> getDevices() {
193 return Collections.unmodifiableCollection(devices.values());
194 }
195
196 @Override
197 public Device getDevice(DeviceId deviceId) {
198 return devices.get(deviceId);
199 }
200
201 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700202 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
203 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700204 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700205 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700207 final DeviceEvent event;
208 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700209 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700210 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700211 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700212 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700213 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700214 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
215 providerId, deviceId);
216 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700217 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700218 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700219 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700220 + providerId + " and deviceId: " + deviceId, e);
221 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700222 }
223 return event;
224 }
225
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700226 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
227 DeviceId deviceId,
228 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229
230 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700231 Map<ProviderId, DeviceDescriptions> providerDescs
232 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700234 synchronized (providerDescs) {
235 // locking per device
236
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700237 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
238 log.debug("Ignoring outdated event: {}", deltaDesc);
239 return null;
240 }
241
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700242 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700243
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700244 final Device oldDevice = devices.get(deviceId);
245 final Device newDevice;
246
247 if (deltaDesc == descs.getDeviceDesc() ||
248 deltaDesc.isNewer(descs.getDeviceDesc())) {
249 // on new device or valid update
250 descs.putDeviceDesc(deltaDesc);
251 newDevice = composeDevice(deviceId, providerDescs);
252 } else {
253 // outdated event, ignored.
254 return null;
255 }
256 if (oldDevice == null) {
257 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700258 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700259 } else {
260 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700261 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700262 }
263 }
264 }
265
266 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700267 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700268 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700269 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700270
271 // update composed device cache
272 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
273 verify(oldDevice == null,
274 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
275 providerId, oldDevice, newDevice);
276
277 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700278 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700279 }
280
281 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
282 }
283
284 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700285 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700286 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700287 Device oldDevice,
288 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700289
290 // We allow only certain attributes to trigger update
291 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
292 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700293 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700294
295 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
296 if (!replaced) {
297 verify(replaced,
298 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
299 providerId, oldDevice, devices.get(newDevice.id())
300 , newDevice);
301 }
302 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700303 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 }
305 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
306 }
307
308 // Otherwise merely attempt to change availability if primary provider
309 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700310 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700311 return !added ? null :
312 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
313 }
314 return null;
315 }
316
317 @Override
318 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700319 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700320 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700321 if (event != null) {
322 log.info("Notifying peers of a device offline topology event for deviceId: {}",
323 deviceId);
324 try {
325 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
326 } catch (IOException e) {
327 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
328 deviceId);
329 }
330 }
331 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700332 }
333
334 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
335
336 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700337 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700338
339 // locking device
340 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341
342 // accept off-line if given timestamp is newer than
343 // the latest Timestamp from Primary provider
344 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
345 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
346 if (timestamp.compareTo(lastTimestamp) <= 0) {
347 // outdated event ignore
348 return null;
349 }
350
351 offline.put(deviceId, timestamp);
352
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700353 Device device = devices.get(deviceId);
354 if (device == null) {
355 return null;
356 }
357 boolean removed = availableDevices.remove(deviceId);
358 if (removed) {
359 // TODO: broadcast ... DOWN only?
360 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 }
362 return null;
363 }
364 }
365
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700366 /**
367 * Marks the device as available if the given timestamp is not outdated,
368 * compared to the time the device has been marked offline.
369 *
370 * @param deviceId identifier of the device
371 * @param timestamp of the event triggering this change.
372 * @return true if availability change request was accepted and changed the state
373 */
374 // Guarded by deviceDescs value (=Device lock)
375 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
376 // accept on-line if given timestamp is newer than
377 // the latest offline request Timestamp
378 Timestamp offlineTimestamp = offline.get(deviceId);
379 if (offlineTimestamp == null ||
380 offlineTimestamp.compareTo(timestamp) < 0) {
381
382 offline.remove(deviceId);
383 return availableDevices.add(deviceId);
384 }
385 return false;
386 }
387
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700388 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700389 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
390 DeviceId deviceId,
391 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700393 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700394
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700395 final Timestamped<List<PortDescription>> timestampedInput
396 = new Timestamped<>(portDescriptions, newTimestamp);
397 final List<DeviceEvent> events;
398 final Timestamped<List<PortDescription>> merged;
399
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700400 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700401 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700402 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700403 List<PortDescription> mergedList =
404 FluentIterable.from(portDescriptions)
405 .transform(new Function<PortDescription, PortDescription>() {
406 @Override
407 public PortDescription apply(PortDescription input) {
408 // lookup merged port description
409 return descs.getPortDesc(input.portNumber()).value();
410 }
411 }).toList();
412 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
413 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700414 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700415 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
416 providerId, deviceId);
417 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700418 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700419 } catch (IOException e) {
420 log.error("Failed to notify peers of a port update topology event or providerId: "
421 + providerId + " and deviceId: " + deviceId, e);
422 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700423 }
424 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700425 }
426
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700427 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
428 DeviceId deviceId,
429 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700430
431 Device device = devices.get(deviceId);
432 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
433
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700434 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
436
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700437 List<DeviceEvent> events = new ArrayList<>();
438 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700439
440 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
441 log.debug("Ignoring outdated events: {}", portDescriptions);
442 return null;
443 }
444
445 DeviceDescriptions descs = descsMap.get(providerId);
446 // every provider must provide DeviceDescription.
447 checkArgument(descs != null,
448 "Device description for Device ID %s from Provider %s was not found",
449 deviceId, providerId);
450
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700451 Map<PortNumber, Port> ports = getPortMap(deviceId);
452
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700453 final Timestamp newTimestamp = portDescriptions.timestamp();
454
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700455 // Add new ports
456 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700457 for (PortDescription portDescription : portDescriptions.value()) {
458 final PortNumber number = portDescription.portNumber();
459 processed.add(number);
460
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700461 final Port oldPort = ports.get(number);
462 final Port newPort;
463
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700464
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700465 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
466 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700467 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700468 // on new port or valid update
469 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700470 descs.putPortDesc(new Timestamped<>(portDescription,
471 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700472 newPort = composePort(device, number, descsMap);
473 } else {
474 // outdated event, ignored.
475 continue;
476 }
477
478 events.add(oldPort == null ?
479 createPort(device, newPort, ports) :
480 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700481 }
482
483 events.addAll(pruneOldPorts(device, ports, processed));
484 }
485 return FluentIterable.from(events).filter(notNull()).toList();
486 }
487
488 // Creates a new port based on the port description adds it to the map and
489 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700490 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700491 private DeviceEvent createPort(Device device, Port newPort,
492 Map<PortNumber, Port> ports) {
493 ports.put(newPort.number(), newPort);
494 return new DeviceEvent(PORT_ADDED, device, newPort);
495 }
496
497 // Checks if the specified port requires update and if so, it replaces the
498 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700499 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700500 private DeviceEvent updatePort(Device device, Port oldPort,
501 Port newPort,
502 Map<PortNumber, Port> ports) {
503 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700504 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700505
506 ports.put(oldPort.number(), newPort);
507 return new DeviceEvent(PORT_UPDATED, device, newPort);
508 }
509 return null;
510 }
511
512 // Prunes the specified list of ports based on which ports are in the
513 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700514 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700515 private List<DeviceEvent> pruneOldPorts(Device device,
516 Map<PortNumber, Port> ports,
517 Set<PortNumber> processed) {
518 List<DeviceEvent> events = new ArrayList<>();
519 Iterator<PortNumber> iterator = ports.keySet().iterator();
520 while (iterator.hasNext()) {
521 PortNumber portNumber = iterator.next();
522 if (!processed.contains(portNumber)) {
523 events.add(new DeviceEvent(PORT_REMOVED, device,
524 ports.get(portNumber)));
525 iterator.remove();
526 }
527 }
528 return events;
529 }
530
531 // Gets the map of ports for the specified device; if one does not already
532 // exist, it creates and registers a new one.
533 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
534 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700535 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
536 }
537
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700538 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700539 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700540 Map<ProviderId, DeviceDescriptions> r;
541 r = deviceDescs.get(deviceId);
542 if (r == null) {
543 r = new HashMap<ProviderId, DeviceDescriptions>();
544 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
545 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
546 if (concurrentlyAdded != null) {
547 r = concurrentlyAdded;
548 }
549 }
550 return r;
551 }
552
553 // Guarded by deviceDescs value (=Device lock)
554 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
555 Map<ProviderId, DeviceDescriptions> device,
556 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
557
558 synchronized (device) {
559 DeviceDescriptions r = device.get(providerId);
560 if (r == null) {
561 r = new DeviceDescriptions(deltaDesc);
562 device.put(providerId, r);
563 }
564 return r;
565 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700566 }
567
568 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700569 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
570 DeviceId deviceId,
571 PortDescription portDescription) {
572
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700573 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700574 final Timestamped<PortDescription> deltaDesc
575 = new Timestamped<>(portDescription, newTimestamp);
576 final DeviceEvent event;
577 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700578 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700579 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700580 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700581 .getPortDesc(portDescription.portNumber());
582 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700583 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700584 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
585 providerId, deviceId);
586 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700587 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700588 } catch (IOException e) {
589 log.error("Failed to notify peers of a port status update topology event or providerId: "
590 + providerId + " and deviceId: " + deviceId, e);
591 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700592 }
593 return event;
594 }
595
596 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
597 Timestamped<PortDescription> deltaDesc) {
598
599 Device device = devices.get(deviceId);
600 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
601
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700602 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700603 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
604
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700605 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700606
607 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
608 log.debug("Ignoring outdated event: {}", deltaDesc);
609 return null;
610 }
611
612 DeviceDescriptions descs = descsMap.get(providerId);
613 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700614 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700615 "Device description for Device ID %s from Provider %s was not found",
616 deviceId, providerId);
617
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700618 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
619 final PortNumber number = deltaDesc.value().portNumber();
620 final Port oldPort = ports.get(number);
621 final Port newPort;
622
623 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
624 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700625 deltaDesc.isNewer(existingPortDesc)) {
626 // on new port or valid update
627 // update description
628 descs.putPortDesc(deltaDesc);
629 newPort = composePort(device, number, descsMap);
630 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700631 // same or outdated event, ignored.
632 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700633 return null;
634 }
635
636 if (oldPort == null) {
637 return createPort(device, newPort, ports);
638 } else {
639 return updatePort(device, oldPort, newPort, ports);
640 }
641 }
642 }
643
644 @Override
645 public List<Port> getPorts(DeviceId deviceId) {
646 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
647 if (ports == null) {
648 return Collections.emptyList();
649 }
650 return ImmutableList.copyOf(ports.values());
651 }
652
653 @Override
654 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
655 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
656 return ports == null ? null : ports.get(portNumber);
657 }
658
659 @Override
660 public boolean isAvailable(DeviceId deviceId) {
661 return availableDevices.contains(deviceId);
662 }
663
664 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700665 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700666 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700667 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700668 if (event != null) {
669 log.info("Notifying peers of a device removed topology event for deviceId: {}",
670 deviceId);
671 try {
672 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
673 } catch (IOException e) {
674 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
675 deviceId);
676 }
677 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700678 return event;
679 }
680
681 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
682 Timestamp timestamp) {
683
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700684 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700685 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700686 // accept removal request if given timestamp is newer than
687 // the latest Timestamp from Primary provider
688 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
689 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
690 if (timestamp.compareTo(lastTimestamp) <= 0) {
691 // outdated event ignore
692 return null;
693 }
694 removalRequest.put(deviceId, timestamp);
695
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700696 Device device = devices.remove(deviceId);
697 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700698 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
699 if (ports != null) {
700 ports.clear();
701 }
702 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700703 descs.clear();
704 return device == null ? null :
705 new DeviceEvent(DEVICE_REMOVED, device, null);
706 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700707 }
708
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700709 /**
710 * Checks if given timestamp is superseded by removal request
711 * with more recent timestamp.
712 *
713 * @param deviceId identifier of a device
714 * @param timestampToCheck timestamp of an event to check
715 * @return true if device is already removed
716 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700717 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
718 Timestamp removalTimestamp = removalRequest.get(deviceId);
719 if (removalTimestamp != null &&
720 removalTimestamp.compareTo(timestampToCheck) >= 0) {
721 // removalRequest is more recent
722 return true;
723 }
724 return false;
725 }
726
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700727 /**
728 * Returns a Device, merging description given from multiple Providers.
729 *
730 * @param deviceId device identifier
731 * @param providerDescs Collection of Descriptions from multiple providers
732 * @return Device instance
733 */
734 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700735 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700736
737 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
738
739 ProviderId primary = pickPrimaryPID(providerDescs);
740
741 DeviceDescriptions desc = providerDescs.get(primary);
742
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700743 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700744 Type type = base.type();
745 String manufacturer = base.manufacturer();
746 String hwVersion = base.hwVersion();
747 String swVersion = base.swVersion();
748 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700749 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700750 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
751 annotations = merge(annotations, base.annotations());
752
753 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
754 if (e.getKey().equals(primary)) {
755 continue;
756 }
757 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700758 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700759 // Currently assuming there will never be a key conflict between
760 // providers
761
762 // annotation merging. not so efficient, should revisit later
763 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
764 }
765
766 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700767 hwVersion, swVersion, serialNumber,
768 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700769 }
770
771 /**
772 * Returns a Port, merging description given from multiple Providers.
773 *
774 * @param device device the port is on
775 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700776 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700777 * @return Port instance
778 */
779 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700780 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700781
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700782 ProviderId primary = pickPrimaryPID(descsMap);
783 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700784 // if no primary, assume not enabled
785 // TODO: revisit this default port enabled/disabled behavior
786 boolean isEnabled = false;
787 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
788
789 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
790 if (portDesc != null) {
791 isEnabled = portDesc.value().isEnabled();
792 annotations = merge(annotations, portDesc.value().annotations());
793 }
794
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700795 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700796 if (e.getKey().equals(primary)) {
797 continue;
798 }
799 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700800 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700801 // Currently assuming there will never be a key conflict between
802 // providers
803
804 // annotation merging. not so efficient, should revisit later
805 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
806 if (otherPortDesc != null) {
807 annotations = merge(annotations, otherPortDesc.value().annotations());
808 }
809 }
810
811 return new DefaultPort(device, number, isEnabled, annotations);
812 }
813
814 /**
815 * @return primary ProviderID, or randomly chosen one if none exists
816 */
817 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700818 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700819 ProviderId fallBackPrimary = null;
820 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
821 if (!e.getKey().isAncillary()) {
822 return e.getKey();
823 } else if (fallBackPrimary == null) {
824 // pick randomly as a fallback in case there is no primary
825 fallBackPrimary = e.getKey();
826 }
827 }
828 return fallBackPrimary;
829 }
830
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700831 private DeviceDescriptions getPrimaryDescriptions(
832 Map<ProviderId, DeviceDescriptions> providerDescs) {
833 ProviderId pid = pickPrimaryPID(providerDescs);
834 return providerDescs.get(pid);
835 }
836
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700837 // TODO: should we be throwing exception?
838 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
839 ClusterMessage message = new ClusterMessage(
840 clusterService.getLocalNode().id(),
841 subject,
842 SERIALIZER.encode(event));
843 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700844 }
845
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700846 // TODO: should we be throwing exception?
847 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
848 ClusterMessage message = new ClusterMessage(
849 clusterService.getLocalNode().id(),
850 subject,
851 SERIALIZER.encode(event));
852 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700853 }
Madan Jampani47c93732014-10-06 20:46:08 -0700854
855 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700856 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700857 }
858
Madan Jampani25322532014-10-08 11:20:38 -0700859 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700860 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700861 }
862
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700863 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700864 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700865 }
866
Madan Jampani47c93732014-10-06 20:46:08 -0700867 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700868 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700869 }
870
871 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700872 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
873 }
874
875 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
876 try {
877 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
878 } catch (IOException e) {
879 log.error("Failed to send" + event + " to " + recipient, e);
880 }
881 }
882
883 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
884 try {
885 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
886 } catch (IOException e) {
887 log.error("Failed to send" + event + " to " + recipient, e);
888 }
889 }
890
891 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
892 try {
893 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
894 } catch (IOException e) {
895 log.error("Failed to send" + event + " to " + recipient, e);
896 }
897 }
898
899 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
900 try {
901 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
902 } catch (IOException e) {
903 log.error("Failed to send" + event + " to " + recipient, e);
904 }
905 }
906
907 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
908 try {
909 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
910 } catch (IOException e) {
911 log.error("Failed to send" + event + " to " + recipient, e);
912 }
913 }
914
915 private DeviceAntiEntropyAdvertisement createAdvertisement() {
916 final NodeId self = clusterService.getLocalNode().id();
917
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700918 final int numDevices = deviceDescs.size();
919 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
920 final int portsPerDevice = 8; // random factor to minimize reallocation
921 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
922 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700923
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700924 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700925 provs : deviceDescs.entrySet()) {
926
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700927 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700928 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700929 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700930 synchronized (devDescs) {
931
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700932 // send device offline timestamp
933 Timestamp lOffline = this.offline.get(deviceId);
934 if (lOffline != null) {
935 adOffline.put(deviceId, lOffline);
936 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700937
938 for (Entry<ProviderId, DeviceDescriptions>
939 prov : devDescs.entrySet()) {
940
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700941 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700942 final ProviderId provId = prov.getKey();
943 final DeviceDescriptions descs = prov.getValue();
944
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700945 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700946 descs.getDeviceDesc().timestamp());
947
948 for (Entry<PortNumber, Timestamped<PortDescription>>
949 portDesc : descs.getPortDescs().entrySet()) {
950
951 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700952 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700953 portDesc.getValue().timestamp());
954 }
955 }
956 }
957 }
958
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700959 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700960 }
961
962 /**
963 * Responds to anti-entropy advertisement message.
964 * <P>
965 * Notify sender about out-dated information using regular replication message.
966 * Send back advertisement to sender if not in sync.
967 *
968 * @param advertisement to respond to
969 */
970 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
971
972 final NodeId sender = advertisement.sender();
973
974 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
975 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
976 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
977
978 // Fragments to request
979 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
980 Collection<PortFragmentId> reqPorts = new ArrayList<>();
981
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700982 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700983 final DeviceId deviceId = de.getKey();
984 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
985
986 synchronized (lDevice) {
987 // latestTimestamp across provider
988 // Note: can be null initially
989 Timestamp localLatest = offline.get(deviceId);
990
991 // handle device Ads
992 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
993 final ProviderId provId = prov.getKey();
994 final DeviceDescriptions lDeviceDescs = prov.getValue();
995
996 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
997
998
999 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1000 Timestamp advDevTimestamp = devAds.get(devFragId);
1001
1002 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1003 // remote does not have it or outdated, suggest
1004 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1005 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1006 // local is outdated, request
1007 reqDevices.add(devFragId);
1008 }
1009
1010 // handle port Ads
1011 for (Entry<PortNumber, Timestamped<PortDescription>>
1012 pe : lDeviceDescs.getPortDescs().entrySet()) {
1013
1014 final PortNumber num = pe.getKey();
1015 final Timestamped<PortDescription> lPort = pe.getValue();
1016
1017 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1018
1019 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001020 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001021 // remote does not have it or outdated, suggest
1022 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1023 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1024 // local is outdated, request
1025 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1026 reqPorts.add(portFragId);
1027 }
1028
1029 // remove port Ad already processed
1030 portAds.remove(portFragId);
1031 } // end local port loop
1032
1033 // remove device Ad already processed
1034 devAds.remove(devFragId);
1035
1036 // find latest and update
1037 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1038 if (localLatest == null ||
1039 providerLatest.compareTo(localLatest) > 0) {
1040 localLatest = providerLatest;
1041 }
1042 } // end local provider loop
1043
1044 // checking if remote timestamp is more recent.
1045 Timestamp rOffline = offlineAds.get(deviceId);
1046 if (rOffline != null &&
1047 rOffline.compareTo(localLatest) > 0) {
1048 // remote offline timestamp suggests that the
1049 // device is off-line
1050 markOfflineInternal(deviceId, rOffline);
1051 }
1052
1053 Timestamp lOffline = offline.get(deviceId);
1054 if (lOffline != null && rOffline == null) {
1055 // locally offline, but remote is online, suggest offline
1056 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1057 }
1058
1059 // remove device offline Ad already processed
1060 offlineAds.remove(deviceId);
1061 } // end local device loop
1062 } // device lock
1063
1064 // If there is any Ads left, request them
1065 log.trace("Ads left {}, {}", devAds, portAds);
1066 reqDevices.addAll(devAds.keySet());
1067 reqPorts.addAll(portAds.keySet());
1068
1069 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1070 log.trace("Nothing to request to remote peer {}", sender);
1071 return;
1072 }
1073
1074 log.info("Need to sync {} {}", reqDevices, reqPorts);
1075
1076 // 2-way Anti-Entropy for now
1077 try {
1078 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1079 } catch (IOException e) {
1080 log.error("Failed to send response advertisement to " + sender, e);
1081 }
1082
1083// Sketch of 3-way Anti-Entropy
1084// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1085// ClusterMessage message = new ClusterMessage(
1086// clusterService.getLocalNode().id(),
1087// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1088// SERIALIZER.encode(request));
1089//
1090// try {
1091// clusterCommunicator.unicast(message, advertisement.sender());
1092// } catch (IOException e) {
1093// log.error("Failed to send advertisement reply to "
1094// + advertisement.sender(), e);
1095// }
Madan Jampani47c93732014-10-06 20:46:08 -07001096 }
1097
Madan Jampani255a58b2014-10-09 12:08:20 -07001098 private void notifyDelegateIfNotNull(DeviceEvent event) {
1099 if (event != null) {
1100 notifyDelegate(event);
1101 }
1102 }
1103
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001104 private final class SendAdvertisementTask implements Runnable {
1105
1106 @Override
1107 public void run() {
1108 if (Thread.currentThread().isInterrupted()) {
1109 log.info("Interrupted, quitting");
1110 return;
1111 }
1112
1113 try {
1114 final NodeId self = clusterService.getLocalNode().id();
1115 Set<ControllerNode> nodes = clusterService.getNodes();
1116
1117 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1118 .transform(toNodeId())
1119 .toList();
1120
1121 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001122 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001123 return;
1124 }
1125
1126 NodeId peer;
1127 do {
1128 int idx = RandomUtils.nextInt(0, nodeIds.size());
1129 peer = nodeIds.get(idx);
1130 } while (peer.equals(self));
1131
1132 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1133
1134 if (Thread.currentThread().isInterrupted()) {
1135 log.info("Interrupted, quitting");
1136 return;
1137 }
1138
1139 try {
1140 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1141 } catch (IOException e) {
1142 log.error("Failed to send anti-entropy advertisement", e);
1143 return;
1144 }
1145 } catch (Exception e) {
1146 // catch all Exception to avoid Scheduled task being suppressed.
1147 log.error("Exception thrown while sending advertisement", e);
1148 }
1149 }
1150 }
1151
Madan Jampani47c93732014-10-06 20:46:08 -07001152 private class InternalDeviceEventListener implements ClusterMessageHandler {
1153 @Override
1154 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001155
Madan Jampani47c93732014-10-06 20:46:08 -07001156 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001157 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001158
Madan Jampani47c93732014-10-06 20:46:08 -07001159 ProviderId providerId = event.providerId();
1160 DeviceId deviceId = event.deviceId();
1161 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001162
Madan Jampani255a58b2014-10-09 12:08:20 -07001163 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001164 }
1165 }
1166
Madan Jampani25322532014-10-08 11:20:38 -07001167 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1168 @Override
1169 public void handle(ClusterMessage message) {
1170
1171 log.info("Received device offline event from peer: {}", message.sender());
1172 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1173
1174 DeviceId deviceId = event.deviceId();
1175 Timestamp timestamp = event.timestamp();
1176
Madan Jampani255a58b2014-10-09 12:08:20 -07001177 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001178 }
1179 }
1180
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001181 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1182 @Override
1183 public void handle(ClusterMessage message) {
1184
1185 log.info("Received device removed event from peer: {}", message.sender());
1186 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1187
1188 DeviceId deviceId = event.deviceId();
1189 Timestamp timestamp = event.timestamp();
1190
Madan Jampani255a58b2014-10-09 12:08:20 -07001191 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001192 }
1193 }
1194
Madan Jampani47c93732014-10-06 20:46:08 -07001195 private class InternalPortEventListener implements ClusterMessageHandler {
1196 @Override
1197 public void handle(ClusterMessage message) {
1198
1199 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001200 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001201
1202 ProviderId providerId = event.providerId();
1203 DeviceId deviceId = event.deviceId();
1204 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1205
Madan Jampani255a58b2014-10-09 12:08:20 -07001206 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001207 }
1208 }
1209
1210 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1211 @Override
1212 public void handle(ClusterMessage message) {
1213
1214 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001215 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001216 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001217
1218 ProviderId providerId = event.providerId();
1219 DeviceId deviceId = event.deviceId();
1220 Timestamped<PortDescription> portDescription = event.portDescription();
1221
Madan Jampani255a58b2014-10-09 12:08:20 -07001222 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001223 }
1224 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001225
1226 private final class InternalDeviceAdvertisementListener
1227 implements ClusterMessageHandler {
1228
1229 @Override
1230 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001231 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001232 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1233 handleAdvertisement(advertisement);
1234 }
1235 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001236}