blob: 11570f7127f103c9d95d2b2a1703db8a31f04ed4 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Jordi Ortizaa8de492016-12-01 00:21:36 +010025import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020026import org.onosproject.net.behaviour.MeterQuery;
27import org.onosproject.net.driver.DriverHandler;
28import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070029import org.onosproject.net.meter.Band;
30import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070031import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010032import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.Meter;
34import org.onosproject.net.meter.MeterEvent;
35import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010036import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030037import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010039import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070040import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070041import org.onosproject.net.meter.MeterOperation;
42import org.onosproject.net.meter.MeterState;
43import org.onosproject.net.meter.MeterStore;
44import org.onosproject.net.meter.MeterStoreDelegate;
45import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070046import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020047import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070048import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020049import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070050import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020051import org.onosproject.store.service.DistributedPrimitive;
52import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.service.MapEvent;
54import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070056import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070057import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070058import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070059import org.osgi.service.component.annotations.Activate;
60import org.osgi.service.component.annotations.Component;
61import org.osgi.service.component.annotations.Deactivate;
62import org.osgi.service.component.annotations.Reference;
63import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070064import org.slf4j.Logger;
65
66import java.util.Collection;
Gamze Abakaf57ef602019-03-11 06:52:48 +000067import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070068import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000069import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020070import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070071import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020072import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070073
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080074import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010075import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070076import static org.slf4j.LoggerFactory.getLogger;
77
78/**
79 * A distributed meter store implementation. Meters are stored consistently
80 * across the cluster.
81 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070083public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
84 implements MeterStore {
85
86 private Logger log = getLogger(getClass());
87
pierventre1b8afbc2020-07-13 14:07:05 +020088 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -070089 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +020090 private ConsistentMap<MeterKey, MeterData> meters;
91 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
pierventre44220052020-09-22 12:51:06 +020092 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -070093
pierventre1b8afbc2020-07-13 14:07:05 +020094 // Meters features related objects
95 private static final String METERFEATURESSTORE = "onos-meter-features-store";
96 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
97
98 // Meters id related objects
99 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
100 // Available meter identifiers
101 private DistributedSet<MeterKey> availableMeterIds;
102 // Atomic counter map for generation of new identifiers;
103 private static final String METERIDSTORE = "onos-meters-id-store";
104 private AtomicCounterMap<DeviceId> meterIdGenerators;
105
106 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800107 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
108 .register(KryoNamespaces.API)
109 .register(MeterKey.class)
110 .register(MeterData.class)
111 .register(DefaultMeter.class)
112 .register(DefaultBand.class)
113 .register(Band.Type.class)
114 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530115 .register(Meter.Unit.class)
116 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800117 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
118
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700120 private StorageService storageService;
121
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200123 protected DriverService driverService;
124
pierventre1b8afbc2020-07-13 14:07:05 +0200125 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700126 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700127 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700128
Pier Luigif094c612017-10-14 12:15:02 +0200129 /**
130 * Defines possible selection strategies to reuse meter ids.
131 */
132 enum ReuseStrategy {
133 /**
134 * Select randomly an available id.
135 */
136 RANDOM,
137 /**
138 * Select the first one.
139 */
140 FIRST_FIT
141 }
Pier Luigif094c612017-10-14 12:15:02 +0200142 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100143
alshabib7bb05012015-08-05 10:15:09 -0700144 @Activate
145 public void activate() {
pierventre1b8afbc2020-07-13 14:07:05 +0200146 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700147 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700148 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800149 .withSerializer(serializer).build();
alshabibeadfc8e2015-08-18 15:40:46 -0700150 meters.addListener(mapListener);
pierventre1b8afbc2020-07-13 14:07:05 +0200151 // Init meter features map (meaningful only for OpenFlow protocol)
Jordi Ortizaa8de492016-12-01 00:21:36 +0100152 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
153 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200154 .withSerializer(Serializer.using(KryoNamespaces.API,
155 MeterFeaturesKey.class,
156 MeterFeatures.class,
157 DefaultMeterFeatures.class,
158 Band.Type.class,
159 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300160 MeterFailReason.class,
161 MeterFeaturesFlag.class)).build();
pierventre44220052020-09-22 12:51:06 +0200162 metersMap = meters.asJavaMap();
Pier Luigif094c612017-10-14 12:15:02 +0200163 // Init the set of the available ids
164 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100165 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200166 .withSerializer(Serializer.using(KryoNamespaces.API,
167 MeterKey.class)).build(),
168 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
Pier Luigif094c612017-10-14 12:15:02 +0200169 // Init atomic map counters
170 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
171 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700172 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
alshabib7bb05012015-08-05 10:15:09 -0700173 log.info("Started");
174 }
175
176 @Deactivate
177 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700178 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700179 log.info("Stopped");
180 }
181
alshabib7bb05012015-08-05 10:15:09 -0700182 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700183 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200184 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700185 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700186 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200187 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700188 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200189 // Store the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200190 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700191 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700192 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700193 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200194 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
195 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900196 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700197 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700198 }
Pier Luigif094c612017-10-14 12:15:02 +0200199 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700200 return future;
alshabib7bb05012015-08-05 10:15:09 -0700201 }
202
203 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700204 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200205 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700206 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700207 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200208 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700209 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200210 // Create the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200211 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200212 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700213 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700214 try {
Pier Luigif094c612017-10-14 12:15:02 +0200215 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700216 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200217 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700218 future.complete(MeterStoreResult.success());
219 }
alshabibeadfc8e2015-08-18 15:40:46 -0700220 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200221 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
222 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900223 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700224 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700225 }
Pier Luigif094c612017-10-14 12:15:02 +0200226 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700227 return future;
alshabib7bb05012015-08-05 10:15:09 -0700228 }
229
230 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100231 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
pierventre1b8afbc2020-07-13 14:07:05 +0200232 // Store meter features, this is done once for each device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100233 MeterStoreResult result = MeterStoreResult.success();
234 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
235 try {
236 meterFeatures.putIfAbsent(key, meterfeatures);
237 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200238 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
239 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100240 result = MeterStoreResult.fail(TIMEOUT);
241 }
242 return result;
243 }
244
245 @Override
246 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
pierventre1b8afbc2020-07-13 14:07:05 +0200247 // Remove meter features - these ops are meaningful only for OpenFlow
Jordi Ortizaa8de492016-12-01 00:21:36 +0100248 MeterStoreResult result = MeterStoreResult.success();
249 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
250 try {
251 meterFeatures.remove(key);
252 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200253 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
254 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100255 result = MeterStoreResult.fail(TIMEOUT);
256 }
257 return result;
258 }
259
260 @Override
pierventre1b8afbc2020-07-13 14:07:05 +0200261 // TODO Should we remove it ? We are not using it
alshabibeadfc8e2015-08-18 15:40:46 -0700262 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
263 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700264 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
265 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700266
pierventre1b8afbc2020-07-13 14:07:05 +0200267 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700268 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700269 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700270 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
271 }
alshabibeadfc8e2015-08-18 15:40:46 -0700272 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200273 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
274 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900275 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700276 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700277 }
alshabibeadfc8e2015-08-18 15:40:46 -0700278 return future;
alshabib7bb05012015-08-05 10:15:09 -0700279 }
280
281 @Override
pierventre44220052020-09-22 12:51:06 +0200282 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200283 // Update meter if present (stats workflow)
alshabib70aaa1b2015-09-25 14:30:59 -0700284 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
pierventre44220052020-09-22 12:51:06 +0200285 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700286 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100287 MeterState meterState = m.state();
288 if (meterState == MeterState.PENDING_ADD) {
289 m.setState(meter.state());
290 }
alshabib7bb05012015-08-05 10:15:09 -0700291 m.setProcessedPackets(meter.packetsSeen());
292 m.setProcessedBytes(meter.bytesSeen());
293 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700294 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700295 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200296 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700297 });
pierventre44220052020-09-22 12:51:06 +0200298 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700299 }
300
301 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700302 public Meter getMeter(MeterKey key) {
303 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700304 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700305 }
306
307 @Override
308 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200309 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700310 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700311 }
312
313 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200314 public Collection<Meter> getAllMeters(DeviceId deviceId) {
315 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200316 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200317 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
318 MeterData::meter);
319 }
320
321 @Override
alshabib7bb05012015-08-05 10:15:09 -0700322 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200323 // Meter ops failed (got notification from the sb)
alshabib70aaa1b2015-09-25 14:30:59 -0700324 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
pierventre1b8afbc2020-07-13 14:07:05 +0200325 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700326 }
327
alshabib5eb79392015-08-19 18:09:55 -0700328 @Override
329 public void deleteMeterNow(Meter m) {
pierventre1b8afbc2020-07-13 14:07:05 +0200330 // Once we receive the ack from the sb
331 // create the key and remove definitely the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700332 MeterKey key = MeterKey.key(m.deviceId(), m.id());
pierventre1b8afbc2020-07-13 14:07:05 +0200333 try {
334 if (Versioned.valueOrNull(meters.remove(key)) != null) {
335 // Free the id
336 freeMeterId(m.deviceId(), m.id());
337 }
338 } catch (StorageException e) {
339 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
340 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100341 }
alshabib5eb79392015-08-19 18:09:55 -0700342 }
343
Jordi Ortizaa8de492016-12-01 00:21:36 +0100344 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000345 public void purgeMeter(DeviceId deviceId) {
pierventre1b8afbc2020-07-13 14:07:05 +0200346 // Purge api (typically used when the device is offline)
Gamze Abakaf57ef602019-03-11 06:52:48 +0000347 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
348 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
349 .map(Map.Entry::getValue)
350 .collect(Collectors.toList());
pierventre1b8afbc2020-07-13 14:07:05 +0200351 // Remove definitely the meter
Gamze Abakaf57ef602019-03-11 06:52:48 +0000352 metersPendingRemove.forEach(versionedMeterKey
353 -> deleteMeterNow(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000354 }
355
356 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100357 public long getMaxMeters(MeterFeaturesKey key) {
pierventre1b8afbc2020-07-13 14:07:05 +0200358 // Leverage the meter features to know the max id
Jordi Ortizaa8de492016-12-01 00:21:36 +0100359 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
360 return features == null ? 0L : features.maxMeter();
361 }
362
Pier Luigif094c612017-10-14 12:15:02 +0200363 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
364 private long queryMaxMeters(DeviceId device) {
365 // Get driver handler for this device
366 DriverHandler handler = driverService.createHandler(device);
367 // If creation failed or the device does not have this behavior
368 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
369 // We cannot know max meter
370 return 0L;
371 }
372 // Get the behavior
373 MeterQuery query = handler.behaviour(MeterQuery.class);
374 // Return as max meter the result of the query
375 return query.getMaxMeters();
376 }
377
378 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
379 boolean available) {
380 // According to available, make available or unavailable a meter key
381 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
382 availableMeterIds.remove(MeterKey.key(deviceId, id));
383 }
384
385 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
386 // If there are no available ids
387 if (availableIds.isEmpty()) {
388 // Just end the cycle
389 return null;
390 }
391 // If it is the first fit
392 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
393 return availableIds.iterator().next();
394 }
395 // If it is random, get the size
396 int size = availableIds.size();
397 // Return a random element
398 return Iterables.get(availableIds, RandomUtils.nextInt(size));
399 }
400
401 // Implements reuse strategy
402 private MeterId firstReusableMeterId(DeviceId deviceId) {
403 // Filter key related to device id, and reduce to meter ids
404 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
405 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
406 .map(MeterKey::meterId)
407 .collect(Collectors.toSet());
408 // Get next available id
409 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
410 // Iterate until there are items
411 while (meterId != null) {
412 // If we are able to reserve the id
413 if (updateMeterIdAvailability(deviceId, meterId, false)) {
414 // Just end
415 return meterId;
416 }
417 // Update the set
418 localAvailableMeterIds.remove(meterId);
419 // Try another time
420 meterId = getNextAvailableId(localAvailableMeterIds);
421 }
422 // No reusable ids
423 return null;
424 }
425
426 @Override
427 public MeterId allocateMeterId(DeviceId deviceId) {
428 // Init steps
429 MeterId meterId;
430 long id;
431 // Try to reuse meter id
432 meterId = firstReusableMeterId(deviceId);
433 // We found a reusable id, return
434 if (meterId != null) {
435 return meterId;
436 }
437 // If there was no reusable MeterId we have to generate a new value
438 // using maxMeters as upper limit.
439 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
440 // If the device does not give us MeterFeatures
441 if (maxMeters == 0L) {
442 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
443 maxMeters = queryMaxMeters(deviceId);
444 }
445 // If we don't know the max, cannot proceed
446 if (maxMeters == 0L) {
447 return null;
448 }
449 // Get a new value
450 id = meterIdGenerators.incrementAndGet(deviceId);
451 // Check with the max, and if the value is bigger, cannot proceed
452 if (id >= maxMeters) {
453 return null;
454 }
455 // Done, return the value
456 return MeterId.meterId(id);
457 }
458
459 @Override
460 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200461 // Avoid to free meter not allocated
462 if (meterIdGenerators.get(deviceId) < meterId.id()) {
463 return;
464 }
Pier Luigif094c612017-10-14 12:15:02 +0200465 // Update the availability
466 updateMeterIdAvailability(deviceId, meterId, true);
467 }
468
pierventre1b8afbc2020-07-13 14:07:05 +0200469 // Enabling the events distribution across the cluster
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700470 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700471 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700472 public void event(MapEvent<MeterKey, MeterData> event) {
473 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700474 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
475 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200476 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700477 switch (event.type()) {
478 case INSERT:
479 case UPDATE:
480 switch (data.meter().state()) {
481 case PENDING_ADD:
482 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200483 // Two cases. If there is a reason, the meter operation failed.
484 // Otherwise, we are ready to install/remove through the delegate.
485 if (data.reason().isEmpty()) {
486 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
487 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
488 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100489 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200490 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100491 return null;
492 });
alshabibe1248b62015-08-20 17:21:55 -0700493 }
494 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200495 case ADDED:
496 // Transition from pending to installed
497 if (data.meter().state() == MeterState.ADDED &&
498 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
499 futures.computeIfPresent(key, (k, v) -> {
500 v.complete(MeterStoreResult.success());
501 return null;
502 });
503 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
504 // Update stats case
505 } else if (data.meter().referenceCount() == 0) {
506 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
507 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700508 }
509 break;
510 default:
511 log.warn("Unknown meter state type {}", data.meter().state());
512 }
513 break;
514 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200515 // Meter removal case
516 futures.computeIfPresent(key, (k, v) -> {
517 v.complete(MeterStoreResult.success());
518 return null;
519 });
520 // Finally notify the delegate
521 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700522 break;
523 default:
524 log.warn("Unknown Map event type {}", event.type());
525 }
526
527 }
528 }
529
alshabib7bb05012015-08-05 10:15:09 -0700530}