blob: 30374c3036704157759c898bfd9ba503191dfe16 [file] [log] [blame]
Madan Jampani61056bc2014-09-27 09:07:26 -07001package org.onlab.onos.store.device.impl;
2
3import static com.google.common.base.Predicates.notNull;
Yuta HIGUCHI2e963892014-09-27 13:00:39 -07004import static com.google.common.base.Preconditions.checkState;
Madan Jampani61056bc2014-09-27 09:07:26 -07005
Madan Jampani61056bc2014-09-27 09:07:26 -07006import com.google.common.collect.FluentIterable;
7import com.google.common.collect.ImmutableSet;
8import com.google.common.collect.ImmutableSet.Builder;
9
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
16import org.onlab.onos.net.DefaultDevice;
17import org.onlab.onos.net.DefaultPort;
18import org.onlab.onos.net.Device;
19import org.onlab.onos.net.DeviceId;
20import org.onlab.onos.net.Port;
21import org.onlab.onos.net.PortNumber;
22import org.onlab.onos.net.device.DeviceDescription;
23import org.onlab.onos.net.device.DeviceEvent;
24import org.onlab.onos.net.device.DeviceStore;
25import org.onlab.onos.net.device.DeviceStoreDelegate;
26import org.onlab.onos.net.device.PortDescription;
27import org.onlab.onos.net.provider.ProviderId;
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070028import org.onlab.onos.store.AbstractStore;
29import org.onlab.onos.store.ClockService;
Madan Jampani61056bc2014-09-27 09:07:26 -070030import org.onlab.onos.store.Timestamp;
Madan Jampani61056bc2014-09-27 09:07:26 -070031import org.slf4j.Logger;
32
33import java.util.ArrayList;
34import java.util.Collections;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.Iterator;
38import java.util.List;
39import java.util.Map;
40import java.util.Objects;
41import java.util.Set;
42import java.util.concurrent.ConcurrentHashMap;
43
44import static com.google.common.base.Preconditions.checkArgument;
45import static org.onlab.onos.net.device.DeviceEvent.Type.*;
46import static org.slf4j.LoggerFactory.getLogger;
47
48/**
49 * Manages inventory of infrastructure devices using a protocol that takes into consideration
50 * the order in which device events occur.
51 */
52@Component(immediate = true)
53@Service
54public class OnosDistributedDeviceStore
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070055 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
Madan Jampani61056bc2014-09-27 09:07:26 -070056 implements DeviceStore {
57
58 private final Logger log = getLogger(getClass());
59
60 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
61
62 private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
63 private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070064
Madan Jampani61056bc2014-09-27 09:07:26 -070065 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClockService clockService;
67
Madan Jampani61056bc2014-09-27 09:07:26 -070068 @Activate
69 public void activate() {
Madan Jampani61056bc2014-09-27 09:07:26 -070070
71 devices = new ConcurrentHashMap<>();
72 devicePorts = new ConcurrentHashMap<>();
73
74 log.info("Started");
75 }
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070076
Madan Jampani61056bc2014-09-27 09:07:26 -070077 @Deactivate
78 public void deactivate() {
79 log.info("Stopped");
80 }
81
82 @Override
83 public int getDeviceCount() {
84 return devices.size();
85 }
86
87 @Override
88 public Iterable<Device> getDevices() {
89 // TODO builder v.s. copyOf. Guava semms to be using copyOf?
90 // FIXME: synchronize.
91 Builder<Device> builder = ImmutableSet.builder();
92 for (VersionedValue<? extends Device> device : devices.values()) {
93 builder.add(device.entity());
94 }
95 return builder.build();
96 }
97
98 @Override
99 public Device getDevice(DeviceId deviceId) {
100 return devices.get(deviceId).entity();
101 }
102
103 @Override
104 public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
105 DeviceDescription deviceDescription) {
106 Timestamp now = clockService.getTimestamp(deviceId);
107 VersionedValue<Device> device = devices.get(deviceId);
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700108
Madan Jampani61056bc2014-09-27 09:07:26 -0700109 if (device == null) {
110 return createDevice(providerId, deviceId, deviceDescription, now);
111 }
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700112
113 checkState(now.compareTo(device.timestamp()) > 0,
114 "Existing device has a timestamp in the future!");
Madan Jampani61056bc2014-09-27 09:07:26 -0700115
116 return updateDevice(providerId, device.entity(), deviceDescription, now);
117 }
118
119 // Creates the device and returns the appropriate event if necessary.
120 private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
121 DeviceDescription desc, Timestamp timestamp) {
122 DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
123 desc.manufacturer(),
124 desc.hwVersion(), desc.swVersion(),
125 desc.serialNumber());
126
127 devices.put(deviceId, new VersionedValue<Device>(device, true, timestamp));
128 // FIXME: broadcast a message telling peers of a device event.
129 return new DeviceEvent(DEVICE_ADDED, device, null);
130 }
131
132 // Updates the device and returns the appropriate event if necessary.
133 private DeviceEvent updateDevice(ProviderId providerId, Device device,
134 DeviceDescription desc, Timestamp timestamp) {
135 // We allow only certain attributes to trigger update
136 if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
137 !Objects.equals(device.swVersion(), desc.swVersion())) {
138
139 Device updated = new DefaultDevice(providerId, device.id(),
140 desc.type(),
141 desc.manufacturer(),
142 desc.hwVersion(),
143 desc.swVersion(),
144 desc.serialNumber());
145 devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
146 // FIXME: broadcast a message telling peers of a device event.
147 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
148 }
149
150 // Otherwise merely attempt to change availability
151 DefaultDevice updated = new DefaultDevice(providerId, device.id(),
152 desc.type(),
153 desc.manufacturer(),
154 desc.hwVersion(),
155 desc.swVersion(),
156 desc.serialNumber());
157
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700158 VersionedValue<Device> oldDevice = devices.put(device.id(),
159 new VersionedValue<Device>(updated, true, timestamp));
Madan Jampani61056bc2014-09-27 09:07:26 -0700160 if (!oldDevice.isUp()) {
161 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
162 } else {
163 return null;
164 }
165 }
166
167 @Override
168 public DeviceEvent markOffline(DeviceId deviceId) {
169 VersionedValue<Device> device = devices.get(deviceId);
170 boolean willRemove = device != null && device.isUp();
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700171 if (!willRemove) {
172 return null;
173 }
Madan Jampani61056bc2014-09-27 09:07:26 -0700174 Timestamp timestamp = clockService.getTimestamp(deviceId);
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700175 if (replaceIfLatest(device.entity(), false, timestamp)) {
Madan Jampani61056bc2014-09-27 09:07:26 -0700176 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
177 }
178 return null;
179 }
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700180
Madan Jampani61056bc2014-09-27 09:07:26 -0700181 // Replace existing value if its timestamp is older.
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700182 private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
Madan Jampani61056bc2014-09-27 09:07:26 -0700183 VersionedValue<Device> existingValue = devices.get(device.id());
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700184 if (timestamp.compareTo(existingValue.timestamp()) > 0) {
Madan Jampani61056bc2014-09-27 09:07:26 -0700185 devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
186 return true;
187 }
188 return false;
189 }
190
191 @Override
192 public List<DeviceEvent> updatePorts(DeviceId deviceId,
193 List<PortDescription> portDescriptions) {
194 List<DeviceEvent> events = new ArrayList<>();
195 synchronized (this) {
196 VersionedValue<Device> device = devices.get(deviceId);
197 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
198 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
199 Timestamp timestamp = clockService.getTimestamp(deviceId);
200
201 // Add new ports
202 Set<PortNumber> processed = new HashSet<>();
203 for (PortDescription portDescription : portDescriptions) {
204 VersionedValue<Port> port = ports.get(portDescription.portNumber());
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700205 if (port == null) {
206 events.add(createPort(device, portDescription, ports, timestamp));
207 }
208 checkState(timestamp.compareTo(port.timestamp()) > 0,
209 "Existing port state has a timestamp in the future!");
Madan Jampani61056bc2014-09-27 09:07:26 -0700210 events.add(updatePort(device, port, portDescription, ports, timestamp));
211 processed.add(portDescription.portNumber());
212 }
213
214 updatePortMap(deviceId, ports);
215
216 events.addAll(pruneOldPorts(device.entity(), ports, processed));
217 }
218 return FluentIterable.from(events).filter(notNull()).toList();
219 }
220
221 // Creates a new port based on the port description adds it to the map and
222 // Returns corresponding event.
223 //@GuardedBy("this")
224 private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
225 Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
226 Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
227 portDescription.isEnabled());
228 ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
229 updatePortMap(device.entity().id(), ports);
230 return new DeviceEvent(PORT_ADDED, device.entity(), port);
231 }
232
233 // Checks if the specified port requires update and if so, it replaces the
234 // existing entry in the map and returns corresponding event.
235 //@GuardedBy("this")
236 private DeviceEvent updatePort(VersionedValue<Device> device, VersionedValue<Port> port,
237 PortDescription portDescription,
238 Map<PortNumber, VersionedValue<Port>> ports,
239 Timestamp timestamp) {
240 if (port.entity().isEnabled() != portDescription.isEnabled()) {
241 VersionedValue<Port> updatedPort = new VersionedValue<Port>(
242 new DefaultPort(device.entity(), portDescription.portNumber(),
243 portDescription.isEnabled()),
244 portDescription.isEnabled(),
245 timestamp);
246 ports.put(port.entity().number(), updatedPort);
247 updatePortMap(device.entity().id(), ports);
248 return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity());
249 }
250 return null;
251 }
252
253 // Prunes the specified list of ports based on which ports are in the
254 // processed list and returns list of corresponding events.
255 //@GuardedBy("this")
256 private List<DeviceEvent> pruneOldPorts(Device device,
257 Map<PortNumber, VersionedValue<Port>> ports,
258 Set<PortNumber> processed) {
259 List<DeviceEvent> events = new ArrayList<>();
260 Iterator<PortNumber> iterator = ports.keySet().iterator();
261 while (iterator.hasNext()) {
262 PortNumber portNumber = iterator.next();
263 if (!processed.contains(portNumber)) {
264 events.add(new DeviceEvent(PORT_REMOVED, device,
265 ports.get(portNumber).entity()));
266 iterator.remove();
267 }
268 }
269 if (!events.isEmpty()) {
270 updatePortMap(device.id(), ports);
271 }
272 return events;
273 }
274
275 // Gets the map of ports for the specified device; if one does not already
276 // exist, it creates and registers a new one.
277 // WARN: returned value is a copy, changes made to the Map
278 // needs to be written back using updatePortMap
279 //@GuardedBy("this")
280 private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
281 Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
282 if (ports == null) {
283 ports = new HashMap<>();
284 // this probably is waste of time in most cases.
285 updatePortMap(deviceId, ports);
286 }
287 return ports;
288 }
289
290 //@GuardedBy("this")
291 private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
292 devicePorts.put(deviceId, ports);
293 }
294
295 @Override
296 public DeviceEvent updatePortStatus(DeviceId deviceId,
297 PortDescription portDescription) {
298 VersionedValue<Device> device = devices.get(deviceId);
299 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
300 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
301 VersionedValue<Port> port = ports.get(portDescription.portNumber());
302 Timestamp timestamp = clockService.getTimestamp(deviceId);
303 return updatePort(device, port, portDescription, ports, timestamp);
304 }
305
306 @Override
307 public List<Port> getPorts(DeviceId deviceId) {
308 Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
Yuta HIGUCHI2e963892014-09-27 13:00:39 -0700309 if (versionedPorts == null) {
310 return Collections.emptyList();
311 }
312 List<Port> ports = new ArrayList<>();
Madan Jampani61056bc2014-09-27 09:07:26 -0700313 for (VersionedValue<Port> port : versionedPorts.values()) {
314 ports.add(port.entity());
315 }
316 return ports;
317 }
318
319 @Override
320 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
321 Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
322 return ports == null ? null : ports.get(portNumber).entity();
323 }
324
325 @Override
326 public boolean isAvailable(DeviceId deviceId) {
327 return devices.get(deviceId).isUp();
328 }
329
330 @Override
331 public DeviceEvent removeDevice(DeviceId deviceId) {
332 VersionedValue<Device> previousDevice = devices.remove(deviceId);
333 return previousDevice == null ? null :
334 new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
335 }
336}