blob: ac9fc3ebfea81a6eb04229422a6ad821ce1d3358 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -070019import org.onlab.onos.mastership.MastershipService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070020import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070021import org.onlab.onos.net.DefaultAnnotations;
22import org.onlab.onos.net.DefaultDevice;
23import org.onlab.onos.net.DefaultPort;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.Device.Type;
26import org.onlab.onos.net.DeviceId;
27import org.onlab.onos.net.Port;
28import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070029import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070030import org.onlab.onos.net.device.DeviceDescription;
31import org.onlab.onos.net.device.DeviceEvent;
32import org.onlab.onos.net.device.DeviceStore;
33import org.onlab.onos.net.device.DeviceStoreDelegate;
34import org.onlab.onos.net.device.PortDescription;
35import org.onlab.onos.net.provider.ProviderId;
36import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070037import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070041import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070042import org.onlab.onos.store.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070044import org.onlab.onos.store.serializers.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070045import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070046import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070054import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070055import java.util.HashSet;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
59import java.util.Map.Entry;
60import java.util.Objects;
61import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070062import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070063import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065
66import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070068import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069import static org.onlab.onos.net.device.DeviceEvent.Type.*;
70import static org.slf4j.LoggerFactory.getLogger;
71import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
72import static org.onlab.onos.net.DefaultAnnotations.merge;
73import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070074import static org.onlab.util.Tools.namedThreads;
75import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
76import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070078// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070079/**
80 * Manages inventory of infrastructure devices using gossip protocol to distribute
81 * information.
82 */
83@Component(immediate = true)
84@Service
85public class GossipDeviceStore
86 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
87 implements DeviceStore {
88
89 private final Logger log = getLogger(getClass());
90
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070091 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070093 // innerMap is used to lock a Device, thus instance should never be replaced.
94 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070095 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070096 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097
98 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070099 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
100 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
101
102 // to be updated under Device lock
103 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
104 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105
106 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700107 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700110 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700111
Madan Jampani47c93732014-10-06 20:46:08 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterCommunicationService clusterCommunicator;
114
Madan Jampani53e44e62014-10-07 12:39:51 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ClusterService clusterService;
117
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected MastershipService mastershipService;
120
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700121 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700122 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700123 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700124 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700125 .register(DistributedStoreSerializers.COMMON)
126
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700127 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700128 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700129 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700130 .register(InternalPortEvent.class, new InternalPortEventSerializer())
131 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700132 .register(DeviceAntiEntropyAdvertisement.class)
133 .register(DeviceFragmentId.class)
134 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700135 .build()
136 .populate(1);
137 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700138 };
139
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700140 private ScheduledExecutorService executor;
141
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700142 @Activate
143 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700144 clusterCommunicator.addSubscriber(
145 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700147 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
148 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700149 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
150 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700151 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
152 clusterCommunicator.addSubscriber(
153 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700154 clusterCommunicator.addSubscriber(
155 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
156
157 executor =
158 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
159
160 // TODO: Make these configurable
161 long initialDelaySec = 5;
162 long periodSec = 5;
163 // start anti-entropy thread
164 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
165 initialDelaySec, periodSec, TimeUnit.SECONDS);
166
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700167 log.info("Started");
168 }
169
170 @Deactivate
171 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700172
173 executor.shutdownNow();
174 try {
175 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
176 if (timedout) {
177 log.error("Timeout during executor shutdown");
178 }
179 } catch (InterruptedException e) {
180 log.error("Error during executor shutdown", e);
181 }
182
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700183 deviceDescs.clear();
184 devices.clear();
185 devicePorts.clear();
186 availableDevices.clear();
187 log.info("Stopped");
188 }
189
190 @Override
191 public int getDeviceCount() {
192 return devices.size();
193 }
194
195 @Override
196 public Iterable<Device> getDevices() {
197 return Collections.unmodifiableCollection(devices.values());
198 }
199
200 @Override
201 public Device getDevice(DeviceId deviceId) {
202 return devices.get(deviceId);
203 }
204
205 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700206 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
207 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700209 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700210 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700211 final DeviceEvent event;
212 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700213 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700214 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700215 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700216 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700217 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700218 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
219 providerId, deviceId);
220 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700221 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700222 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700223 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700224 + providerId + " and deviceId: " + deviceId, e);
225 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700226 }
227 return event;
228 }
229
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700230 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
231 DeviceId deviceId,
232 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
234 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700235 Map<ProviderId, DeviceDescriptions> providerDescs
236 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700238 synchronized (providerDescs) {
239 // locking per device
240
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700241 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
242 log.debug("Ignoring outdated event: {}", deltaDesc);
243 return null;
244 }
245
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700246 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700247
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700248 final Device oldDevice = devices.get(deviceId);
249 final Device newDevice;
250
251 if (deltaDesc == descs.getDeviceDesc() ||
252 deltaDesc.isNewer(descs.getDeviceDesc())) {
253 // on new device or valid update
254 descs.putDeviceDesc(deltaDesc);
255 newDevice = composeDevice(deviceId, providerDescs);
256 } else {
257 // outdated event, ignored.
258 return null;
259 }
260 if (oldDevice == null) {
261 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700262 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 } else {
264 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 }
267 }
268 }
269
270 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274
275 // update composed device cache
276 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
277 verify(oldDevice == null,
278 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
279 providerId, oldDevice, newDevice);
280
281 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283 }
284
285 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
286 }
287
288 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700291 Device oldDevice,
292 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293
294 // We allow only certain attributes to trigger update
295 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
296 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700297 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700298
299 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
300 if (!replaced) {
301 verify(replaced,
302 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
303 providerId, oldDevice, devices.get(newDevice.id())
304 , newDevice);
305 }
306 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700307 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700308 }
309 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
310 }
311
312 // Otherwise merely attempt to change availability if primary provider
313 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700314 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700315 return !added ? null :
316 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
317 }
318 return null;
319 }
320
321 @Override
322 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700323 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700324 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700325 if (event != null) {
326 log.info("Notifying peers of a device offline topology event for deviceId: {}",
327 deviceId);
328 try {
329 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
330 } catch (IOException e) {
331 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
332 deviceId);
333 }
334 }
335 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336 }
337
338 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
339
340 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700341 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700342
343 // locking device
344 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700345
346 // accept off-line if given timestamp is newer than
347 // the latest Timestamp from Primary provider
348 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
349 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
350 if (timestamp.compareTo(lastTimestamp) <= 0) {
351 // outdated event ignore
352 return null;
353 }
354
355 offline.put(deviceId, timestamp);
356
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 Device device = devices.get(deviceId);
358 if (device == null) {
359 return null;
360 }
361 boolean removed = availableDevices.remove(deviceId);
362 if (removed) {
363 // TODO: broadcast ... DOWN only?
364 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365 }
366 return null;
367 }
368 }
369
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700370 /**
371 * Marks the device as available if the given timestamp is not outdated,
372 * compared to the time the device has been marked offline.
373 *
374 * @param deviceId identifier of the device
375 * @param timestamp of the event triggering this change.
376 * @return true if availability change request was accepted and changed the state
377 */
378 // Guarded by deviceDescs value (=Device lock)
379 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
380 // accept on-line if given timestamp is newer than
381 // the latest offline request Timestamp
382 Timestamp offlineTimestamp = offline.get(deviceId);
383 if (offlineTimestamp == null ||
384 offlineTimestamp.compareTo(timestamp) < 0) {
385
386 offline.remove(deviceId);
387 return availableDevices.add(deviceId);
388 }
389 return false;
390 }
391
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700393 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
394 DeviceId deviceId,
395 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700396
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700397 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700398 log.info("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700399
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700400 final Timestamped<List<PortDescription>> timestampedInput
401 = new Timestamped<>(portDescriptions, newTimestamp);
402 final List<DeviceEvent> events;
403 final Timestamped<List<PortDescription>> merged;
404
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700405 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700406 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700407 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700408 List<PortDescription> mergedList =
409 FluentIterable.from(portDescriptions)
410 .transform(new Function<PortDescription, PortDescription>() {
411 @Override
412 public PortDescription apply(PortDescription input) {
413 // lookup merged port description
414 return descs.getPortDesc(input.portNumber()).value();
415 }
416 }).toList();
417 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
418 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700419 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700420 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
421 providerId, deviceId);
422 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700423 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700424 } catch (IOException e) {
425 log.error("Failed to notify peers of a port update topology event or providerId: "
426 + providerId + " and deviceId: " + deviceId, e);
427 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700428 }
429 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700430 }
431
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700432 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
433 DeviceId deviceId,
434 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435
436 Device device = devices.get(deviceId);
437 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
438
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700439 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700440 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
441
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700442 List<DeviceEvent> events = new ArrayList<>();
443 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700444
445 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
446 log.debug("Ignoring outdated events: {}", portDescriptions);
447 return null;
448 }
449
450 DeviceDescriptions descs = descsMap.get(providerId);
451 // every provider must provide DeviceDescription.
452 checkArgument(descs != null,
453 "Device description for Device ID %s from Provider %s was not found",
454 deviceId, providerId);
455
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700456 Map<PortNumber, Port> ports = getPortMap(deviceId);
457
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700458 final Timestamp newTimestamp = portDescriptions.timestamp();
459
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700460 // Add new ports
461 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700462 for (PortDescription portDescription : portDescriptions.value()) {
463 final PortNumber number = portDescription.portNumber();
464 processed.add(number);
465
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466 final Port oldPort = ports.get(number);
467 final Port newPort;
468
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700469
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700470 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
471 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700472 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700473 // on new port or valid update
474 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700475 descs.putPortDesc(new Timestamped<>(portDescription,
476 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700477 newPort = composePort(device, number, descsMap);
478 } else {
479 // outdated event, ignored.
480 continue;
481 }
482
483 events.add(oldPort == null ?
484 createPort(device, newPort, ports) :
485 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700486 }
487
488 events.addAll(pruneOldPorts(device, ports, processed));
489 }
490 return FluentIterable.from(events).filter(notNull()).toList();
491 }
492
493 // Creates a new port based on the port description adds it to the map and
494 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700495 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700496 private DeviceEvent createPort(Device device, Port newPort,
497 Map<PortNumber, Port> ports) {
498 ports.put(newPort.number(), newPort);
499 return new DeviceEvent(PORT_ADDED, device, newPort);
500 }
501
502 // Checks if the specified port requires update and if so, it replaces the
503 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700504 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700505 private DeviceEvent updatePort(Device device, Port oldPort,
506 Port newPort,
507 Map<PortNumber, Port> ports) {
508 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700509 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700510
511 ports.put(oldPort.number(), newPort);
512 return new DeviceEvent(PORT_UPDATED, device, newPort);
513 }
514 return null;
515 }
516
517 // Prunes the specified list of ports based on which ports are in the
518 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700519 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700520 private List<DeviceEvent> pruneOldPorts(Device device,
521 Map<PortNumber, Port> ports,
522 Set<PortNumber> processed) {
523 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700524 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700525 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700526 Entry<PortNumber, Port> e = iterator.next();
527 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700528 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700529 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700530 iterator.remove();
531 }
532 }
533 return events;
534 }
535
536 // Gets the map of ports for the specified device; if one does not already
537 // exist, it creates and registers a new one.
538 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
539 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700540 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
541 }
542
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700543 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700544 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700545 Map<ProviderId, DeviceDescriptions> r;
546 r = deviceDescs.get(deviceId);
547 if (r == null) {
548 r = new HashMap<ProviderId, DeviceDescriptions>();
549 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
550 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
551 if (concurrentlyAdded != null) {
552 r = concurrentlyAdded;
553 }
554 }
555 return r;
556 }
557
558 // Guarded by deviceDescs value (=Device lock)
559 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
560 Map<ProviderId, DeviceDescriptions> device,
561 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
562
563 synchronized (device) {
564 DeviceDescriptions r = device.get(providerId);
565 if (r == null) {
566 r = new DeviceDescriptions(deltaDesc);
567 device.put(providerId, r);
568 }
569 return r;
570 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700571 }
572
573 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700574 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
575 DeviceId deviceId,
576 PortDescription portDescription) {
577
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700578 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700579 final Timestamped<PortDescription> deltaDesc
580 = new Timestamped<>(portDescription, newTimestamp);
581 final DeviceEvent event;
582 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700583 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700584 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700585 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700586 .getPortDesc(portDescription.portNumber());
587 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700588 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700589 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
590 providerId, deviceId);
591 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700592 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700593 } catch (IOException e) {
594 log.error("Failed to notify peers of a port status update topology event or providerId: "
595 + providerId + " and deviceId: " + deviceId, e);
596 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700597 }
598 return event;
599 }
600
601 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
602 Timestamped<PortDescription> deltaDesc) {
603
604 Device device = devices.get(deviceId);
605 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
606
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700607 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700608 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
609
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700610 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700611
612 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
613 log.debug("Ignoring outdated event: {}", deltaDesc);
614 return null;
615 }
616
617 DeviceDescriptions descs = descsMap.get(providerId);
618 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700619 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700620 "Device description for Device ID %s from Provider %s was not found",
621 deviceId, providerId);
622
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700623 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
624 final PortNumber number = deltaDesc.value().portNumber();
625 final Port oldPort = ports.get(number);
626 final Port newPort;
627
628 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
629 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700630 deltaDesc.isNewer(existingPortDesc)) {
631 // on new port or valid update
632 // update description
633 descs.putPortDesc(deltaDesc);
634 newPort = composePort(device, number, descsMap);
635 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700636 // same or outdated event, ignored.
637 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700638 return null;
639 }
640
641 if (oldPort == null) {
642 return createPort(device, newPort, ports);
643 } else {
644 return updatePort(device, oldPort, newPort, ports);
645 }
646 }
647 }
648
649 @Override
650 public List<Port> getPorts(DeviceId deviceId) {
651 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
652 if (ports == null) {
653 return Collections.emptyList();
654 }
655 return ImmutableList.copyOf(ports.values());
656 }
657
658 @Override
659 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
660 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
661 return ports == null ? null : ports.get(portNumber);
662 }
663
664 @Override
665 public boolean isAvailable(DeviceId deviceId) {
666 return availableDevices.contains(deviceId);
667 }
668
669 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700670 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700671 final NodeId master = mastershipService.getMasterFor(deviceId);
672 if (!clusterService.getLocalNode().id().equals(master)) {
673 log.info("remove Device {} requested on non master node", deviceId);
674 // FIXME silently ignoring. Should be forwarding or broadcasting to
675 // master.
676 return null;
677 }
678
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700679 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700680 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700681 if (event != null) {
682 log.info("Notifying peers of a device removed topology event for deviceId: {}",
683 deviceId);
684 try {
685 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
686 } catch (IOException e) {
687 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
688 deviceId);
689 }
690 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700691 return event;
692 }
693
694 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
695 Timestamp timestamp) {
696
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700697 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700698 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700699 // accept removal request if given timestamp is newer than
700 // the latest Timestamp from Primary provider
701 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
702 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
703 if (timestamp.compareTo(lastTimestamp) <= 0) {
704 // outdated event ignore
705 return null;
706 }
707 removalRequest.put(deviceId, timestamp);
708
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700709 Device device = devices.remove(deviceId);
710 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700711 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
712 if (ports != null) {
713 ports.clear();
714 }
715 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700716 descs.clear();
717 return device == null ? null :
718 new DeviceEvent(DEVICE_REMOVED, device, null);
719 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700720 }
721
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700722 /**
723 * Checks if given timestamp is superseded by removal request
724 * with more recent timestamp.
725 *
726 * @param deviceId identifier of a device
727 * @param timestampToCheck timestamp of an event to check
728 * @return true if device is already removed
729 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700730 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
731 Timestamp removalTimestamp = removalRequest.get(deviceId);
732 if (removalTimestamp != null &&
733 removalTimestamp.compareTo(timestampToCheck) >= 0) {
734 // removalRequest is more recent
735 return true;
736 }
737 return false;
738 }
739
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700740 /**
741 * Returns a Device, merging description given from multiple Providers.
742 *
743 * @param deviceId device identifier
744 * @param providerDescs Collection of Descriptions from multiple providers
745 * @return Device instance
746 */
747 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700748 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700749
750 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
751
752 ProviderId primary = pickPrimaryPID(providerDescs);
753
754 DeviceDescriptions desc = providerDescs.get(primary);
755
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700756 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700757 Type type = base.type();
758 String manufacturer = base.manufacturer();
759 String hwVersion = base.hwVersion();
760 String swVersion = base.swVersion();
761 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700762 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700763 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
764 annotations = merge(annotations, base.annotations());
765
766 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
767 if (e.getKey().equals(primary)) {
768 continue;
769 }
770 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700771 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700772 // Currently assuming there will never be a key conflict between
773 // providers
774
775 // annotation merging. not so efficient, should revisit later
776 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
777 }
778
779 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700780 hwVersion, swVersion, serialNumber,
781 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700782 }
783
784 /**
785 * Returns a Port, merging description given from multiple Providers.
786 *
787 * @param device device the port is on
788 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700789 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700790 * @return Port instance
791 */
792 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700793 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700794
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700795 ProviderId primary = pickPrimaryPID(descsMap);
796 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700797 // if no primary, assume not enabled
798 // TODO: revisit this default port enabled/disabled behavior
799 boolean isEnabled = false;
800 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
801
802 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
803 if (portDesc != null) {
804 isEnabled = portDesc.value().isEnabled();
805 annotations = merge(annotations, portDesc.value().annotations());
806 }
807
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700808 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700809 if (e.getKey().equals(primary)) {
810 continue;
811 }
812 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700813 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700814 // Currently assuming there will never be a key conflict between
815 // providers
816
817 // annotation merging. not so efficient, should revisit later
818 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
819 if (otherPortDesc != null) {
820 annotations = merge(annotations, otherPortDesc.value().annotations());
821 }
822 }
823
824 return new DefaultPort(device, number, isEnabled, annotations);
825 }
826
827 /**
828 * @return primary ProviderID, or randomly chosen one if none exists
829 */
830 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700831 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700832 ProviderId fallBackPrimary = null;
833 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
834 if (!e.getKey().isAncillary()) {
835 return e.getKey();
836 } else if (fallBackPrimary == null) {
837 // pick randomly as a fallback in case there is no primary
838 fallBackPrimary = e.getKey();
839 }
840 }
841 return fallBackPrimary;
842 }
843
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700844 private DeviceDescriptions getPrimaryDescriptions(
845 Map<ProviderId, DeviceDescriptions> providerDescs) {
846 ProviderId pid = pickPrimaryPID(providerDescs);
847 return providerDescs.get(pid);
848 }
849
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700850 // TODO: should we be throwing exception?
851 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
852 ClusterMessage message = new ClusterMessage(
853 clusterService.getLocalNode().id(),
854 subject,
855 SERIALIZER.encode(event));
856 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700857 }
858
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700859 // TODO: should we be throwing exception?
860 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
861 ClusterMessage message = new ClusterMessage(
862 clusterService.getLocalNode().id(),
863 subject,
864 SERIALIZER.encode(event));
865 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700866 }
Madan Jampani47c93732014-10-06 20:46:08 -0700867
868 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700869 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700870 }
871
Madan Jampani25322532014-10-08 11:20:38 -0700872 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700873 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700874 }
875
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700876 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700877 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700878 }
879
Madan Jampani47c93732014-10-06 20:46:08 -0700880 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700881 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700882 }
883
884 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700885 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
886 }
887
888 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
889 try {
890 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
891 } catch (IOException e) {
892 log.error("Failed to send" + event + " to " + recipient, e);
893 }
894 }
895
896 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
897 try {
898 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
899 } catch (IOException e) {
900 log.error("Failed to send" + event + " to " + recipient, e);
901 }
902 }
903
904 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
905 try {
906 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
907 } catch (IOException e) {
908 log.error("Failed to send" + event + " to " + recipient, e);
909 }
910 }
911
912 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
913 try {
914 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
915 } catch (IOException e) {
916 log.error("Failed to send" + event + " to " + recipient, e);
917 }
918 }
919
920 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
921 try {
922 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
923 } catch (IOException e) {
924 log.error("Failed to send" + event + " to " + recipient, e);
925 }
926 }
927
928 private DeviceAntiEntropyAdvertisement createAdvertisement() {
929 final NodeId self = clusterService.getLocalNode().id();
930
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700931 final int numDevices = deviceDescs.size();
932 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
933 final int portsPerDevice = 8; // random factor to minimize reallocation
934 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
935 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700936
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700937 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700938 provs : deviceDescs.entrySet()) {
939
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700940 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700941 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700942 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700943 synchronized (devDescs) {
944
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700945 // send device offline timestamp
946 Timestamp lOffline = this.offline.get(deviceId);
947 if (lOffline != null) {
948 adOffline.put(deviceId, lOffline);
949 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700950
951 for (Entry<ProviderId, DeviceDescriptions>
952 prov : devDescs.entrySet()) {
953
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700954 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700955 final ProviderId provId = prov.getKey();
956 final DeviceDescriptions descs = prov.getValue();
957
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700958 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700959 descs.getDeviceDesc().timestamp());
960
961 for (Entry<PortNumber, Timestamped<PortDescription>>
962 portDesc : descs.getPortDescs().entrySet()) {
963
964 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700965 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700966 portDesc.getValue().timestamp());
967 }
968 }
969 }
970 }
971
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700972 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700973 }
974
975 /**
976 * Responds to anti-entropy advertisement message.
977 * <P>
978 * Notify sender about out-dated information using regular replication message.
979 * Send back advertisement to sender if not in sync.
980 *
981 * @param advertisement to respond to
982 */
983 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
984
985 final NodeId sender = advertisement.sender();
986
987 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
988 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
989 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
990
991 // Fragments to request
992 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
993 Collection<PortFragmentId> reqPorts = new ArrayList<>();
994
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700995 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700996 final DeviceId deviceId = de.getKey();
997 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
998
999 synchronized (lDevice) {
1000 // latestTimestamp across provider
1001 // Note: can be null initially
1002 Timestamp localLatest = offline.get(deviceId);
1003
1004 // handle device Ads
1005 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1006 final ProviderId provId = prov.getKey();
1007 final DeviceDescriptions lDeviceDescs = prov.getValue();
1008
1009 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1010
1011
1012 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1013 Timestamp advDevTimestamp = devAds.get(devFragId);
1014
1015 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1016 // remote does not have it or outdated, suggest
1017 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1018 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1019 // local is outdated, request
1020 reqDevices.add(devFragId);
1021 }
1022
1023 // handle port Ads
1024 for (Entry<PortNumber, Timestamped<PortDescription>>
1025 pe : lDeviceDescs.getPortDescs().entrySet()) {
1026
1027 final PortNumber num = pe.getKey();
1028 final Timestamped<PortDescription> lPort = pe.getValue();
1029
1030 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1031
1032 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001033 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001034 // remote does not have it or outdated, suggest
1035 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1036 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1037 // local is outdated, request
1038 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1039 reqPorts.add(portFragId);
1040 }
1041
1042 // remove port Ad already processed
1043 portAds.remove(portFragId);
1044 } // end local port loop
1045
1046 // remove device Ad already processed
1047 devAds.remove(devFragId);
1048
1049 // find latest and update
1050 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1051 if (localLatest == null ||
1052 providerLatest.compareTo(localLatest) > 0) {
1053 localLatest = providerLatest;
1054 }
1055 } // end local provider loop
1056
1057 // checking if remote timestamp is more recent.
1058 Timestamp rOffline = offlineAds.get(deviceId);
1059 if (rOffline != null &&
1060 rOffline.compareTo(localLatest) > 0) {
1061 // remote offline timestamp suggests that the
1062 // device is off-line
1063 markOfflineInternal(deviceId, rOffline);
1064 }
1065
1066 Timestamp lOffline = offline.get(deviceId);
1067 if (lOffline != null && rOffline == null) {
1068 // locally offline, but remote is online, suggest offline
1069 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1070 }
1071
1072 // remove device offline Ad already processed
1073 offlineAds.remove(deviceId);
1074 } // end local device loop
1075 } // device lock
1076
1077 // If there is any Ads left, request them
1078 log.trace("Ads left {}, {}", devAds, portAds);
1079 reqDevices.addAll(devAds.keySet());
1080 reqPorts.addAll(portAds.keySet());
1081
1082 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1083 log.trace("Nothing to request to remote peer {}", sender);
1084 return;
1085 }
1086
1087 log.info("Need to sync {} {}", reqDevices, reqPorts);
1088
1089 // 2-way Anti-Entropy for now
1090 try {
1091 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1092 } catch (IOException e) {
1093 log.error("Failed to send response advertisement to " + sender, e);
1094 }
1095
1096// Sketch of 3-way Anti-Entropy
1097// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1098// ClusterMessage message = new ClusterMessage(
1099// clusterService.getLocalNode().id(),
1100// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1101// SERIALIZER.encode(request));
1102//
1103// try {
1104// clusterCommunicator.unicast(message, advertisement.sender());
1105// } catch (IOException e) {
1106// log.error("Failed to send advertisement reply to "
1107// + advertisement.sender(), e);
1108// }
Madan Jampani47c93732014-10-06 20:46:08 -07001109 }
1110
Madan Jampani255a58b2014-10-09 12:08:20 -07001111 private void notifyDelegateIfNotNull(DeviceEvent event) {
1112 if (event != null) {
1113 notifyDelegate(event);
1114 }
1115 }
1116
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001117 private final class SendAdvertisementTask implements Runnable {
1118
1119 @Override
1120 public void run() {
1121 if (Thread.currentThread().isInterrupted()) {
1122 log.info("Interrupted, quitting");
1123 return;
1124 }
1125
1126 try {
1127 final NodeId self = clusterService.getLocalNode().id();
1128 Set<ControllerNode> nodes = clusterService.getNodes();
1129
1130 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1131 .transform(toNodeId())
1132 .toList();
1133
1134 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001135 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001136 return;
1137 }
1138
1139 NodeId peer;
1140 do {
1141 int idx = RandomUtils.nextInt(0, nodeIds.size());
1142 peer = nodeIds.get(idx);
1143 } while (peer.equals(self));
1144
1145 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1146
1147 if (Thread.currentThread().isInterrupted()) {
1148 log.info("Interrupted, quitting");
1149 return;
1150 }
1151
1152 try {
1153 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1154 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001155 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001156 return;
1157 }
1158 } catch (Exception e) {
1159 // catch all Exception to avoid Scheduled task being suppressed.
1160 log.error("Exception thrown while sending advertisement", e);
1161 }
1162 }
1163 }
1164
Madan Jampani47c93732014-10-06 20:46:08 -07001165 private class InternalDeviceEventListener implements ClusterMessageHandler {
1166 @Override
1167 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001168
Madan Jampani47c93732014-10-06 20:46:08 -07001169 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001170 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001171
Madan Jampani47c93732014-10-06 20:46:08 -07001172 ProviderId providerId = event.providerId();
1173 DeviceId deviceId = event.deviceId();
1174 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001175
Madan Jampani255a58b2014-10-09 12:08:20 -07001176 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001177 }
1178 }
1179
Madan Jampani25322532014-10-08 11:20:38 -07001180 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1181 @Override
1182 public void handle(ClusterMessage message) {
1183
1184 log.info("Received device offline event from peer: {}", message.sender());
1185 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1186
1187 DeviceId deviceId = event.deviceId();
1188 Timestamp timestamp = event.timestamp();
1189
Madan Jampani255a58b2014-10-09 12:08:20 -07001190 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001191 }
1192 }
1193
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001194 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1195 @Override
1196 public void handle(ClusterMessage message) {
1197
1198 log.info("Received device removed event from peer: {}", message.sender());
1199 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1200
1201 DeviceId deviceId = event.deviceId();
1202 Timestamp timestamp = event.timestamp();
1203
Madan Jampani255a58b2014-10-09 12:08:20 -07001204 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001205 }
1206 }
1207
Madan Jampani47c93732014-10-06 20:46:08 -07001208 private class InternalPortEventListener implements ClusterMessageHandler {
1209 @Override
1210 public void handle(ClusterMessage message) {
1211
1212 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001213 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001214
1215 ProviderId providerId = event.providerId();
1216 DeviceId deviceId = event.deviceId();
1217 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1218
Madan Jampani255a58b2014-10-09 12:08:20 -07001219 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001220 }
1221 }
1222
1223 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1224 @Override
1225 public void handle(ClusterMessage message) {
1226
1227 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001228 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001229 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001230
1231 ProviderId providerId = event.providerId();
1232 DeviceId deviceId = event.deviceId();
1233 Timestamped<PortDescription> portDescription = event.portDescription();
1234
Madan Jampani255a58b2014-10-09 12:08:20 -07001235 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001236 }
1237 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001238
1239 private final class InternalDeviceAdvertisementListener
1240 implements ClusterMessageHandler {
1241
1242 @Override
1243 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001244 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001245 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1246 handleAdvertisement(advertisement);
1247 }
1248 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001249}