blob: 2221ea06081dbf2c5933a542f3a2e8ef7996f36b [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070017import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070018import org.onlab.onos.net.DefaultAnnotations;
19import org.onlab.onos.net.DefaultDevice;
20import org.onlab.onos.net.DefaultPort;
21import org.onlab.onos.net.Device;
22import org.onlab.onos.net.Device.Type;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Port;
25import org.onlab.onos.net.PortNumber;
26import org.onlab.onos.net.SparseAnnotations;
27import org.onlab.onos.net.device.DefaultDeviceDescription;
28import org.onlab.onos.net.device.DefaultPortDescription;
29import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.ClockService;
37import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani53e44e62014-10-07 12:39:51 -070041import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoPoolUtil;
44import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanifef9b3a2014-10-07 18:38:17 -070045import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070046import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
52import java.util.Collections;
53import java.util.HashSet;
54import java.util.Iterator;
55import java.util.List;
56import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ConcurrentHashMap;
61import java.util.concurrent.ConcurrentMap;
62import java.util.concurrent.atomic.AtomicReference;
63
64import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Predicates.notNull;
67import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68import static org.slf4j.LoggerFactory.getLogger;
69import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070071import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import static com.google.common.base.Verify.verify;
73
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070074// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070075/**
76 * Manages inventory of infrastructure devices using gossip protocol to distribute
77 * information.
78 */
79@Component(immediate = true)
80@Service
81public class GossipDeviceStore
82 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
83 implements DeviceStore {
84
85 private final Logger log = getLogger(getClass());
86
87 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
88
89 // TODO: Check if inner Map can be replaced with plain Map
90 // innerMap is used to lock a Device, thus instance should never be replaced.
91 // collection of Description given from various providers
92 private final ConcurrentMap<DeviceId,
93 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070094 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095
96 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
98 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
99
100 // to be updated under Device lock
101 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
102 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700103
104 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700105 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClockService clockService;
109
Madan Jampani47c93732014-10-06 20:46:08 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
Madan Jampani53e44e62014-10-07 12:39:51 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700117 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700118 protected void setupKryoPool() {
119 serializerPool = KryoPool.newBuilder()
120 .register(KryoPoolUtil.API)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700122 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700123 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700124 .register(InternalPortEvent.class, new InternalPortEventSerializer())
125 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700126 .register(Timestamp.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700127 .register(Timestamped.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700128 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -0700129 .build()
130 .populate(1);
131 }
132
133 };
134
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700135 @Activate
136 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700137 clusterCommunicator.addSubscriber(
138 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
139 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700140 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
141 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700142 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
143 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700144 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
145 clusterCommunicator.addSubscriber(
146 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700147 log.info("Started");
148 }
149
150 @Deactivate
151 public void deactivate() {
152 deviceDescs.clear();
153 devices.clear();
154 devicePorts.clear();
155 availableDevices.clear();
156 log.info("Stopped");
157 }
158
159 @Override
160 public int getDeviceCount() {
161 return devices.size();
162 }
163
164 @Override
165 public Iterable<Device> getDevices() {
166 return Collections.unmodifiableCollection(devices.values());
167 }
168
169 @Override
170 public Device getDevice(DeviceId deviceId) {
171 return devices.get(deviceId);
172 }
173
174 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700175 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
176 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700177 DeviceDescription deviceDescription) {
178 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
179 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
180 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
181 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700182 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
183 providerId, deviceId);
184 try {
185 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
186 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700187 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700188 + providerId + " and deviceId: " + deviceId, e);
189 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700190 }
191 return event;
192 }
193
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700194 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
195 DeviceId deviceId,
196 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700197
198 // Collection of DeviceDescriptions for a Device
199 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700200 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700201
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700202 synchronized (providerDescs) {
203 // locking per device
204
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700205 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
206 log.debug("Ignoring outdated event: {}", deltaDesc);
207 return null;
208 }
209
210 DeviceDescriptions descs
211 = createIfAbsentUnchecked(providerDescs, providerId,
212 new InitDeviceDescs(deltaDesc));
213
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700214 final Device oldDevice = devices.get(deviceId);
215 final Device newDevice;
216
217 if (deltaDesc == descs.getDeviceDesc() ||
218 deltaDesc.isNewer(descs.getDeviceDesc())) {
219 // on new device or valid update
220 descs.putDeviceDesc(deltaDesc);
221 newDevice = composeDevice(deviceId, providerDescs);
222 } else {
223 // outdated event, ignored.
224 return null;
225 }
226 if (oldDevice == null) {
227 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700228 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229 } else {
230 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700231 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700232 }
233 }
234 }
235
236 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700237 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700238 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700239 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700240
241 // update composed device cache
242 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
243 verify(oldDevice == null,
244 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
245 providerId, oldDevice, newDevice);
246
247 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700248 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700249 }
250
251 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
252 }
253
254 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700255 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700256 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700257 Device oldDevice,
258 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700259
260 // We allow only certain attributes to trigger update
261 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
262 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700263 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700264
265 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
266 if (!replaced) {
267 verify(replaced,
268 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
269 providerId, oldDevice, devices.get(newDevice.id())
270 , newDevice);
271 }
272 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274 }
275 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
276 }
277
278 // Otherwise merely attempt to change availability if primary provider
279 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700280 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700281 return !added ? null :
282 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
283 }
284 return null;
285 }
286
287 @Override
288 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 Timestamp timestamp = clockService.getTimestamp(deviceId);
Madan Jampani25322532014-10-08 11:20:38 -0700290 DeviceEvent event = markOfflineInternal(deviceId, timestamp);
291 if (event != null) {
292 log.info("Notifying peers of a device offline topology event for deviceId: {}",
293 deviceId);
294 try {
295 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
296 } catch (IOException e) {
297 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
298 deviceId);
299 }
300 }
301 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700302 }
303
304 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
305
306 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700307 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700308
309 // locking device
310 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700311
312 // accept off-line if given timestamp is newer than
313 // the latest Timestamp from Primary provider
314 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
315 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
316 if (timestamp.compareTo(lastTimestamp) <= 0) {
317 // outdated event ignore
318 return null;
319 }
320
321 offline.put(deviceId, timestamp);
322
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700323 Device device = devices.get(deviceId);
324 if (device == null) {
325 return null;
326 }
327 boolean removed = availableDevices.remove(deviceId);
328 if (removed) {
329 // TODO: broadcast ... DOWN only?
330 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700331 }
332 return null;
333 }
334 }
335
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336 /**
337 * Marks the device as available if the given timestamp is not outdated,
338 * compared to the time the device has been marked offline.
339 *
340 * @param deviceId identifier of the device
341 * @param timestamp of the event triggering this change.
342 * @return true if availability change request was accepted and changed the state
343 */
344 // Guarded by deviceDescs value (=Device lock)
345 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
346 // accept on-line if given timestamp is newer than
347 // the latest offline request Timestamp
348 Timestamp offlineTimestamp = offline.get(deviceId);
349 if (offlineTimestamp == null ||
350 offlineTimestamp.compareTo(timestamp) < 0) {
351
352 offline.remove(deviceId);
353 return availableDevices.add(deviceId);
354 }
355 return false;
356 }
357
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700358 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700359 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
360 DeviceId deviceId,
361 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
363
Madan Jampani47c93732014-10-06 20:46:08 -0700364 Timestamped<List<PortDescription>> timestampedPortDescriptions =
365 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700366
Madan Jampani47c93732014-10-06 20:46:08 -0700367 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700368 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700369 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
370 providerId, deviceId);
371 try {
372 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
373 } catch (IOException e) {
374 log.error("Failed to notify peers of a port update topology event or providerId: "
375 + providerId + " and deviceId: " + deviceId, e);
376 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700377 }
378 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700379 }
380
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700381 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
382 DeviceId deviceId,
383 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700384
385 Device device = devices.get(deviceId);
386 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
387
388 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
389 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
390
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700391 List<DeviceEvent> events = new ArrayList<>();
392 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700393
394 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
395 log.debug("Ignoring outdated events: {}", portDescriptions);
396 return null;
397 }
398
399 DeviceDescriptions descs = descsMap.get(providerId);
400 // every provider must provide DeviceDescription.
401 checkArgument(descs != null,
402 "Device description for Device ID %s from Provider %s was not found",
403 deviceId, providerId);
404
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700405 Map<PortNumber, Port> ports = getPortMap(deviceId);
406
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700407 final Timestamp newTimestamp = portDescriptions.timestamp();
408
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700409 // Add new ports
410 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700411 for (PortDescription portDescription : portDescriptions.value()) {
412 final PortNumber number = portDescription.portNumber();
413 processed.add(number);
414
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415 final Port oldPort = ports.get(number);
416 final Port newPort;
417
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700418
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700419 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
420 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700421 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700422 // on new port or valid update
423 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700424 descs.putPortDesc(new Timestamped<>(portDescription,
425 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700426 newPort = composePort(device, number, descsMap);
427 } else {
428 // outdated event, ignored.
429 continue;
430 }
431
432 events.add(oldPort == null ?
433 createPort(device, newPort, ports) :
434 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435 }
436
437 events.addAll(pruneOldPorts(device, ports, processed));
438 }
439 return FluentIterable.from(events).filter(notNull()).toList();
440 }
441
442 // Creates a new port based on the port description adds it to the map and
443 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700444 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700445 private DeviceEvent createPort(Device device, Port newPort,
446 Map<PortNumber, Port> ports) {
447 ports.put(newPort.number(), newPort);
448 return new DeviceEvent(PORT_ADDED, device, newPort);
449 }
450
451 // Checks if the specified port requires update and if so, it replaces the
452 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700453 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700454 private DeviceEvent updatePort(Device device, Port oldPort,
455 Port newPort,
456 Map<PortNumber, Port> ports) {
457 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700458 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700459
460 ports.put(oldPort.number(), newPort);
461 return new DeviceEvent(PORT_UPDATED, device, newPort);
462 }
463 return null;
464 }
465
466 // Prunes the specified list of ports based on which ports are in the
467 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700468 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700469 private List<DeviceEvent> pruneOldPorts(Device device,
470 Map<PortNumber, Port> ports,
471 Set<PortNumber> processed) {
472 List<DeviceEvent> events = new ArrayList<>();
473 Iterator<PortNumber> iterator = ports.keySet().iterator();
474 while (iterator.hasNext()) {
475 PortNumber portNumber = iterator.next();
476 if (!processed.contains(portNumber)) {
477 events.add(new DeviceEvent(PORT_REMOVED, device,
478 ports.get(portNumber)));
479 iterator.remove();
480 }
481 }
482 return events;
483 }
484
485 // Gets the map of ports for the specified device; if one does not already
486 // exist, it creates and registers a new one.
487 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
488 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700489 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
490 }
491
492 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
493 DeviceId deviceId) {
494 return createIfAbsentUnchecked(deviceDescs, deviceId,
495 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700496 }
497
498 @Override
499 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
500 PortDescription portDescription) {
501 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
502 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
503 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
504 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700505 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
506 providerId, deviceId);
507 try {
508 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
509 } catch (IOException e) {
510 log.error("Failed to notify peers of a port status update topology event or providerId: "
511 + providerId + " and deviceId: " + deviceId, e);
512 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700513 }
514 return event;
515 }
516
517 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
518 Timestamped<PortDescription> deltaDesc) {
519
520 Device device = devices.get(deviceId);
521 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
522
523 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
524 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
525
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700526 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700527
528 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
529 log.debug("Ignoring outdated event: {}", deltaDesc);
530 return null;
531 }
532
533 DeviceDescriptions descs = descsMap.get(providerId);
534 // assuming all providers must to give DeviceDescription
535 checkArgument(descs != null,
536 "Device description for Device ID %s from Provider %s was not found",
537 deviceId, providerId);
538
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700539 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
540 final PortNumber number = deltaDesc.value().portNumber();
541 final Port oldPort = ports.get(number);
542 final Port newPort;
543
544 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
545 if (existingPortDesc == null ||
546 deltaDesc == existingPortDesc ||
547 deltaDesc.isNewer(existingPortDesc)) {
548 // on new port or valid update
549 // update description
550 descs.putPortDesc(deltaDesc);
551 newPort = composePort(device, number, descsMap);
552 } else {
553 // outdated event, ignored.
554 return null;
555 }
556
557 if (oldPort == null) {
558 return createPort(device, newPort, ports);
559 } else {
560 return updatePort(device, oldPort, newPort, ports);
561 }
562 }
563 }
564
565 @Override
566 public List<Port> getPorts(DeviceId deviceId) {
567 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
568 if (ports == null) {
569 return Collections.emptyList();
570 }
571 return ImmutableList.copyOf(ports.values());
572 }
573
574 @Override
575 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
576 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
577 return ports == null ? null : ports.get(portNumber);
578 }
579
580 @Override
581 public boolean isAvailable(DeviceId deviceId) {
582 return availableDevices.contains(deviceId);
583 }
584
585 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700586 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
587 Timestamp timestamp = clockService.getTimestamp(deviceId);
588 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700589 if (event != null) {
590 log.info("Notifying peers of a device removed topology event for deviceId: {}",
591 deviceId);
592 try {
593 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
594 } catch (IOException e) {
595 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
596 deviceId);
597 }
598 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700599 return event;
600 }
601
602 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
603 Timestamp timestamp) {
604
605 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700606 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700607 // accept removal request if given timestamp is newer than
608 // the latest Timestamp from Primary provider
609 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
610 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
611 if (timestamp.compareTo(lastTimestamp) <= 0) {
612 // outdated event ignore
613 return null;
614 }
615 removalRequest.put(deviceId, timestamp);
616
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700617 Device device = devices.remove(deviceId);
618 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700619 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
620 if (ports != null) {
621 ports.clear();
622 }
623 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700624 descs.clear();
625 return device == null ? null :
626 new DeviceEvent(DEVICE_REMOVED, device, null);
627 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700628 }
629
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700630 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
631 Timestamp removalTimestamp = removalRequest.get(deviceId);
632 if (removalTimestamp != null &&
633 removalTimestamp.compareTo(timestampToCheck) >= 0) {
634 // removalRequest is more recent
635 return true;
636 }
637 return false;
638 }
639
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700640 /**
641 * Returns a Device, merging description given from multiple Providers.
642 *
643 * @param deviceId device identifier
644 * @param providerDescs Collection of Descriptions from multiple providers
645 * @return Device instance
646 */
647 private Device composeDevice(DeviceId deviceId,
648 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
649
650 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
651
652 ProviderId primary = pickPrimaryPID(providerDescs);
653
654 DeviceDescriptions desc = providerDescs.get(primary);
655
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700656 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700657 Type type = base.type();
658 String manufacturer = base.manufacturer();
659 String hwVersion = base.hwVersion();
660 String swVersion = base.swVersion();
661 String serialNumber = base.serialNumber();
662 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
663 annotations = merge(annotations, base.annotations());
664
665 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
666 if (e.getKey().equals(primary)) {
667 continue;
668 }
669 // TODO: should keep track of Description timestamp
670 // and only merge conflicting keys when timestamp is newer
671 // Currently assuming there will never be a key conflict between
672 // providers
673
674 // annotation merging. not so efficient, should revisit later
675 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
676 }
677
678 return new DefaultDevice(primary, deviceId , type, manufacturer,
679 hwVersion, swVersion, serialNumber, annotations);
680 }
681
682 /**
683 * Returns a Port, merging description given from multiple Providers.
684 *
685 * @param device device the port is on
686 * @param number port number
687 * @param providerDescs Collection of Descriptions from multiple providers
688 * @return Port instance
689 */
690 private Port composePort(Device device, PortNumber number,
691 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
692
693 ProviderId primary = pickPrimaryPID(providerDescs);
694 DeviceDescriptions primDescs = providerDescs.get(primary);
695 // if no primary, assume not enabled
696 // TODO: revisit this default port enabled/disabled behavior
697 boolean isEnabled = false;
698 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
699
700 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
701 if (portDesc != null) {
702 isEnabled = portDesc.value().isEnabled();
703 annotations = merge(annotations, portDesc.value().annotations());
704 }
705
706 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
707 if (e.getKey().equals(primary)) {
708 continue;
709 }
710 // TODO: should keep track of Description timestamp
711 // and only merge conflicting keys when timestamp is newer
712 // Currently assuming there will never be a key conflict between
713 // providers
714
715 // annotation merging. not so efficient, should revisit later
716 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
717 if (otherPortDesc != null) {
718 annotations = merge(annotations, otherPortDesc.value().annotations());
719 }
720 }
721
722 return new DefaultPort(device, number, isEnabled, annotations);
723 }
724
725 /**
726 * @return primary ProviderID, or randomly chosen one if none exists
727 */
728 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700729 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700730 ProviderId fallBackPrimary = null;
731 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
732 if (!e.getKey().isAncillary()) {
733 return e.getKey();
734 } else if (fallBackPrimary == null) {
735 // pick randomly as a fallback in case there is no primary
736 fallBackPrimary = e.getKey();
737 }
738 }
739 return fallBackPrimary;
740 }
741
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700742 private DeviceDescriptions getPrimaryDescriptions(
743 Map<ProviderId, DeviceDescriptions> providerDescs) {
744 ProviderId pid = pickPrimaryPID(providerDescs);
745 return providerDescs.get(pid);
746 }
747
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700748 public static final class InitDeviceDescs
749 implements ConcurrentInitializer<DeviceDescriptions> {
750
751 private final Timestamped<DeviceDescription> deviceDesc;
752
753 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
754 this.deviceDesc = checkNotNull(deviceDesc);
755 }
756 @Override
757 public DeviceDescriptions get() throws ConcurrentException {
758 return new DeviceDescriptions(deviceDesc);
759 }
760 }
761
762
763 /**
764 * Collection of Description of a Device and it's Ports given from a Provider.
765 */
766 public static class DeviceDescriptions {
767
768 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
769 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
770
771 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
772 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
773 this.portDescs = new ConcurrentHashMap<>();
774 }
775
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700776 Timestamp getLatestTimestamp() {
777 Timestamp latest = deviceDesc.get().timestamp();
778 for (Timestamped<PortDescription> desc : portDescs.values()) {
779 if (desc.timestamp().compareTo(latest) > 0) {
780 latest = desc.timestamp();
781 }
782 }
783 return latest;
784 }
785
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700786 public Timestamped<DeviceDescription> getDeviceDesc() {
787 return deviceDesc.get();
788 }
789
790 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
791 return portDescs.get(number);
792 }
793
794 /**
795 * Puts DeviceDescription, merging annotations as necessary.
796 *
797 * @param newDesc new DeviceDescription
798 * @return previous DeviceDescription
799 */
800 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
801 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
802 Timestamped<DeviceDescription> newOne = newDesc;
803 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700804 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700805 newDesc.value().annotations());
806 newOne = new Timestamped<DeviceDescription>(
807 new DefaultDeviceDescription(newDesc.value(), merged),
808 newDesc.timestamp());
809 }
810 return deviceDesc.getAndSet(newOne);
811 }
812
813 /**
814 * Puts PortDescription, merging annotations as necessary.
815 *
816 * @param newDesc new PortDescription
817 * @return previous PortDescription
818 */
819 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
820 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
821 Timestamped<PortDescription> newOne = newDesc;
822 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700823 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700824 newDesc.value().annotations());
825 newOne = new Timestamped<PortDescription>(
826 new DefaultPortDescription(newDesc.value(), merged),
827 newDesc.timestamp());
828 }
829 return portDescs.put(newOne.value().portNumber(), newOne);
830 }
831 }
Madan Jampani47c93732014-10-06 20:46:08 -0700832
833 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700834 ClusterMessage message = new ClusterMessage(
835 clusterService.getLocalNode().id(),
836 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
837 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700838 clusterCommunicator.broadcast(message);
839 }
840
Madan Jampani25322532014-10-08 11:20:38 -0700841 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
842 ClusterMessage message = new ClusterMessage(
843 clusterService.getLocalNode().id(),
844 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
845 SERIALIZER.encode(event));
846 clusterCommunicator.broadcast(message);
847 }
848
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700849 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
850 ClusterMessage message = new ClusterMessage(
851 clusterService.getLocalNode().id(),
852 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
853 SERIALIZER.encode(event));
854 clusterCommunicator.broadcast(message);
855 }
856
Madan Jampani47c93732014-10-06 20:46:08 -0700857 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700858 ClusterMessage message = new ClusterMessage(
859 clusterService.getLocalNode().id(),
860 GossipDeviceStoreMessageSubjects.PORT_UPDATE,
861 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700862 clusterCommunicator.broadcast(message);
863 }
864
865 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700866 ClusterMessage message = new ClusterMessage(
867 clusterService.getLocalNode().id(),
868 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
869 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700870 clusterCommunicator.broadcast(message);
871 }
872
Madan Jampani255a58b2014-10-09 12:08:20 -0700873 private void notifyDelegateIfNotNull(DeviceEvent event) {
874 if (event != null) {
875 notifyDelegate(event);
876 }
877 }
878
Madan Jampani47c93732014-10-06 20:46:08 -0700879 private class InternalDeviceEventListener implements ClusterMessageHandler {
880 @Override
881 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -0700882
Madan Jampani47c93732014-10-06 20:46:08 -0700883 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700884 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -0700885
Madan Jampani47c93732014-10-06 20:46:08 -0700886 ProviderId providerId = event.providerId();
887 DeviceId deviceId = event.deviceId();
888 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -0700889
Madan Jampani255a58b2014-10-09 12:08:20 -0700890 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -0700891 }
892 }
893
Madan Jampani25322532014-10-08 11:20:38 -0700894 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
895 @Override
896 public void handle(ClusterMessage message) {
897
898 log.info("Received device offline event from peer: {}", message.sender());
899 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
900
901 DeviceId deviceId = event.deviceId();
902 Timestamp timestamp = event.timestamp();
903
Madan Jampani255a58b2014-10-09 12:08:20 -0700904 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -0700905 }
906 }
907
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700908 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
909 @Override
910 public void handle(ClusterMessage message) {
911
912 log.info("Received device removed event from peer: {}", message.sender());
913 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
914
915 DeviceId deviceId = event.deviceId();
916 Timestamp timestamp = event.timestamp();
917
Madan Jampani255a58b2014-10-09 12:08:20 -0700918 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700919 }
920 }
921
Madan Jampani47c93732014-10-06 20:46:08 -0700922 private class InternalPortEventListener implements ClusterMessageHandler {
923 @Override
924 public void handle(ClusterMessage message) {
925
926 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700927 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700928
929 ProviderId providerId = event.providerId();
930 DeviceId deviceId = event.deviceId();
931 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
932
Madan Jampani255a58b2014-10-09 12:08:20 -0700933 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -0700934 }
935 }
936
937 private class InternalPortStatusEventListener implements ClusterMessageHandler {
938 @Override
939 public void handle(ClusterMessage message) {
940
941 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700942 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700943
944 ProviderId providerId = event.providerId();
945 DeviceId deviceId = event.deviceId();
946 Timestamped<PortDescription> portDescription = event.portDescription();
947
Madan Jampani255a58b2014-10-09 12:08:20 -0700948 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -0700949 }
950 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700951}