blob: b623b5dd39016ee668f5a903ce9bfde68f3e151f [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070016package org.onlab.onos.store.device.impl;
17
Yuta HIGUCHI47c40882014-10-10 18:44:37 -070018import com.google.common.base.Function;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070019import com.google.common.collect.FluentIterable;
20import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070021import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -070023
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070024import org.apache.commons.lang3.RandomUtils;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070031import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070032import org.onlab.onos.cluster.ControllerNode;
33import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -070034import org.onlab.onos.mastership.MastershipService;
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -080035import org.onlab.onos.mastership.MastershipTerm;
36import org.onlab.onos.mastership.MastershipTermService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070037import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070038import org.onlab.onos.net.DefaultAnnotations;
39import org.onlab.onos.net.DefaultDevice;
40import org.onlab.onos.net.DefaultPort;
41import org.onlab.onos.net.Device;
42import org.onlab.onos.net.Device.Type;
43import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -080044import org.onlab.onos.net.MastershipRole;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070045import org.onlab.onos.net.Port;
46import org.onlab.onos.net.PortNumber;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070047import org.onlab.onos.net.device.DeviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.onlab.onos.net.device.DeviceDescription;
49import org.onlab.onos.net.device.DeviceEvent;
50import org.onlab.onos.net.device.DeviceStore;
51import org.onlab.onos.net.device.DeviceStoreDelegate;
52import org.onlab.onos.net.device.PortDescription;
53import org.onlab.onos.net.provider.ProviderId;
54import org.onlab.onos.store.AbstractStore;
Yuta HIGUCHId40483d2014-10-09 15:20:30 -070055import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070056import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
57import org.onlab.onos.store.cluster.messaging.ClusterMessage;
58import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070059import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070060import org.onlab.onos.store.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070061import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080062import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
alshabib7911a052014-10-16 17:49:37 -070063import org.onlab.packet.ChassisId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070064import org.onlab.util.KryoNamespace;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070065import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066import org.slf4j.Logger;
67
Madan Jampani47c93732014-10-06 20:46:08 -070068import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069import java.util.ArrayList;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070070import java.util.Collection;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070071import java.util.Collections;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070072import java.util.HashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070073import java.util.HashSet;
74import java.util.Iterator;
75import java.util.List;
76import java.util.Map;
77import java.util.Map.Entry;
78import java.util.Objects;
79import java.util.Set;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070080import java.util.concurrent.ConcurrentMap;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070081import java.util.concurrent.ScheduledExecutorService;
82import java.util.concurrent.TimeUnit;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070083
84import static com.google.common.base.Preconditions.checkArgument;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070085import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHIdc7374c2014-10-10 11:11:09 -070086import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070087import static org.onlab.onos.net.device.DeviceEvent.Type.*;
88import static org.slf4j.LoggerFactory.getLogger;
89import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
90import static org.onlab.onos.net.DefaultAnnotations.merge;
91import static com.google.common.base.Verify.verify;
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -070092import static org.onlab.util.Tools.namedThreads;
93import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
94import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -080095import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070096
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070098/**
99 * Manages inventory of infrastructure devices using gossip protocol to distribute
100 * information.
101 */
102@Component(immediate = true)
103@Service
104public class GossipDeviceStore
105 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
106 implements DeviceStore {
107
108 private final Logger log = getLogger(getClass());
109
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700110 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700111
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700112 // innerMap is used to lock a Device, thus instance should never be replaced.
113 // collection of Description given from various providers
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700114 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700115 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700116
117 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700118 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
119 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
120
121 // to be updated under Device lock
122 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
123 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700124
125 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700126 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700129 protected DeviceClockService deviceClockService;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700130
Madan Jampani47c93732014-10-06 20:46:08 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ClusterCommunicationService clusterCommunicator;
133
Madan Jampani53e44e62014-10-07 12:39:51 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterService clusterService;
136
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected MastershipService mastershipService;
139
Yuta HIGUCHIbcac4992014-11-22 19:27:57 -0800140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected MastershipTermService termService;
142
143
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700144 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700145 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700146 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700147 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800148 .register(DistributedStoreSerializers.STORE_COMMON)
149 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
150 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
151 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700152 .register(InternalDeviceRemovedEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800153 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
154 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700155 .register(DeviceAntiEntropyAdvertisement.class)
156 .register(DeviceFragmentId.class)
157 .register(PortFragmentId.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800158 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -0700159 }
Madan Jampani53e44e62014-10-07 12:39:51 -0700160 };
161
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700162 private ScheduledExecutorService executor;
163
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700164 @Activate
165 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700166 clusterCommunicator.addSubscriber(
167 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
168 clusterCommunicator.addSubscriber(
Madan Jampani25322532014-10-08 11:20:38 -0700169 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800170 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
Madan Jampani25322532014-10-08 11:20:38 -0700171 clusterCommunicator.addSubscriber(
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700172 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
173 clusterCommunicator.addSubscriber(
Madan Jampani2206e012014-10-06 21:04:20 -0700174 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
175 clusterCommunicator.addSubscriber(
176 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700177 clusterCommunicator.addSubscriber(
178 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
179
180 executor =
181 newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
182
183 // TODO: Make these configurable
184 long initialDelaySec = 5;
185 long periodSec = 5;
186 // start anti-entropy thread
187 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
188 initialDelaySec, periodSec, TimeUnit.SECONDS);
189
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700190 log.info("Started");
191 }
192
193 @Deactivate
194 public void deactivate() {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700195
196 executor.shutdownNow();
197 try {
198 boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
199 if (timedout) {
200 log.error("Timeout during executor shutdown");
201 }
202 } catch (InterruptedException e) {
203 log.error("Error during executor shutdown", e);
204 }
205
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 deviceDescs.clear();
207 devices.clear();
208 devicePorts.clear();
209 availableDevices.clear();
210 log.info("Stopped");
211 }
212
213 @Override
214 public int getDeviceCount() {
215 return devices.size();
216 }
217
218 @Override
219 public Iterable<Device> getDevices() {
220 return Collections.unmodifiableCollection(devices.values());
221 }
222
223 @Override
224 public Device getDevice(DeviceId deviceId) {
225 return devices.get(deviceId);
226 }
227
228 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700229 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
230 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700231 DeviceDescription deviceDescription) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700232 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700234 final DeviceEvent event;
235 final Timestamped<DeviceDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800236 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
237 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700238 event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800239 mergedDesc = device.get(providerId).getDeviceDesc();
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700240 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700241 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700242 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
243 providerId, deviceId);
244 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700245 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700246 } catch (IOException e) {
Madan Jampani25322532014-10-08 11:20:38 -0700247 log.error("Failed to notify peers of a device update topology event for providerId: "
Madan Jampani47c93732014-10-06 20:46:08 -0700248 + providerId + " and deviceId: " + deviceId, e);
249 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700250 }
251 return event;
252 }
253
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700254 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
255 DeviceId deviceId,
256 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700257
258 // Collection of DeviceDescriptions for a Device
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800259 Map<ProviderId, DeviceDescriptions> device
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700260 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700261
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800262 synchronized (device) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700263 // locking per device
264
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
266 log.debug("Ignoring outdated event: {}", deltaDesc);
267 return null;
268 }
269
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800270 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 final Device oldDevice = devices.get(deviceId);
273 final Device newDevice;
274
275 if (deltaDesc == descs.getDeviceDesc() ||
276 deltaDesc.isNewer(descs.getDeviceDesc())) {
277 // on new device or valid update
278 descs.putDeviceDesc(deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800279 newDevice = composeDevice(deviceId, device);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700280 } else {
281 // outdated event, ignored.
282 return null;
283 }
284 if (oldDevice == null) {
285 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700286 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700287 } else {
288 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700289 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290 }
291 }
292 }
293
294 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700295 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700296 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700297 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700298
299 // update composed device cache
300 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
301 verify(oldDevice == null,
302 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
303 providerId, oldDevice, newDevice);
304
305 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700306 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700307 }
308
309 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
310 }
311
312 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700313 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700314 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700315 Device oldDevice,
316 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700317 // We allow only certain attributes to trigger update
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700318 boolean propertiesChanged =
319 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
320 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
321 boolean annotationsChanged =
322 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700323
Thomas Vachuska56dbeb12014-10-22 16:40:44 -0700324 // Primary providers can respond to all changes, but ancillary ones
325 // should respond only to annotation changes.
326 if ((providerId.isAncillary() && annotationsChanged) ||
327 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700328 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
329 if (!replaced) {
330 verify(replaced,
331 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
332 providerId, oldDevice, devices.get(newDevice.id())
333 , newDevice);
334 }
335 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700336 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700337 }
338 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
339 }
340
341 // Otherwise merely attempt to change availability if primary provider
342 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700343 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700344 return !added ? null :
345 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
346 }
347 return null;
348 }
349
350 @Override
351 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700352 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700353 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700354 if (event != null) {
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700355 log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
356 deviceId, timestamp);
Madan Jampani25322532014-10-08 11:20:38 -0700357 try {
358 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
359 } catch (IOException e) {
360 log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
361 deviceId);
362 }
363 }
364 return event;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700365 }
366
367 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
368
369 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700370 = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700371
372 // locking device
373 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700374
375 // accept off-line if given timestamp is newer than
376 // the latest Timestamp from Primary provider
377 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
378 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
379 if (timestamp.compareTo(lastTimestamp) <= 0) {
380 // outdated event ignore
381 return null;
382 }
383
384 offline.put(deviceId, timestamp);
385
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700386 Device device = devices.get(deviceId);
387 if (device == null) {
388 return null;
389 }
390 boolean removed = availableDevices.remove(deviceId);
391 if (removed) {
392 // TODO: broadcast ... DOWN only?
393 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700394 }
395 return null;
396 }
397 }
398
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700399 /**
400 * Marks the device as available if the given timestamp is not outdated,
401 * compared to the time the device has been marked offline.
402 *
403 * @param deviceId identifier of the device
404 * @param timestamp of the event triggering this change.
405 * @return true if availability change request was accepted and changed the state
406 */
407 // Guarded by deviceDescs value (=Device lock)
408 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
409 // accept on-line if given timestamp is newer than
410 // the latest offline request Timestamp
411 Timestamp offlineTimestamp = offline.get(deviceId);
412 if (offlineTimestamp == null ||
413 offlineTimestamp.compareTo(timestamp) < 0) {
414
415 offline.remove(deviceId);
416 return availableDevices.add(deviceId);
417 }
418 return false;
419 }
420
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700421 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700422 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
423 DeviceId deviceId,
424 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700425
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700426 final Timestamp newTimestamp;
427 try {
428 newTimestamp = deviceClockService.getTimestamp(deviceId);
429 } catch (IllegalStateException e) {
430 log.info("Timestamp was not available for device {}", deviceId);
431 log.debug(" discarding {}", portDescriptions);
432 // Failed to generate timestamp.
433
434 // Possible situation:
435 // Device connected and became master for short period of time,
436 // but lost mastership before this instance had the chance to
437 // retrieve term information.
438
439 // Information dropped here is expected to be recoverable by
440 // device probing after mastership change
441
442 return Collections.emptyList();
443 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800444 log.debug("timestamp for {} {}", deviceId, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700445
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700446 final Timestamped<List<PortDescription>> timestampedInput
447 = new Timestamped<>(portDescriptions, newTimestamp);
448 final List<DeviceEvent> events;
449 final Timestamped<List<PortDescription>> merged;
450
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800451 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
452 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700453 events = updatePortsInternal(providerId, deviceId, timestampedInput);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800454 final DeviceDescriptions descs = device.get(providerId);
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700455 List<PortDescription> mergedList =
456 FluentIterable.from(portDescriptions)
457 .transform(new Function<PortDescription, PortDescription>() {
458 @Override
459 public PortDescription apply(PortDescription input) {
460 // lookup merged port description
461 return descs.getPortDesc(input.portNumber()).value();
462 }
463 }).toList();
464 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
465 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700466 if (!events.isEmpty()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800467 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
Madan Jampani47c93732014-10-06 20:46:08 -0700468 providerId, deviceId);
469 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700470 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
Madan Jampani47c93732014-10-06 20:46:08 -0700471 } catch (IOException e) {
472 log.error("Failed to notify peers of a port update topology event or providerId: "
473 + providerId + " and deviceId: " + deviceId, e);
474 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700475 }
476 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700477 }
478
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700479 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
480 DeviceId deviceId,
481 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700482
483 Device device = devices.get(deviceId);
484 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
485
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700486 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700487 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
488
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700489 List<DeviceEvent> events = new ArrayList<>();
490 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700491
492 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
493 log.debug("Ignoring outdated events: {}", portDescriptions);
494 return null;
495 }
496
497 DeviceDescriptions descs = descsMap.get(providerId);
498 // every provider must provide DeviceDescription.
499 checkArgument(descs != null,
500 "Device description for Device ID %s from Provider %s was not found",
501 deviceId, providerId);
502
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700503 Map<PortNumber, Port> ports = getPortMap(deviceId);
504
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700505 final Timestamp newTimestamp = portDescriptions.timestamp();
506
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700507 // Add new ports
508 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700509 for (PortDescription portDescription : portDescriptions.value()) {
510 final PortNumber number = portDescription.portNumber();
511 processed.add(number);
512
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700513 final Port oldPort = ports.get(number);
514 final Port newPort;
515
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700516
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700517 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
518 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700519 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700520 // on new port or valid update
521 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700522 descs.putPortDesc(new Timestamped<>(portDescription,
523 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700524 newPort = composePort(device, number, descsMap);
525 } else {
526 // outdated event, ignored.
527 continue;
528 }
529
530 events.add(oldPort == null ?
531 createPort(device, newPort, ports) :
532 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700533 }
534
535 events.addAll(pruneOldPorts(device, ports, processed));
536 }
537 return FluentIterable.from(events).filter(notNull()).toList();
538 }
539
540 // Creates a new port based on the port description adds it to the map and
541 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700542 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700543 private DeviceEvent createPort(Device device, Port newPort,
544 Map<PortNumber, Port> ports) {
545 ports.put(newPort.number(), newPort);
546 return new DeviceEvent(PORT_ADDED, device, newPort);
547 }
548
549 // Checks if the specified port requires update and if so, it replaces the
550 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700551 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700552 private DeviceEvent updatePort(Device device, Port oldPort,
553 Port newPort,
554 Map<PortNumber, Port> ports) {
555 if (oldPort.isEnabled() != newPort.isEnabled() ||
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700556 oldPort.type() != newPort.type() ||
557 oldPort.portSpeed() != newPort.portSpeed() ||
558 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700559 ports.put(oldPort.number(), newPort);
560 return new DeviceEvent(PORT_UPDATED, device, newPort);
561 }
562 return null;
563 }
564
565 // Prunes the specified list of ports based on which ports are in the
566 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700567 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700568 private List<DeviceEvent> pruneOldPorts(Device device,
569 Map<PortNumber, Port> ports,
570 Set<PortNumber> processed) {
571 List<DeviceEvent> events = new ArrayList<>();
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700572 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700573 while (iterator.hasNext()) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700574 Entry<PortNumber, Port> e = iterator.next();
575 PortNumber portNumber = e.getKey();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700576 if (!processed.contains(portNumber)) {
Yuta HIGUCHI02649072014-10-15 23:28:20 -0700577 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700578 iterator.remove();
579 }
580 }
581 return events;
582 }
583
584 // Gets the map of ports for the specified device; if one does not already
585 // exist, it creates and registers a new one.
586 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
587 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700588 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
589 }
590
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700591 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700592 DeviceId deviceId) {
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700593 Map<ProviderId, DeviceDescriptions> r;
594 r = deviceDescs.get(deviceId);
595 if (r == null) {
596 r = new HashMap<ProviderId, DeviceDescriptions>();
597 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
598 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
599 if (concurrentlyAdded != null) {
600 r = concurrentlyAdded;
601 }
602 }
603 return r;
604 }
605
606 // Guarded by deviceDescs value (=Device lock)
607 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
608 Map<ProviderId, DeviceDescriptions> device,
609 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
610
611 synchronized (device) {
612 DeviceDescriptions r = device.get(providerId);
613 if (r == null) {
614 r = new DeviceDescriptions(deltaDesc);
615 device.put(providerId, r);
616 }
617 return r;
618 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700619 }
620
621 @Override
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700622 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
623 DeviceId deviceId,
624 PortDescription portDescription) {
625
Yuta HIGUCHIeb24e9d2014-10-26 19:34:20 -0700626 final Timestamp newTimestamp;
627 try {
628 newTimestamp = deviceClockService.getTimestamp(deviceId);
629 } catch (IllegalStateException e) {
630 log.info("Timestamp was not available for device {}", deviceId);
631 log.debug(" discarding {}", portDescription);
632 // Failed to generate timestamp. Ignoring.
633 // See updatePorts comment
634 return null;
635 }
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700636 final Timestamped<PortDescription> deltaDesc
637 = new Timestamped<>(portDescription, newTimestamp);
638 final DeviceEvent event;
639 final Timestamped<PortDescription> mergedDesc;
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800640 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
641 synchronized (device) {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700642 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
Yuta HIGUCHI04c6b3f2014-11-07 14:47:01 -0800643 mergedDesc = device.get(providerId)
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700644 .getPortDesc(portDescription.portNumber());
645 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700646 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700647 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
648 providerId, deviceId);
649 try {
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700650 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
Madan Jampani47c93732014-10-06 20:46:08 -0700651 } catch (IOException e) {
652 log.error("Failed to notify peers of a port status update topology event or providerId: "
653 + providerId + " and deviceId: " + deviceId, e);
654 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700655 }
656 return event;
657 }
658
659 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
660 Timestamped<PortDescription> deltaDesc) {
661
662 Device device = devices.get(deviceId);
663 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
664
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700665 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700666 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
667
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700668 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700669
670 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
671 log.debug("Ignoring outdated event: {}", deltaDesc);
672 return null;
673 }
674
675 DeviceDescriptions descs = descsMap.get(providerId);
676 // assuming all providers must to give DeviceDescription
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700677 verify(descs != null,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700678 "Device description for Device ID %s from Provider %s was not found",
679 deviceId, providerId);
680
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700681 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
682 final PortNumber number = deltaDesc.value().portNumber();
683 final Port oldPort = ports.get(number);
684 final Port newPort;
685
686 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
687 if (existingPortDesc == null ||
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700688 deltaDesc.isNewer(existingPortDesc)) {
689 // on new port or valid update
690 // update description
691 descs.putPortDesc(deltaDesc);
692 newPort = composePort(device, number, descsMap);
693 } else {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700694 // same or outdated event, ignored.
695 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700696 return null;
697 }
698
699 if (oldPort == null) {
700 return createPort(device, newPort, ports);
701 } else {
702 return updatePort(device, oldPort, newPort, ports);
703 }
704 }
705 }
706
707 @Override
708 public List<Port> getPorts(DeviceId deviceId) {
709 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
710 if (ports == null) {
711 return Collections.emptyList();
712 }
713 return ImmutableList.copyOf(ports.values());
714 }
715
716 @Override
717 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
718 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
719 return ports == null ? null : ports.get(portNumber);
720 }
721
722 @Override
723 public boolean isAvailable(DeviceId deviceId) {
724 return availableDevices.contains(deviceId);
725 }
726
727 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700728 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800729 final NodeId myId = clusterService.getLocalNode().id();
730 NodeId master = mastershipService.getMasterFor(deviceId);
731
732 // if there exist a master, forward
733 // if there is no master, try to become one and process
734
735 boolean relinquishAtEnd = false;
736 if (master == null) {
737 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
738 if (myRole != MastershipRole.NONE) {
739 relinquishAtEnd = true;
740 }
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800741 log.debug("Temporarily requesting role for {} to remove", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800742 mastershipService.requestRoleFor(deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800743 MastershipTerm term = termService.getMastershipTerm(deviceId);
744 if (myId.equals(term.master())) {
745 master = myId;
746 }
Yuta HIGUCHIe8252bb2014-10-22 09:41:01 -0700747 }
748
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800749 if (!myId.equals(master)) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800750 log.debug("{} has control of {}, forwarding remove request",
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800751 master, deviceId);
752
753 ClusterMessage message = new ClusterMessage(
754 myId,
755 DEVICE_REMOVE_REQ,
756 SERIALIZER.encode(deviceId));
757
758 try {
759 clusterCommunicator.unicast(message, master);
760 } catch (IOException e) {
761 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
762 }
763
764 // event will be triggered after master processes it.
765 return null;
766 }
767
768 // I have control..
769
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700770 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700771 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700772 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800773 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700774 deviceId);
775 try {
776 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
777 } catch (IOException e) {
778 log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
779 deviceId);
780 }
781 }
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800782 if (relinquishAtEnd) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800783 log.debug("Relinquishing temporary role acquired for {}", deviceId);
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -0800784 mastershipService.relinquishMastership(deviceId);
785 }
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700786 return event;
787 }
788
789 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
790 Timestamp timestamp) {
791
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700792 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700793 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700794 // accept removal request if given timestamp is newer than
795 // the latest Timestamp from Primary provider
796 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
797 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
798 if (timestamp.compareTo(lastTimestamp) <= 0) {
799 // outdated event ignore
800 return null;
801 }
802 removalRequest.put(deviceId, timestamp);
803
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700804 Device device = devices.remove(deviceId);
805 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700806 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
807 if (ports != null) {
808 ports.clear();
809 }
810 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700811 descs.clear();
812 return device == null ? null :
813 new DeviceEvent(DEVICE_REMOVED, device, null);
814 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700815 }
816
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700817 /**
818 * Checks if given timestamp is superseded by removal request
819 * with more recent timestamp.
820 *
821 * @param deviceId identifier of a device
822 * @param timestampToCheck timestamp of an event to check
823 * @return true if device is already removed
824 */
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700825 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
826 Timestamp removalTimestamp = removalRequest.get(deviceId);
827 if (removalTimestamp != null &&
828 removalTimestamp.compareTo(timestampToCheck) >= 0) {
829 // removalRequest is more recent
830 return true;
831 }
832 return false;
833 }
834
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700835 /**
836 * Returns a Device, merging description given from multiple Providers.
837 *
838 * @param deviceId device identifier
839 * @param providerDescs Collection of Descriptions from multiple providers
840 * @return Device instance
841 */
842 private Device composeDevice(DeviceId deviceId,
Yuta HIGUCHI47c40882014-10-10 18:44:37 -0700843 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700844
Thomas Vachuska444eda62014-10-28 13:09:42 -0700845 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700846
847 ProviderId primary = pickPrimaryPID(providerDescs);
848
849 DeviceDescriptions desc = providerDescs.get(primary);
850
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700851 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700852 Type type = base.type();
853 String manufacturer = base.manufacturer();
854 String hwVersion = base.hwVersion();
855 String swVersion = base.swVersion();
856 String serialNumber = base.serialNumber();
alshabib7911a052014-10-16 17:49:37 -0700857 ChassisId chassisId = base.chassisId();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700858 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
859 annotations = merge(annotations, base.annotations());
860
861 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
862 if (e.getKey().equals(primary)) {
863 continue;
864 }
865 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700866 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700867 // Currently assuming there will never be a key conflict between
868 // providers
869
870 // annotation merging. not so efficient, should revisit later
871 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
872 }
873
874 return new DefaultDevice(primary, deviceId , type, manufacturer,
alshabib7911a052014-10-16 17:49:37 -0700875 hwVersion, swVersion, serialNumber,
876 chassisId, annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700877 }
878
879 /**
880 * Returns a Port, merging description given from multiple Providers.
881 *
882 * @param device device the port is on
883 * @param number port number
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700884 * @param descsMap Collection of Descriptions from multiple providers
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700885 * @return Port instance
886 */
887 private Port composePort(Device device, PortNumber number,
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700888 Map<ProviderId, DeviceDescriptions> descsMap) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700889
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700890 ProviderId primary = pickPrimaryPID(descsMap);
891 DeviceDescriptions primDescs = descsMap.get(primary);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700892 // if no primary, assume not enabled
893 // TODO: revisit this default port enabled/disabled behavior
894 boolean isEnabled = false;
895 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
896
897 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
898 if (portDesc != null) {
899 isEnabled = portDesc.value().isEnabled();
900 annotations = merge(annotations, portDesc.value().annotations());
901 }
902
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -0700903 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700904 if (e.getKey().equals(primary)) {
905 continue;
906 }
907 // TODO: should keep track of Description timestamp
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700908 // and only merge conflicting keys when timestamp is newer.
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700909 // Currently assuming there will never be a key conflict between
910 // providers
911
912 // annotation merging. not so efficient, should revisit later
913 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
914 if (otherPortDesc != null) {
915 annotations = merge(annotations, otherPortDesc.value().annotations());
916 }
917 }
918
Thomas Vachuskad16ce182014-10-29 17:25:29 -0700919 return portDesc == null ?
920 new DefaultPort(device, number, false, annotations) :
921 new DefaultPort(device, number, isEnabled, portDesc.value().type(),
922 portDesc.value().portSpeed(), annotations);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700923 }
924
925 /**
926 * @return primary ProviderID, or randomly chosen one if none exists
927 */
928 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700929 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700930 ProviderId fallBackPrimary = null;
931 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
932 if (!e.getKey().isAncillary()) {
933 return e.getKey();
934 } else if (fallBackPrimary == null) {
935 // pick randomly as a fallback in case there is no primary
936 fallBackPrimary = e.getKey();
937 }
938 }
939 return fallBackPrimary;
940 }
941
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700942 private DeviceDescriptions getPrimaryDescriptions(
943 Map<ProviderId, DeviceDescriptions> providerDescs) {
944 ProviderId pid = pickPrimaryPID(providerDescs);
945 return providerDescs.get(pid);
946 }
947
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700948 // TODO: should we be throwing exception?
949 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
950 ClusterMessage message = new ClusterMessage(
951 clusterService.getLocalNode().id(),
952 subject,
953 SERIALIZER.encode(event));
954 clusterCommunicator.unicast(message, recipient);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700955 }
956
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700957 // TODO: should we be throwing exception?
958 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
959 ClusterMessage message = new ClusterMessage(
960 clusterService.getLocalNode().id(),
961 subject,
962 SERIALIZER.encode(event));
963 clusterCommunicator.broadcast(message);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700964 }
Madan Jampani47c93732014-10-06 20:46:08 -0700965
966 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700967 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700968 }
969
Madan Jampani25322532014-10-08 11:20:38 -0700970 private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700971 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
Madan Jampani25322532014-10-08 11:20:38 -0700972 }
973
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700974 private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700975 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
Madan Jampani3fc72ed2014-10-08 12:50:27 -0700976 }
977
Madan Jampani47c93732014-10-06 20:46:08 -0700978 private void notifyPeers(InternalPortEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700979 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700980 }
981
982 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -0700983 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
984 }
985
986 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
987 try {
988 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
989 } catch (IOException e) {
990 log.error("Failed to send" + event + " to " + recipient, e);
991 }
992 }
993
994 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
995 try {
996 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
997 } catch (IOException e) {
998 log.error("Failed to send" + event + " to " + recipient, e);
999 }
1000 }
1001
1002 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1003 try {
1004 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1005 } catch (IOException e) {
1006 log.error("Failed to send" + event + " to " + recipient, e);
1007 }
1008 }
1009
1010 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1011 try {
1012 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1013 } catch (IOException e) {
1014 log.error("Failed to send" + event + " to " + recipient, e);
1015 }
1016 }
1017
1018 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1019 try {
1020 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1021 } catch (IOException e) {
1022 log.error("Failed to send" + event + " to " + recipient, e);
1023 }
1024 }
1025
1026 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1027 final NodeId self = clusterService.getLocalNode().id();
1028
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001029 final int numDevices = deviceDescs.size();
1030 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1031 final int portsPerDevice = 8; // random factor to minimize reallocation
1032 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1033 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001034
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001035 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001036 provs : deviceDescs.entrySet()) {
1037
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001038 // for each Device...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001039 final DeviceId deviceId = provs.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001040 final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001041 synchronized (devDescs) {
1042
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001043 // send device offline timestamp
1044 Timestamp lOffline = this.offline.get(deviceId);
1045 if (lOffline != null) {
1046 adOffline.put(deviceId, lOffline);
1047 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001048
1049 for (Entry<ProviderId, DeviceDescriptions>
1050 prov : devDescs.entrySet()) {
1051
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001052 // for each Provider Descriptions...
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001053 final ProviderId provId = prov.getKey();
1054 final DeviceDescriptions descs = prov.getValue();
1055
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001056 adDevices.put(new DeviceFragmentId(deviceId, provId),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001057 descs.getDeviceDesc().timestamp());
1058
1059 for (Entry<PortNumber, Timestamped<PortDescription>>
1060 portDesc : descs.getPortDescs().entrySet()) {
1061
1062 final PortNumber number = portDesc.getKey();
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001063 adPorts.put(new PortFragmentId(deviceId, provId, number),
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001064 portDesc.getValue().timestamp());
1065 }
1066 }
1067 }
1068 }
1069
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001070 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001071 }
1072
1073 /**
1074 * Responds to anti-entropy advertisement message.
1075 * <P>
1076 * Notify sender about out-dated information using regular replication message.
1077 * Send back advertisement to sender if not in sync.
1078 *
1079 * @param advertisement to respond to
1080 */
1081 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1082
1083 final NodeId sender = advertisement.sender();
1084
1085 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1086 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1087 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1088
1089 // Fragments to request
1090 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1091 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1092
Yuta HIGUCHIbf71dff2014-10-14 16:02:33 -07001093 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001094 final DeviceId deviceId = de.getKey();
1095 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1096
1097 synchronized (lDevice) {
1098 // latestTimestamp across provider
1099 // Note: can be null initially
1100 Timestamp localLatest = offline.get(deviceId);
1101
1102 // handle device Ads
1103 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1104 final ProviderId provId = prov.getKey();
1105 final DeviceDescriptions lDeviceDescs = prov.getValue();
1106
1107 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1108
1109
1110 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1111 Timestamp advDevTimestamp = devAds.get(devFragId);
1112
1113 if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
1114 // remote does not have it or outdated, suggest
1115 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1116 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1117 // local is outdated, request
1118 reqDevices.add(devFragId);
1119 }
1120
1121 // handle port Ads
1122 for (Entry<PortNumber, Timestamped<PortDescription>>
1123 pe : lDeviceDescs.getPortDescs().entrySet()) {
1124
1125 final PortNumber num = pe.getKey();
1126 final Timestamped<PortDescription> lPort = pe.getValue();
1127
1128 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1129
1130 Timestamp advPortTimestamp = portAds.get(portFragId);
Yuta HIGUCHIec76bfe2014-10-09 20:17:07 -07001131 if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001132 // remote does not have it or outdated, suggest
1133 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1134 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1135 // local is outdated, request
1136 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1137 reqPorts.add(portFragId);
1138 }
1139
1140 // remove port Ad already processed
1141 portAds.remove(portFragId);
1142 } // end local port loop
1143
1144 // remove device Ad already processed
1145 devAds.remove(devFragId);
1146
1147 // find latest and update
1148 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1149 if (localLatest == null ||
1150 providerLatest.compareTo(localLatest) > 0) {
1151 localLatest = providerLatest;
1152 }
1153 } // end local provider loop
1154
1155 // checking if remote timestamp is more recent.
1156 Timestamp rOffline = offlineAds.get(deviceId);
1157 if (rOffline != null &&
1158 rOffline.compareTo(localLatest) > 0) {
1159 // remote offline timestamp suggests that the
1160 // device is off-line
1161 markOfflineInternal(deviceId, rOffline);
1162 }
1163
1164 Timestamp lOffline = offline.get(deviceId);
1165 if (lOffline != null && rOffline == null) {
1166 // locally offline, but remote is online, suggest offline
1167 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1168 }
1169
1170 // remove device offline Ad already processed
1171 offlineAds.remove(deviceId);
1172 } // end local device loop
1173 } // device lock
1174
1175 // If there is any Ads left, request them
1176 log.trace("Ads left {}, {}", devAds, portAds);
1177 reqDevices.addAll(devAds.keySet());
1178 reqPorts.addAll(portAds.keySet());
1179
1180 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1181 log.trace("Nothing to request to remote peer {}", sender);
1182 return;
1183 }
1184
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001185 log.debug("Need to sync {} {}", reqDevices, reqPorts);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001186
1187 // 2-way Anti-Entropy for now
1188 try {
1189 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1190 } catch (IOException e) {
1191 log.error("Failed to send response advertisement to " + sender, e);
1192 }
1193
1194// Sketch of 3-way Anti-Entropy
1195// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1196// ClusterMessage message = new ClusterMessage(
1197// clusterService.getLocalNode().id(),
1198// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1199// SERIALIZER.encode(request));
1200//
1201// try {
1202// clusterCommunicator.unicast(message, advertisement.sender());
1203// } catch (IOException e) {
1204// log.error("Failed to send advertisement reply to "
1205// + advertisement.sender(), e);
1206// }
Madan Jampani47c93732014-10-06 20:46:08 -07001207 }
1208
Madan Jampani255a58b2014-10-09 12:08:20 -07001209 private void notifyDelegateIfNotNull(DeviceEvent event) {
1210 if (event != null) {
1211 notifyDelegate(event);
1212 }
1213 }
1214
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001215 private final class SendAdvertisementTask implements Runnable {
1216
1217 @Override
1218 public void run() {
1219 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001220 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001221 return;
1222 }
1223
1224 try {
1225 final NodeId self = clusterService.getLocalNode().id();
1226 Set<ControllerNode> nodes = clusterService.getNodes();
1227
1228 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1229 .transform(toNodeId())
1230 .toList();
1231
1232 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001233 log.trace("No other peers in the cluster.");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001234 return;
1235 }
1236
1237 NodeId peer;
1238 do {
1239 int idx = RandomUtils.nextInt(0, nodeIds.size());
1240 peer = nodeIds.get(idx);
1241 } while (peer.equals(self));
1242
1243 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1244
1245 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001246 log.debug("Interrupted, quitting");
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001247 return;
1248 }
1249
1250 try {
1251 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1252 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -07001253 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001254 return;
1255 }
1256 } catch (Exception e) {
1257 // catch all Exception to avoid Scheduled task being suppressed.
1258 log.error("Exception thrown while sending advertisement", e);
1259 }
1260 }
1261 }
1262
Madan Jampani47c93732014-10-06 20:46:08 -07001263 private class InternalDeviceEventListener implements ClusterMessageHandler {
1264 @Override
1265 public void handle(ClusterMessage message) {
Madan Jampani25322532014-10-08 11:20:38 -07001266
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001267 log.debug("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001268 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani25322532014-10-08 11:20:38 -07001269
Madan Jampani47c93732014-10-06 20:46:08 -07001270 ProviderId providerId = event.providerId();
1271 DeviceId deviceId = event.deviceId();
1272 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
Madan Jampani25322532014-10-08 11:20:38 -07001273
Madan Jampani255a58b2014-10-09 12:08:20 -07001274 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001275 }
1276 }
1277
Madan Jampani25322532014-10-08 11:20:38 -07001278 private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
1279 @Override
1280 public void handle(ClusterMessage message) {
1281
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001282 log.debug("Received device offline event from peer: {}", message.sender());
Madan Jampani25322532014-10-08 11:20:38 -07001283 InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
1284
1285 DeviceId deviceId = event.deviceId();
1286 Timestamp timestamp = event.timestamp();
1287
Madan Jampani255a58b2014-10-09 12:08:20 -07001288 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
Madan Jampani25322532014-10-08 11:20:38 -07001289 }
1290 }
1291
Yuta HIGUCHI53afd5b2014-11-03 18:03:08 -08001292 private final class InternalRemoveRequestListener
1293 implements ClusterMessageHandler {
1294 @Override
1295 public void handle(ClusterMessage message) {
1296 log.debug("Received device remove request from peer: {}", message.sender());
1297 DeviceId did = SERIALIZER.decode(message.payload());
1298 removeDevice(did);
1299 }
1300 }
1301
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001302 private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
1303 @Override
1304 public void handle(ClusterMessage message) {
1305
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001306 log.debug("Received device removed event from peer: {}", message.sender());
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001307 InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
1308
1309 DeviceId deviceId = event.deviceId();
1310 Timestamp timestamp = event.timestamp();
1311
Madan Jampani255a58b2014-10-09 12:08:20 -07001312 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
Madan Jampani3fc72ed2014-10-08 12:50:27 -07001313 }
1314 }
1315
Madan Jampani47c93732014-10-06 20:46:08 -07001316 private class InternalPortEventListener implements ClusterMessageHandler {
1317 @Override
1318 public void handle(ClusterMessage message) {
1319
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001320 log.debug("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001321 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001322
1323 ProviderId providerId = event.providerId();
1324 DeviceId deviceId = event.deviceId();
1325 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1326
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001327 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001328 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001329 // Note: dropped information will be recovered by anti-entropy
1330 return;
1331 }
1332
Madan Jampani255a58b2014-10-09 12:08:20 -07001333 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
Madan Jampani47c93732014-10-06 20:46:08 -07001334 }
1335 }
1336
1337 private class InternalPortStatusEventListener implements ClusterMessageHandler {
1338 @Override
1339 public void handle(ClusterMessage message) {
1340
Yuta HIGUCHIcc8e96e2014-10-31 17:48:09 -07001341 log.debug("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -07001342 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -07001343
1344 ProviderId providerId = event.providerId();
1345 DeviceId deviceId = event.deviceId();
1346 Timestamped<PortDescription> portDescription = event.portDescription();
1347
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001348 if (getDevice(deviceId) == null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -08001349 log.debug("{} not found on this node yet, ignoring.", deviceId);
Yuta HIGUCHI20c0e972014-10-31 15:22:33 -07001350 // Note: dropped information will be recovered by anti-entropy
1351 return;
1352 }
1353
Madan Jampani255a58b2014-10-09 12:08:20 -07001354 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
Madan Jampani47c93732014-10-06 20:46:08 -07001355 }
1356 }
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001357
1358 private final class InternalDeviceAdvertisementListener
1359 implements ClusterMessageHandler {
1360
1361 @Override
1362 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -08001363 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI9ee60f62014-10-09 10:00:01 -07001364 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1365 handleAdvertisement(advertisement);
1366 }
1367 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001368}