blob: 4aac3a324f9da1ddb9d727ff8ec18f39d622ad20 [file] [log] [blame]
Charles Chanff79dd92018-06-01 16:33:48 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.host.impl;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalNotification;
Charles Chanff79dd92018-06-01 16:33:48 -070021import org.onlab.packet.MacAddress;
22import org.onlab.util.KryoNamespace;
23import org.onosproject.net.ConnectPoint;
24import org.onosproject.net.Host;
25import org.onosproject.net.host.HostProbe;
26import org.onosproject.net.host.HostProbeStore;
27import org.onosproject.net.host.HostProbingEvent;
Charles Chanff79dd92018-06-01 16:33:48 -070028import org.onosproject.net.host.HostProbingStoreDelegate;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.onosproject.net.host.ProbeMode;
Charles Chanff79dd92018-06-01 16:33:48 -070030import org.onosproject.store.AbstractStore;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.AtomicCounter;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070039import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
Charles Chanff79dd92018-06-01 16:33:48 -070044import org.slf4j.Logger;
45
46import java.util.Map;
47import java.util.concurrent.ScheduledExecutorService;
48import java.util.concurrent.TimeUnit;
49
50import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
53
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054@Component(immediate = true, service = HostProbeStore.class)
Charles Chanff79dd92018-06-01 16:33:48 -070055public class DefaultHostProbeStore extends AbstractStore<HostProbingEvent, HostProbingStoreDelegate>
56 implements HostProbeStore {
Ray Milkeyd84f89b2018-08-17 14:54:17 -070057 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chanff79dd92018-06-01 16:33:48 -070058 protected StorageService storageService;
59
60 private final Logger log = getLogger(getClass());
61
62 // TODO make this configurable
63 private static final int PROBE_TIMEOUT_MS = 3000;
64
65 private AtomicCounter hostProbeIndex;
66 private Cache<MacAddress, HostProbe> probingHostsCache;
67 private ConsistentMap<MacAddress, HostProbe> probingHostsConsistentMap;
68 private Map<MacAddress, HostProbe> probingHosts;
69 private MapEventListener<MacAddress, HostProbe> probingHostListener = new ProbingHostListener();
70 private ScheduledExecutorService cacheCleaner;
71 private ScheduledExecutorService locationRemover;
72
73 @Activate
74 public void activate() {
75 KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
76 .register(KryoNamespaces.API)
77 .register(DefaultHostProbe.class)
78 .register(ProbeMode.class);
79 probingHostsConsistentMap = storageService.<MacAddress, HostProbe>consistentMapBuilder()
80 .withName("onos-hosts-pending")
81 .withRelaxedReadConsistency()
82 .withSerializer(Serializer.using(pendingHostSerializer.build()))
83 .build();
84 probingHostsConsistentMap.addListener(probingHostListener);
85 probingHosts = probingHostsConsistentMap.asJavaMap();
86
87 hostProbeIndex = storageService.atomicCounterBuilder()
88 .withName("onos-hosts-probe-index")
89 .build()
90 .asAtomicCounter();
91
92 probingHostsCache = CacheBuilder.newBuilder()
93 .expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
94 .removalListener((RemovalNotification<MacAddress, HostProbe> notification) -> {
95 MacAddress probeMac = notification.getKey();
96 switch (notification.getCause()) {
97 case EXPIRED:
98 case REPLACED:
99 probingHosts.computeIfPresent(probeMac, (k, v) -> {
100 v.decreaseRetry();
101 return v;
102 });
103 break;
104 case EXPLICIT:
105 break;
106 default:
107 log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
108 notification.getKey(), notification.getCause());
109 }
110 }).build();
111
112 cacheCleaner = newSingleThreadScheduledExecutor(
113 groupedThreads("onos/host/hostprobestore", "cache-cleaner", log));
114 cacheCleaner.scheduleAtFixedRate(probingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
115 locationRemover = newSingleThreadScheduledExecutor(
116 groupedThreads("onos/host/hostprobestore", "loc-remover", log));
117
118 log.info("Started");
119 }
120
121 @Deactivate
122 public void deactivate() {
123 cacheCleaner.shutdown();
124 locationRemover.shutdown();
125 probingHostsCache.cleanUp();
126
127 log.info("Stopped");
128 }
129
130 @Override
131 public MacAddress addProbingHost(Host host, ConnectPoint connectPoint, ProbeMode probeMode,
132 MacAddress probeMac, int retry) {
133 if (probeMac == null) {
134 probeMac = generateProbeMac();
135 }
136 DefaultHostProbe probingHost = new DefaultHostProbe(host, connectPoint, probeMode, probeMac, retry);
137 probingHostsCache.put(probeMac, probingHost);
138 probingHosts.put(probeMac, probingHost);
139 return probeMac;
140 }
141
142 @Override
143 public void removeProbingHost(MacAddress probeMac) {
144 probingHostsCache.invalidate(probeMac);
145 probingHosts.remove(probeMac);
146 }
147
148 private MacAddress generateProbeMac() {
149 // Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
150 long nextIndex = hostProbeIndex.incrementAndGet();
151 return MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
152 }
153
154 private class ProbingHostListener implements MapEventListener<MacAddress, HostProbe> {
155 @Override
156 public void event(MapEvent<MacAddress, HostProbe> event) {
157 HostProbe newValue = Versioned.valueOrNull(event.newValue());
158 HostProbe oldValue = Versioned.valueOrNull(event.oldValue());
159
160 HostProbingEvent hostProbingEvent;
161 switch (event.type()) {
162 case INSERT:
163 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_REQUESTED, newValue);
164 notifyDelegate(hostProbingEvent);
165 break;
166 case UPDATE:
167 // Fail VERIFY probe immediately. Only allow DISCOVER probe to retry.
168 if (newValue.retry() > 0) {
169 if (newValue.mode() == ProbeMode.DISCOVER) {
170 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_TIMEOUT,
171 newValue, oldValue);
172 notifyDelegate(hostProbingEvent);
173 } else {
174 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL,
175 newValue, oldValue);
176 notifyDelegate(hostProbingEvent);
177 }
178 } else {
179 // Remove from pendingHost and let the remove listener generates the event
180 locationRemover.execute(() -> probingHosts.remove(event.key()));
181 }
182 break;
183 case REMOVE:
184 if (oldValue.retry() > 0) {
185 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_COMPLETED, oldValue);
186 notifyDelegate(hostProbingEvent);
187 } else {
188 hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL, oldValue);
189 notifyDelegate(hostProbingEvent);
190 }
191 break;
192 default:
193 log.warn("Unknown map event type: {}", event.type());
194 }
195 }
196 }
197}