blob: fdc08272166e7c54c4ee0978da714449aa296d30 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070019import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import org.onlab.onos.net.DefaultAnnotations;
21import org.onlab.onos.net.DefaultDevice;
22import org.onlab.onos.net.DefaultPort;
23import org.onlab.onos.net.Device;
24import org.onlab.onos.net.Device.Type;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.Port;
27import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070028import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070029import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070036import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070041import org.onlab.onos.store.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070042import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070043import org.onlab.onos.store.serializers.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070044import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070045import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070046import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070047import org.slf4j.Logger;
48
Madan Jampani47c93732014-10-06 20:46:08 -070049import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070050import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070051import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070052import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070053import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070054import java.util.HashSet;
55import java.util.Iterator;
56import java.util.List;
57import java.util.Map;
58import java.util.Map.Entry;
59import java.util.Objects;
60import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070061import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070062import java.util.concurrent.ScheduledExecutorService;
63import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070064
65import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070067import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070068import static org.onlab.onos.net.device.DeviceEvent.Type.*;
69import static org.slf4j.LoggerFactory.getLogger;
70import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
71import static org.onlab.onos.net.DefaultAnnotations.merge;
72import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070073import static org.onlab.util.Tools.namedThreads;
74import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
75import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070076
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070077// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070078/**
79 * Manages inventory of infrastructure devices using gossip protocol to distribute
80 * information.
81 */
82@Component(immediate = true)
83@Service
84public class GossipDeviceStore
85 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
86 implements DeviceStore {
87
88 private final Logger log = getLogger(getClass());
89
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070090 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092 // innerMap is used to lock a Device, thus instance should never be replaced.
93 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070094 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070095 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070096
97 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070098 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
99 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
100
101 // to be updated under Device lock
102 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
103 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700104
105 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700106 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700109 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700110
Madan Jampani47c93732014-10-06 20:46:08 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterCommunicationService clusterCommunicator;
113
Madan Jampani53e44e62014-10-07 12:39:51 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ClusterService clusterService;
116
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700117 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700118 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700119 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700120 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700121 .register(DistributedStoreSerializers.COMMON)
122
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700123 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700124 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700125 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700126 .register(InternalPortEvent.class, new InternalPortEventSerializer())
127 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700128 .register(DeviceAntiEntropyAdvertisement.class)
129 .register(DeviceFragmentId.class)
130 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700131 .build()
132 .populate(1);
133 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700134 };
135
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700136 private ScheduledExecutorService executor;
137
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700138 @Activate
139 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700140 clusterCommunicator.addSubscriber(
141 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
142 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700143 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
144 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700145 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700147 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
148 clusterCommunicator.addSubscriber(
149 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700150 clusterCommunicator.addSubscriber(
151 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
152
153 executor =
154 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
155
156 // TODO: Make these configurable
157 long initialDelaySec = 5;
158 long periodSec = 5;
159 // start anti-entropy thread
160 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
161 initialDelaySec, periodSec, TimeUnit.SECONDS);
162
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700163 log.info("Started");
164 }
165
166 @Deactivate
167 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700168
169 executor.shutdownNow();
170 try {
171 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
172 if (timedout) {
173 log.error("Timeout during executor shutdown");
174 }
175 } catch (InterruptedException e) {
176 log.error("Error during executor shutdown", e);
177 }
178
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700179 deviceDescs.clear();
180 devices.clear();
181 devicePorts.clear();
182 availableDevices.clear();
183 log.info("Stopped");
184 }
185
186 @Override
187 public int getDeviceCount() {
188 return devices.size();
189 }
190
191 @Override
192 public Iterable<Device> getDevices() {
193 return Collections.unmodifiableCollection(devices.values());
194 }
195
196 @Override
197 public Device getDevice(DeviceId deviceId) {
198 return devices.get(deviceId);
199 }
200
201 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700202 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
203 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700204 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700205 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700207 final DeviceEvent event;
208 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700209 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700210 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700211 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700212 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700213 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700214 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
215 providerId, deviceId);
216 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700217 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700218 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700219 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700220 + providerId + " and deviceId: " + deviceId, e);
221 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700222 }
223 return event;
224 }
225
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700226 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
227 DeviceId deviceId,
228 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229
230 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700231 Map<ProviderId, DeviceDescriptions> providerDescs
232 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700234 synchronized (providerDescs) {
235 // locking per device
236
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700237 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
238 log.debug("Ignoring outdated event: {}", deltaDesc);
239 return null;
240 }
241
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700242 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700243
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700244 final Device oldDevice = devices.get(deviceId);
245 final Device newDevice;
246
247 if (deltaDesc == descs.getDeviceDesc() ||
248 deltaDesc.isNewer(descs.getDeviceDesc())) {
249 // on new device or valid update
250 descs.putDeviceDesc(deltaDesc);
251 newDevice = composeDevice(deviceId, providerDescs);
252 } else {
253 // outdated event, ignored.
254 return null;
255 }
256 if (oldDevice == null) {
257 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700258 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700259 } else {
260 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700261 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700262 }
263 }
264 }
265
266 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700267 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700268 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700269 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700270
271 // update composed device cache
272 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
273 verify(oldDevice == null,
274 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
275 providerId, oldDevice, newDevice);
276
277 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700278 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700279 }
280
281 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
282 }
283
284 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700285 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700286 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700287 Device oldDevice,
288 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700289
290 // We allow only certain attributes to trigger update
291 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
292 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700293 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700294
295 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
296 if (!replaced) {
297 verify(replaced,
298 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
299 providerId, oldDevice, devices.get(newDevice.id())
300 , newDevice);
301 }
302 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700303 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 }
305 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
306 }
307
308 // Otherwise merely attempt to change availability if primary provider
309 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700310 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700311 return !added ? null :
312 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
313 }
314 return null;
315 }
316
317 @Override
318 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700319 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700320 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700321 if (event != null) {
322 log.info("Notifying peers of a device offline topology event for deviceId: {}",
323 deviceId);
324 try {
325 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
326 } catch (IOException e) {
327 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
328 deviceId);
329 }
330 }
331 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700332 }
333
334 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
335
336 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700337 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700338
339 // locking device
340 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341
342 // accept off-line if given timestamp is newer than
343 // the latest Timestamp from Primary provider
344 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
345 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
346 if (timestamp.compareTo(lastTimestamp) <= 0) {
347 // outdated event ignore
348 return null;
349 }
350
351 offline.put(deviceId, timestamp);
352
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700353 Device device = devices.get(deviceId);
354 if (device == null) {
355 return null;
356 }
357 boolean removed = availableDevices.remove(deviceId);
358 if (removed) {
359 // TODO: broadcast ... DOWN only?
360 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 }
362 return null;
363 }
364 }
365
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700366 /**
367 * Marks the device as available if the given timestamp is not outdated,
368 * compared to the time the device has been marked offline.
369 *
370 * @param deviceId identifier of the device
371 * @param timestamp of the event triggering this change.
372 * @return true if availability change request was accepted and changed the state
373 */
374 // Guarded by deviceDescs value (=Device lock)
375 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
376 // accept on-line if given timestamp is newer than
377 // the latest offline request Timestamp
378 Timestamp offlineTimestamp = offline.get(deviceId);
379 if (offlineTimestamp == null ||
380 offlineTimestamp.compareTo(timestamp) < 0) {
381
382 offline.remove(deviceId);
383 return availableDevices.add(deviceId);
384 }
385 return false;
386 }
387
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700388 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700389 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
390 DeviceId deviceId,
391 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700393 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700394 log.info("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700396 final Timestamped<List<PortDescription>> timestampedInput
397 = new Timestamped<>(portDescriptions, newTimestamp);
398 final List<DeviceEvent> events;
399 final Timestamped<List<PortDescription>> merged;
400
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700401 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700402 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700403 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700404 List<PortDescription> mergedList =
405 FluentIterable.from(portDescriptions)
406 .transform(new Function<PortDescription, PortDescription>() {
407 @Override
408 public PortDescription apply(PortDescription input) {
409 // lookup merged port description
410 return descs.getPortDesc(input.portNumber()).value();
411 }
412 }).toList();
413 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
414 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700416 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
417 providerId, deviceId);
418 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700419 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700420 } catch (IOException e) {
421 log.error("Failed to notify peers of a port update topology event or providerId: "
422 + providerId + " and deviceId: " + deviceId, e);
423 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700424 }
425 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700426 }
427
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700428 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
429 DeviceId deviceId,
430 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700431
432 Device device = devices.get(deviceId);
433 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
434
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700435 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700436 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
437
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700438 List<DeviceEvent> events = new ArrayList<>();
439 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700440
441 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
442 log.debug("Ignoring outdated events: {}", portDescriptions);
443 return null;
444 }
445
446 DeviceDescriptions descs = descsMap.get(providerId);
447 // every provider must provide DeviceDescription.
448 checkArgument(descs != null,
449 "Device description for Device ID %s from Provider %s was not found",
450 deviceId, providerId);
451
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700452 Map<PortNumber, Port> ports = getPortMap(deviceId);
453
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700454 final Timestamp newTimestamp = portDescriptions.timestamp();
455
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700456 // Add new ports
457 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700458 for (PortDescription portDescription : portDescriptions.value()) {
459 final PortNumber number = portDescription.portNumber();
460 processed.add(number);
461
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700462 final Port oldPort = ports.get(number);
463 final Port newPort;
464
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700465
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
467 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700468 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700469 // on new port or valid update
470 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700471 descs.putPortDesc(new Timestamped<>(portDescription,
472 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700473 newPort = composePort(device, number, descsMap);
474 } else {
475 // outdated event, ignored.
476 continue;
477 }
478
479 events.add(oldPort == null ?
480 createPort(device, newPort, ports) :
481 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482 }
483
484 events.addAll(pruneOldPorts(device, ports, processed));
485 }
486 return FluentIterable.from(events).filter(notNull()).toList();
487 }
488
489 // Creates a new port based on the port description adds it to the map and
490 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700491 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700492 private DeviceEvent createPort(Device device, Port newPort,
493 Map<PortNumber, Port> ports) {
494 ports.put(newPort.number(), newPort);
495 return new DeviceEvent(PORT_ADDED, device, newPort);
496 }
497
498 // Checks if the specified port requires update and if so, it replaces the
499 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700500 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700501 private DeviceEvent updatePort(Device device, Port oldPort,
502 Port newPort,
503 Map<PortNumber, Port> ports) {
504 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700505 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700506
507 ports.put(oldPort.number(), newPort);
508 return new DeviceEvent(PORT_UPDATED, device, newPort);
509 }
510 return null;
511 }
512
513 // Prunes the specified list of ports based on which ports are in the
514 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700515 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700516 private List<DeviceEvent> pruneOldPorts(Device device,
517 Map<PortNumber, Port> ports,
518 Set<PortNumber> processed) {
519 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700520 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700521 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700522 Entry<PortNumber, Port> e = iterator.next();
523 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700524 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700525 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700526 iterator.remove();
527 }
528 }
529 return events;
530 }
531
532 // Gets the map of ports for the specified device; if one does not already
533 // exist, it creates and registers a new one.
534 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
535 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700536 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
537 }
538
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700539 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700540 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700541 Map<ProviderId, DeviceDescriptions> r;
542 r = deviceDescs.get(deviceId);
543 if (r == null) {
544 r = new HashMap<ProviderId, DeviceDescriptions>();
545 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
546 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
547 if (concurrentlyAdded != null) {
548 r = concurrentlyAdded;
549 }
550 }
551 return r;
552 }
553
554 // Guarded by deviceDescs value (=Device lock)
555 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
556 Map<ProviderId, DeviceDescriptions> device,
557 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
558
559 synchronized (device) {
560 DeviceDescriptions r = device.get(providerId);
561 if (r == null) {
562 r = new DeviceDescriptions(deltaDesc);
563 device.put(providerId, r);
564 }
565 return r;
566 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700567 }
568
569 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700570 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
571 DeviceId deviceId,
572 PortDescription portDescription) {
573
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700574 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700575 final Timestamped<PortDescription> deltaDesc
576 = new Timestamped<>(portDescription, newTimestamp);
577 final DeviceEvent event;
578 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700579 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700580 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700581 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700582 .getPortDesc(portDescription.portNumber());
583 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700584 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700585 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
586 providerId, deviceId);
587 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700588 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700589 } catch (IOException e) {
590 log.error("Failed to notify peers of a port status update topology event or providerId: "
591 + providerId + " and deviceId: " + deviceId, e);
592 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700593 }
594 return event;
595 }
596
597 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
598 Timestamped<PortDescription> deltaDesc) {
599
600 Device device = devices.get(deviceId);
601 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
602
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700603 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700604 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
605
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700606 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700607
608 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
609 log.debug("Ignoring outdated event: {}", deltaDesc);
610 return null;
611 }
612
613 DeviceDescriptions descs = descsMap.get(providerId);
614 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700615 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700616 "Device description for Device ID %s from Provider %s was not found",
617 deviceId, providerId);
618
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700619 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
620 final PortNumber number = deltaDesc.value().portNumber();
621 final Port oldPort = ports.get(number);
622 final Port newPort;
623
624 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
625 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700626 deltaDesc.isNewer(existingPortDesc)) {
627 // on new port or valid update
628 // update description
629 descs.putPortDesc(deltaDesc);
630 newPort = composePort(device, number, descsMap);
631 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700632 // same or outdated event, ignored.
633 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700634 return null;
635 }
636
637 if (oldPort == null) {
638 return createPort(device, newPort, ports);
639 } else {
640 return updatePort(device, oldPort, newPort, ports);
641 }
642 }
643 }
644
645 @Override
646 public List<Port> getPorts(DeviceId deviceId) {
647 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
648 if (ports == null) {
649 return Collections.emptyList();
650 }
651 return ImmutableList.copyOf(ports.values());
652 }
653
654 @Override
655 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
656 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
657 return ports == null ? null : ports.get(portNumber);
658 }
659
660 @Override
661 public boolean isAvailable(DeviceId deviceId) {
662 return availableDevices.contains(deviceId);
663 }
664
665 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700666 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700667 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700668 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700669 if (event != null) {
670 log.info("Notifying peers of a device removed topology event for deviceId: {}",
671 deviceId);
672 try {
673 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
674 } catch (IOException e) {
675 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
676 deviceId);
677 }
678 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700679 return event;
680 }
681
682 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
683 Timestamp timestamp) {
684
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700685 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700686 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700687 // accept removal request if given timestamp is newer than
688 // the latest Timestamp from Primary provider
689 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
690 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
691 if (timestamp.compareTo(lastTimestamp) <= 0) {
692 // outdated event ignore
693 return null;
694 }
695 removalRequest.put(deviceId, timestamp);
696
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700697 Device device = devices.remove(deviceId);
698 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700699 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
700 if (ports != null) {
701 ports.clear();
702 }
703 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700704 descs.clear();
705 return device == null ? null :
706 new DeviceEvent(DEVICE_REMOVED, device, null);
707 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700708 }
709
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700710 /**
711 * Checks if given timestamp is superseded by removal request
712 * with more recent timestamp.
713 *
714 * @param deviceId identifier of a device
715 * @param timestampToCheck timestamp of an event to check
716 * @return true if device is already removed
717 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700718 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
719 Timestamp removalTimestamp = removalRequest.get(deviceId);
720 if (removalTimestamp != null &&
721 removalTimestamp.compareTo(timestampToCheck) >= 0) {
722 // removalRequest is more recent
723 return true;
724 }
725 return false;
726 }
727
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700728 /**
729 * Returns a Device, merging description given from multiple Providers.
730 *
731 * @param deviceId device identifier
732 * @param providerDescs Collection of Descriptions from multiple providers
733 * @return Device instance
734 */
735 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700736 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700737
738 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
739
740 ProviderId primary = pickPrimaryPID(providerDescs);
741
742 DeviceDescriptions desc = providerDescs.get(primary);
743
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700744 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700745 Type type = base.type();
746 String manufacturer = base.manufacturer();
747 String hwVersion = base.hwVersion();
748 String swVersion = base.swVersion();
749 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700750 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700751 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
752 annotations = merge(annotations, base.annotations());
753
754 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
755 if (e.getKey().equals(primary)) {
756 continue;
757 }
758 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700759 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700760 // Currently assuming there will never be a key conflict between
761 // providers
762
763 // annotation merging. not so efficient, should revisit later
764 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
765 }
766
767 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700768 hwVersion, swVersion, serialNumber,
769 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700770 }
771
772 /**
773 * Returns a Port, merging description given from multiple Providers.
774 *
775 * @param device device the port is on
776 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700777 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700778 * @return Port instance
779 */
780 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700781 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700782
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700783 ProviderId primary = pickPrimaryPID(descsMap);
784 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700785 // if no primary, assume not enabled
786 // TODO: revisit this default port enabled/disabled behavior
787 boolean isEnabled = false;
788 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
789
790 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
791 if (portDesc != null) {
792 isEnabled = portDesc.value().isEnabled();
793 annotations = merge(annotations, portDesc.value().annotations());
794 }
795
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700796 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700797 if (e.getKey().equals(primary)) {
798 continue;
799 }
800 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700801 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700802 // Currently assuming there will never be a key conflict between
803 // providers
804
805 // annotation merging. not so efficient, should revisit later
806 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
807 if (otherPortDesc != null) {
808 annotations = merge(annotations, otherPortDesc.value().annotations());
809 }
810 }
811
812 return new DefaultPort(device, number, isEnabled, annotations);
813 }
814
815 /**
816 * @return primary ProviderID, or randomly chosen one if none exists
817 */
818 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700819 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700820 ProviderId fallBackPrimary = null;
821 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
822 if (!e.getKey().isAncillary()) {
823 return e.getKey();
824 } else if (fallBackPrimary == null) {
825 // pick randomly as a fallback in case there is no primary
826 fallBackPrimary = e.getKey();
827 }
828 }
829 return fallBackPrimary;
830 }
831
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700832 private DeviceDescriptions getPrimaryDescriptions(
833 Map<ProviderId, DeviceDescriptions> providerDescs) {
834 ProviderId pid = pickPrimaryPID(providerDescs);
835 return providerDescs.get(pid);
836 }
837
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700838 // TODO: should we be throwing exception?
839 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
840 ClusterMessage message = new ClusterMessage(
841 clusterService.getLocalNode().id(),
842 subject,
843 SERIALIZER.encode(event));
844 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700845 }
846
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700847 // TODO: should we be throwing exception?
848 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
849 ClusterMessage message = new ClusterMessage(
850 clusterService.getLocalNode().id(),
851 subject,
852 SERIALIZER.encode(event));
853 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700854 }
Madan Jampani47c93732014-10-06 20:46:08 -0700855
856 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700857 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700858 }
859
Madan Jampani25322532014-10-08 11:20:38 -0700860 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700861 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700862 }
863
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700864 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700865 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700866 }
867
Madan Jampani47c93732014-10-06 20:46:08 -0700868 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700869 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700870 }
871
872 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700873 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
874 }
875
876 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
877 try {
878 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
879 } catch (IOException e) {
880 log.error("Failed to send" + event + " to " + recipient, e);
881 }
882 }
883
884 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
885 try {
886 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
887 } catch (IOException e) {
888 log.error("Failed to send" + event + " to " + recipient, e);
889 }
890 }
891
892 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
893 try {
894 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
895 } catch (IOException e) {
896 log.error("Failed to send" + event + " to " + recipient, e);
897 }
898 }
899
900 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
901 try {
902 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
903 } catch (IOException e) {
904 log.error("Failed to send" + event + " to " + recipient, e);
905 }
906 }
907
908 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
909 try {
910 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
911 } catch (IOException e) {
912 log.error("Failed to send" + event + " to " + recipient, e);
913 }
914 }
915
916 private DeviceAntiEntropyAdvertisement createAdvertisement() {
917 final NodeId self = clusterService.getLocalNode().id();
918
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700919 final int numDevices = deviceDescs.size();
920 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
921 final int portsPerDevice = 8; // random factor to minimize reallocation
922 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
923 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700924
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700925 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700926 provs : deviceDescs.entrySet()) {
927
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700928 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700929 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700930 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700931 synchronized (devDescs) {
932
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700933 // send device offline timestamp
934 Timestamp lOffline = this.offline.get(deviceId);
935 if (lOffline != null) {
936 adOffline.put(deviceId, lOffline);
937 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700938
939 for (Entry<ProviderId, DeviceDescriptions>
940 prov : devDescs.entrySet()) {
941
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700942 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700943 final ProviderId provId = prov.getKey();
944 final DeviceDescriptions descs = prov.getValue();
945
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700946 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700947 descs.getDeviceDesc().timestamp());
948
949 for (Entry<PortNumber, Timestamped<PortDescription>>
950 portDesc : descs.getPortDescs().entrySet()) {
951
952 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700953 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700954 portDesc.getValue().timestamp());
955 }
956 }
957 }
958 }
959
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700960 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700961 }
962
963 /**
964 * Responds to anti-entropy advertisement message.
965 * <P>
966 * Notify sender about out-dated information using regular replication message.
967 * Send back advertisement to sender if not in sync.
968 *
969 * @param advertisement to respond to
970 */
971 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
972
973 final NodeId sender = advertisement.sender();
974
975 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
976 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
977 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
978
979 // Fragments to request
980 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
981 Collection<PortFragmentId> reqPorts = new ArrayList<>();
982
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700983 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700984 final DeviceId deviceId = de.getKey();
985 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
986
987 synchronized (lDevice) {
988 // latestTimestamp across provider
989 // Note: can be null initially
990 Timestamp localLatest = offline.get(deviceId);
991
992 // handle device Ads
993 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
994 final ProviderId provId = prov.getKey();
995 final DeviceDescriptions lDeviceDescs = prov.getValue();
996
997 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
998
999
1000 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1001 Timestamp advDevTimestamp = devAds.get(devFragId);
1002
1003 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1004 // remote does not have it or outdated, suggest
1005 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1006 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1007 // local is outdated, request
1008 reqDevices.add(devFragId);
1009 }
1010
1011 // handle port Ads
1012 for (Entry<PortNumber, Timestamped<PortDescription>>
1013 pe : lDeviceDescs.getPortDescs().entrySet()) {
1014
1015 final PortNumber num = pe.getKey();
1016 final Timestamped<PortDescription> lPort = pe.getValue();
1017
1018 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1019
1020 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001021 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001022 // remote does not have it or outdated, suggest
1023 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1024 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1025 // local is outdated, request
1026 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1027 reqPorts.add(portFragId);
1028 }
1029
1030 // remove port Ad already processed
1031 portAds.remove(portFragId);
1032 } // end local port loop
1033
1034 // remove device Ad already processed
1035 devAds.remove(devFragId);
1036
1037 // find latest and update
1038 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1039 if (localLatest == null ||
1040 providerLatest.compareTo(localLatest) > 0) {
1041 localLatest = providerLatest;
1042 }
1043 } // end local provider loop
1044
1045 // checking if remote timestamp is more recent.
1046 Timestamp rOffline = offlineAds.get(deviceId);
1047 if (rOffline != null &&
1048 rOffline.compareTo(localLatest) > 0) {
1049 // remote offline timestamp suggests that the
1050 // device is off-line
1051 markOfflineInternal(deviceId, rOffline);
1052 }
1053
1054 Timestamp lOffline = offline.get(deviceId);
1055 if (lOffline != null && rOffline == null) {
1056 // locally offline, but remote is online, suggest offline
1057 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1058 }
1059
1060 // remove device offline Ad already processed
1061 offlineAds.remove(deviceId);
1062 } // end local device loop
1063 } // device lock
1064
1065 // If there is any Ads left, request them
1066 log.trace("Ads left {}, {}", devAds, portAds);
1067 reqDevices.addAll(devAds.keySet());
1068 reqPorts.addAll(portAds.keySet());
1069
1070 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1071 log.trace("Nothing to request to remote peer {}", sender);
1072 return;
1073 }
1074
1075 log.info("Need to sync {} {}", reqDevices, reqPorts);
1076
1077 // 2-way Anti-Entropy for now
1078 try {
1079 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1080 } catch (IOException e) {
1081 log.error("Failed to send response advertisement to " + sender, e);
1082 }
1083
1084// Sketch of 3-way Anti-Entropy
1085// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1086// ClusterMessage message = new ClusterMessage(
1087// clusterService.getLocalNode().id(),
1088// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1089// SERIALIZER.encode(request));
1090//
1091// try {
1092// clusterCommunicator.unicast(message, advertisement.sender());
1093// } catch (IOException e) {
1094// log.error("Failed to send advertisement reply to "
1095// + advertisement.sender(), e);
1096// }
Madan Jampani47c93732014-10-06 20:46:08 -07001097 }
1098
Madan Jampani255a58b2014-10-09 12:08:20 -07001099 private void notifyDelegateIfNotNull(DeviceEvent event) {
1100 if (event != null) {
1101 notifyDelegate(event);
1102 }
1103 }
1104
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001105 private final class SendAdvertisementTask implements Runnable {
1106
1107 @Override
1108 public void run() {
1109 if (Thread.currentThread().isInterrupted()) {
1110 log.info("Interrupted, quitting");
1111 return;
1112 }
1113
1114 try {
1115 final NodeId self = clusterService.getLocalNode().id();
1116 Set<ControllerNode> nodes = clusterService.getNodes();
1117
1118 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1119 .transform(toNodeId())
1120 .toList();
1121
1122 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001123 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001124 return;
1125 }
1126
1127 NodeId peer;
1128 do {
1129 int idx = RandomUtils.nextInt(0, nodeIds.size());
1130 peer = nodeIds.get(idx);
1131 } while (peer.equals(self));
1132
1133 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1134
1135 if (Thread.currentThread().isInterrupted()) {
1136 log.info("Interrupted, quitting");
1137 return;
1138 }
1139
1140 try {
1141 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1142 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001143 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001144 return;
1145 }
1146 } catch (Exception e) {
1147 // catch all Exception to avoid Scheduled task being suppressed.
1148 log.error("Exception thrown while sending advertisement", e);
1149 }
1150 }
1151 }
1152
Madan Jampani47c93732014-10-06 20:46:08 -07001153 private class InternalDeviceEventListener implements ClusterMessageHandler {
1154 @Override
1155 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001156
Madan Jampani47c93732014-10-06 20:46:08 -07001157 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001158 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001159
Madan Jampani47c93732014-10-06 20:46:08 -07001160 ProviderId providerId = event.providerId();
1161 DeviceId deviceId = event.deviceId();
1162 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001163
Madan Jampani255a58b2014-10-09 12:08:20 -07001164 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001165 }
1166 }
1167
Madan Jampani25322532014-10-08 11:20:38 -07001168 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1169 @Override
1170 public void handle(ClusterMessage message) {
1171
1172 log.info("Received device offline event from peer: {}", message.sender());
1173 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1174
1175 DeviceId deviceId = event.deviceId();
1176 Timestamp timestamp = event.timestamp();
1177
Madan Jampani255a58b2014-10-09 12:08:20 -07001178 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001179 }
1180 }
1181
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001182 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1183 @Override
1184 public void handle(ClusterMessage message) {
1185
1186 log.info("Received device removed event from peer: {}", message.sender());
1187 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1188
1189 DeviceId deviceId = event.deviceId();
1190 Timestamp timestamp = event.timestamp();
1191
Madan Jampani255a58b2014-10-09 12:08:20 -07001192 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001193 }
1194 }
1195
Madan Jampani47c93732014-10-06 20:46:08 -07001196 private class InternalPortEventListener implements ClusterMessageHandler {
1197 @Override
1198 public void handle(ClusterMessage message) {
1199
1200 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001201 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001202
1203 ProviderId providerId = event.providerId();
1204 DeviceId deviceId = event.deviceId();
1205 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1206
Madan Jampani255a58b2014-10-09 12:08:20 -07001207 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001208 }
1209 }
1210
1211 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1212 @Override
1213 public void handle(ClusterMessage message) {
1214
1215 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001216 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001217 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001218
1219 ProviderId providerId = event.providerId();
1220 DeviceId deviceId = event.deviceId();
1221 Timestamped<PortDescription> portDescription = event.portDescription();
1222
Madan Jampani255a58b2014-10-09 12:08:20 -07001223 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001224 }
1225 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001226
1227 private final class InternalDeviceAdvertisementListener
1228 implements ClusterMessageHandler {
1229
1230 @Override
1231 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001232 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001233 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1234 handleAdvertisement(advertisement);
1235 }
1236 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001237}