blob: 39bc7709429aa2b8f11dad6ebc911fb0edef47da [file] [log] [blame]
Madan Jampaniab994042014-10-13 15:34:04 -07001package org.onlab.onos.store.host.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import com.google.common.collect.Sets;
7
8import org.apache.felix.scr.annotations.Activate;
9import org.apache.felix.scr.annotations.Component;
10import org.apache.felix.scr.annotations.Deactivate;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
13import org.apache.felix.scr.annotations.Service;
14import org.onlab.onos.cluster.ClusterService;
15import org.onlab.onos.net.Annotations;
16import org.onlab.onos.net.ConnectPoint;
17import org.onlab.onos.net.DefaultHost;
18import org.onlab.onos.net.DeviceId;
19import org.onlab.onos.net.Host;
20import org.onlab.onos.net.HostId;
21import org.onlab.onos.net.HostLocation;
22import org.onlab.onos.net.host.HostClockService;
23import org.onlab.onos.net.host.HostDescription;
24import org.onlab.onos.net.host.HostEvent;
25import org.onlab.onos.net.host.HostStore;
26import org.onlab.onos.net.host.HostStoreDelegate;
27import org.onlab.onos.net.host.PortAddresses;
28import org.onlab.onos.net.provider.ProviderId;
29import org.onlab.onos.store.AbstractStore;
30import org.onlab.onos.store.Timestamp;
31import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070032import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070033import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070034import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampaniab994042014-10-13 15:34:04 -070035import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070036import org.onlab.onos.store.serializers.DistributedStoreSerializers;
37import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampaniab994042014-10-13 15:34:04 -070038import org.onlab.packet.IpPrefix;
39import org.onlab.packet.MacAddress;
40import org.onlab.packet.VlanId;
Madan Jampani312a2982014-10-14 21:07:16 -070041import org.onlab.util.KryoPool;
Madan Jampaniab994042014-10-13 15:34:04 -070042import org.slf4j.Logger;
43
Madan Jampani312a2982014-10-14 21:07:16 -070044import java.io.IOException;
Madan Jampaniab994042014-10-13 15:34:04 -070045import java.util.HashSet;
46import java.util.Map;
47import java.util.Set;
48import java.util.concurrent.ConcurrentHashMap;
49
50import static org.onlab.onos.net.host.HostEvent.Type.*;
51import static org.slf4j.LoggerFactory.getLogger;
52
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070053//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070054/**
55 * Manages inventory of end-station hosts in distributed data store
56 * that uses optimistic replication and gossip based techniques.
57 */
58@Component(immediate = true)
59@Service
60public class GossipHostStore
61 extends AbstractStore<HostEvent, HostStoreDelegate>
62 implements HostStore {
63
64 private final Logger log = getLogger(getClass());
65
66 // Host inventory
67 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
68
69 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
70
71 // Hosts tracked by their location
72 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
73
74 private final Map<ConnectPoint, PortAddresses> portAddresses =
75 new ConcurrentHashMap<>();
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected HostClockService hostClockService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterCommunicationService clusterCommunicator;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
Madan Jampani312a2982014-10-14 21:07:16 -070086 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
87 @Override
88 protected void setupKryoPool() {
89 serializerPool = KryoPool.newBuilder()
90 .register(DistributedStoreSerializers.COMMON)
91 .register(InternalHostRemovedEvent.class)
92 .build()
93 .populate(1);
94 }
95 };
96
Madan Jampaniab994042014-10-13 15:34:04 -070097 @Activate
98 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -070099 clusterCommunicator.addSubscriber(
100 GossipHostStoreMessageSubjects.HOST_UPDATED, new InternalHostEventListener());
101 clusterCommunicator.addSubscriber(
102 GossipHostStoreMessageSubjects.HOST_REMOVED, new InternalHostRemovedEventListener());
103
Madan Jampaniab994042014-10-13 15:34:04 -0700104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 log.info("Stopped");
110 }
111
112 @Override
113 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
114 HostDescription hostDescription) {
115 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700116 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
117 if (event != null) {
118 log.info("Notifying peers of a host topology event for providerId: "
119 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
120 try {
121 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
122 } catch (IOException e) {
123 log.error("Failed to notify peers of a host topology event for providerId: "
124 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
125 }
126 }
127 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700128 }
129
130 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
131 HostDescription hostDescription, Timestamp timestamp) {
132 StoredHost host = hosts.get(hostId);
133 if (host == null) {
134 return createHost(providerId, hostId, hostDescription, timestamp);
135 }
136 return updateHost(providerId, host, hostDescription, timestamp);
137 }
138
139 // creates a new host and sends HOST_ADDED
140 private HostEvent createHost(ProviderId providerId, HostId hostId,
141 HostDescription descr, Timestamp timestamp) {
142 synchronized (this) {
143 // If this host was previously removed, first ensure
144 // this new request is "newer"
145 if (removedHosts.containsKey(hostId)) {
146 if (removedHosts.get(hostId).isNewer(timestamp)) {
147 return null;
148 } else {
149 removedHosts.remove(hostId);
150 }
151 }
152 StoredHost newhost = new StoredHost(providerId, hostId,
153 descr.hwAddress(),
154 descr.vlan(),
155 new Timestamped<>(descr.location(), timestamp),
156 ImmutableSet.of(descr.ipAddress()));
157 hosts.put(hostId, newhost);
158 locations.put(descr.location(), newhost);
159 return new HostEvent(HOST_ADDED, newhost);
160 }
161 }
162
163 // checks for type of update to host, sends appropriate event
164 private HostEvent updateHost(ProviderId providerId, StoredHost host,
165 HostDescription descr, Timestamp timestamp) {
166 HostEvent event;
167 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
168 host.setLocation(new Timestamped<>(descr.location(), timestamp));
169 return new HostEvent(HOST_MOVED, host);
170 }
171
172 if (host.ipAddresses().contains(descr.ipAddress())) {
173 return null;
174 }
175
176 Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
177 addresses.add(descr.ipAddress());
178 StoredHost updated = new StoredHost(providerId, host.id(),
179 host.mac(), host.vlan(),
180 host.location, addresses);
181 event = new HostEvent(HOST_UPDATED, updated);
182 synchronized (this) {
183 hosts.put(host.id(), updated);
184 locations.remove(host.location(), host);
185 locations.put(updated.location(), updated);
186 }
187 return event;
188 }
189
190 @Override
191 public HostEvent removeHost(HostId hostId) {
192 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700193 HostEvent event = removeHostInternal(hostId, timestamp);
194 if (event != null) {
195 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
196 try {
197 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
198 } catch (IOException e) {
199 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
200 }
201 }
202 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700203 }
204
205 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
206 synchronized (this) {
207 Host host = hosts.remove(hostId);
208 if (host != null) {
209 locations.remove((host.location()), host);
210 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
211 return new HostEvent(HOST_REMOVED, host);
212 }
213 return null;
214 }
215 }
216
217 @Override
218 public int getHostCount() {
219 return hosts.size();
220 }
221
222 @Override
223 public Iterable<Host> getHosts() {
224 return ImmutableSet.<Host>copyOf(hosts.values());
225 }
226
227 @Override
228 public Host getHost(HostId hostId) {
229 return hosts.get(hostId);
230 }
231
232 @Override
233 public Set<Host> getHosts(VlanId vlanId) {
234 Set<Host> vlanset = new HashSet<>();
235 for (Host h : hosts.values()) {
236 if (h.vlan().equals(vlanId)) {
237 vlanset.add(h);
238 }
239 }
240 return vlanset;
241 }
242
243 @Override
244 public Set<Host> getHosts(MacAddress mac) {
245 Set<Host> macset = new HashSet<>();
246 for (Host h : hosts.values()) {
247 if (h.mac().equals(mac)) {
248 macset.add(h);
249 }
250 }
251 return macset;
252 }
253
254 @Override
255 public Set<Host> getHosts(IpPrefix ip) {
256 Set<Host> ipset = new HashSet<>();
257 for (Host h : hosts.values()) {
258 if (h.ipAddresses().contains(ip)) {
259 ipset.add(h);
260 }
261 }
262 return ipset;
263 }
264
265 @Override
266 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
267 return ImmutableSet.copyOf(locations.get(connectPoint));
268 }
269
270 @Override
271 public Set<Host> getConnectedHosts(DeviceId deviceId) {
272 Set<Host> hostset = new HashSet<>();
273 for (ConnectPoint p : locations.keySet()) {
274 if (p.deviceId().equals(deviceId)) {
275 hostset.addAll(locations.get(p));
276 }
277 }
278 return hostset;
279 }
280
281 @Override
282 public void updateAddressBindings(PortAddresses addresses) {
283 synchronized (portAddresses) {
284 PortAddresses existing = portAddresses.get(addresses.connectPoint());
285 if (existing == null) {
286 portAddresses.put(addresses.connectPoint(), addresses);
287 } else {
288 Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
289 .immutableCopy();
290
291 MacAddress newMac = (addresses.mac() == null) ? existing.mac()
292 : addresses.mac();
293
294 PortAddresses newAddresses =
295 new PortAddresses(addresses.connectPoint(), union, newMac);
296
297 portAddresses.put(newAddresses.connectPoint(), newAddresses);
298 }
299 }
300 }
301
302 @Override
303 public void removeAddressBindings(PortAddresses addresses) {
304 synchronized (portAddresses) {
305 PortAddresses existing = portAddresses.get(addresses.connectPoint());
306 if (existing != null) {
307 Set<IpPrefix> difference =
308 Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
309
310 // If they removed the existing mac, set the new mac to null.
311 // Otherwise, keep the existing mac.
312 MacAddress newMac = existing.mac();
313 if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
314 newMac = null;
315 }
316
317 PortAddresses newAddresses =
318 new PortAddresses(addresses.connectPoint(), difference, newMac);
319
320 portAddresses.put(newAddresses.connectPoint(), newAddresses);
321 }
322 }
323 }
324
325 @Override
326 public void clearAddressBindings(ConnectPoint connectPoint) {
327 synchronized (portAddresses) {
328 portAddresses.remove(connectPoint);
329 }
330 }
331
332 @Override
333 public Set<PortAddresses> getAddressBindings() {
334 synchronized (portAddresses) {
335 return new HashSet<>(portAddresses.values());
336 }
337 }
338
339 @Override
340 public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
341 PortAddresses addresses;
342
343 synchronized (portAddresses) {
344 addresses = portAddresses.get(connectPoint);
345 }
346
347 if (addresses == null) {
348 addresses = new PortAddresses(connectPoint, null, null);
349 }
350
351 return addresses;
352 }
353
354 // Auxiliary extension to allow location to mutate.
355 private class StoredHost extends DefaultHost {
356 private Timestamped<HostLocation> location;
357
358 /**
359 * Creates an end-station host using the supplied information.
360 *
361 * @param providerId provider identity
362 * @param id host identifier
363 * @param mac host MAC address
364 * @param vlan host VLAN identifier
365 * @param location host location
366 * @param ips host IP addresses
367 * @param annotations optional key/value annotations
368 */
369 public StoredHost(ProviderId providerId, HostId id,
370 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
371 Set<IpPrefix> ips, Annotations... annotations) {
372 super(providerId, id, mac, vlan, location.value(), ips, annotations);
373 this.location = location;
374 }
375
376 void setLocation(Timestamped<HostLocation> location) {
377 this.location = location;
378 }
379
380 @Override
381 public HostLocation location() {
382 return location.value();
383 }
384 }
Madan Jampani312a2982014-10-14 21:07:16 -0700385
386 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
387 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
388 }
389
390 private void notifyPeers(InternalHostEvent event) throws IOException {
391 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
392 }
393
394 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
395 ClusterMessage message = new ClusterMessage(
396 clusterService.getLocalNode().id(),
397 subject,
398 SERIALIZER.encode(event));
399 clusterCommunicator.broadcast(message);
400 }
Madan Jampani255618f2014-10-14 23:27:57 -0700401
402 private void notifyDelegateIfNotNull(HostEvent event) {
403 if (event != null) {
404 notifyDelegate(event);
405 }
406 }
407
408 private class InternalHostEventListener implements ClusterMessageHandler {
409 @Override
410 public void handle(ClusterMessage message) {
411
412 log.info("Received host update event from peer: {}", message.sender());
413 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
414
415 ProviderId providerId = event.providerId();
416 HostId hostId = event.hostId();
417 HostDescription hostDescription = event.hostDescription();
418 Timestamp timestamp = event.timestamp();
419
420 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
421 }
422 }
423
424 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
425 @Override
426 public void handle(ClusterMessage message) {
427
428 log.info("Received host removed event from peer: {}", message.sender());
429 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
430
431 HostId hostId = event.hostId();
432 Timestamp timestamp = event.timestamp();
433
434 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
435 }
436 }
Madan Jampaniab994042014-10-13 15:34:04 -0700437}