blob: 637444c5eb58531dc7134b562d89e425321239cf [file] [log] [blame]
Madan Jampani70583972015-06-30 11:25:05 -07001package org.onosproject.store.host.impl;
2
3import static org.onosproject.net.DefaultAnnotations.merge;
4import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
5import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
6import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
7import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
8import static org.slf4j.LoggerFactory.getLogger;
9
10import java.util.Collection;
11import java.util.Collections;
12import java.util.Objects;
13import java.util.Set;
14import java.util.concurrent.ConcurrentHashMap;
15import java.util.function.Predicate;
16import java.util.stream.Collectors;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.MacAddress;
26import org.onlab.packet.VlanId;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.net.Annotations;
29import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.DefaultAnnotations;
31import org.onosproject.net.DefaultHost;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.Host;
34import org.onosproject.net.HostId;
35import org.onosproject.net.host.HostDescription;
36import org.onosproject.net.host.HostEvent;
37import org.onosproject.net.host.HostStore;
38import org.onosproject.net.host.HostStoreDelegate;
39import org.onosproject.net.host.PortAddresses;
40import org.onosproject.net.host.HostEvent.Type;
41import org.onosproject.net.provider.ProviderId;
42import org.onosproject.store.AbstractStore;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.EventuallyConsistentMap;
45import org.onosproject.store.service.EventuallyConsistentMapEvent;
46import org.onosproject.store.service.EventuallyConsistentMapListener;
47import org.onosproject.store.service.LogicalClockService;
48import org.onosproject.store.service.StorageService;
49import org.slf4j.Logger;
50
51import com.google.common.collect.HashMultimap;
52import com.google.common.collect.ImmutableSet;
53import com.google.common.collect.Multimap;
54import com.google.common.collect.Multimaps;
55import com.google.common.collect.SetMultimap;
56import com.google.common.collect.Sets;
57import static com.google.common.collect.Multimaps.newSetMultimap;
58import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
59import static com.google.common.collect.Sets.newConcurrentHashSet;
60
61/**
62 * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
63 */
64@Component(immediate = true)
65@Service
66public class ECHostStore
67 extends AbstractStore<HostEvent, HostStoreDelegate>
68 implements HostStore {
69
70 private final Logger log = getLogger(getClass());
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected StorageService storageService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected LogicalClockService clockService;
77
78 // Hosts tracked by their location
79 private final Multimap<ConnectPoint, Host> locations
80 = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
81 () -> newConcurrentHashSet()));
82 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
83 Multimaps.synchronizedSetMultimap(
84 HashMultimap.<ConnectPoint, PortAddresses>create());
85
86 private EventuallyConsistentMap<HostId, DefaultHost> hosts;
87
88 private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
89 new HostLocationTracker();
90
91 @Activate
92 public void activate() {
93 KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API);
95
96 hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
97 .withName("onos-hosts")
98 .withSerializer(hostSerializer)
99 .withTimestampProvider((k, v) -> clockService.getTimestamp())
100 .build();
101
102 hosts.addListener(hostLocationTracker);
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 hosts.removeListener(hostLocationTracker);
110 hosts.destroy();
111 locations.clear();
112 portAddresses.clear();
113
114 log.info("Stopped");
115 }
116
117 @Override
118 public HostEvent createOrUpdateHost(ProviderId providerId,
119 HostId hostId,
120 HostDescription hostDescription) {
121 DefaultHost currentHost = hosts.get(hostId);
122 if (currentHost == null) {
123 DefaultHost newhost = new DefaultHost(
124 providerId,
125 hostId,
126 hostDescription.hwAddress(),
127 hostDescription.vlan(),
128 hostDescription.location(),
129 ImmutableSet.copyOf(hostDescription.ipAddress()));
130 hosts.put(hostId, newhost);
131 return new HostEvent(HOST_ADDED, newhost);
132 }
133 return updateHost(providerId, hostId, hostDescription, currentHost);
134 }
135
136 @Override
137 public HostEvent removeHost(HostId hostId) {
138 Host host = hosts.remove(hostId);
139 return host != null ? new HostEvent(HOST_REMOVED, host) : null;
140 }
141
142 @Override
143 public int getHostCount() {
144 return hosts.size();
145 }
146
147 @Override
148 public Iterable<Host> getHosts() {
149 return ImmutableSet.copyOf(hosts.values());
150 }
151
152 @Override
153 public Host getHost(HostId hostId) {
154 return hosts.get(hostId);
155 }
156
157 @Override
158 public Set<Host> getHosts(VlanId vlanId) {
159 return filter(hosts.values(), host -> Objects.equals(host.vlan(), vlanId));
160 }
161
162 @Override
163 public Set<Host> getHosts(MacAddress mac) {
164 return filter(hosts.values(), host -> Objects.equals(host.mac(), mac));
165 }
166
167 @Override
168 public Set<Host> getHosts(IpAddress ip) {
169 return filter(hosts.values(), host -> host.ipAddresses().contains(ip));
170 }
171
172 @Override
173 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
174 return ImmutableSet.copyOf(locations.get(connectPoint));
175 }
176
177 @Override
178 public Set<Host> getConnectedHosts(DeviceId deviceId) {
179 return locations.entries()
180 .stream()
181 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
182 .map(entry -> entry.getValue())
183 .collect(Collectors.toSet());
184 }
185
186 @Override
187 public void updateAddressBindings(PortAddresses addresses) {
188 portAddresses.put(addresses.connectPoint(), addresses);
189 }
190
191 @Override
192 public void removeAddressBindings(PortAddresses addresses) {
193 portAddresses.remove(addresses.connectPoint(), addresses);
194 }
195
196 @Override
197 public void clearAddressBindings(ConnectPoint connectPoint) {
198 portAddresses.removeAll(connectPoint);
199 }
200
201 @Override
202 public Set<PortAddresses> getAddressBindings() {
203 return ImmutableSet.copyOf(portAddresses.values());
204 }
205
206 @Override
207 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
208 synchronized (portAddresses) {
209 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
210 return addresses == null ? Collections.emptySet() : ImmutableSet.copyOf(addresses);
211 }
212 }
213
214 private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
215 return collection.stream().filter(predicate).collect(Collectors.toSet());
216 }
217
218 // checks for type of update to host, sends appropriate event
219 private HostEvent updateHost(ProviderId providerId,
220 HostId hostId,
221 HostDescription descr,
222 DefaultHost currentHost) {
223
224 final boolean hostMoved = !currentHost.location().equals(descr.location());
225 if (hostMoved ||
226 !currentHost.ipAddresses().containsAll(descr.ipAddress()) ||
227 !descr.annotations().keys().isEmpty()) {
228
229 Set<IpAddress> addresses = Sets.newHashSet(currentHost.ipAddresses());
230 addresses.addAll(descr.ipAddress());
231 Annotations annotations = merge((DefaultAnnotations) currentHost.annotations(),
232 descr.annotations());
233
234 DefaultHost updatedHost = new DefaultHost(providerId, currentHost.id(),
235 currentHost.mac(), currentHost.vlan(),
236 descr.location(),
237 addresses,
238 annotations);
239
240 // TODO: We need a way to detect conflicting changes and abort update.
241 hosts.put(hostId, updatedHost);
242 locations.remove(currentHost.location(), currentHost);
243 locations.put(updatedHost.location(), updatedHost);
244
245 HostEvent.Type eventType = hostMoved ? Type.HOST_MOVED : Type.HOST_UPDATED;
246 return new HostEvent(eventType, updatedHost);
247 }
248 return null;
249 }
250
251 private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
252
253 @Override
254 public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
255 DefaultHost host = event.value();
256 if (event.type() == PUT) {
257 locations.put(host.location(), host);
258 } else if (event.type() == REMOVE) {
259 locations.remove(host.location(), host);
260 }
261 }
262 }
263}