blob: 99bbac1c0375ddd2dd501bc606f409a6d6a96448 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070020import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070021import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020022import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080023import org.onlab.util.KryoNamespace;
alshabib7bb05012015-08-05 10:15:09 -070024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070026import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010027import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020028import org.onosproject.net.behaviour.MeterQuery;
29import org.onosproject.net.driver.DriverHandler;
30import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070031import org.onosproject.net.meter.Band;
32import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010034import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070035import org.onosproject.net.meter.Meter;
36import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010040import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
44import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020049import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070050import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020051import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.service.DistributedPrimitive;
54import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070055import org.onosproject.store.service.MapEvent;
56import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070057import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070058import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070059import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070060import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070066import org.slf4j.Logger;
67
68import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070069import java.util.Map;
Pier Luigif094c612017-10-14 12:15:02 +020070import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070071import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020072import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070073
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080074import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010075import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070076import static org.slf4j.LoggerFactory.getLogger;
77
78/**
79 * A distributed meter store implementation. Meters are stored consistently
80 * across the cluster.
81 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070083public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
84 implements MeterStore {
85
86 private Logger log = getLogger(getClass());
87
88 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010089 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010090 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020091 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070092
Charles Chan593acf92017-11-22 13:55:41 -080093 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(MeterKey.class)
96 .register(MeterData.class)
97 .register(DefaultMeter.class)
98 .register(DefaultBand.class)
99 .register(Band.Type.class)
100 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530101 .register(Meter.Unit.class)
102 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800103
104 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
105
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700107 private StorageService storageService;
108
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700110 private MastershipService mastershipService;
111
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700113 private ClusterService clusterService;
114
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200116 protected DriverService driverService;
117
alshabib70aaa1b2015-09-25 14:30:59 -0700118 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700119 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700120
Jordi Ortizaa8de492016-12-01 00:21:36 +0100121 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
122
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700123 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700124
alshabib70aaa1b2015-09-25 14:30:59 -0700125 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700126 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700127
Pier Luigif094c612017-10-14 12:15:02 +0200128 // Available meter identifiers
129 private DistributedSet<MeterKey> availableMeterIds;
130
131 // Atomic counter map for generation of new identifiers;
132 private AtomicCounterMap<DeviceId> meterIdGenerators;
133
134 /**
135 * Defines possible selection strategies to reuse meter ids.
136 */
137 enum ReuseStrategy {
138 /**
139 * Select randomly an available id.
140 */
141 RANDOM,
142 /**
143 * Select the first one.
144 */
145 FIRST_FIT
146 }
147
148 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100149
alshabib7bb05012015-08-05 10:15:09 -0700150 @Activate
151 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700152 local = clusterService.getLocalNode().id();
153
alshabib70aaa1b2015-09-25 14:30:59 -0700154 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700155 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800156 .withSerializer(serializer).build();
alshabib7bb05012015-08-05 10:15:09 -0700157
alshabibeadfc8e2015-08-18 15:40:46 -0700158 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700159
Jordi Ortizaa8de492016-12-01 00:21:36 +0100160 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
161 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200162 .withSerializer(Serializer.using(KryoNamespaces.API,
163 MeterFeaturesKey.class,
164 MeterFeatures.class,
165 DefaultMeterFeatures.class,
166 Band.Type.class,
167 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300168 MeterFailReason.class,
169 MeterFeaturesFlag.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100170
Pier Luigif094c612017-10-14 12:15:02 +0200171 // Init the set of the available ids
172 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100173 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200174 .withSerializer(Serializer.using(KryoNamespaces.API,
175 MeterKey.class)).build(),
176 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
177
178 // Init atomic map counters
179 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
180 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700181 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100182
alshabib7bb05012015-08-05 10:15:09 -0700183 log.info("Started");
184 }
185
186 @Deactivate
187 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700188 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700189 log.info("Stopped");
190 }
191
alshabib7bb05012015-08-05 10:15:09 -0700192 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700193 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200194 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700195 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700196 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200197 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700198 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200199 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700200 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700201 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700202 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700203 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900204 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700205 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700206 }
Pier Luigif094c612017-10-14 12:15:02 +0200207 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700208 return future;
alshabib7bb05012015-08-05 10:15:09 -0700209 }
210
211 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700212 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200213 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700214 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700215 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200216 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700217 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200218 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700219 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200220 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700221 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700222 try {
Pier Luigif094c612017-10-14 12:15:02 +0200223 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700224 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200225 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700226 future.complete(MeterStoreResult.success());
227 }
alshabibeadfc8e2015-08-18 15:40:46 -0700228 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900229 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700230 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700231 }
Pier Luigif094c612017-10-14 12:15:02 +0200232 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700233 return future;
alshabib7bb05012015-08-05 10:15:09 -0700234 }
235
236 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100237 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
238 MeterStoreResult result = MeterStoreResult.success();
239 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
240 try {
241 meterFeatures.putIfAbsent(key, meterfeatures);
242 } catch (StorageException e) {
243 result = MeterStoreResult.fail(TIMEOUT);
244 }
245 return result;
246 }
247
248 @Override
249 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
250 MeterStoreResult result = MeterStoreResult.success();
251 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
252 try {
253 meterFeatures.remove(key);
254 } catch (StorageException e) {
255 result = MeterStoreResult.fail(TIMEOUT);
256 }
257 return result;
258 }
259
260 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700261 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
262 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700263 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
264 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700265
alshabibeadfc8e2015-08-18 15:40:46 -0700266 MeterData data = new MeterData(meter, null, local);
267 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700268 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700269 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
270 }
alshabibeadfc8e2015-08-18 15:40:46 -0700271 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900272 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700273 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700274 }
alshabibeadfc8e2015-08-18 15:40:46 -0700275 return future;
alshabib7bb05012015-08-05 10:15:09 -0700276 }
277
278 @Override
279 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700280 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
281 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700282 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700283 m.setState(meter.state());
284 m.setProcessedPackets(meter.packetsSeen());
285 m.setProcessedBytes(meter.bytesSeen());
286 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700287 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700288 m.setReferenceCount(meter.referenceCount());
Gamze Abakadadae722018-09-12 10:55:35 +0000289 if (meter.referenceCount() == 0) {
290 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, m));
291 }
alshabibeadfc8e2015-08-18 15:40:46 -0700292 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700293 });
294 }
295
296 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700297 public Meter getMeter(MeterKey key) {
298 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700299 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700300 }
301
302 @Override
303 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700304 return Collections2.transform(meters.asJavaMap().values(),
305 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700306 }
307
308 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200309 public Collection<Meter> getAllMeters(DeviceId deviceId) {
310 return Collections2.transform(
311 Collections2.filter(meters.asJavaMap().values(),
312 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
313 MeterData::meter);
314 }
315
316 @Override
alshabib7bb05012015-08-05 10:15:09 -0700317 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700318 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
319 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700320 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700321 }
322
alshabib5eb79392015-08-19 18:09:55 -0700323 @Override
324 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200325 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700326 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200327 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700328 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200329 // Remove the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700330 meters.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200331 // Free the id
332 freeMeterId(m.deviceId(), m.id());
333 // Finally notify the delegate
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100334 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700335 }
336
Jordi Ortizaa8de492016-12-01 00:21:36 +0100337 @Override
338 public long getMaxMeters(MeterFeaturesKey key) {
339 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
340 return features == null ? 0L : features.maxMeter();
341 }
342
Pier Luigif094c612017-10-14 12:15:02 +0200343 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
344 private long queryMaxMeters(DeviceId device) {
345 // Get driver handler for this device
346 DriverHandler handler = driverService.createHandler(device);
347 // If creation failed or the device does not have this behavior
348 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
349 // We cannot know max meter
350 return 0L;
351 }
352 // Get the behavior
353 MeterQuery query = handler.behaviour(MeterQuery.class);
354 // Return as max meter the result of the query
355 return query.getMaxMeters();
356 }
357
358 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
359 boolean available) {
360 // According to available, make available or unavailable a meter key
361 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
362 availableMeterIds.remove(MeterKey.key(deviceId, id));
363 }
364
365 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
366 // If there are no available ids
367 if (availableIds.isEmpty()) {
368 // Just end the cycle
369 return null;
370 }
371 // If it is the first fit
372 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
373 return availableIds.iterator().next();
374 }
375 // If it is random, get the size
376 int size = availableIds.size();
377 // Return a random element
378 return Iterables.get(availableIds, RandomUtils.nextInt(size));
379 }
380
381 // Implements reuse strategy
382 private MeterId firstReusableMeterId(DeviceId deviceId) {
383 // Filter key related to device id, and reduce to meter ids
384 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
385 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
386 .map(MeterKey::meterId)
387 .collect(Collectors.toSet());
388 // Get next available id
389 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
390 // Iterate until there are items
391 while (meterId != null) {
392 // If we are able to reserve the id
393 if (updateMeterIdAvailability(deviceId, meterId, false)) {
394 // Just end
395 return meterId;
396 }
397 // Update the set
398 localAvailableMeterIds.remove(meterId);
399 // Try another time
400 meterId = getNextAvailableId(localAvailableMeterIds);
401 }
402 // No reusable ids
403 return null;
404 }
405
406 @Override
407 public MeterId allocateMeterId(DeviceId deviceId) {
408 // Init steps
409 MeterId meterId;
410 long id;
411 // Try to reuse meter id
412 meterId = firstReusableMeterId(deviceId);
413 // We found a reusable id, return
414 if (meterId != null) {
415 return meterId;
416 }
417 // If there was no reusable MeterId we have to generate a new value
418 // using maxMeters as upper limit.
419 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
420 // If the device does not give us MeterFeatures
421 if (maxMeters == 0L) {
422 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
423 maxMeters = queryMaxMeters(deviceId);
424 }
425 // If we don't know the max, cannot proceed
426 if (maxMeters == 0L) {
427 return null;
428 }
429 // Get a new value
430 id = meterIdGenerators.incrementAndGet(deviceId);
431 // Check with the max, and if the value is bigger, cannot proceed
432 if (id >= maxMeters) {
433 return null;
434 }
435 // Done, return the value
436 return MeterId.meterId(id);
437 }
438
439 @Override
440 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200441 // Avoid to free meter not allocated
442 if (meterIdGenerators.get(deviceId) < meterId.id()) {
443 return;
444 }
Pier Luigif094c612017-10-14 12:15:02 +0200445 // Update the availability
446 updateMeterIdAvailability(deviceId, meterId, true);
447 }
448
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700449 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700450 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700451 public void event(MapEvent<MeterKey, MeterData> event) {
452 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700453 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
454 MeterData data = value.value();
alshabibeadfc8e2015-08-18 15:40:46 -0700455 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
456 switch (event.type()) {
457 case INSERT:
458 case UPDATE:
459 switch (data.meter().state()) {
460 case PENDING_ADD:
461 case PENDING_REMOVE:
462 if (!data.reason().isPresent() && local.equals(master)) {
463 notifyDelegate(
464 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
465 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
466 data.meter()));
467 } else if (data.reason().isPresent() && local.equals(data.origin())) {
468 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
469 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700470 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700471 }
472 break;
473 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100474 if (local.equals(data.origin()) &&
475 (data.meter().state() == MeterState.PENDING_ADD
476 || data.meter().state() == MeterState.ADDED)) {
477 futures.computeIfPresent(key, (k, v) -> {
Gamze Abaka7e65ac72019-03-05 15:40:57 +0000478 v.complete(MeterStoreResult.success());
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100479 notifyDelegate(
480 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
481 return null;
482 });
alshabibe1248b62015-08-20 17:21:55 -0700483 }
484 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700485 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700486 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700487 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700488 }
489 break;
490 default:
491 log.warn("Unknown meter state type {}", data.meter().state());
492 }
493 break;
494 case REMOVE:
495 //Only happens at origin so we do not need to care.
496 break;
497 default:
498 log.warn("Unknown Map event type {}", event.type());
499 }
500
501 }
502 }
503
504
alshabib7bb05012015-08-05 10:15:09 -0700505}