blob: 1cc68a69e4c86aea21fc65f1f5df5e10985f3e82 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Charles Chan5851f772017-11-21 16:18:21 -080019import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070020import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070021import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070022import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070026import org.apache.felix.scr.annotations.Service;
Charles Chan5851f772017-11-21 16:18:21 -080027import org.onlab.util.KryoNamespace;
alshabib7bb05012015-08-05 10:15:09 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010031import org.onosproject.net.DeviceId;
alshabib58fe6dc2015-08-19 17:16:13 -070032import org.onosproject.net.meter.Band;
33import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070034import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010035import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070036import org.onosproject.net.meter.Meter;
37import org.onosproject.net.meter.MeterEvent;
38import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010039import org.onosproject.net.meter.MeterFeatures;
40import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
44import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070049import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070050import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070051import org.onosproject.store.service.MapEvent;
52import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070053import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070054import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070056import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070057import org.slf4j.Logger;
58
alshabibeadfc8e2015-08-18 15:40:46 -070059import java.util.Arrays;
Jordi Ortiz6c847762017-01-30 17:13:05 +010060import java.util.BitSet;
alshabib7bb05012015-08-05 10:15:09 -070061import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070062import java.util.Map;
63import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070064
Jordi Ortizaa8de492016-12-01 00:21:36 +010065import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * A distributed meter store implementation. Meters are stored consistently
70 * across the cluster.
71 */
alshabib58fe6dc2015-08-19 17:16:13 -070072@Component(immediate = true)
73@Service
alshabib7bb05012015-08-05 10:15:09 -070074public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
75 implements MeterStore {
76
77 private Logger log = getLogger(getClass());
78
79 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010080 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010081 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
alshabib7bb05012015-08-05 10:15:09 -070082
Charles Chan5851f772017-11-21 16:18:21 -080083 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
84 .register(KryoNamespaces.API)
85 .register(MeterKey.class)
86 .register(MeterData.class)
87 .register(DefaultMeter.class)
88 .register(DefaultBand.class)
89 .register(Band.Type.class)
90 .register(MeterState.class)
91 .register(Meter.Unit.class);
92
93 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
94
alshabib7bb05012015-08-05 10:15:09 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 private StorageService storageService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070099 private MastershipService mastershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 private ClusterService clusterService;
103
alshabib70aaa1b2015-09-25 14:30:59 -0700104 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700105 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700106
Jordi Ortizaa8de492016-12-01 00:21:36 +0100107 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
108
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700109 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700110
alshabib70aaa1b2015-09-25 14:30:59 -0700111 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700112 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700113
Jordi Ortiz6c847762017-01-30 17:13:05 +0100114 private ConsistentMap<DeviceId, BitSet> availableMeterIds;
115
alshabib7bb05012015-08-05 10:15:09 -0700116 @Activate
117 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700118 local = clusterService.getLocalNode().id();
119
alshabib70aaa1b2015-09-25 14:30:59 -0700120 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700121 .withName(METERSTORE)
Charles Chan5851f772017-11-21 16:18:21 -0800122 .withSerializer(serializer).build();
alshabib7bb05012015-08-05 10:15:09 -0700123
alshabibeadfc8e2015-08-18 15:40:46 -0700124 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700125
Jordi Ortizaa8de492016-12-01 00:21:36 +0100126 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
127 .withName(METERFEATURESSTORE)
128 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
129 MeterFeaturesKey.class,
130 MeterFeatures.class,
131 DefaultMeterFeatures.class,
132 Band.Type.class,
133 Meter.Unit.class,
134 MeterFailReason.class)).build();
135
Jordi Ortiz6c847762017-01-30 17:13:05 +0100136 availableMeterIds = storageService.<DeviceId, BitSet>consistentMapBuilder()
137 .withName(AVAILABLEMETERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700138 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100139
alshabib7bb05012015-08-05 10:15:09 -0700140 log.info("Started");
141 }
142
143 @Deactivate
144 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700145 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700146 log.info("Stopped");
147 }
148
Jordi Ortiz6c847762017-01-30 17:13:05 +0100149 private void updateMeterIdAvailability(DeviceId deviceId, MeterId id,
150 boolean available) {
151 availableMeterIds.compute(deviceId, (k, v) -> {
152 v = v == null ? new BitSet() : v;
153 v.set(id.id().intValue(), available);
154 return v;
155 });
156 }
157
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700158 @Override
Jordi Ortiz6c847762017-01-30 17:13:05 +0100159 public MeterId firstReusableMeterId(DeviceId deviceId) {
160 Versioned<BitSet> bitSetVersioned = availableMeterIds.get(deviceId);
161 if (bitSetVersioned == null) {
162 return null;
163 }
164 BitSet value = bitSetVersioned.value();
165 int nextSetBit = value.nextSetBit(1);
166 if (nextSetBit < 0) {
167 return null;
168 }
169 return MeterId.meterId(nextSetBit);
170 }
alshabib7bb05012015-08-05 10:15:09 -0700171
172 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700173 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
174 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700175 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Jordi Ortiz6c847762017-01-30 17:13:05 +0100176 updateMeterIdAvailability(meter.deviceId(), meter.id(), false);
alshabib70aaa1b2015-09-25 14:30:59 -0700177 futures.put(key, future);
alshabibeadfc8e2015-08-18 15:40:46 -0700178 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700179
alshabibeadfc8e2015-08-18 15:40:46 -0700180 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700181 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700182 } catch (StorageException e) {
Hwanwook Lee563de202018-01-02 18:03:50 +0900183 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700184 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700185 }
186
alshabibeadfc8e2015-08-18 15:40:46 -0700187 return future;
188
alshabib7bb05012015-08-05 10:15:09 -0700189 }
190
191 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700192 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
193 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700194 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
195 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700196
alshabibeadfc8e2015-08-18 15:40:46 -0700197 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700198
199 // update the state of the meter. It will be pruned by observing
200 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700201 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700202 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700203 future.complete(MeterStoreResult.success());
204 }
Jordi Ortiz6c847762017-01-30 17:13:05 +0100205 updateMeterIdAvailability(meter.deviceId(), meter.id(), true);
alshabibeadfc8e2015-08-18 15:40:46 -0700206 } catch (StorageException e) {
Hwanwook Lee563de202018-01-02 18:03:50 +0900207 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700208 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700209 }
210
alshabibeadfc8e2015-08-18 15:40:46 -0700211
212 return future;
alshabib7bb05012015-08-05 10:15:09 -0700213 }
214
215 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100216 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
217 MeterStoreResult result = MeterStoreResult.success();
218 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
219 try {
220 meterFeatures.putIfAbsent(key, meterfeatures);
221 } catch (StorageException e) {
222 result = MeterStoreResult.fail(TIMEOUT);
223 }
224 return result;
225 }
226
227 @Override
228 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
229 MeterStoreResult result = MeterStoreResult.success();
230 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
231 try {
232 meterFeatures.remove(key);
233 } catch (StorageException e) {
234 result = MeterStoreResult.fail(TIMEOUT);
235 }
236 return result;
237 }
238
239 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700240 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
241 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700242 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
243 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700244
alshabibeadfc8e2015-08-18 15:40:46 -0700245 MeterData data = new MeterData(meter, null, local);
246 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700247 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700248 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
249 }
alshabibeadfc8e2015-08-18 15:40:46 -0700250 } catch (StorageException e) {
Hwanwook Lee563de202018-01-02 18:03:50 +0900251 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700252 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700253 }
alshabibeadfc8e2015-08-18 15:40:46 -0700254 return future;
alshabib7bb05012015-08-05 10:15:09 -0700255 }
256
257 @Override
258 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700259 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
260 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700261 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700262 m.setState(meter.state());
263 m.setProcessedPackets(meter.packetsSeen());
264 m.setProcessedBytes(meter.bytesSeen());
265 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700266 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700267 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700268 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700269 });
270 }
271
272 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700273 public Meter getMeter(MeterKey key) {
274 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700275 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700276 }
277
278 @Override
279 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700280 return Collections2.transform(meters.asJavaMap().values(),
281 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700282 }
283
284 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200285 public Collection<Meter> getAllMeters(DeviceId deviceId) {
286 return Collections2.transform(
287 Collections2.filter(meters.asJavaMap().values(),
288 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
289 MeterData::meter);
290 }
291
292 @Override
alshabib7bb05012015-08-05 10:15:09 -0700293 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700294 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
295 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700296 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700297 }
298
alshabib5eb79392015-08-19 18:09:55 -0700299 @Override
300 public void deleteMeterNow(Meter m) {
alshabib70aaa1b2015-09-25 14:30:59 -0700301 MeterKey key = MeterKey.key(m.deviceId(), m.id());
302 futures.remove(key);
303 meters.remove(key);
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100304 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700305 }
306
Jordi Ortizaa8de492016-12-01 00:21:36 +0100307 @Override
308 public long getMaxMeters(MeterFeaturesKey key) {
309 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
310 return features == null ? 0L : features.maxMeter();
311 }
312
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700313 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700314 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700315 public void event(MapEvent<MeterKey, MeterData> event) {
316 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700317 MeterData data = event.value().value();
318 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
319 switch (event.type()) {
320 case INSERT:
321 case UPDATE:
322 switch (data.meter().state()) {
323 case PENDING_ADD:
324 case PENDING_REMOVE:
325 if (!data.reason().isPresent() && local.equals(master)) {
326 notifyDelegate(
327 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
328 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
329 data.meter()));
330 } else if (data.reason().isPresent() && local.equals(data.origin())) {
331 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
332 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700333 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700334 }
335 break;
336 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100337 if (local.equals(data.origin()) &&
338 (data.meter().state() == MeterState.PENDING_ADD
339 || data.meter().state() == MeterState.ADDED)) {
340 futures.computeIfPresent(key, (k, v) -> {
341 notifyDelegate(
342 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
343 return null;
344 });
alshabibe1248b62015-08-20 17:21:55 -0700345 }
346 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700347 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700348 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700349 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700350 }
351 break;
352 default:
353 log.warn("Unknown meter state type {}", data.meter().state());
354 }
355 break;
356 case REMOVE:
357 //Only happens at origin so we do not need to care.
358 break;
359 default:
360 log.warn("Unknown Map event type {}", event.type());
361 }
362
363 }
364 }
365
366
alshabib7bb05012015-08-05 10:15:09 -0700367}