blob: 3f927fd564eb70b7df18eabf9251ce7a43eaa98d [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070017import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070018import org.onlab.onos.net.DefaultAnnotations;
19import org.onlab.onos.net.DefaultDevice;
20import org.onlab.onos.net.DefaultPort;
21import org.onlab.onos.net.Device;
22import org.onlab.onos.net.Device.Type;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Port;
25import org.onlab.onos.net.PortNumber;
26import org.onlab.onos.net.SparseAnnotations;
27import org.onlab.onos.net.device.DefaultDeviceDescription;
28import org.onlab.onos.net.device.DefaultPortDescription;
29import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.ClockService;
37import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani53e44e62014-10-07 12:39:51 -070041import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoPoolUtil;
44import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanifef9b3a2014-10-07 18:38:17 -070045import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070046import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
52import java.util.Collections;
53import java.util.HashSet;
54import java.util.Iterator;
55import java.util.List;
56import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ConcurrentHashMap;
61import java.util.concurrent.ConcurrentMap;
62import java.util.concurrent.atomic.AtomicReference;
63
64import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Predicates.notNull;
67import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68import static org.slf4j.LoggerFactory.getLogger;
69import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070071import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import static com.google.common.base.Verify.verify;
73
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070074// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070075/**
76 * Manages inventory of infrastructure devices using gossip protocol to distribute
77 * information.
78 */
79@Component(immediate = true)
80@Service
81public class GossipDeviceStore
82 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
83 implements DeviceStore {
84
85 private final Logger log = getLogger(getClass());
86
87 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
88
89 // TODO: Check if inner Map can be replaced with plain Map
90 // innerMap is used to lock a Device, thus instance should never be replaced.
91 // collection of Description given from various providers
92 private final ConcurrentMap<DeviceId,
93 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070094 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095
96 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
98 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
99
100 // to be updated under Device lock
101 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
102 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700103
104 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700105 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClockService clockService;
109
Madan Jampani47c93732014-10-06 20:46:08 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
Madan Jampani53e44e62014-10-07 12:39:51 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700117 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700118 protected void setupKryoPool() {
119 serializerPool = KryoPool.newBuilder()
120 .register(KryoPoolUtil.API)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700122 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700123 .register(InternalPortEvent.class, new InternalPortEventSerializer())
124 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700125 .register(Timestamp.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700126 .register(Timestamped.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700127 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -0700128 .build()
129 .populate(1);
130 }
131
132 };
133
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700134 @Activate
135 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700136 clusterCommunicator.addSubscriber(
137 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
138 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700139 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
140 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700141 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
142 clusterCommunicator.addSubscriber(
143 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700144 log.info("Started");
145 }
146
147 @Deactivate
148 public void deactivate() {
149 deviceDescs.clear();
150 devices.clear();
151 devicePorts.clear();
152 availableDevices.clear();
153 log.info("Stopped");
154 }
155
156 @Override
157 public int getDeviceCount() {
158 return devices.size();
159 }
160
161 @Override
162 public Iterable<Device> getDevices() {
163 return Collections.unmodifiableCollection(devices.values());
164 }
165
166 @Override
167 public Device getDevice(DeviceId deviceId) {
168 return devices.get(deviceId);
169 }
170
171 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700172 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
173 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700174 DeviceDescription deviceDescription) {
175 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
176 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
177 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
178 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700179 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
180 providerId, deviceId);
181 try {
182 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
183 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700184 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700185 + providerId + " and deviceId: " + deviceId, e);
186 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700187 }
188 return event;
189 }
190
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700191 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
192 DeviceId deviceId,
193 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700194
195 // Collection of DeviceDescriptions for a Device
196 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700197 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700198
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700199 synchronized (providerDescs) {
200 // locking per device
201
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700202 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
203 log.debug("Ignoring outdated event: {}", deltaDesc);
204 return null;
205 }
206
207 DeviceDescriptions descs
208 = createIfAbsentUnchecked(providerDescs, providerId,
209 new InitDeviceDescs(deltaDesc));
210
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700211 final Device oldDevice = devices.get(deviceId);
212 final Device newDevice;
213
214 if (deltaDesc == descs.getDeviceDesc() ||
215 deltaDesc.isNewer(descs.getDeviceDesc())) {
216 // on new device or valid update
217 descs.putDeviceDesc(deltaDesc);
218 newDevice = composeDevice(deviceId, providerDescs);
219 } else {
220 // outdated event, ignored.
221 return null;
222 }
223 if (oldDevice == null) {
224 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700225 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700226 } else {
227 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700228 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229 }
230 }
231 }
232
233 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700234 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700235 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700236 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237
238 // update composed device cache
239 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
240 verify(oldDevice == null,
241 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
242 providerId, oldDevice, newDevice);
243
244 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700245 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700246 }
247
248 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
249 }
250
251 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700252 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700253 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700254 Device oldDevice,
255 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700256
257 // We allow only certain attributes to trigger update
258 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
259 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700260 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700261
262 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
263 if (!replaced) {
264 verify(replaced,
265 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
266 providerId, oldDevice, devices.get(newDevice.id())
267 , newDevice);
268 }
269 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700270 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700271 }
272 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
273 }
274
275 // Otherwise merely attempt to change availability if primary provider
276 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700277 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700278 return !added ? null :
279 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
280 }
281 return null;
282 }
283
284 @Override
285 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700286 Timestamp timestamp = clockService.getTimestamp(deviceId);
Madan Jampani25322532014-10-08 11:20:38 -0700287 DeviceEvent event = markOfflineInternal(deviceId, timestamp);
288 if (event != null) {
289 log.info("Notifying peers of a device offline topology event for deviceId: {}",
290 deviceId);
291 try {
292 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
293 } catch (IOException e) {
294 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
295 deviceId);
296 }
297 }
298 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700299 }
300
301 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
302
303 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700304 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700305
306 // locking device
307 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700308
309 // accept off-line if given timestamp is newer than
310 // the latest Timestamp from Primary provider
311 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
312 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
313 if (timestamp.compareTo(lastTimestamp) <= 0) {
314 // outdated event ignore
315 return null;
316 }
317
318 offline.put(deviceId, timestamp);
319
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700320 Device device = devices.get(deviceId);
321 if (device == null) {
322 return null;
323 }
324 boolean removed = availableDevices.remove(deviceId);
325 if (removed) {
326 // TODO: broadcast ... DOWN only?
327 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700328 }
329 return null;
330 }
331 }
332
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700333 /**
334 * Marks the device as available if the given timestamp is not outdated,
335 * compared to the time the device has been marked offline.
336 *
337 * @param deviceId identifier of the device
338 * @param timestamp of the event triggering this change.
339 * @return true if availability change request was accepted and changed the state
340 */
341 // Guarded by deviceDescs value (=Device lock)
342 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
343 // accept on-line if given timestamp is newer than
344 // the latest offline request Timestamp
345 Timestamp offlineTimestamp = offline.get(deviceId);
346 if (offlineTimestamp == null ||
347 offlineTimestamp.compareTo(timestamp) < 0) {
348
349 offline.remove(deviceId);
350 return availableDevices.add(deviceId);
351 }
352 return false;
353 }
354
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700355 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700356 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
357 DeviceId deviceId,
358 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700359 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
360
Madan Jampani47c93732014-10-06 20:46:08 -0700361 Timestamped<List<PortDescription>> timestampedPortDescriptions =
362 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700363
Madan Jampani47c93732014-10-06 20:46:08 -0700364 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700366 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
367 providerId, deviceId);
368 try {
369 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
370 } catch (IOException e) {
371 log.error("Failed to notify peers of a port update topology event or providerId: "
372 + providerId + " and deviceId: " + deviceId, e);
373 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700374 }
375 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700376 }
377
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700378 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
379 DeviceId deviceId,
380 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700381
382 Device device = devices.get(deviceId);
383 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
384
385 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
386 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
387
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700388 List<DeviceEvent> events = new ArrayList<>();
389 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700390
391 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
392 log.debug("Ignoring outdated events: {}", portDescriptions);
393 return null;
394 }
395
396 DeviceDescriptions descs = descsMap.get(providerId);
397 // every provider must provide DeviceDescription.
398 checkArgument(descs != null,
399 "Device description for Device ID %s from Provider %s was not found",
400 deviceId, providerId);
401
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700402 Map<PortNumber, Port> ports = getPortMap(deviceId);
403
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700404 final Timestamp newTimestamp = portDescriptions.timestamp();
405
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700406 // Add new ports
407 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700408 for (PortDescription portDescription : portDescriptions.value()) {
409 final PortNumber number = portDescription.portNumber();
410 processed.add(number);
411
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700412 final Port oldPort = ports.get(number);
413 final Port newPort;
414
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700415
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700416 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
417 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700418 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700419 // on new port or valid update
420 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700421 descs.putPortDesc(new Timestamped<>(portDescription,
422 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700423 newPort = composePort(device, number, descsMap);
424 } else {
425 // outdated event, ignored.
426 continue;
427 }
428
429 events.add(oldPort == null ?
430 createPort(device, newPort, ports) :
431 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700432 }
433
434 events.addAll(pruneOldPorts(device, ports, processed));
435 }
436 return FluentIterable.from(events).filter(notNull()).toList();
437 }
438
439 // Creates a new port based on the port description adds it to the map and
440 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700441 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700442 private DeviceEvent createPort(Device device, Port newPort,
443 Map<PortNumber, Port> ports) {
444 ports.put(newPort.number(), newPort);
445 return new DeviceEvent(PORT_ADDED, device, newPort);
446 }
447
448 // Checks if the specified port requires update and if so, it replaces the
449 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700450 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700451 private DeviceEvent updatePort(Device device, Port oldPort,
452 Port newPort,
453 Map<PortNumber, Port> ports) {
454 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700455 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700456
457 ports.put(oldPort.number(), newPort);
458 return new DeviceEvent(PORT_UPDATED, device, newPort);
459 }
460 return null;
461 }
462
463 // Prunes the specified list of ports based on which ports are in the
464 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700465 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466 private List<DeviceEvent> pruneOldPorts(Device device,
467 Map<PortNumber, Port> ports,
468 Set<PortNumber> processed) {
469 List<DeviceEvent> events = new ArrayList<>();
470 Iterator<PortNumber> iterator = ports.keySet().iterator();
471 while (iterator.hasNext()) {
472 PortNumber portNumber = iterator.next();
473 if (!processed.contains(portNumber)) {
474 events.add(new DeviceEvent(PORT_REMOVED, device,
475 ports.get(portNumber)));
476 iterator.remove();
477 }
478 }
479 return events;
480 }
481
482 // Gets the map of ports for the specified device; if one does not already
483 // exist, it creates and registers a new one.
484 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
485 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700486 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
487 }
488
489 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
490 DeviceId deviceId) {
491 return createIfAbsentUnchecked(deviceDescs, deviceId,
492 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700493 }
494
495 @Override
496 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
497 PortDescription portDescription) {
498 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
499 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
500 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
501 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700502 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
503 providerId, deviceId);
504 try {
505 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
506 } catch (IOException e) {
507 log.error("Failed to notify peers of a port status update topology event or providerId: "
508 + providerId + " and deviceId: " + deviceId, e);
509 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700510 }
511 return event;
512 }
513
514 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
515 Timestamped<PortDescription> deltaDesc) {
516
517 Device device = devices.get(deviceId);
518 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
519
520 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
521 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
522
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700523 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700524
525 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
526 log.debug("Ignoring outdated event: {}", deltaDesc);
527 return null;
528 }
529
530 DeviceDescriptions descs = descsMap.get(providerId);
531 // assuming all providers must to give DeviceDescription
532 checkArgument(descs != null,
533 "Device description for Device ID %s from Provider %s was not found",
534 deviceId, providerId);
535
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700536 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
537 final PortNumber number = deltaDesc.value().portNumber();
538 final Port oldPort = ports.get(number);
539 final Port newPort;
540
541 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
542 if (existingPortDesc == null ||
543 deltaDesc == existingPortDesc ||
544 deltaDesc.isNewer(existingPortDesc)) {
545 // on new port or valid update
546 // update description
547 descs.putPortDesc(deltaDesc);
548 newPort = composePort(device, number, descsMap);
549 } else {
550 // outdated event, ignored.
551 return null;
552 }
553
554 if (oldPort == null) {
555 return createPort(device, newPort, ports);
556 } else {
557 return updatePort(device, oldPort, newPort, ports);
558 }
559 }
560 }
561
562 @Override
563 public List<Port> getPorts(DeviceId deviceId) {
564 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
565 if (ports == null) {
566 return Collections.emptyList();
567 }
568 return ImmutableList.copyOf(ports.values());
569 }
570
571 @Override
572 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
573 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
574 return ports == null ? null : ports.get(portNumber);
575 }
576
577 @Override
578 public boolean isAvailable(DeviceId deviceId) {
579 return availableDevices.contains(deviceId);
580 }
581
582 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700583 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
584 Timestamp timestamp = clockService.getTimestamp(deviceId);
585 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
586 // TODO: broadcast removal event
587 return event;
588 }
589
590 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
591 Timestamp timestamp) {
592
593 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700594 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700595 // accept removal request if given timestamp is newer than
596 // the latest Timestamp from Primary provider
597 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
598 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
599 if (timestamp.compareTo(lastTimestamp) <= 0) {
600 // outdated event ignore
601 return null;
602 }
603 removalRequest.put(deviceId, timestamp);
604
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700605 Device device = devices.remove(deviceId);
606 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700607 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
608 if (ports != null) {
609 ports.clear();
610 }
611 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700612 descs.clear();
613 return device == null ? null :
614 new DeviceEvent(DEVICE_REMOVED, device, null);
615 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700616 }
617
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700618 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
619 Timestamp removalTimestamp = removalRequest.get(deviceId);
620 if (removalTimestamp != null &&
621 removalTimestamp.compareTo(timestampToCheck) >= 0) {
622 // removalRequest is more recent
623 return true;
624 }
625 return false;
626 }
627
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700628 /**
629 * Returns a Device, merging description given from multiple Providers.
630 *
631 * @param deviceId device identifier
632 * @param providerDescs Collection of Descriptions from multiple providers
633 * @return Device instance
634 */
635 private Device composeDevice(DeviceId deviceId,
636 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
637
638 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
639
640 ProviderId primary = pickPrimaryPID(providerDescs);
641
642 DeviceDescriptions desc = providerDescs.get(primary);
643
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700644 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700645 Type type = base.type();
646 String manufacturer = base.manufacturer();
647 String hwVersion = base.hwVersion();
648 String swVersion = base.swVersion();
649 String serialNumber = base.serialNumber();
650 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
651 annotations = merge(annotations, base.annotations());
652
653 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
654 if (e.getKey().equals(primary)) {
655 continue;
656 }
657 // TODO: should keep track of Description timestamp
658 // and only merge conflicting keys when timestamp is newer
659 // Currently assuming there will never be a key conflict between
660 // providers
661
662 // annotation merging. not so efficient, should revisit later
663 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
664 }
665
666 return new DefaultDevice(primary, deviceId , type, manufacturer,
667 hwVersion, swVersion, serialNumber, annotations);
668 }
669
670 /**
671 * Returns a Port, merging description given from multiple Providers.
672 *
673 * @param device device the port is on
674 * @param number port number
675 * @param providerDescs Collection of Descriptions from multiple providers
676 * @return Port instance
677 */
678 private Port composePort(Device device, PortNumber number,
679 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
680
681 ProviderId primary = pickPrimaryPID(providerDescs);
682 DeviceDescriptions primDescs = providerDescs.get(primary);
683 // if no primary, assume not enabled
684 // TODO: revisit this default port enabled/disabled behavior
685 boolean isEnabled = false;
686 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
687
688 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
689 if (portDesc != null) {
690 isEnabled = portDesc.value().isEnabled();
691 annotations = merge(annotations, portDesc.value().annotations());
692 }
693
694 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
695 if (e.getKey().equals(primary)) {
696 continue;
697 }
698 // TODO: should keep track of Description timestamp
699 // and only merge conflicting keys when timestamp is newer
700 // Currently assuming there will never be a key conflict between
701 // providers
702
703 // annotation merging. not so efficient, should revisit later
704 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
705 if (otherPortDesc != null) {
706 annotations = merge(annotations, otherPortDesc.value().annotations());
707 }
708 }
709
710 return new DefaultPort(device, number, isEnabled, annotations);
711 }
712
713 /**
714 * @return primary ProviderID, or randomly chosen one if none exists
715 */
716 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700717 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700718 ProviderId fallBackPrimary = null;
719 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
720 if (!e.getKey().isAncillary()) {
721 return e.getKey();
722 } else if (fallBackPrimary == null) {
723 // pick randomly as a fallback in case there is no primary
724 fallBackPrimary = e.getKey();
725 }
726 }
727 return fallBackPrimary;
728 }
729
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700730 private DeviceDescriptions getPrimaryDescriptions(
731 Map<ProviderId, DeviceDescriptions> providerDescs) {
732 ProviderId pid = pickPrimaryPID(providerDescs);
733 return providerDescs.get(pid);
734 }
735
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700736 public static final class InitDeviceDescs
737 implements ConcurrentInitializer<DeviceDescriptions> {
738
739 private final Timestamped<DeviceDescription> deviceDesc;
740
741 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
742 this.deviceDesc = checkNotNull(deviceDesc);
743 }
744 @Override
745 public DeviceDescriptions get() throws ConcurrentException {
746 return new DeviceDescriptions(deviceDesc);
747 }
748 }
749
750
751 /**
752 * Collection of Description of a Device and it's Ports given from a Provider.
753 */
754 public static class DeviceDescriptions {
755
756 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
757 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
758
759 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
760 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
761 this.portDescs = new ConcurrentHashMap<>();
762 }
763
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700764 Timestamp getLatestTimestamp() {
765 Timestamp latest = deviceDesc.get().timestamp();
766 for (Timestamped<PortDescription> desc : portDescs.values()) {
767 if (desc.timestamp().compareTo(latest) > 0) {
768 latest = desc.timestamp();
769 }
770 }
771 return latest;
772 }
773
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700774 public Timestamped<DeviceDescription> getDeviceDesc() {
775 return deviceDesc.get();
776 }
777
778 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
779 return portDescs.get(number);
780 }
781
782 /**
783 * Puts DeviceDescription, merging annotations as necessary.
784 *
785 * @param newDesc new DeviceDescription
786 * @return previous DeviceDescription
787 */
788 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
789 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
790 Timestamped<DeviceDescription> newOne = newDesc;
791 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700792 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700793 newDesc.value().annotations());
794 newOne = new Timestamped<DeviceDescription>(
795 new DefaultDeviceDescription(newDesc.value(), merged),
796 newDesc.timestamp());
797 }
798 return deviceDesc.getAndSet(newOne);
799 }
800
801 /**
802 * Puts PortDescription, merging annotations as necessary.
803 *
804 * @param newDesc new PortDescription
805 * @return previous PortDescription
806 */
807 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
808 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
809 Timestamped<PortDescription> newOne = newDesc;
810 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700811 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700812 newDesc.value().annotations());
813 newOne = new Timestamped<PortDescription>(
814 new DefaultPortDescription(newDesc.value(), merged),
815 newDesc.timestamp());
816 }
817 return portDescs.put(newOne.value().portNumber(), newOne);
818 }
819 }
Madan Jampani47c93732014-10-06 20:46:08 -0700820
821 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700822 ClusterMessage message = new ClusterMessage(
823 clusterService.getLocalNode().id(),
824 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
825 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700826 clusterCommunicator.broadcast(message);
827 }
828
Madan Jampani25322532014-10-08 11:20:38 -0700829 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
830 ClusterMessage message = new ClusterMessage(
831 clusterService.getLocalNode().id(),
832 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
833 SERIALIZER.encode(event));
834 clusterCommunicator.broadcast(message);
835 }
836
Madan Jampani47c93732014-10-06 20:46:08 -0700837 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700838 ClusterMessage message = new ClusterMessage(
839 clusterService.getLocalNode().id(),
840 GossipDeviceStoreMessageSubjects.PORT_UPDATE,
841 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700842 clusterCommunicator.broadcast(message);
843 }
844
845 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700846 ClusterMessage message = new ClusterMessage(
847 clusterService.getLocalNode().id(),
848 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
849 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700850 clusterCommunicator.broadcast(message);
851 }
852
853 private class InternalDeviceEventListener implements ClusterMessageHandler {
854 @Override
855 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -0700856
Madan Jampani47c93732014-10-06 20:46:08 -0700857 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700858 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -0700859
Madan Jampani47c93732014-10-06 20:46:08 -0700860 ProviderId providerId = event.providerId();
861 DeviceId deviceId = event.deviceId();
862 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -0700863
Madan Jampani47c93732014-10-06 20:46:08 -0700864 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
865 }
866 }
867
Madan Jampani25322532014-10-08 11:20:38 -0700868 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
869 @Override
870 public void handle(ClusterMessage message) {
871
872 log.info("Received device offline event from peer: {}", message.sender());
873 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
874
875 DeviceId deviceId = event.deviceId();
876 Timestamp timestamp = event.timestamp();
877
878 markOfflineInternal(deviceId, timestamp);
879 }
880 }
881
Madan Jampani47c93732014-10-06 20:46:08 -0700882 private class InternalPortEventListener implements ClusterMessageHandler {
883 @Override
884 public void handle(ClusterMessage message) {
885
886 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700887 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700888
889 ProviderId providerId = event.providerId();
890 DeviceId deviceId = event.deviceId();
891 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
892
893 updatePortsInternal(providerId, deviceId, portDescriptions);
894 }
895 }
896
897 private class InternalPortStatusEventListener implements ClusterMessageHandler {
898 @Override
899 public void handle(ClusterMessage message) {
900
901 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700902 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700903
904 ProviderId providerId = event.providerId();
905 DeviceId deviceId = event.deviceId();
906 Timestamped<PortDescription> portDescription = event.portDescription();
907
908 updatePortStatusInternal(providerId, deviceId, portDescription);
909 }
910 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700911}