blob: 9aec726cd4eb11731d076cd9c5eac9da922a0709 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070020import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070021import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020022import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080023import org.onlab.util.KryoNamespace;
Jordi Ortizaa8de492016-12-01 00:21:36 +010024import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020025import org.onosproject.net.behaviour.MeterQuery;
26import org.onosproject.net.driver.DriverHandler;
27import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.onosproject.net.meter.Band;
29import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070030import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010031import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.Meter;
33import org.onosproject.net.meter.MeterEvent;
34import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010035import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030036import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010037import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010038import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070039import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070040import org.onosproject.net.meter.MeterOperation;
41import org.onosproject.net.meter.MeterState;
42import org.onosproject.net.meter.MeterStore;
43import org.onosproject.net.meter.MeterStoreDelegate;
44import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070045import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020046import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070047import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020048import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070049import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020050import org.onosproject.store.service.DistributedPrimitive;
51import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070052import org.onosproject.store.service.MapEvent;
53import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070054import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070055import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070056import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070057import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070058import org.osgi.service.component.annotations.Activate;
59import org.osgi.service.component.annotations.Component;
60import org.osgi.service.component.annotations.Deactivate;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070063import org.slf4j.Logger;
64
65import java.util.Collection;
Gamze Abakaf57ef602019-03-11 06:52:48 +000066import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070067import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000068import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020069import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070070import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020071import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070072
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080073import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010074import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070075import static org.slf4j.LoggerFactory.getLogger;
76
77/**
78 * A distributed meter store implementation. Meters are stored consistently
79 * across the cluster.
80 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070082public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
83 implements MeterStore {
84
85 private Logger log = getLogger(getClass());
86
pierventree97f4832020-07-13 14:07:05 +020087 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -070088 private static final String METERSTORE = "onos-meter-store";
pierventree97f4832020-07-13 14:07:05 +020089 private ConsistentMap<MeterKey, MeterData> meters;
90 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabib7bb05012015-08-05 10:15:09 -070091
pierventree97f4832020-07-13 14:07:05 +020092 // Meters features related objects
93 private static final String METERFEATURESSTORE = "onos-meter-features-store";
94 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
95
96 // Meters id related objects
97 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
98 // Available meter identifiers
99 private DistributedSet<MeterKey> availableMeterIds;
100 // Atomic counter map for generation of new identifiers;
101 private static final String METERIDSTORE = "onos-meters-id-store";
102 private AtomicCounterMap<DeviceId> meterIdGenerators;
103
104 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800105 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
106 .register(KryoNamespaces.API)
107 .register(MeterKey.class)
108 .register(MeterData.class)
109 .register(DefaultMeter.class)
110 .register(DefaultBand.class)
111 .register(Band.Type.class)
112 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530113 .register(Meter.Unit.class)
114 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800115 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
116
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700118 private StorageService storageService;
119
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200121 protected DriverService driverService;
122
pierventree97f4832020-07-13 14:07:05 +0200123 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700124 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700125 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700126
Pier Luigif094c612017-10-14 12:15:02 +0200127 /**
128 * Defines possible selection strategies to reuse meter ids.
129 */
130 enum ReuseStrategy {
131 /**
132 * Select randomly an available id.
133 */
134 RANDOM,
135 /**
136 * Select the first one.
137 */
138 FIRST_FIT
139 }
Pier Luigif094c612017-10-14 12:15:02 +0200140 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100141
alshabib7bb05012015-08-05 10:15:09 -0700142 @Activate
143 public void activate() {
pierventree97f4832020-07-13 14:07:05 +0200144 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700145 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700146 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800147 .withSerializer(serializer).build();
alshabibeadfc8e2015-08-18 15:40:46 -0700148 meters.addListener(mapListener);
pierventree97f4832020-07-13 14:07:05 +0200149 // Init meter features map (meaningful only for OpenFlow protocol)
Jordi Ortizaa8de492016-12-01 00:21:36 +0100150 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
151 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200152 .withSerializer(Serializer.using(KryoNamespaces.API,
153 MeterFeaturesKey.class,
154 MeterFeatures.class,
155 DefaultMeterFeatures.class,
156 Band.Type.class,
157 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300158 MeterFailReason.class,
159 MeterFeaturesFlag.class)).build();
Pier Luigif094c612017-10-14 12:15:02 +0200160 // Init the set of the available ids
161 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100162 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200163 .withSerializer(Serializer.using(KryoNamespaces.API,
164 MeterKey.class)).build(),
165 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
Pier Luigif094c612017-10-14 12:15:02 +0200166 // Init atomic map counters
167 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
168 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700169 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
alshabib7bb05012015-08-05 10:15:09 -0700170 log.info("Started");
171 }
172
173 @Deactivate
174 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700175 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700176 log.info("Stopped");
177 }
178
alshabib7bb05012015-08-05 10:15:09 -0700179 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700180 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200181 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700182 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700183 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200184 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700185 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200186 // Store the meter data
pierventree97f4832020-07-13 14:07:05 +0200187 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700188 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700189 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700190 } catch (StorageException e) {
pierventree97f4832020-07-13 14:07:05 +0200191 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
192 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900193 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700194 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700195 }
Pier Luigif094c612017-10-14 12:15:02 +0200196 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700197 return future;
alshabib7bb05012015-08-05 10:15:09 -0700198 }
199
200 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700201 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200202 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700203 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700204 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200205 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700206 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200207 // Create the meter data
pierventree97f4832020-07-13 14:07:05 +0200208 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200209 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700210 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700211 try {
Pier Luigif094c612017-10-14 12:15:02 +0200212 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700213 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200214 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700215 future.complete(MeterStoreResult.success());
216 }
alshabibeadfc8e2015-08-18 15:40:46 -0700217 } catch (StorageException e) {
pierventree97f4832020-07-13 14:07:05 +0200218 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
219 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900220 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700221 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700222 }
Pier Luigif094c612017-10-14 12:15:02 +0200223 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700224 return future;
alshabib7bb05012015-08-05 10:15:09 -0700225 }
226
227 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100228 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
pierventree97f4832020-07-13 14:07:05 +0200229 // Store meter features, this is done once for each device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100230 MeterStoreResult result = MeterStoreResult.success();
231 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
232 try {
233 meterFeatures.putIfAbsent(key, meterfeatures);
234 } catch (StorageException e) {
pierventree97f4832020-07-13 14:07:05 +0200235 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
236 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100237 result = MeterStoreResult.fail(TIMEOUT);
238 }
239 return result;
240 }
241
242 @Override
243 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
pierventree97f4832020-07-13 14:07:05 +0200244 // Remove meter features - these ops are meaningful only for OpenFlow
Jordi Ortizaa8de492016-12-01 00:21:36 +0100245 MeterStoreResult result = MeterStoreResult.success();
246 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
247 try {
248 meterFeatures.remove(key);
249 } catch (StorageException e) {
pierventree97f4832020-07-13 14:07:05 +0200250 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
251 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100252 result = MeterStoreResult.fail(TIMEOUT);
253 }
254 return result;
255 }
256
257 @Override
pierventree97f4832020-07-13 14:07:05 +0200258 // TODO Should we remove it ? We are not using it
alshabibeadfc8e2015-08-18 15:40:46 -0700259 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
260 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700261 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
262 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700263
pierventree97f4832020-07-13 14:07:05 +0200264 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700265 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700266 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700267 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
268 }
alshabibeadfc8e2015-08-18 15:40:46 -0700269 } catch (StorageException e) {
pierventree97f4832020-07-13 14:07:05 +0200270 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
271 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900272 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700273 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700274 }
alshabibeadfc8e2015-08-18 15:40:46 -0700275 return future;
alshabib7bb05012015-08-05 10:15:09 -0700276 }
277
278 @Override
279 public void updateMeterState(Meter meter) {
pierventree97f4832020-07-13 14:07:05 +0200280 // Update meter if present (stats workflow)
alshabib70aaa1b2015-09-25 14:30:59 -0700281 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
282 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700283 DefaultMeter m = (DefaultMeter) v.meter();
piere6c47642020-01-08 08:57:46 +0100284 MeterState meterState = m.state();
285 if (meterState == MeterState.PENDING_ADD) {
286 m.setState(meter.state());
287 }
alshabib7bb05012015-08-05 10:15:09 -0700288 m.setProcessedPackets(meter.packetsSeen());
289 m.setProcessedBytes(meter.bytesSeen());
290 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700291 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700292 m.setReferenceCount(meter.referenceCount());
pierventree97f4832020-07-13 14:07:05 +0200293 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700294 });
295 }
296
297 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700298 public Meter getMeter(MeterKey key) {
299 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700300 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700301 }
302
303 @Override
304 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700305 return Collections2.transform(meters.asJavaMap().values(),
306 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700307 }
308
309 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200310 public Collection<Meter> getAllMeters(DeviceId deviceId) {
311 return Collections2.transform(
312 Collections2.filter(meters.asJavaMap().values(),
313 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
314 MeterData::meter);
315 }
316
317 @Override
alshabib7bb05012015-08-05 10:15:09 -0700318 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventree97f4832020-07-13 14:07:05 +0200319 // Meter ops failed (got notification from the sb)
alshabib70aaa1b2015-09-25 14:30:59 -0700320 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
pierventree97f4832020-07-13 14:07:05 +0200321 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700322 }
323
alshabib5eb79392015-08-19 18:09:55 -0700324 @Override
325 public void deleteMeterNow(Meter m) {
pierventree97f4832020-07-13 14:07:05 +0200326 // Once we receive the ack from the sb
327 // create the key and remove definitely the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700328 MeterKey key = MeterKey.key(m.deviceId(), m.id());
pierventree97f4832020-07-13 14:07:05 +0200329 try {
330 if (Versioned.valueOrNull(meters.remove(key)) != null) {
331 // Free the id
332 freeMeterId(m.deviceId(), m.id());
333 }
334 } catch (StorageException e) {
335 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
336 e.getMessage(), e);
piere6c47642020-01-08 08:57:46 +0100337 }
alshabib5eb79392015-08-19 18:09:55 -0700338 }
339
Jordi Ortizaa8de492016-12-01 00:21:36 +0100340 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000341 public void purgeMeter(DeviceId deviceId) {
pierventree97f4832020-07-13 14:07:05 +0200342 // Purge api (typically used when the device is offline)
Gamze Abakaf57ef602019-03-11 06:52:48 +0000343 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
344 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
345 .map(Map.Entry::getValue)
346 .collect(Collectors.toList());
pierventree97f4832020-07-13 14:07:05 +0200347 // Remove definitely the meter
Gamze Abakaf57ef602019-03-11 06:52:48 +0000348 metersPendingRemove.forEach(versionedMeterKey
349 -> deleteMeterNow(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000350 }
351
352 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100353 public long getMaxMeters(MeterFeaturesKey key) {
pierventree97f4832020-07-13 14:07:05 +0200354 // Leverage the meter features to know the max id
Jordi Ortizaa8de492016-12-01 00:21:36 +0100355 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
356 return features == null ? 0L : features.maxMeter();
357 }
358
Pier Luigif094c612017-10-14 12:15:02 +0200359 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
360 private long queryMaxMeters(DeviceId device) {
361 // Get driver handler for this device
362 DriverHandler handler = driverService.createHandler(device);
363 // If creation failed or the device does not have this behavior
364 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
365 // We cannot know max meter
366 return 0L;
367 }
368 // Get the behavior
369 MeterQuery query = handler.behaviour(MeterQuery.class);
370 // Return as max meter the result of the query
371 return query.getMaxMeters();
372 }
373
374 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
375 boolean available) {
376 // According to available, make available or unavailable a meter key
377 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
378 availableMeterIds.remove(MeterKey.key(deviceId, id));
379 }
380
381 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
382 // If there are no available ids
383 if (availableIds.isEmpty()) {
384 // Just end the cycle
385 return null;
386 }
387 // If it is the first fit
388 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
389 return availableIds.iterator().next();
390 }
391 // If it is random, get the size
392 int size = availableIds.size();
393 // Return a random element
394 return Iterables.get(availableIds, RandomUtils.nextInt(size));
395 }
396
397 // Implements reuse strategy
398 private MeterId firstReusableMeterId(DeviceId deviceId) {
399 // Filter key related to device id, and reduce to meter ids
400 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
401 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
402 .map(MeterKey::meterId)
403 .collect(Collectors.toSet());
404 // Get next available id
405 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
406 // Iterate until there are items
407 while (meterId != null) {
408 // If we are able to reserve the id
409 if (updateMeterIdAvailability(deviceId, meterId, false)) {
410 // Just end
411 return meterId;
412 }
413 // Update the set
414 localAvailableMeterIds.remove(meterId);
415 // Try another time
416 meterId = getNextAvailableId(localAvailableMeterIds);
417 }
418 // No reusable ids
419 return null;
420 }
421
422 @Override
423 public MeterId allocateMeterId(DeviceId deviceId) {
424 // Init steps
425 MeterId meterId;
426 long id;
427 // Try to reuse meter id
428 meterId = firstReusableMeterId(deviceId);
429 // We found a reusable id, return
430 if (meterId != null) {
431 return meterId;
432 }
433 // If there was no reusable MeterId we have to generate a new value
434 // using maxMeters as upper limit.
435 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
436 // If the device does not give us MeterFeatures
437 if (maxMeters == 0L) {
438 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
439 maxMeters = queryMaxMeters(deviceId);
440 }
441 // If we don't know the max, cannot proceed
442 if (maxMeters == 0L) {
443 return null;
444 }
445 // Get a new value
446 id = meterIdGenerators.incrementAndGet(deviceId);
447 // Check with the max, and if the value is bigger, cannot proceed
448 if (id >= maxMeters) {
449 return null;
450 }
451 // Done, return the value
452 return MeterId.meterId(id);
453 }
454
455 @Override
456 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200457 // Avoid to free meter not allocated
458 if (meterIdGenerators.get(deviceId) < meterId.id()) {
459 return;
460 }
Pier Luigif094c612017-10-14 12:15:02 +0200461 // Update the availability
462 updateMeterIdAvailability(deviceId, meterId, true);
463 }
464
pierventree97f4832020-07-13 14:07:05 +0200465 // Enabling the events distribution across the cluster
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700466 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700467 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700468 public void event(MapEvent<MeterKey, MeterData> event) {
469 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700470 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
471 MeterData data = value.value();
pierventree97f4832020-07-13 14:07:05 +0200472 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700473 switch (event.type()) {
474 case INSERT:
475 case UPDATE:
476 switch (data.meter().state()) {
477 case PENDING_ADD:
478 case PENDING_REMOVE:
pierventree97f4832020-07-13 14:07:05 +0200479 // Two cases. If there is a reason, the meter operation failed.
480 // Otherwise, we are ready to install/remove through the delegate.
481 if (data.reason().isEmpty()) {
482 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
483 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
484 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100485 futures.computeIfPresent(key, (k, v) -> {
pierventree97f4832020-07-13 14:07:05 +0200486 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100487 return null;
488 });
alshabibe1248b62015-08-20 17:21:55 -0700489 }
490 break;
pierventree97f4832020-07-13 14:07:05 +0200491 case ADDED:
492 // Transition from pending to installed
493 if (data.meter().state() == MeterState.ADDED &&
494 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
495 futures.computeIfPresent(key, (k, v) -> {
496 v.complete(MeterStoreResult.success());
497 return null;
498 });
499 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
500 // Update stats case
501 } else if (data.meter().referenceCount() == 0) {
502 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
503 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700504 }
505 break;
506 default:
507 log.warn("Unknown meter state type {}", data.meter().state());
508 }
509 break;
510 case REMOVE:
pierventree97f4832020-07-13 14:07:05 +0200511 // Meter removal case
512 futures.computeIfPresent(key, (k, v) -> {
513 v.complete(MeterStoreResult.success());
514 return null;
515 });
516 // Finally notify the delegate
517 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700518 break;
519 default:
520 log.warn("Unknown Map event type {}", event.type());
521 }
522
523 }
524 }
525
alshabib7bb05012015-08-05 10:15:09 -0700526}