blob: 33cb7adfb0a4292bb785ad11da9aa66ced33b55e [file] [log] [blame]
Charles Chanff79dd92018-06-01 16:33:48 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.host.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalNotification;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.packet.MacAddress;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.Host;
31import org.onosproject.net.host.HostProbe;
32import org.onosproject.net.host.HostProbeStore;
33import org.onosproject.net.host.HostProbingEvent;
34import org.onosproject.net.host.ProbeMode;
35import org.onosproject.net.host.HostProbingStoreDelegate;
36import org.onosproject.store.AbstractStore;
37import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.AtomicCounter;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.MapEvent;
41import org.onosproject.store.service.MapEventListener;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
44import org.onosproject.store.service.Versioned;
45import org.slf4j.Logger;
46
47import java.util.Map;
48import java.util.concurrent.ScheduledExecutorService;
49import java.util.concurrent.TimeUnit;
50
51import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.slf4j.LoggerFactory.getLogger;
54
55@Component(immediate = true)
56@Service
57public class DefaultHostProbeStore extends AbstractStore<HostProbingEvent, HostProbingStoreDelegate>
58 implements HostProbeStore {
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 protected StorageService storageService;
61
62 private final Logger log = getLogger(getClass());
63
64 // TODO make this configurable
65 private static final int PROBE_TIMEOUT_MS = 3000;
66
67 private AtomicCounter hostProbeIndex;
68 private Cache<MacAddress, HostProbe> probingHostsCache;
69 private ConsistentMap<MacAddress, HostProbe> probingHostsConsistentMap;
70 private Map<MacAddress, HostProbe> probingHosts;
71 private MapEventListener<MacAddress, HostProbe> probingHostListener = new ProbingHostListener();
72 private ScheduledExecutorService cacheCleaner;
73 private ScheduledExecutorService locationRemover;
74
75 @Activate
76 public void activate() {
77 KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
78 .register(KryoNamespaces.API)
79 .register(DefaultHostProbe.class)
80 .register(ProbeMode.class);
81 probingHostsConsistentMap = storageService.<MacAddress, HostProbe>consistentMapBuilder()
82 .withName("onos-hosts-pending")
83 .withRelaxedReadConsistency()
84 .withSerializer(Serializer.using(pendingHostSerializer.build()))
85 .build();
86 probingHostsConsistentMap.addListener(probingHostListener);
87 probingHosts = probingHostsConsistentMap.asJavaMap();
88
89 hostProbeIndex = storageService.atomicCounterBuilder()
90 .withName("onos-hosts-probe-index")
91 .build()
92 .asAtomicCounter();
93
94 probingHostsCache = CacheBuilder.newBuilder()
95 .expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
96 .removalListener((RemovalNotification<MacAddress, HostProbe> notification) -> {
97 MacAddress probeMac = notification.getKey();
98 switch (notification.getCause()) {
99 case EXPIRED:
100 case REPLACED:
101 probingHosts.computeIfPresent(probeMac, (k, v) -> {
102 v.decreaseRetry();
103 return v;
104 });
105 break;
106 case EXPLICIT:
107 break;
108 default:
109 log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
110 notification.getKey(), notification.getCause());
111 }
112 }).build();
113
114 cacheCleaner = newSingleThreadScheduledExecutor(
115 groupedThreads("onos/host/hostprobestore", "cache-cleaner", log));
116 cacheCleaner.scheduleAtFixedRate(probingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
117 locationRemover = newSingleThreadScheduledExecutor(
118 groupedThreads("onos/host/hostprobestore", "loc-remover", log));
119
120 log.info("Started");
121 }
122
123 @Deactivate
124 public void deactivate() {
125 cacheCleaner.shutdown();
126 locationRemover.shutdown();
127 probingHostsCache.cleanUp();
128
129 log.info("Stopped");
130 }
131
132 @Override
133 public MacAddress addProbingHost(Host host, ConnectPoint connectPoint, ProbeMode probeMode,
134 MacAddress probeMac, int retry) {
135 if (probeMac == null) {
136 probeMac = generateProbeMac();
137 }
138 DefaultHostProbe probingHost = new DefaultHostProbe(host, connectPoint, probeMode, probeMac, retry);
139 probingHostsCache.put(probeMac, probingHost);
140 probingHosts.put(probeMac, probingHost);
141 return probeMac;
142 }
143
144 @Override
145 public void removeProbingHost(MacAddress probeMac) {
146 probingHostsCache.invalidate(probeMac);
147 probingHosts.remove(probeMac);
148 }
149
150 private MacAddress generateProbeMac() {
151 // Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
152 long nextIndex = hostProbeIndex.incrementAndGet();
153 return MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
154 }
155
156 private class ProbingHostListener implements MapEventListener<MacAddress, HostProbe> {
157 @Override
158 public void event(MapEvent<MacAddress, HostProbe> event) {
159 HostProbe newValue = Versioned.valueOrNull(event.newValue());
160 HostProbe oldValue = Versioned.valueOrNull(event.oldValue());
161
162 HostProbingEvent hostProbingEvent;
163 switch (event.type()) {
164 case INSERT:
165 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_REQUESTED, newValue);
166 notifyDelegate(hostProbingEvent);
167 break;
168 case UPDATE:
169 // Fail VERIFY probe immediately. Only allow DISCOVER probe to retry.
170 if (newValue.retry() > 0) {
171 if (newValue.mode() == ProbeMode.DISCOVER) {
172 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_TIMEOUT,
173 newValue, oldValue);
174 notifyDelegate(hostProbingEvent);
175 } else {
176 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL,
177 newValue, oldValue);
178 notifyDelegate(hostProbingEvent);
179 }
180 } else {
181 // Remove from pendingHost and let the remove listener generates the event
182 locationRemover.execute(() -> probingHosts.remove(event.key()));
183 }
184 break;
185 case REMOVE:
186 if (oldValue.retry() > 0) {
187 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_COMPLETED, oldValue);
188 notifyDelegate(hostProbingEvent);
189 } else {
190 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL, oldValue);
191 notifyDelegate(hostProbingEvent);
192 }
193 break;
194 default:
195 log.warn("Unknown map event type: {}", event.type());
196 }
197 }
198 }
199}