blob: 2603da1197874c238f673b4ef98ab5ae2f6f0cb1 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070019import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import org.onlab.onos.net.DefaultAnnotations;
21import org.onlab.onos.net.DefaultDevice;
22import org.onlab.onos.net.DefaultPort;
23import org.onlab.onos.net.Device;
24import org.onlab.onos.net.Device.Type;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.Port;
27import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070028import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070029import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070036import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIfa891c92014-10-09 15:21:40 -070041import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070042import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070043import org.onlab.onos.store.serializers.DistributedStoreSerializers;
Madan Jampani53e44e62014-10-07 12:39:51 -070044import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070045import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070046import org.slf4j.Logger;
47
Madan Jampani47c93732014-10-06 20:46:08 -070048import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070049import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070050import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.HashSet;
54import java.util.Iterator;
55import java.util.List;
56import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070060import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070061import java.util.concurrent.ScheduledExecutorService;
62import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070063
64import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070066import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68import static org.slf4j.LoggerFactory.getLogger;
69import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70import static org.onlab.onos.net.DefaultAnnotations.merge;
71import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070072import static org.onlab.util.Tools.namedThreads;
73import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
74import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070075
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070076// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077/**
78 * Manages inventory of infrastructure devices using gossip protocol to distribute
79 * information.
80 */
81@Component(immediate = true)
82@Service
83public class GossipDeviceStore
84 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
85 implements DeviceStore {
86
87 private final Logger log = getLogger(getClass());
88
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070089 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070090
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070091 // innerMap is used to lock a Device, thus instance should never be replaced.
92 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070093 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070094 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095
96 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
98 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
99
100 // to be updated under Device lock
101 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
102 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700103
104 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700105 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700108 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700109
Madan Jampani47c93732014-10-06 20:46:08 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
Madan Jampani53e44e62014-10-07 12:39:51 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700116 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700117 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700118 protected void setupKryoPool() {
119 serializerPool = KryoPool.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700120 .register(DistributedStoreSerializers.COMMON)
121
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700122 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700123 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700124 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700125 .register(InternalPortEvent.class, new InternalPortEventSerializer())
126 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700127 .register(DeviceAntiEntropyAdvertisement.class)
128 .register(DeviceFragmentId.class)
129 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700130 .build()
131 .populate(1);
132 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700133 };
134
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700135 private ScheduledExecutorService executor;
136
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700137 @Activate
138 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700139 clusterCommunicator.addSubscriber(
140 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
141 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700142 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
143 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700144 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
145 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700146 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
147 clusterCommunicator.addSubscriber(
148 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700149 clusterCommunicator.addSubscriber(
150 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
151
152 executor =
153 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
154
155 // TODO: Make these configurable
156 long initialDelaySec = 5;
157 long periodSec = 5;
158 // start anti-entropy thread
159 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
160 initialDelaySec, periodSec, TimeUnit.SECONDS);
161
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700162 log.info("Started");
163 }
164
165 @Deactivate
166 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700167
168 executor.shutdownNow();
169 try {
170 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
171 if (timedout) {
172 log.error("Timeout during executor shutdown");
173 }
174 } catch (InterruptedException e) {
175 log.error("Error during executor shutdown", e);
176 }
177
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700178 deviceDescs.clear();
179 devices.clear();
180 devicePorts.clear();
181 availableDevices.clear();
182 log.info("Stopped");
183 }
184
185 @Override
186 public int getDeviceCount() {
187 return devices.size();
188 }
189
190 @Override
191 public Iterable<Device> getDevices() {
192 return Collections.unmodifiableCollection(devices.values());
193 }
194
195 @Override
196 public Device getDevice(DeviceId deviceId) {
197 return devices.get(deviceId);
198 }
199
200 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700201 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
202 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700203 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700204 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700205 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700206 final DeviceEvent event;
207 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700208 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700209 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700210 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700211 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700212 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700213 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
214 providerId, deviceId);
215 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700216 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700217 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700218 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700219 + providerId + " and deviceId: " + deviceId, e);
220 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700221 }
222 return event;
223 }
224
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700225 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
226 DeviceId deviceId,
227 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700228
229 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700230 Map<ProviderId, DeviceDescriptions> providerDescs
231 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700232
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233 synchronized (providerDescs) {
234 // locking per device
235
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700236 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
237 log.debug("Ignoring outdated event: {}", deltaDesc);
238 return null;
239 }
240
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700241 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700242
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700243 final Device oldDevice = devices.get(deviceId);
244 final Device newDevice;
245
246 if (deltaDesc == descs.getDeviceDesc() ||
247 deltaDesc.isNewer(descs.getDeviceDesc())) {
248 // on new device or valid update
249 descs.putDeviceDesc(deltaDesc);
250 newDevice = composeDevice(deviceId, providerDescs);
251 } else {
252 // outdated event, ignored.
253 return null;
254 }
255 if (oldDevice == null) {
256 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700257 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700258 } else {
259 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700260 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700261 }
262 }
263 }
264
265 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700266 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700267 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700268 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700269
270 // update composed device cache
271 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
272 verify(oldDevice == null,
273 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
274 providerId, oldDevice, newDevice);
275
276 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700277 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700278 }
279
280 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
281 }
282
283 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700284 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700285 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700286 Device oldDevice,
287 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700288
289 // We allow only certain attributes to trigger update
290 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
291 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700292 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293
294 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
295 if (!replaced) {
296 verify(replaced,
297 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
298 providerId, oldDevice, devices.get(newDevice.id())
299 , newDevice);
300 }
301 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700302 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700303 }
304 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
305 }
306
307 // Otherwise merely attempt to change availability if primary provider
308 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700309 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700310 return !added ? null :
311 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
312 }
313 return null;
314 }
315
316 @Override
317 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700318 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700319 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700320 if (event != null) {
321 log.info("Notifying peers of a device offline topology event for deviceId: {}",
322 deviceId);
323 try {
324 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
325 } catch (IOException e) {
326 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
327 deviceId);
328 }
329 }
330 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700331 }
332
333 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
334
335 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700336 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700337
338 // locking device
339 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700340
341 // accept off-line if given timestamp is newer than
342 // the latest Timestamp from Primary provider
343 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
344 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
345 if (timestamp.compareTo(lastTimestamp) <= 0) {
346 // outdated event ignore
347 return null;
348 }
349
350 offline.put(deviceId, timestamp);
351
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700352 Device device = devices.get(deviceId);
353 if (device == null) {
354 return null;
355 }
356 boolean removed = availableDevices.remove(deviceId);
357 if (removed) {
358 // TODO: broadcast ... DOWN only?
359 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700360 }
361 return null;
362 }
363 }
364
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700365 /**
366 * Marks the device as available if the given timestamp is not outdated,
367 * compared to the time the device has been marked offline.
368 *
369 * @param deviceId identifier of the device
370 * @param timestamp of the event triggering this change.
371 * @return true if availability change request was accepted and changed the state
372 */
373 // Guarded by deviceDescs value (=Device lock)
374 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
375 // accept on-line if given timestamp is newer than
376 // the latest offline request Timestamp
377 Timestamp offlineTimestamp = offline.get(deviceId);
378 if (offlineTimestamp == null ||
379 offlineTimestamp.compareTo(timestamp) < 0) {
380
381 offline.remove(deviceId);
382 return availableDevices.add(deviceId);
383 }
384 return false;
385 }
386
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700387 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700388 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
389 DeviceId deviceId,
390 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700391
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700392 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700393
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700394 final Timestamped<List<PortDescription>> timestampedInput
395 = new Timestamped<>(portDescriptions, newTimestamp);
396 final List<DeviceEvent> events;
397 final Timestamped<List<PortDescription>> merged;
398
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700399 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700400 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700401 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700402 List<PortDescription> mergedList =
403 FluentIterable.from(portDescriptions)
404 .transform(new Function<PortDescription, PortDescription>() {
405 @Override
406 public PortDescription apply(PortDescription input) {
407 // lookup merged port description
408 return descs.getPortDesc(input.portNumber()).value();
409 }
410 }).toList();
411 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
412 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700413 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700414 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
415 providerId, deviceId);
416 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700417 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700418 } catch (IOException e) {
419 log.error("Failed to notify peers of a port update topology event or providerId: "
420 + providerId + " and deviceId: " + deviceId, e);
421 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700422 }
423 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700424 }
425
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700426 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
427 DeviceId deviceId,
428 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700429
430 Device device = devices.get(deviceId);
431 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
432
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700433 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700434 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
435
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700436 List<DeviceEvent> events = new ArrayList<>();
437 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700438
439 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
440 log.debug("Ignoring outdated events: {}", portDescriptions);
441 return null;
442 }
443
444 DeviceDescriptions descs = descsMap.get(providerId);
445 // every provider must provide DeviceDescription.
446 checkArgument(descs != null,
447 "Device description for Device ID %s from Provider %s was not found",
448 deviceId, providerId);
449
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700450 Map<PortNumber, Port> ports = getPortMap(deviceId);
451
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700452 final Timestamp newTimestamp = portDescriptions.timestamp();
453
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700454 // Add new ports
455 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700456 for (PortDescription portDescription : portDescriptions.value()) {
457 final PortNumber number = portDescription.portNumber();
458 processed.add(number);
459
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700460 final Port oldPort = ports.get(number);
461 final Port newPort;
462
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700463
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700464 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
465 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700466 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700467 // on new port or valid update
468 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700469 descs.putPortDesc(new Timestamped<>(portDescription,
470 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700471 newPort = composePort(device, number, descsMap);
472 } else {
473 // outdated event, ignored.
474 continue;
475 }
476
477 events.add(oldPort == null ?
478 createPort(device, newPort, ports) :
479 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700480 }
481
482 events.addAll(pruneOldPorts(device, ports, processed));
483 }
484 return FluentIterable.from(events).filter(notNull()).toList();
485 }
486
487 // Creates a new port based on the port description adds it to the map and
488 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700489 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700490 private DeviceEvent createPort(Device device, Port newPort,
491 Map<PortNumber, Port> ports) {
492 ports.put(newPort.number(), newPort);
493 return new DeviceEvent(PORT_ADDED, device, newPort);
494 }
495
496 // Checks if the specified port requires update and if so, it replaces the
497 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700498 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700499 private DeviceEvent updatePort(Device device, Port oldPort,
500 Port newPort,
501 Map<PortNumber, Port> ports) {
502 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700503 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700504
505 ports.put(oldPort.number(), newPort);
506 return new DeviceEvent(PORT_UPDATED, device, newPort);
507 }
508 return null;
509 }
510
511 // Prunes the specified list of ports based on which ports are in the
512 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700513 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700514 private List<DeviceEvent> pruneOldPorts(Device device,
515 Map<PortNumber, Port> ports,
516 Set<PortNumber> processed) {
517 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700518 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700519 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700520 Entry<PortNumber, Port> e = iterator.next();
521 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700522 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700523 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700524 iterator.remove();
525 }
526 }
527 return events;
528 }
529
530 // Gets the map of ports for the specified device; if one does not already
531 // exist, it creates and registers a new one.
532 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
533 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700534 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
535 }
536
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700537 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700538 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700539 Map<ProviderId, DeviceDescriptions> r;
540 r = deviceDescs.get(deviceId);
541 if (r == null) {
542 r = new HashMap<ProviderId, DeviceDescriptions>();
543 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
544 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
545 if (concurrentlyAdded != null) {
546 r = concurrentlyAdded;
547 }
548 }
549 return r;
550 }
551
552 // Guarded by deviceDescs value (=Device lock)
553 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
554 Map<ProviderId, DeviceDescriptions> device,
555 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
556
557 synchronized (device) {
558 DeviceDescriptions r = device.get(providerId);
559 if (r == null) {
560 r = new DeviceDescriptions(deltaDesc);
561 device.put(providerId, r);
562 }
563 return r;
564 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700565 }
566
567 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700568 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
569 DeviceId deviceId,
570 PortDescription portDescription) {
571
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700572 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700573 final Timestamped<PortDescription> deltaDesc
574 = new Timestamped<>(portDescription, newTimestamp);
575 final DeviceEvent event;
576 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700577 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700578 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700579 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700580 .getPortDesc(portDescription.portNumber());
581 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700582 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700583 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
584 providerId, deviceId);
585 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700586 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700587 } catch (IOException e) {
588 log.error("Failed to notify peers of a port status update topology event or providerId: "
589 + providerId + " and deviceId: " + deviceId, e);
590 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700591 }
592 return event;
593 }
594
595 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
596 Timestamped<PortDescription> deltaDesc) {
597
598 Device device = devices.get(deviceId);
599 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
600
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700601 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700602 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
603
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700604 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700605
606 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
607 log.debug("Ignoring outdated event: {}", deltaDesc);
608 return null;
609 }
610
611 DeviceDescriptions descs = descsMap.get(providerId);
612 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700613 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700614 "Device description for Device ID %s from Provider %s was not found",
615 deviceId, providerId);
616
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700617 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
618 final PortNumber number = deltaDesc.value().portNumber();
619 final Port oldPort = ports.get(number);
620 final Port newPort;
621
622 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
623 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700624 deltaDesc.isNewer(existingPortDesc)) {
625 // on new port or valid update
626 // update description
627 descs.putPortDesc(deltaDesc);
628 newPort = composePort(device, number, descsMap);
629 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700630 // same or outdated event, ignored.
631 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700632 return null;
633 }
634
635 if (oldPort == null) {
636 return createPort(device, newPort, ports);
637 } else {
638 return updatePort(device, oldPort, newPort, ports);
639 }
640 }
641 }
642
643 @Override
644 public List<Port> getPorts(DeviceId deviceId) {
645 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
646 if (ports == null) {
647 return Collections.emptyList();
648 }
649 return ImmutableList.copyOf(ports.values());
650 }
651
652 @Override
653 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
654 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
655 return ports == null ? null : ports.get(portNumber);
656 }
657
658 @Override
659 public boolean isAvailable(DeviceId deviceId) {
660 return availableDevices.contains(deviceId);
661 }
662
663 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700664 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700665 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700666 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700667 if (event != null) {
668 log.info("Notifying peers of a device removed topology event for deviceId: {}",
669 deviceId);
670 try {
671 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
672 } catch (IOException e) {
673 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
674 deviceId);
675 }
676 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700677 return event;
678 }
679
680 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
681 Timestamp timestamp) {
682
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700683 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700684 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700685 // accept removal request if given timestamp is newer than
686 // the latest Timestamp from Primary provider
687 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
688 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
689 if (timestamp.compareTo(lastTimestamp) <= 0) {
690 // outdated event ignore
691 return null;
692 }
693 removalRequest.put(deviceId, timestamp);
694
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700695 Device device = devices.remove(deviceId);
696 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700697 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
698 if (ports != null) {
699 ports.clear();
700 }
701 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700702 descs.clear();
703 return device == null ? null :
704 new DeviceEvent(DEVICE_REMOVED, device, null);
705 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700706 }
707
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700708 /**
709 * Checks if given timestamp is superseded by removal request
710 * with more recent timestamp.
711 *
712 * @param deviceId identifier of a device
713 * @param timestampToCheck timestamp of an event to check
714 * @return true if device is already removed
715 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700716 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
717 Timestamp removalTimestamp = removalRequest.get(deviceId);
718 if (removalTimestamp != null &&
719 removalTimestamp.compareTo(timestampToCheck) >= 0) {
720 // removalRequest is more recent
721 return true;
722 }
723 return false;
724 }
725
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700726 /**
727 * Returns a Device, merging description given from multiple Providers.
728 *
729 * @param deviceId device identifier
730 * @param providerDescs Collection of Descriptions from multiple providers
731 * @return Device instance
732 */
733 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700734 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700735
736 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
737
738 ProviderId primary = pickPrimaryPID(providerDescs);
739
740 DeviceDescriptions desc = providerDescs.get(primary);
741
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700742 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700743 Type type = base.type();
744 String manufacturer = base.manufacturer();
745 String hwVersion = base.hwVersion();
746 String swVersion = base.swVersion();
747 String serialNumber = base.serialNumber();
748 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
749 annotations = merge(annotations, base.annotations());
750
751 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
752 if (e.getKey().equals(primary)) {
753 continue;
754 }
755 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700756 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700757 // Currently assuming there will never be a key conflict between
758 // providers
759
760 // annotation merging. not so efficient, should revisit later
761 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
762 }
763
764 return new DefaultDevice(primary, deviceId , type, manufacturer,
765 hwVersion, swVersion, serialNumber, annotations);
766 }
767
768 /**
769 * Returns a Port, merging description given from multiple Providers.
770 *
771 * @param device device the port is on
772 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700773 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700774 * @return Port instance
775 */
776 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700777 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700778
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700779 ProviderId primary = pickPrimaryPID(descsMap);
780 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700781 // if no primary, assume not enabled
782 // TODO: revisit this default port enabled/disabled behavior
783 boolean isEnabled = false;
784 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
785
786 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
787 if (portDesc != null) {
788 isEnabled = portDesc.value().isEnabled();
789 annotations = merge(annotations, portDesc.value().annotations());
790 }
791
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700792 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700793 if (e.getKey().equals(primary)) {
794 continue;
795 }
796 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700797 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700798 // Currently assuming there will never be a key conflict between
799 // providers
800
801 // annotation merging. not so efficient, should revisit later
802 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
803 if (otherPortDesc != null) {
804 annotations = merge(annotations, otherPortDesc.value().annotations());
805 }
806 }
807
808 return new DefaultPort(device, number, isEnabled, annotations);
809 }
810
811 /**
812 * @return primary ProviderID, or randomly chosen one if none exists
813 */
814 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700815 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700816 ProviderId fallBackPrimary = null;
817 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
818 if (!e.getKey().isAncillary()) {
819 return e.getKey();
820 } else if (fallBackPrimary == null) {
821 // pick randomly as a fallback in case there is no primary
822 fallBackPrimary = e.getKey();
823 }
824 }
825 return fallBackPrimary;
826 }
827
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700828 private DeviceDescriptions getPrimaryDescriptions(
829 Map<ProviderId, DeviceDescriptions> providerDescs) {
830 ProviderId pid = pickPrimaryPID(providerDescs);
831 return providerDescs.get(pid);
832 }
833
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700834 // TODO: should we be throwing exception?
835 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
836 ClusterMessage message = new ClusterMessage(
837 clusterService.getLocalNode().id(),
838 subject,
839 SERIALIZER.encode(event));
840 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700841 }
842
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700843 // TODO: should we be throwing exception?
844 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
845 ClusterMessage message = new ClusterMessage(
846 clusterService.getLocalNode().id(),
847 subject,
848 SERIALIZER.encode(event));
849 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700850 }
Madan Jampani47c93732014-10-06 20:46:08 -0700851
852 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700853 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700854 }
855
Madan Jampani25322532014-10-08 11:20:38 -0700856 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700857 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700858 }
859
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700860 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700861 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700862 }
863
Madan Jampani47c93732014-10-06 20:46:08 -0700864 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700865 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700866 }
867
868 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700869 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
870 }
871
872 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
873 try {
874 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
875 } catch (IOException e) {
876 log.error("Failed to send" + event + " to " + recipient, e);
877 }
878 }
879
880 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
881 try {
882 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
883 } catch (IOException e) {
884 log.error("Failed to send" + event + " to " + recipient, e);
885 }
886 }
887
888 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
889 try {
890 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
891 } catch (IOException e) {
892 log.error("Failed to send" + event + " to " + recipient, e);
893 }
894 }
895
896 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
897 try {
898 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
899 } catch (IOException e) {
900 log.error("Failed to send" + event + " to " + recipient, e);
901 }
902 }
903
904 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
905 try {
906 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
907 } catch (IOException e) {
908 log.error("Failed to send" + event + " to " + recipient, e);
909 }
910 }
911
912 private DeviceAntiEntropyAdvertisement createAdvertisement() {
913 final NodeId self = clusterService.getLocalNode().id();
914
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700915 final int numDevices = deviceDescs.size();
916 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
917 final int portsPerDevice = 8; // random factor to minimize reallocation
918 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
919 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700920
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700921 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700922 provs : deviceDescs.entrySet()) {
923
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700924 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700925 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700926 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700927 synchronized (devDescs) {
928
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700929 // send device offline timestamp
930 Timestamp lOffline = this.offline.get(deviceId);
931 if (lOffline != null) {
932 adOffline.put(deviceId, lOffline);
933 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700934
935 for (Entry<ProviderId, DeviceDescriptions>
936 prov : devDescs.entrySet()) {
937
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700938 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700939 final ProviderId provId = prov.getKey();
940 final DeviceDescriptions descs = prov.getValue();
941
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700942 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700943 descs.getDeviceDesc().timestamp());
944
945 for (Entry<PortNumber, Timestamped<PortDescription>>
946 portDesc : descs.getPortDescs().entrySet()) {
947
948 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700949 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700950 portDesc.getValue().timestamp());
951 }
952 }
953 }
954 }
955
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700956 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700957 }
958
959 /**
960 * Responds to anti-entropy advertisement message.
961 * <P>
962 * Notify sender about out-dated information using regular replication message.
963 * Send back advertisement to sender if not in sync.
964 *
965 * @param advertisement to respond to
966 */
967 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
968
969 final NodeId sender = advertisement.sender();
970
971 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
972 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
973 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
974
975 // Fragments to request
976 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
977 Collection<PortFragmentId> reqPorts = new ArrayList<>();
978
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700979 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700980 final DeviceId deviceId = de.getKey();
981 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
982
983 synchronized (lDevice) {
984 // latestTimestamp across provider
985 // Note: can be null initially
986 Timestamp localLatest = offline.get(deviceId);
987
988 // handle device Ads
989 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
990 final ProviderId provId = prov.getKey();
991 final DeviceDescriptions lDeviceDescs = prov.getValue();
992
993 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
994
995
996 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
997 Timestamp advDevTimestamp = devAds.get(devFragId);
998
999 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1000 // remote does not have it or outdated, suggest
1001 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1002 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1003 // local is outdated, request
1004 reqDevices.add(devFragId);
1005 }
1006
1007 // handle port Ads
1008 for (Entry<PortNumber, Timestamped<PortDescription>>
1009 pe : lDeviceDescs.getPortDescs().entrySet()) {
1010
1011 final PortNumber num = pe.getKey();
1012 final Timestamped<PortDescription> lPort = pe.getValue();
1013
1014 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1015
1016 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001017 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001018 // remote does not have it or outdated, suggest
1019 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1020 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1021 // local is outdated, request
1022 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1023 reqPorts.add(portFragId);
1024 }
1025
1026 // remove port Ad already processed
1027 portAds.remove(portFragId);
1028 } // end local port loop
1029
1030 // remove device Ad already processed
1031 devAds.remove(devFragId);
1032
1033 // find latest and update
1034 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1035 if (localLatest == null ||
1036 providerLatest.compareTo(localLatest) > 0) {
1037 localLatest = providerLatest;
1038 }
1039 } // end local provider loop
1040
1041 // checking if remote timestamp is more recent.
1042 Timestamp rOffline = offlineAds.get(deviceId);
1043 if (rOffline != null &&
1044 rOffline.compareTo(localLatest) > 0) {
1045 // remote offline timestamp suggests that the
1046 // device is off-line
1047 markOfflineInternal(deviceId, rOffline);
1048 }
1049
1050 Timestamp lOffline = offline.get(deviceId);
1051 if (lOffline != null && rOffline == null) {
1052 // locally offline, but remote is online, suggest offline
1053 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1054 }
1055
1056 // remove device offline Ad already processed
1057 offlineAds.remove(deviceId);
1058 } // end local device loop
1059 } // device lock
1060
1061 // If there is any Ads left, request them
1062 log.trace("Ads left {}, {}", devAds, portAds);
1063 reqDevices.addAll(devAds.keySet());
1064 reqPorts.addAll(portAds.keySet());
1065
1066 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1067 log.trace("Nothing to request to remote peer {}", sender);
1068 return;
1069 }
1070
1071 log.info("Need to sync {} {}", reqDevices, reqPorts);
1072
1073 // 2-way Anti-Entropy for now
1074 try {
1075 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1076 } catch (IOException e) {
1077 log.error("Failed to send response advertisement to " + sender, e);
1078 }
1079
1080// Sketch of 3-way Anti-Entropy
1081// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1082// ClusterMessage message = new ClusterMessage(
1083// clusterService.getLocalNode().id(),
1084// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1085// SERIALIZER.encode(request));
1086//
1087// try {
1088// clusterCommunicator.unicast(message, advertisement.sender());
1089// } catch (IOException e) {
1090// log.error("Failed to send advertisement reply to "
1091// + advertisement.sender(), e);
1092// }
Madan Jampani47c93732014-10-06 20:46:08 -07001093 }
1094
Madan Jampani255a58b2014-10-09 12:08:20 -07001095 private void notifyDelegateIfNotNull(DeviceEvent event) {
1096 if (event != null) {
1097 notifyDelegate(event);
1098 }
1099 }
1100
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001101 private final class SendAdvertisementTask implements Runnable {
1102
1103 @Override
1104 public void run() {
1105 if (Thread.currentThread().isInterrupted()) {
1106 log.info("Interrupted, quitting");
1107 return;
1108 }
1109
1110 try {
1111 final NodeId self = clusterService.getLocalNode().id();
1112 Set<ControllerNode> nodes = clusterService.getNodes();
1113
1114 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1115 .transform(toNodeId())
1116 .toList();
1117
1118 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001119 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001120 return;
1121 }
1122
1123 NodeId peer;
1124 do {
1125 int idx = RandomUtils.nextInt(0, nodeIds.size());
1126 peer = nodeIds.get(idx);
1127 } while (peer.equals(self));
1128
1129 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1130
1131 if (Thread.currentThread().isInterrupted()) {
1132 log.info("Interrupted, quitting");
1133 return;
1134 }
1135
1136 try {
1137 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1138 } catch (IOException e) {
1139 log.error("Failed to send anti-entropy advertisement", e);
1140 return;
1141 }
1142 } catch (Exception e) {
1143 // catch all Exception to avoid Scheduled task being suppressed.
1144 log.error("Exception thrown while sending advertisement", e);
1145 }
1146 }
1147 }
1148
Madan Jampani47c93732014-10-06 20:46:08 -07001149 private class InternalDeviceEventListener implements ClusterMessageHandler {
1150 @Override
1151 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001152
Madan Jampani47c93732014-10-06 20:46:08 -07001153 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001154 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001155
Madan Jampani47c93732014-10-06 20:46:08 -07001156 ProviderId providerId = event.providerId();
1157 DeviceId deviceId = event.deviceId();
1158 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001159
Madan Jampani255a58b2014-10-09 12:08:20 -07001160 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001161 }
1162 }
1163
Madan Jampani25322532014-10-08 11:20:38 -07001164 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1165 @Override
1166 public void handle(ClusterMessage message) {
1167
1168 log.info("Received device offline event from peer: {}", message.sender());
1169 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1170
1171 DeviceId deviceId = event.deviceId();
1172 Timestamp timestamp = event.timestamp();
1173
Madan Jampani255a58b2014-10-09 12:08:20 -07001174 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001175 }
1176 }
1177
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001178 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1179 @Override
1180 public void handle(ClusterMessage message) {
1181
1182 log.info("Received device removed event from peer: {}", message.sender());
1183 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1184
1185 DeviceId deviceId = event.deviceId();
1186 Timestamp timestamp = event.timestamp();
1187
Madan Jampani255a58b2014-10-09 12:08:20 -07001188 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001189 }
1190 }
1191
Madan Jampani47c93732014-10-06 20:46:08 -07001192 private class InternalPortEventListener implements ClusterMessageHandler {
1193 @Override
1194 public void handle(ClusterMessage message) {
1195
1196 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001197 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001198
1199 ProviderId providerId = event.providerId();
1200 DeviceId deviceId = event.deviceId();
1201 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1202
Madan Jampani255a58b2014-10-09 12:08:20 -07001203 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001204 }
1205 }
1206
1207 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1208 @Override
1209 public void handle(ClusterMessage message) {
1210
1211 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001212 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001213 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001214
1215 ProviderId providerId = event.providerId();
1216 DeviceId deviceId = event.deviceId();
1217 Timestamped<PortDescription> portDescription = event.portDescription();
1218
Madan Jampani255a58b2014-10-09 12:08:20 -07001219 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001220 }
1221 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001222
1223 private final class InternalDeviceAdvertisementListener
1224 implements ClusterMessageHandler {
1225
1226 @Override
1227 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001228 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001229 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1230 handleAdvertisement(advertisement);
1231 }
1232 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001233}