blob: 21941b50023c0a18da2c4efcc7ce03138ca01d09 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -070019import org.onlab.onos.mastership.MastershipService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070020import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070021import org.onlab.onos.net.DefaultAnnotations;
22import org.onlab.onos.net.DefaultDevice;
23import org.onlab.onos.net.DefaultPort;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.Device.Type;
26import org.onlab.onos.net.DeviceId;
27import org.onlab.onos.net.Port;
28import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070029import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070030import org.onlab.onos.net.device.DeviceDescription;
31import org.onlab.onos.net.device.DeviceEvent;
32import org.onlab.onos.net.device.DeviceStore;
33import org.onlab.onos.net.device.DeviceStoreDelegate;
34import org.onlab.onos.net.device.PortDescription;
35import org.onlab.onos.net.provider.ProviderId;
36import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070037import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070041import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070042import org.onlab.onos.store.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070044import org.onlab.onos.store.serializers.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070045import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070046import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070054import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070055import java.util.HashSet;
56import java.util.Iterator;
57import java.util.List;
58import java.util.Map;
59import java.util.Map.Entry;
60import java.util.Objects;
61import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070062import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070063import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065
66import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070068import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069import static org.onlab.onos.net.device.DeviceEvent.Type.*;
70import static org.slf4j.LoggerFactory.getLogger;
71import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
72import static org.onlab.onos.net.DefaultAnnotations.merge;
73import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070074import static org.onlab.util.Tools.namedThreads;
75import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
76import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070078// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070079/**
80 * Manages inventory of infrastructure devices using gossip protocol to distribute
81 * information.
82 */
83@Component(immediate = true)
84@Service
85public class GossipDeviceStore
86 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
87 implements DeviceStore {
88
89 private final Logger log = getLogger(getClass());
90
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070091 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070093 // innerMap is used to lock a Device, thus instance should never be replaced.
94 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -070095 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070096 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097
98 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070099 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
100 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
101
102 // to be updated under Device lock
103 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
104 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105
106 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700107 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700110 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700111
Madan Jampani47c93732014-10-06 20:46:08 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterCommunicationService clusterCommunicator;
114
Madan Jampani53e44e62014-10-07 12:39:51 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ClusterService clusterService;
117
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected MastershipService mastershipService;
120
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700121 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700122 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700123 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700124 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700125 .register(DistributedStoreSerializers.COMMON)
126
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700127 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700128 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700129 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700130 .register(InternalPortEvent.class, new InternalPortEventSerializer())
131 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700132 .register(DeviceAntiEntropyAdvertisement.class)
133 .register(DeviceFragmentId.class)
134 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700135 .build()
136 .populate(1);
137 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700138 };
139
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700140 private ScheduledExecutorService executor;
141
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700142 @Activate
143 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700144 clusterCommunicator.addSubscriber(
145 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700147 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
148 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700149 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
150 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700151 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
152 clusterCommunicator.addSubscriber(
153 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700154 clusterCommunicator.addSubscriber(
155 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
156
157 executor =
158 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
159
160 // TODO: Make these configurable
161 long initialDelaySec = 5;
162 long periodSec = 5;
163 // start anti-entropy thread
164 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
165 initialDelaySec, periodSec, TimeUnit.SECONDS);
166
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700167 log.info("Started");
168 }
169
170 @Deactivate
171 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700172
173 executor.shutdownNow();
174 try {
175 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
176 if (timedout) {
177 log.error("Timeout during executor shutdown");
178 }
179 } catch (InterruptedException e) {
180 log.error("Error during executor shutdown", e);
181 }
182
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700183 deviceDescs.clear();
184 devices.clear();
185 devicePorts.clear();
186 availableDevices.clear();
187 log.info("Stopped");
188 }
189
190 @Override
191 public int getDeviceCount() {
192 return devices.size();
193 }
194
195 @Override
196 public Iterable<Device> getDevices() {
197 return Collections.unmodifiableCollection(devices.values());
198 }
199
200 @Override
201 public Device getDevice(DeviceId deviceId) {
202 return devices.get(deviceId);
203 }
204
205 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700206 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
207 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700209 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700210 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700211 final DeviceEvent event;
212 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700213 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700214 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700215 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700216 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700217 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700218 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
219 providerId, deviceId);
220 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700221 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700222 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700223 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700224 + providerId + " and deviceId: " + deviceId, e);
225 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700226 }
227 return event;
228 }
229
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700230 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
231 DeviceId deviceId,
232 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
234 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700235 Map<ProviderId, DeviceDescriptions> providerDescs
236 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700238 synchronized (providerDescs) {
239 // locking per device
240
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700241 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
242 log.debug("Ignoring outdated event: {}", deltaDesc);
243 return null;
244 }
245
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700246 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700247
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700248 final Device oldDevice = devices.get(deviceId);
249 final Device newDevice;
250
251 if (deltaDesc == descs.getDeviceDesc() ||
252 deltaDesc.isNewer(descs.getDeviceDesc())) {
253 // on new device or valid update
254 descs.putDeviceDesc(deltaDesc);
255 newDevice = composeDevice(deviceId, providerDescs);
256 } else {
257 // outdated event, ignored.
258 return null;
259 }
260 if (oldDevice == null) {
261 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700262 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 } else {
264 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 }
267 }
268 }
269
270 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274
275 // update composed device cache
276 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
277 verify(oldDevice == null,
278 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
279 providerId, oldDevice, newDevice);
280
281 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283 }
284
285 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
286 }
287
288 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700291 Device oldDevice,
292 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700294 boolean propertiesChanged =
295 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
296 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
297 boolean annotationsChanged =
298 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700299
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700300 // Primary providers can respond to all changes, but ancillary ones
301 // should respond only to annotation changes.
302 if ((providerId.isAncillary() && annotationsChanged) ||
303 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
305 if (!replaced) {
306 verify(replaced,
307 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
308 providerId, oldDevice, devices.get(newDevice.id())
309 , newDevice);
310 }
311 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700312 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700313 }
314 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
315 }
316
317 // Otherwise merely attempt to change availability if primary provider
318 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700319 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700320 return !added ? null :
321 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
322 }
323 return null;
324 }
325
326 @Override
327 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700328 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700329 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700330 if (event != null) {
331 log.info("Notifying peers of a device offline topology event for deviceId: {}",
332 deviceId);
333 try {
334 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
335 } catch (IOException e) {
336 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
337 deviceId);
338 }
339 }
340 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341 }
342
343 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
344
345 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700346 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700347
348 // locking device
349 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700350
351 // accept off-line if given timestamp is newer than
352 // the latest Timestamp from Primary provider
353 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
354 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
355 if (timestamp.compareTo(lastTimestamp) <= 0) {
356 // outdated event ignore
357 return null;
358 }
359
360 offline.put(deviceId, timestamp);
361
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362 Device device = devices.get(deviceId);
363 if (device == null) {
364 return null;
365 }
366 boolean removed = availableDevices.remove(deviceId);
367 if (removed) {
368 // TODO: broadcast ... DOWN only?
369 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700370 }
371 return null;
372 }
373 }
374
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700375 /**
376 * Marks the device as available if the given timestamp is not outdated,
377 * compared to the time the device has been marked offline.
378 *
379 * @param deviceId identifier of the device
380 * @param timestamp of the event triggering this change.
381 * @return true if availability change request was accepted and changed the state
382 */
383 // Guarded by deviceDescs value (=Device lock)
384 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
385 // accept on-line if given timestamp is newer than
386 // the latest offline request Timestamp
387 Timestamp offlineTimestamp = offline.get(deviceId);
388 if (offlineTimestamp == null ||
389 offlineTimestamp.compareTo(timestamp) < 0) {
390
391 offline.remove(deviceId);
392 return availableDevices.add(deviceId);
393 }
394 return false;
395 }
396
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700397 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700398 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
399 DeviceId deviceId,
400 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700401
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700402 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700403 log.info("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700404
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700405 final Timestamped<List<PortDescription>> timestampedInput
406 = new Timestamped<>(portDescriptions, newTimestamp);
407 final List<DeviceEvent> events;
408 final Timestamped<List<PortDescription>> merged;
409
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700410 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700411 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700412 final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700413 List<PortDescription> mergedList =
414 FluentIterable.from(portDescriptions)
415 .transform(new Function<PortDescription, PortDescription>() {
416 @Override
417 public PortDescription apply(PortDescription input) {
418 // lookup merged port description
419 return descs.getPortDesc(input.portNumber()).value();
420 }
421 }).toList();
422 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
423 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700424 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700425 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
426 providerId, deviceId);
427 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700428 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700429 } catch (IOException e) {
430 log.error("Failed to notify peers of a port update topology event or providerId: "
431 + providerId + " and deviceId: " + deviceId, e);
432 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700433 }
434 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435 }
436
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700437 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
438 DeviceId deviceId,
439 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700440
441 Device device = devices.get(deviceId);
442 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
443
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700444 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700445 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
446
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700447 List<DeviceEvent> events = new ArrayList<>();
448 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700449
450 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
451 log.debug("Ignoring outdated events: {}", portDescriptions);
452 return null;
453 }
454
455 DeviceDescriptions descs = descsMap.get(providerId);
456 // every provider must provide DeviceDescription.
457 checkArgument(descs != null,
458 "Device description for Device ID %s from Provider %s was not found",
459 deviceId, providerId);
460
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700461 Map<PortNumber, Port> ports = getPortMap(deviceId);
462
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700463 final Timestamp newTimestamp = portDescriptions.timestamp();
464
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700465 // Add new ports
466 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700467 for (PortDescription portDescription : portDescriptions.value()) {
468 final PortNumber number = portDescription.portNumber();
469 processed.add(number);
470
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700471 final Port oldPort = ports.get(number);
472 final Port newPort;
473
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700474
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700475 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
476 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700477 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700478 // on new port or valid update
479 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700480 descs.putPortDesc(new Timestamped<>(portDescription,
481 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482 newPort = composePort(device, number, descsMap);
483 } else {
484 // outdated event, ignored.
485 continue;
486 }
487
488 events.add(oldPort == null ?
489 createPort(device, newPort, ports) :
490 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700491 }
492
493 events.addAll(pruneOldPorts(device, ports, processed));
494 }
495 return FluentIterable.from(events).filter(notNull()).toList();
496 }
497
498 // Creates a new port based on the port description adds it to the map and
499 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700500 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700501 private DeviceEvent createPort(Device device, Port newPort,
502 Map<PortNumber, Port> ports) {
503 ports.put(newPort.number(), newPort);
504 return new DeviceEvent(PORT_ADDED, device, newPort);
505 }
506
507 // Checks if the specified port requires update and if so, it replaces the
508 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700509 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700510 private DeviceEvent updatePort(Device device, Port oldPort,
511 Port newPort,
512 Map<PortNumber, Port> ports) {
513 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700514 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700515
516 ports.put(oldPort.number(), newPort);
517 return new DeviceEvent(PORT_UPDATED, device, newPort);
518 }
519 return null;
520 }
521
522 // Prunes the specified list of ports based on which ports are in the
523 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700524 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700525 private List<DeviceEvent> pruneOldPorts(Device device,
526 Map<PortNumber, Port> ports,
527 Set<PortNumber> processed) {
528 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700529 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700530 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700531 Entry<PortNumber, Port> e = iterator.next();
532 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700533 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700534 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700535 iterator.remove();
536 }
537 }
538 return events;
539 }
540
541 // Gets the map of ports for the specified device; if one does not already
542 // exist, it creates and registers a new one.
543 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
544 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700545 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
546 }
547
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700548 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700549 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700550 Map<ProviderId, DeviceDescriptions> r;
551 r = deviceDescs.get(deviceId);
552 if (r == null) {
553 r = new HashMap<ProviderId, DeviceDescriptions>();
554 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
555 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
556 if (concurrentlyAdded != null) {
557 r = concurrentlyAdded;
558 }
559 }
560 return r;
561 }
562
563 // Guarded by deviceDescs value (=Device lock)
564 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
565 Map<ProviderId, DeviceDescriptions> device,
566 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
567
568 synchronized (device) {
569 DeviceDescriptions r = device.get(providerId);
570 if (r == null) {
571 r = new DeviceDescriptions(deltaDesc);
572 device.put(providerId, r);
573 }
574 return r;
575 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700576 }
577
578 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700579 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
580 DeviceId deviceId,
581 PortDescription portDescription) {
582
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700583 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700584 final Timestamped<PortDescription> deltaDesc
585 = new Timestamped<>(portDescription, newTimestamp);
586 final DeviceEvent event;
587 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700588 synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700589 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700590 mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700591 .getPortDesc(portDescription.portNumber());
592 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700593 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700594 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
595 providerId, deviceId);
596 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700597 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700598 } catch (IOException e) {
599 log.error("Failed to notify peers of a port status update topology event or providerId: "
600 + providerId + " and deviceId: " + deviceId, e);
601 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700602 }
603 return event;
604 }
605
606 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
607 Timestamped<PortDescription> deltaDesc) {
608
609 Device device = devices.get(deviceId);
610 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
611
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700612 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700613 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
614
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700615 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700616
617 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
618 log.debug("Ignoring outdated event: {}", deltaDesc);
619 return null;
620 }
621
622 DeviceDescriptions descs = descsMap.get(providerId);
623 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700624 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700625 "Device description for Device ID %s from Provider %s was not found",
626 deviceId, providerId);
627
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700628 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
629 final PortNumber number = deltaDesc.value().portNumber();
630 final Port oldPort = ports.get(number);
631 final Port newPort;
632
633 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
634 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700635 deltaDesc.isNewer(existingPortDesc)) {
636 // on new port or valid update
637 // update description
638 descs.putPortDesc(deltaDesc);
639 newPort = composePort(device, number, descsMap);
640 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700641 // same or outdated event, ignored.
642 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700643 return null;
644 }
645
646 if (oldPort == null) {
647 return createPort(device, newPort, ports);
648 } else {
649 return updatePort(device, oldPort, newPort, ports);
650 }
651 }
652 }
653
654 @Override
655 public List<Port> getPorts(DeviceId deviceId) {
656 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
657 if (ports == null) {
658 return Collections.emptyList();
659 }
660 return ImmutableList.copyOf(ports.values());
661 }
662
663 @Override
664 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
665 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
666 return ports == null ? null : ports.get(portNumber);
667 }
668
669 @Override
670 public boolean isAvailable(DeviceId deviceId) {
671 return availableDevices.contains(deviceId);
672 }
673
674 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700675 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700676 final NodeId master = mastershipService.getMasterFor(deviceId);
677 if (!clusterService.getLocalNode().id().equals(master)) {
678 log.info("remove Device {} requested on non master node", deviceId);
679 // FIXME silently ignoring. Should be forwarding or broadcasting to
680 // master.
681 return null;
682 }
683
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700684 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700685 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700686 if (event != null) {
687 log.info("Notifying peers of a device removed topology event for deviceId: {}",
688 deviceId);
689 try {
690 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
691 } catch (IOException e) {
692 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
693 deviceId);
694 }
695 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700696 return event;
697 }
698
699 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
700 Timestamp timestamp) {
701
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700702 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700703 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700704 // accept removal request if given timestamp is newer than
705 // the latest Timestamp from Primary provider
706 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
707 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
708 if (timestamp.compareTo(lastTimestamp) <= 0) {
709 // outdated event ignore
710 return null;
711 }
712 removalRequest.put(deviceId, timestamp);
713
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700714 Device device = devices.remove(deviceId);
715 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700716 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
717 if (ports != null) {
718 ports.clear();
719 }
720 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700721 descs.clear();
722 return device == null ? null :
723 new DeviceEvent(DEVICE_REMOVED, device, null);
724 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700725 }
726
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700727 /**
728 * Checks if given timestamp is superseded by removal request
729 * with more recent timestamp.
730 *
731 * @param deviceId identifier of a device
732 * @param timestampToCheck timestamp of an event to check
733 * @return true if device is already removed
734 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700735 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
736 Timestamp removalTimestamp = removalRequest.get(deviceId);
737 if (removalTimestamp != null &&
738 removalTimestamp.compareTo(timestampToCheck) >= 0) {
739 // removalRequest is more recent
740 return true;
741 }
742 return false;
743 }
744
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700745 /**
746 * Returns a Device, merging description given from multiple Providers.
747 *
748 * @param deviceId device identifier
749 * @param providerDescs Collection of Descriptions from multiple providers
750 * @return Device instance
751 */
752 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700753 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700754
755 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
756
757 ProviderId primary = pickPrimaryPID(providerDescs);
758
759 DeviceDescriptions desc = providerDescs.get(primary);
760
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700761 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700762 Type type = base.type();
763 String manufacturer = base.manufacturer();
764 String hwVersion = base.hwVersion();
765 String swVersion = base.swVersion();
766 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700767 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700768 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
769 annotations = merge(annotations, base.annotations());
770
771 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
772 if (e.getKey().equals(primary)) {
773 continue;
774 }
775 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700776 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700777 // Currently assuming there will never be a key conflict between
778 // providers
779
780 // annotation merging. not so efficient, should revisit later
781 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
782 }
783
784 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700785 hwVersion, swVersion, serialNumber,
786 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700787 }
788
789 /**
790 * Returns a Port, merging description given from multiple Providers.
791 *
792 * @param device device the port is on
793 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700794 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700795 * @return Port instance
796 */
797 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700798 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700799
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700800 ProviderId primary = pickPrimaryPID(descsMap);
801 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700802 // if no primary, assume not enabled
803 // TODO: revisit this default port enabled/disabled behavior
804 boolean isEnabled = false;
805 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
806
807 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
808 if (portDesc != null) {
809 isEnabled = portDesc.value().isEnabled();
810 annotations = merge(annotations, portDesc.value().annotations());
811 }
812
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700813 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700814 if (e.getKey().equals(primary)) {
815 continue;
816 }
817 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700818 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700819 // Currently assuming there will never be a key conflict between
820 // providers
821
822 // annotation merging. not so efficient, should revisit later
823 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
824 if (otherPortDesc != null) {
825 annotations = merge(annotations, otherPortDesc.value().annotations());
826 }
827 }
828
829 return new DefaultPort(device, number, isEnabled, annotations);
830 }
831
832 /**
833 * @return primary ProviderID, or randomly chosen one if none exists
834 */
835 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700836 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700837 ProviderId fallBackPrimary = null;
838 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
839 if (!e.getKey().isAncillary()) {
840 return e.getKey();
841 } else if (fallBackPrimary == null) {
842 // pick randomly as a fallback in case there is no primary
843 fallBackPrimary = e.getKey();
844 }
845 }
846 return fallBackPrimary;
847 }
848
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700849 private DeviceDescriptions getPrimaryDescriptions(
850 Map<ProviderId, DeviceDescriptions> providerDescs) {
851 ProviderId pid = pickPrimaryPID(providerDescs);
852 return providerDescs.get(pid);
853 }
854
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700855 // TODO: should we be throwing exception?
856 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
857 ClusterMessage message = new ClusterMessage(
858 clusterService.getLocalNode().id(),
859 subject,
860 SERIALIZER.encode(event));
861 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700862 }
863
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700864 // TODO: should we be throwing exception?
865 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
866 ClusterMessage message = new ClusterMessage(
867 clusterService.getLocalNode().id(),
868 subject,
869 SERIALIZER.encode(event));
870 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700871 }
Madan Jampani47c93732014-10-06 20:46:08 -0700872
873 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700874 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700875 }
876
Madan Jampani25322532014-10-08 11:20:38 -0700877 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700878 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700879 }
880
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700881 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700882 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700883 }
884
Madan Jampani47c93732014-10-06 20:46:08 -0700885 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700886 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700887 }
888
889 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700890 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
891 }
892
893 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
894 try {
895 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
896 } catch (IOException e) {
897 log.error("Failed to send" + event + " to " + recipient, e);
898 }
899 }
900
901 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
902 try {
903 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
904 } catch (IOException e) {
905 log.error("Failed to send" + event + " to " + recipient, e);
906 }
907 }
908
909 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
910 try {
911 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
912 } catch (IOException e) {
913 log.error("Failed to send" + event + " to " + recipient, e);
914 }
915 }
916
917 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
918 try {
919 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
920 } catch (IOException e) {
921 log.error("Failed to send" + event + " to " + recipient, e);
922 }
923 }
924
925 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
926 try {
927 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
928 } catch (IOException e) {
929 log.error("Failed to send" + event + " to " + recipient, e);
930 }
931 }
932
933 private DeviceAntiEntropyAdvertisement createAdvertisement() {
934 final NodeId self = clusterService.getLocalNode().id();
935
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700936 final int numDevices = deviceDescs.size();
937 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
938 final int portsPerDevice = 8; // random factor to minimize reallocation
939 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
940 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700941
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700942 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700943 provs : deviceDescs.entrySet()) {
944
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700945 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700946 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700947 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700948 synchronized (devDescs) {
949
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700950 // send device offline timestamp
951 Timestamp lOffline = this.offline.get(deviceId);
952 if (lOffline != null) {
953 adOffline.put(deviceId, lOffline);
954 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700955
956 for (Entry<ProviderId, DeviceDescriptions>
957 prov : devDescs.entrySet()) {
958
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700959 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700960 final ProviderId provId = prov.getKey();
961 final DeviceDescriptions descs = prov.getValue();
962
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700963 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700964 descs.getDeviceDesc().timestamp());
965
966 for (Entry<PortNumber, Timestamped<PortDescription>>
967 portDesc : descs.getPortDescs().entrySet()) {
968
969 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700970 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700971 portDesc.getValue().timestamp());
972 }
973 }
974 }
975 }
976
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700977 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700978 }
979
980 /**
981 * Responds to anti-entropy advertisement message.
982 * <P>
983 * Notify sender about out-dated information using regular replication message.
984 * Send back advertisement to sender if not in sync.
985 *
986 * @param advertisement to respond to
987 */
988 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
989
990 final NodeId sender = advertisement.sender();
991
992 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
993 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
994 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
995
996 // Fragments to request
997 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
998 Collection<PortFragmentId> reqPorts = new ArrayList<>();
999
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001000 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001001 final DeviceId deviceId = de.getKey();
1002 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1003
1004 synchronized (lDevice) {
1005 // latestTimestamp across provider
1006 // Note: can be null initially
1007 Timestamp localLatest = offline.get(deviceId);
1008
1009 // handle device Ads
1010 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1011 final ProviderId provId = prov.getKey();
1012 final DeviceDescriptions lDeviceDescs = prov.getValue();
1013
1014 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1015
1016
1017 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1018 Timestamp advDevTimestamp = devAds.get(devFragId);
1019
1020 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1021 // remote does not have it or outdated, suggest
1022 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1023 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1024 // local is outdated, request
1025 reqDevices.add(devFragId);
1026 }
1027
1028 // handle port Ads
1029 for (Entry<PortNumber, Timestamped<PortDescription>>
1030 pe : lDeviceDescs.getPortDescs().entrySet()) {
1031
1032 final PortNumber num = pe.getKey();
1033 final Timestamped<PortDescription> lPort = pe.getValue();
1034
1035 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1036
1037 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001038 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001039 // remote does not have it or outdated, suggest
1040 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1041 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1042 // local is outdated, request
1043 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1044 reqPorts.add(portFragId);
1045 }
1046
1047 // remove port Ad already processed
1048 portAds.remove(portFragId);
1049 } // end local port loop
1050
1051 // remove device Ad already processed
1052 devAds.remove(devFragId);
1053
1054 // find latest and update
1055 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1056 if (localLatest == null ||
1057 providerLatest.compareTo(localLatest) > 0) {
1058 localLatest = providerLatest;
1059 }
1060 } // end local provider loop
1061
1062 // checking if remote timestamp is more recent.
1063 Timestamp rOffline = offlineAds.get(deviceId);
1064 if (rOffline != null &&
1065 rOffline.compareTo(localLatest) > 0) {
1066 // remote offline timestamp suggests that the
1067 // device is off-line
1068 markOfflineInternal(deviceId, rOffline);
1069 }
1070
1071 Timestamp lOffline = offline.get(deviceId);
1072 if (lOffline != null && rOffline == null) {
1073 // locally offline, but remote is online, suggest offline
1074 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1075 }
1076
1077 // remove device offline Ad already processed
1078 offlineAds.remove(deviceId);
1079 } // end local device loop
1080 } // device lock
1081
1082 // If there is any Ads left, request them
1083 log.trace("Ads left {}, {}", devAds, portAds);
1084 reqDevices.addAll(devAds.keySet());
1085 reqPorts.addAll(portAds.keySet());
1086
1087 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1088 log.trace("Nothing to request to remote peer {}", sender);
1089 return;
1090 }
1091
1092 log.info("Need to sync {} {}", reqDevices, reqPorts);
1093
1094 // 2-way Anti-Entropy for now
1095 try {
1096 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1097 } catch (IOException e) {
1098 log.error("Failed to send response advertisement to " + sender, e);
1099 }
1100
1101// Sketch of 3-way Anti-Entropy
1102// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1103// ClusterMessage message = new ClusterMessage(
1104// clusterService.getLocalNode().id(),
1105// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1106// SERIALIZER.encode(request));
1107//
1108// try {
1109// clusterCommunicator.unicast(message, advertisement.sender());
1110// } catch (IOException e) {
1111// log.error("Failed to send advertisement reply to "
1112// + advertisement.sender(), e);
1113// }
Madan Jampani47c93732014-10-06 20:46:08 -07001114 }
1115
Madan Jampani255a58b2014-10-09 12:08:20 -07001116 private void notifyDelegateIfNotNull(DeviceEvent event) {
1117 if (event != null) {
1118 notifyDelegate(event);
1119 }
1120 }
1121
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001122 private final class SendAdvertisementTask implements Runnable {
1123
1124 @Override
1125 public void run() {
1126 if (Thread.currentThread().isInterrupted()) {
1127 log.info("Interrupted, quitting");
1128 return;
1129 }
1130
1131 try {
1132 final NodeId self = clusterService.getLocalNode().id();
1133 Set<ControllerNode> nodes = clusterService.getNodes();
1134
1135 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1136 .transform(toNodeId())
1137 .toList();
1138
1139 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001140 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001141 return;
1142 }
1143
1144 NodeId peer;
1145 do {
1146 int idx = RandomUtils.nextInt(0, nodeIds.size());
1147 peer = nodeIds.get(idx);
1148 } while (peer.equals(self));
1149
1150 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1151
1152 if (Thread.currentThread().isInterrupted()) {
1153 log.info("Interrupted, quitting");
1154 return;
1155 }
1156
1157 try {
1158 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1159 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001160 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001161 return;
1162 }
1163 } catch (Exception e) {
1164 // catch all Exception to avoid Scheduled task being suppressed.
1165 log.error("Exception thrown while sending advertisement", e);
1166 }
1167 }
1168 }
1169
Madan Jampani47c93732014-10-06 20:46:08 -07001170 private class InternalDeviceEventListener implements ClusterMessageHandler {
1171 @Override
1172 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001173
Madan Jampani47c93732014-10-06 20:46:08 -07001174 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001175 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001176
Madan Jampani47c93732014-10-06 20:46:08 -07001177 ProviderId providerId = event.providerId();
1178 DeviceId deviceId = event.deviceId();
1179 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001180
Madan Jampani255a58b2014-10-09 12:08:20 -07001181 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001182 }
1183 }
1184
Madan Jampani25322532014-10-08 11:20:38 -07001185 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1186 @Override
1187 public void handle(ClusterMessage message) {
1188
1189 log.info("Received device offline event from peer: {}", message.sender());
1190 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1191
1192 DeviceId deviceId = event.deviceId();
1193 Timestamp timestamp = event.timestamp();
1194
Madan Jampani255a58b2014-10-09 12:08:20 -07001195 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001196 }
1197 }
1198
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001199 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1200 @Override
1201 public void handle(ClusterMessage message) {
1202
1203 log.info("Received device removed event from peer: {}", message.sender());
1204 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1205
1206 DeviceId deviceId = event.deviceId();
1207 Timestamp timestamp = event.timestamp();
1208
Madan Jampani255a58b2014-10-09 12:08:20 -07001209 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001210 }
1211 }
1212
Madan Jampani47c93732014-10-06 20:46:08 -07001213 private class InternalPortEventListener implements ClusterMessageHandler {
1214 @Override
1215 public void handle(ClusterMessage message) {
1216
1217 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001218 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001219
1220 ProviderId providerId = event.providerId();
1221 DeviceId deviceId = event.deviceId();
1222 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1223
Madan Jampani255a58b2014-10-09 12:08:20 -07001224 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001225 }
1226 }
1227
1228 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1229 @Override
1230 public void handle(ClusterMessage message) {
1231
1232 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001233 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001234 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001235
1236 ProviderId providerId = event.providerId();
1237 DeviceId deviceId = event.deviceId();
1238 Timestamped<PortDescription> portDescription = event.portDescription();
1239
Madan Jampani255a58b2014-10-09 12:08:20 -07001240 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001241 }
1242 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001243
1244 private final class InternalDeviceAdvertisementListener
1245 implements ClusterMessageHandler {
1246
1247 @Override
1248 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -07001249 log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001250 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1251 handleAdvertisement(advertisement);
1252 }
1253 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001254}