blob: f5f1d3e6882ccf8200435862efcccccc91290c02 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
Yuta HIGUCHI47c40882014-10-10 18:44:37 -07003import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07004import com.google.common.collect.FluentIterable;
5import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07008
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07009import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070019import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070020import org.onlab.onos.net.DefaultAnnotations;
21import org.onlab.onos.net.DefaultDevice;
22import org.onlab.onos.net.DefaultPort;
23import org.onlab.onos.net.Device;
24import org.onlab.onos.net.Device.Type;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.Port;
27import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070028import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070029import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070036import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIfa891c92014-10-09 15:21:40 -070041import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070042import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070043import org.onlab.onos.store.serializers.DistributedStoreSerializers;
Madan Jampani53e44e62014-10-07 12:39:51 -070044import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070045import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070046import org.slf4j.Logger;
47
Madan Jampani47c93732014-10-06 20:46:08 -070048import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070049import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070050import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070052import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070053import java.util.HashSet;
54import java.util.Iterator;
55import java.util.List;
56import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070060import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070061import java.util.concurrent.ScheduledExecutorService;
62import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070063
64import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070065import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070066import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68import static org.slf4j.LoggerFactory.getLogger;
69import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70import static org.onlab.onos.net.DefaultAnnotations.merge;
71import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070072import static org.onlab.util.Tools.namedThreads;
73import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
74import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070075
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070076// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070077/**
78 * Manages inventory of infrastructure devices using gossip protocol to distribute
79 * information.
80 */
81@Component(immediate = true)
82@Service
83public class GossipDeviceStore
84 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
85 implements DeviceStore {
86
87 private final Logger log = getLogger(getClass());
88
89 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
90
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070091 // TODO: Check if inner Map can be replaced with plain Map.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092 // innerMap is used to lock a Device, thus instance should never be replaced.
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070093
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094 // collection of Description given from various providers
95 private final ConcurrentMap<DeviceId,
96 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070098
99 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700100 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
101 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
102
103 // to be updated under Device lock
104 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
105 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106
107 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700108 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700111 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700112
Madan Jampani47c93732014-10-06 20:46:08 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterCommunicationService clusterCommunicator;
115
Madan Jampani53e44e62014-10-07 12:39:51 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterService clusterService;
118
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700119 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700120 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700121 protected void setupKryoPool() {
122 serializerPool = KryoPool.newBuilder()
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700123 .register(DistributedStoreSerializers.COMMON)
124
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700125 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
Madan Jampani25322532014-10-08 11:20:38 -0700126 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700127 .register(InternalDeviceRemovedEvent.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700128 .register(InternalPortEvent.class, new InternalPortEventSerializer())
129 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700130 .register(DeviceAntiEntropyAdvertisement.class)
131 .register(DeviceFragmentId.class)
132 .register(PortFragmentId.class)
Madan Jampani53e44e62014-10-07 12:39:51 -0700133 .build()
134 .populate(1);
135 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700136 };
137
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700138 private ScheduledExecutorService executor;
139
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700140 @Activate
141 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700142 clusterCommunicator.addSubscriber(
143 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
144 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700145 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
146 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700147 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
148 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700149 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
150 clusterCommunicator.addSubscriber(
151 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700152 clusterCommunicator.addSubscriber(
153 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
154
155 executor =
156 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
157
158 // TODO: Make these configurable
159 long initialDelaySec = 5;
160 long periodSec = 5;
161 // start anti-entropy thread
162 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
163 initialDelaySec, periodSec, TimeUnit.SECONDS);
164
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700165 log.info("Started");
166 }
167
168 @Deactivate
169 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700170
171 executor.shutdownNow();
172 try {
173 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
174 if (timedout) {
175 log.error("Timeout during executor shutdown");
176 }
177 } catch (InterruptedException e) {
178 log.error("Error during executor shutdown", e);
179 }
180
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700181 deviceDescs.clear();
182 devices.clear();
183 devicePorts.clear();
184 availableDevices.clear();
185 log.info("Stopped");
186 }
187
188 @Override
189 public int getDeviceCount() {
190 return devices.size();
191 }
192
193 @Override
194 public Iterable<Device> getDevices() {
195 return Collections.unmodifiableCollection(devices.values());
196 }
197
198 @Override
199 public Device getDevice(DeviceId deviceId) {
200 return devices.get(deviceId);
201 }
202
203 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700204 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
205 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700207 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700209 final DeviceEvent event;
210 final Timestamped<DeviceDescription> mergedDesc;
211 synchronized (getDeviceDescriptions(deviceId)) {
212 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
213 mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
214 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700215 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700216 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
217 providerId, deviceId);
218 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700219 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700220 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700221 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700222 + providerId + " and deviceId: " + deviceId, e);
223 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700224 }
225 return event;
226 }
227
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700228 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
229 DeviceId deviceId,
230 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700231
232 // Collection of DeviceDescriptions for a Device
233 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700234 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700235
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700236 synchronized (providerDescs) {
237 // locking per device
238
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700239 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
240 log.debug("Ignoring outdated event: {}", deltaDesc);
241 return null;
242 }
243
244 DeviceDescriptions descs
245 = createIfAbsentUnchecked(providerDescs, providerId,
246 new InitDeviceDescs(deltaDesc));
247
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700248 final Device oldDevice = devices.get(deviceId);
249 final Device newDevice;
250
251 if (deltaDesc == descs.getDeviceDesc() ||
252 deltaDesc.isNewer(descs.getDeviceDesc())) {
253 // on new device or valid update
254 descs.putDeviceDesc(deltaDesc);
255 newDevice = composeDevice(deviceId, providerDescs);
256 } else {
257 // outdated event, ignored.
258 return null;
259 }
260 if (oldDevice == null) {
261 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700262 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 } else {
264 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 }
267 }
268 }
269
270 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274
275 // update composed device cache
276 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
277 verify(oldDevice == null,
278 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
279 providerId, oldDevice, newDevice);
280
281 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283 }
284
285 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
286 }
287
288 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700291 Device oldDevice,
292 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700293
294 // We allow only certain attributes to trigger update
295 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
296 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700297 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700298
299 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
300 if (!replaced) {
301 verify(replaced,
302 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
303 providerId, oldDevice, devices.get(newDevice.id())
304 , newDevice);
305 }
306 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700307 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700308 }
309 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
310 }
311
312 // Otherwise merely attempt to change availability if primary provider
313 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700314 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700315 return !added ? null :
316 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
317 }
318 return null;
319 }
320
321 @Override
322 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700323 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700324 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700325 if (event != null) {
326 log.info("Notifying peers of a device offline topology event for deviceId: {}",
327 deviceId);
328 try {
329 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
330 } catch (IOException e) {
331 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
332 deviceId);
333 }
334 }
335 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336 }
337
338 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
339
340 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700341 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700342
343 // locking device
344 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700345
346 // accept off-line if given timestamp is newer than
347 // the latest Timestamp from Primary provider
348 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
349 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
350 if (timestamp.compareTo(lastTimestamp) <= 0) {
351 // outdated event ignore
352 return null;
353 }
354
355 offline.put(deviceId, timestamp);
356
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 Device device = devices.get(deviceId);
358 if (device == null) {
359 return null;
360 }
361 boolean removed = availableDevices.remove(deviceId);
362 if (removed) {
363 // TODO: broadcast ... DOWN only?
364 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365 }
366 return null;
367 }
368 }
369
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700370 /**
371 * Marks the device as available if the given timestamp is not outdated,
372 * compared to the time the device has been marked offline.
373 *
374 * @param deviceId identifier of the device
375 * @param timestamp of the event triggering this change.
376 * @return true if availability change request was accepted and changed the state
377 */
378 // Guarded by deviceDescs value (=Device lock)
379 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
380 // accept on-line if given timestamp is newer than
381 // the latest offline request Timestamp
382 Timestamp offlineTimestamp = offline.get(deviceId);
383 if (offlineTimestamp == null ||
384 offlineTimestamp.compareTo(timestamp) < 0) {
385
386 offline.remove(deviceId);
387 return availableDevices.add(deviceId);
388 }
389 return false;
390 }
391
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700393 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
394 DeviceId deviceId,
395 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700396
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700397 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700398
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700399 final Timestamped<List<PortDescription>> timestampedInput
400 = new Timestamped<>(portDescriptions, newTimestamp);
401 final List<DeviceEvent> events;
402 final Timestamped<List<PortDescription>> merged;
403
404 synchronized (getDeviceDescriptions(deviceId)) {
405 events = updatePortsInternal(providerId, deviceId, timestampedInput);
406 final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
407 List<PortDescription> mergedList =
408 FluentIterable.from(portDescriptions)
409 .transform(new Function<PortDescription, PortDescription>() {
410 @Override
411 public PortDescription apply(PortDescription input) {
412 // lookup merged port description
413 return descs.getPortDesc(input.portNumber()).value();
414 }
415 }).toList();
416 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
417 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700418 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700419 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
420 providerId, deviceId);
421 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700422 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700423 } catch (IOException e) {
424 log.error("Failed to notify peers of a port update topology event or providerId: "
425 + providerId + " and deviceId: " + deviceId, e);
426 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700427 }
428 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700429 }
430
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700431 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
432 DeviceId deviceId,
433 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700434
435 Device device = devices.get(deviceId);
436 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
437
438 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
439 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
440
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700441 List<DeviceEvent> events = new ArrayList<>();
442 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700443
444 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
445 log.debug("Ignoring outdated events: {}", portDescriptions);
446 return null;
447 }
448
449 DeviceDescriptions descs = descsMap.get(providerId);
450 // every provider must provide DeviceDescription.
451 checkArgument(descs != null,
452 "Device description for Device ID %s from Provider %s was not found",
453 deviceId, providerId);
454
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700455 Map<PortNumber, Port> ports = getPortMap(deviceId);
456
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700457 final Timestamp newTimestamp = portDescriptions.timestamp();
458
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700459 // Add new ports
460 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700461 for (PortDescription portDescription : portDescriptions.value()) {
462 final PortNumber number = portDescription.portNumber();
463 processed.add(number);
464
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700465 final Port oldPort = ports.get(number);
466 final Port newPort;
467
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700468
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700469 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
470 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700471 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700472 // on new port or valid update
473 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700474 descs.putPortDesc(new Timestamped<>(portDescription,
475 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700476 newPort = composePort(device, number, descsMap);
477 } else {
478 // outdated event, ignored.
479 continue;
480 }
481
482 events.add(oldPort == null ?
483 createPort(device, newPort, ports) :
484 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700485 }
486
487 events.addAll(pruneOldPorts(device, ports, processed));
488 }
489 return FluentIterable.from(events).filter(notNull()).toList();
490 }
491
492 // Creates a new port based on the port description adds it to the map and
493 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700494 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700495 private DeviceEvent createPort(Device device, Port newPort,
496 Map<PortNumber, Port> ports) {
497 ports.put(newPort.number(), newPort);
498 return new DeviceEvent(PORT_ADDED, device, newPort);
499 }
500
501 // Checks if the specified port requires update and if so, it replaces the
502 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700503 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700504 private DeviceEvent updatePort(Device device, Port oldPort,
505 Port newPort,
506 Map<PortNumber, Port> ports) {
507 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700508 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700509
510 ports.put(oldPort.number(), newPort);
511 return new DeviceEvent(PORT_UPDATED, device, newPort);
512 }
513 return null;
514 }
515
516 // Prunes the specified list of ports based on which ports are in the
517 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700518 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700519 private List<DeviceEvent> pruneOldPorts(Device device,
520 Map<PortNumber, Port> ports,
521 Set<PortNumber> processed) {
522 List<DeviceEvent> events = new ArrayList<>();
523 Iterator<PortNumber> iterator = ports.keySet().iterator();
524 while (iterator.hasNext()) {
525 PortNumber portNumber = iterator.next();
526 if (!processed.contains(portNumber)) {
527 events.add(new DeviceEvent(PORT_REMOVED, device,
528 ports.get(portNumber)));
529 iterator.remove();
530 }
531 }
532 return events;
533 }
534
535 // Gets the map of ports for the specified device; if one does not already
536 // exist, it creates and registers a new one.
537 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
538 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700539 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
540 }
541
542 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
543 DeviceId deviceId) {
544 return createIfAbsentUnchecked(deviceDescs, deviceId,
545 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700546 }
547
548 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700549 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
550 DeviceId deviceId,
551 PortDescription portDescription) {
552
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700553 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700554 final Timestamped<PortDescription> deltaDesc
555 = new Timestamped<>(portDescription, newTimestamp);
556 final DeviceEvent event;
557 final Timestamped<PortDescription> mergedDesc;
558 synchronized (getDeviceDescriptions(deviceId)) {
559 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
560 mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
561 .getPortDesc(portDescription.portNumber());
562 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700563 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700564 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
565 providerId, deviceId);
566 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700567 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700568 } catch (IOException e) {
569 log.error("Failed to notify peers of a port status update topology event or providerId: "
570 + providerId + " and deviceId: " + deviceId, e);
571 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700572 }
573 return event;
574 }
575
576 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
577 Timestamped<PortDescription> deltaDesc) {
578
579 Device device = devices.get(deviceId);
580 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
581
582 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
583 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
584
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700585 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700586
587 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
588 log.debug("Ignoring outdated event: {}", deltaDesc);
589 return null;
590 }
591
592 DeviceDescriptions descs = descsMap.get(providerId);
593 // assuming all providers must to give DeviceDescription
594 checkArgument(descs != null,
595 "Device description for Device ID %s from Provider %s was not found",
596 deviceId, providerId);
597
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700598 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
599 final PortNumber number = deltaDesc.value().portNumber();
600 final Port oldPort = ports.get(number);
601 final Port newPort;
602
603 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
604 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700605 deltaDesc.isNewer(existingPortDesc)) {
606 // on new port or valid update
607 // update description
608 descs.putPortDesc(deltaDesc);
609 newPort = composePort(device, number, descsMap);
610 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700611 // same or outdated event, ignored.
612 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700613 return null;
614 }
615
616 if (oldPort == null) {
617 return createPort(device, newPort, ports);
618 } else {
619 return updatePort(device, oldPort, newPort, ports);
620 }
621 }
622 }
623
624 @Override
625 public List<Port> getPorts(DeviceId deviceId) {
626 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
627 if (ports == null) {
628 return Collections.emptyList();
629 }
630 return ImmutableList.copyOf(ports.values());
631 }
632
633 @Override
634 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
635 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
636 return ports == null ? null : ports.get(portNumber);
637 }
638
639 @Override
640 public boolean isAvailable(DeviceId deviceId) {
641 return availableDevices.contains(deviceId);
642 }
643
644 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700645 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700646 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700647 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700648 if (event != null) {
649 log.info("Notifying peers of a device removed topology event for deviceId: {}",
650 deviceId);
651 try {
652 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
653 } catch (IOException e) {
654 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
655 deviceId);
656 }
657 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700658 return event;
659 }
660
661 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
662 Timestamp timestamp) {
663
664 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700665 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700666 // accept removal request if given timestamp is newer than
667 // the latest Timestamp from Primary provider
668 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
669 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
670 if (timestamp.compareTo(lastTimestamp) <= 0) {
671 // outdated event ignore
672 return null;
673 }
674 removalRequest.put(deviceId, timestamp);
675
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700676 Device device = devices.remove(deviceId);
677 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700678 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
679 if (ports != null) {
680 ports.clear();
681 }
682 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700683 descs.clear();
684 return device == null ? null :
685 new DeviceEvent(DEVICE_REMOVED, device, null);
686 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700687 }
688
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700689 /**
690 * Checks if given timestamp is superseded by removal request
691 * with more recent timestamp.
692 *
693 * @param deviceId identifier of a device
694 * @param timestampToCheck timestamp of an event to check
695 * @return true if device is already removed
696 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700697 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
698 Timestamp removalTimestamp = removalRequest.get(deviceId);
699 if (removalTimestamp != null &&
700 removalTimestamp.compareTo(timestampToCheck) >= 0) {
701 // removalRequest is more recent
702 return true;
703 }
704 return false;
705 }
706
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700707 /**
708 * Returns a Device, merging description given from multiple Providers.
709 *
710 * @param deviceId device identifier
711 * @param providerDescs Collection of Descriptions from multiple providers
712 * @return Device instance
713 */
714 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700715 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700716
717 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
718
719 ProviderId primary = pickPrimaryPID(providerDescs);
720
721 DeviceDescriptions desc = providerDescs.get(primary);
722
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700723 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700724 Type type = base.type();
725 String manufacturer = base.manufacturer();
726 String hwVersion = base.hwVersion();
727 String swVersion = base.swVersion();
728 String serialNumber = base.serialNumber();
729 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
730 annotations = merge(annotations, base.annotations());
731
732 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
733 if (e.getKey().equals(primary)) {
734 continue;
735 }
736 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700737 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700738 // Currently assuming there will never be a key conflict between
739 // providers
740
741 // annotation merging. not so efficient, should revisit later
742 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
743 }
744
745 return new DefaultDevice(primary, deviceId , type, manufacturer,
746 hwVersion, swVersion, serialNumber, annotations);
747 }
748
749 /**
750 * Returns a Port, merging description given from multiple Providers.
751 *
752 * @param device device the port is on
753 * @param number port number
754 * @param providerDescs Collection of Descriptions from multiple providers
755 * @return Port instance
756 */
757 private Port composePort(Device device, PortNumber number,
758 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
759
760 ProviderId primary = pickPrimaryPID(providerDescs);
761 DeviceDescriptions primDescs = providerDescs.get(primary);
762 // if no primary, assume not enabled
763 // TODO: revisit this default port enabled/disabled behavior
764 boolean isEnabled = false;
765 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
766
767 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
768 if (portDesc != null) {
769 isEnabled = portDesc.value().isEnabled();
770 annotations = merge(annotations, portDesc.value().annotations());
771 }
772
773 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
774 if (e.getKey().equals(primary)) {
775 continue;
776 }
777 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700778 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700779 // Currently assuming there will never be a key conflict between
780 // providers
781
782 // annotation merging. not so efficient, should revisit later
783 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
784 if (otherPortDesc != null) {
785 annotations = merge(annotations, otherPortDesc.value().annotations());
786 }
787 }
788
789 return new DefaultPort(device, number, isEnabled, annotations);
790 }
791
792 /**
793 * @return primary ProviderID, or randomly chosen one if none exists
794 */
795 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700796 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700797 ProviderId fallBackPrimary = null;
798 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
799 if (!e.getKey().isAncillary()) {
800 return e.getKey();
801 } else if (fallBackPrimary == null) {
802 // pick randomly as a fallback in case there is no primary
803 fallBackPrimary = e.getKey();
804 }
805 }
806 return fallBackPrimary;
807 }
808
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700809 private DeviceDescriptions getPrimaryDescriptions(
810 Map<ProviderId, DeviceDescriptions> providerDescs) {
811 ProviderId pid = pickPrimaryPID(providerDescs);
812 return providerDescs.get(pid);
813 }
814
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700815 // TODO: should we be throwing exception?
816 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
817 ClusterMessage message = new ClusterMessage(
818 clusterService.getLocalNode().id(),
819 subject,
820 SERIALIZER.encode(event));
821 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700822 }
823
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700824 // TODO: should we be throwing exception?
825 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
826 ClusterMessage message = new ClusterMessage(
827 clusterService.getLocalNode().id(),
828 subject,
829 SERIALIZER.encode(event));
830 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700831 }
Madan Jampani47c93732014-10-06 20:46:08 -0700832
833 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700834 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700835 }
836
Madan Jampani25322532014-10-08 11:20:38 -0700837 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700838 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700839 }
840
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700841 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700842 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700843 }
844
Madan Jampani47c93732014-10-06 20:46:08 -0700845 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700846 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700847 }
848
849 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700850 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
851 }
852
853 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
854 try {
855 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
856 } catch (IOException e) {
857 log.error("Failed to send" + event + " to " + recipient, e);
858 }
859 }
860
861 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
862 try {
863 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
864 } catch (IOException e) {
865 log.error("Failed to send" + event + " to " + recipient, e);
866 }
867 }
868
869 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
870 try {
871 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
872 } catch (IOException e) {
873 log.error("Failed to send" + event + " to " + recipient, e);
874 }
875 }
876
877 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
878 try {
879 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
880 } catch (IOException e) {
881 log.error("Failed to send" + event + " to " + recipient, e);
882 }
883 }
884
885 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
886 try {
887 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
888 } catch (IOException e) {
889 log.error("Failed to send" + event + " to " + recipient, e);
890 }
891 }
892
893 private DeviceAntiEntropyAdvertisement createAdvertisement() {
894 final NodeId self = clusterService.getLocalNode().id();
895
896 Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
897 final int portsPerDevice = 8; // random guess to minimize reallocation
898 Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
899 Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
900
901 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
902 provs : deviceDescs.entrySet()) {
903
904 final DeviceId deviceId = provs.getKey();
905 final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
906 synchronized (devDescs) {
907
908 offline.put(deviceId, this.offline.get(deviceId));
909
910 for (Entry<ProviderId, DeviceDescriptions>
911 prov : devDescs.entrySet()) {
912
913 final ProviderId provId = prov.getKey();
914 final DeviceDescriptions descs = prov.getValue();
915
916 devices.put(new DeviceFragmentId(deviceId, provId),
917 descs.getDeviceDesc().timestamp());
918
919 for (Entry<PortNumber, Timestamped<PortDescription>>
920 portDesc : descs.getPortDescs().entrySet()) {
921
922 final PortNumber number = portDesc.getKey();
923 ports.put(new PortFragmentId(deviceId, provId, number),
924 portDesc.getValue().timestamp());
925 }
926 }
927 }
928 }
929
930 return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
931 }
932
933 /**
934 * Responds to anti-entropy advertisement message.
935 * <P>
936 * Notify sender about out-dated information using regular replication message.
937 * Send back advertisement to sender if not in sync.
938 *
939 * @param advertisement to respond to
940 */
941 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
942
943 final NodeId sender = advertisement.sender();
944
945 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
946 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
947 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
948
949 // Fragments to request
950 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
951 Collection<PortFragmentId> reqPorts = new ArrayList<>();
952
953 for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
954 final DeviceId deviceId = de.getKey();
955 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
956
957 synchronized (lDevice) {
958 // latestTimestamp across provider
959 // Note: can be null initially
960 Timestamp localLatest = offline.get(deviceId);
961
962 // handle device Ads
963 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
964 final ProviderId provId = prov.getKey();
965 final DeviceDescriptions lDeviceDescs = prov.getValue();
966
967 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
968
969
970 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
971 Timestamp advDevTimestamp = devAds.get(devFragId);
972
973 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
974 // remote does not have it or outdated, suggest
975 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
976 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
977 // local is outdated, request
978 reqDevices.add(devFragId);
979 }
980
981 // handle port Ads
982 for (Entry<PortNumber, Timestamped<PortDescription>>
983 pe : lDeviceDescs.getPortDescs().entrySet()) {
984
985 final PortNumber num = pe.getKey();
986 final Timestamped<PortDescription> lPort = pe.getValue();
987
988 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
989
990 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -0700991 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700992 // remote does not have it or outdated, suggest
993 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
994 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
995 // local is outdated, request
996 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
997 reqPorts.add(portFragId);
998 }
999
1000 // remove port Ad already processed
1001 portAds.remove(portFragId);
1002 } // end local port loop
1003
1004 // remove device Ad already processed
1005 devAds.remove(devFragId);
1006
1007 // find latest and update
1008 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1009 if (localLatest == null ||
1010 providerLatest.compareTo(localLatest) > 0) {
1011 localLatest = providerLatest;
1012 }
1013 } // end local provider loop
1014
1015 // checking if remote timestamp is more recent.
1016 Timestamp rOffline = offlineAds.get(deviceId);
1017 if (rOffline != null &&
1018 rOffline.compareTo(localLatest) > 0) {
1019 // remote offline timestamp suggests that the
1020 // device is off-line
1021 markOfflineInternal(deviceId, rOffline);
1022 }
1023
1024 Timestamp lOffline = offline.get(deviceId);
1025 if (lOffline != null && rOffline == null) {
1026 // locally offline, but remote is online, suggest offline
1027 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1028 }
1029
1030 // remove device offline Ad already processed
1031 offlineAds.remove(deviceId);
1032 } // end local device loop
1033 } // device lock
1034
1035 // If there is any Ads left, request them
1036 log.trace("Ads left {}, {}", devAds, portAds);
1037 reqDevices.addAll(devAds.keySet());
1038 reqPorts.addAll(portAds.keySet());
1039
1040 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1041 log.trace("Nothing to request to remote peer {}", sender);
1042 return;
1043 }
1044
1045 log.info("Need to sync {} {}", reqDevices, reqPorts);
1046
1047 // 2-way Anti-Entropy for now
1048 try {
1049 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1050 } catch (IOException e) {
1051 log.error("Failed to send response advertisement to " + sender, e);
1052 }
1053
1054// Sketch of 3-way Anti-Entropy
1055// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1056// ClusterMessage message = new ClusterMessage(
1057// clusterService.getLocalNode().id(),
1058// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1059// SERIALIZER.encode(request));
1060//
1061// try {
1062// clusterCommunicator.unicast(message, advertisement.sender());
1063// } catch (IOException e) {
1064// log.error("Failed to send advertisement reply to "
1065// + advertisement.sender(), e);
1066// }
Madan Jampani47c93732014-10-06 20:46:08 -07001067 }
1068
Madan Jampani255a58b2014-10-09 12:08:20 -07001069 private void notifyDelegateIfNotNull(DeviceEvent event) {
1070 if (event != null) {
1071 notifyDelegate(event);
1072 }
1073 }
1074
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001075 private final class SendAdvertisementTask implements Runnable {
1076
1077 @Override
1078 public void run() {
1079 if (Thread.currentThread().isInterrupted()) {
1080 log.info("Interrupted, quitting");
1081 return;
1082 }
1083
1084 try {
1085 final NodeId self = clusterService.getLocalNode().id();
1086 Set<ControllerNode> nodes = clusterService.getNodes();
1087
1088 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1089 .transform(toNodeId())
1090 .toList();
1091
1092 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -07001093 log.debug("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001094 return;
1095 }
1096
1097 NodeId peer;
1098 do {
1099 int idx = RandomUtils.nextInt(0, nodeIds.size());
1100 peer = nodeIds.get(idx);
1101 } while (peer.equals(self));
1102
1103 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1104
1105 if (Thread.currentThread().isInterrupted()) {
1106 log.info("Interrupted, quitting");
1107 return;
1108 }
1109
1110 try {
1111 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1112 } catch (IOException e) {
1113 log.error("Failed to send anti-entropy advertisement", e);
1114 return;
1115 }
1116 } catch (Exception e) {
1117 // catch all Exception to avoid Scheduled task being suppressed.
1118 log.error("Exception thrown while sending advertisement", e);
1119 }
1120 }
1121 }
1122
Madan Jampani47c93732014-10-06 20:46:08 -07001123 private class InternalDeviceEventListener implements ClusterMessageHandler {
1124 @Override
1125 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001126
Madan Jampani47c93732014-10-06 20:46:08 -07001127 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001128 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001129
Madan Jampani47c93732014-10-06 20:46:08 -07001130 ProviderId providerId = event.providerId();
1131 DeviceId deviceId = event.deviceId();
1132 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001133
Madan Jampani255a58b2014-10-09 12:08:20 -07001134 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001135 }
1136 }
1137
Madan Jampani25322532014-10-08 11:20:38 -07001138 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1139 @Override
1140 public void handle(ClusterMessage message) {
1141
1142 log.info("Received device offline event from peer: {}", message.sender());
1143 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1144
1145 DeviceId deviceId = event.deviceId();
1146 Timestamp timestamp = event.timestamp();
1147
Madan Jampani255a58b2014-10-09 12:08:20 -07001148 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001149 }
1150 }
1151
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001152 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1153 @Override
1154 public void handle(ClusterMessage message) {
1155
1156 log.info("Received device removed event from peer: {}", message.sender());
1157 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1158
1159 DeviceId deviceId = event.deviceId();
1160 Timestamp timestamp = event.timestamp();
1161
Madan Jampani255a58b2014-10-09 12:08:20 -07001162 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001163 }
1164 }
1165
Madan Jampani47c93732014-10-06 20:46:08 -07001166 private class InternalPortEventListener implements ClusterMessageHandler {
1167 @Override
1168 public void handle(ClusterMessage message) {
1169
1170 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001171 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001172
1173 ProviderId providerId = event.providerId();
1174 DeviceId deviceId = event.deviceId();
1175 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1176
Madan Jampani255a58b2014-10-09 12:08:20 -07001177 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001178 }
1179 }
1180
1181 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1182 @Override
1183 public void handle(ClusterMessage message) {
1184
1185 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001186 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001187 log.info("{}", event);
Madan Jampani47c93732014-10-06 20:46:08 -07001188
1189 ProviderId providerId = event.providerId();
1190 DeviceId deviceId = event.deviceId();
1191 Timestamped<PortDescription> portDescription = event.portDescription();
1192
Madan Jampani255a58b2014-10-09 12:08:20 -07001193 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001194 }
1195 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001196
1197 private final class InternalDeviceAdvertisementListener
1198 implements ClusterMessageHandler {
1199
1200 @Override
1201 public void handle(ClusterMessage message) {
1202 log.info("Received Device advertisement from peer: {}", message.sender());
1203 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1204 handleAdvertisement(advertisement);
1205 }
1206 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001207}