blob: 0646e666b4c3d9f05bba848bda74a7911c4ee48d [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070020import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070021import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020022import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080023import org.onlab.util.KryoNamespace;
alshabib7bb05012015-08-05 10:15:09 -070024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070026import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010027import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020028import org.onosproject.net.behaviour.MeterQuery;
29import org.onosproject.net.driver.DriverHandler;
30import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070031import org.onosproject.net.meter.Band;
32import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010034import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070035import org.onosproject.net.meter.Meter;
36import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010040import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
44import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020049import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070050import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020051import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.service.DistributedPrimitive;
54import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070055import org.onosproject.store.service.MapEvent;
56import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070057import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070058import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070059import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070060import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070066import org.slf4j.Logger;
67
68import java.util.Collection;
Gamze Abakaf57ef602019-03-11 06:52:48 +000069import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070070import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000071import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020072import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070073import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020074import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070075
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080076import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010077import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070078import static org.slf4j.LoggerFactory.getLogger;
79
80/**
81 * A distributed meter store implementation. Meters are stored consistently
82 * across the cluster.
83 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070084@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070085public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
86 implements MeterStore {
87
88 private Logger log = getLogger(getClass());
89
90 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010091 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010092 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020093 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070094
Charles Chan593acf92017-11-22 13:55:41 -080095 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
96 .register(KryoNamespaces.API)
97 .register(MeterKey.class)
98 .register(MeterData.class)
99 .register(DefaultMeter.class)
100 .register(DefaultBand.class)
101 .register(Band.Type.class)
102 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530103 .register(Meter.Unit.class)
104 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800105
106 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
107
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700109 private StorageService storageService;
110
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700112 private MastershipService mastershipService;
113
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700115 private ClusterService clusterService;
116
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200118 protected DriverService driverService;
119
alshabib70aaa1b2015-09-25 14:30:59 -0700120 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700121 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700122
Jordi Ortizaa8de492016-12-01 00:21:36 +0100123 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
124
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700125 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700126
alshabib70aaa1b2015-09-25 14:30:59 -0700127 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700128 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700129
Pier Luigif094c612017-10-14 12:15:02 +0200130 // Available meter identifiers
131 private DistributedSet<MeterKey> availableMeterIds;
132
133 // Atomic counter map for generation of new identifiers;
134 private AtomicCounterMap<DeviceId> meterIdGenerators;
135
136 /**
137 * Defines possible selection strategies to reuse meter ids.
138 */
139 enum ReuseStrategy {
140 /**
141 * Select randomly an available id.
142 */
143 RANDOM,
144 /**
145 * Select the first one.
146 */
147 FIRST_FIT
148 }
149
150 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100151
alshabib7bb05012015-08-05 10:15:09 -0700152 @Activate
153 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700154 local = clusterService.getLocalNode().id();
155
alshabib70aaa1b2015-09-25 14:30:59 -0700156 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700157 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800158 .withSerializer(serializer).build();
alshabib7bb05012015-08-05 10:15:09 -0700159
alshabibeadfc8e2015-08-18 15:40:46 -0700160 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700161
Jordi Ortizaa8de492016-12-01 00:21:36 +0100162 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
163 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200164 .withSerializer(Serializer.using(KryoNamespaces.API,
165 MeterFeaturesKey.class,
166 MeterFeatures.class,
167 DefaultMeterFeatures.class,
168 Band.Type.class,
169 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300170 MeterFailReason.class,
171 MeterFeaturesFlag.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100172
Pier Luigif094c612017-10-14 12:15:02 +0200173 // Init the set of the available ids
174 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100175 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200176 .withSerializer(Serializer.using(KryoNamespaces.API,
177 MeterKey.class)).build(),
178 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
179
180 // Init atomic map counters
181 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
182 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700183 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100184
alshabib7bb05012015-08-05 10:15:09 -0700185 log.info("Started");
186 }
187
188 @Deactivate
189 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700190 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700191 log.info("Stopped");
192 }
193
alshabib7bb05012015-08-05 10:15:09 -0700194 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700195 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200196 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700197 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700198 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200199 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700200 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200201 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700202 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700203 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700204 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700205 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900206 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700207 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700208 }
Pier Luigif094c612017-10-14 12:15:02 +0200209 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700210 return future;
alshabib7bb05012015-08-05 10:15:09 -0700211 }
212
213 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700214 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200215 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700216 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700217 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200218 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700219 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200220 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700221 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200222 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700223 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700224 try {
Pier Luigif094c612017-10-14 12:15:02 +0200225 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700226 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200227 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700228 future.complete(MeterStoreResult.success());
229 }
alshabibeadfc8e2015-08-18 15:40:46 -0700230 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900231 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700232 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700233 }
Pier Luigif094c612017-10-14 12:15:02 +0200234 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700235 return future;
alshabib7bb05012015-08-05 10:15:09 -0700236 }
237
238 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100239 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
240 MeterStoreResult result = MeterStoreResult.success();
241 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
242 try {
243 meterFeatures.putIfAbsent(key, meterfeatures);
244 } catch (StorageException e) {
245 result = MeterStoreResult.fail(TIMEOUT);
246 }
247 return result;
248 }
249
250 @Override
251 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
252 MeterStoreResult result = MeterStoreResult.success();
253 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
254 try {
255 meterFeatures.remove(key);
256 } catch (StorageException e) {
257 result = MeterStoreResult.fail(TIMEOUT);
258 }
259 return result;
260 }
261
262 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700263 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
264 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700265 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
266 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700267
alshabibeadfc8e2015-08-18 15:40:46 -0700268 MeterData data = new MeterData(meter, null, local);
269 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700270 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700271 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
272 }
alshabibeadfc8e2015-08-18 15:40:46 -0700273 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900274 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700275 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700276 }
alshabibeadfc8e2015-08-18 15:40:46 -0700277 return future;
alshabib7bb05012015-08-05 10:15:09 -0700278 }
279
280 @Override
281 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700282 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
283 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700284 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700285 m.setState(meter.state());
286 m.setProcessedPackets(meter.packetsSeen());
287 m.setProcessedBytes(meter.bytesSeen());
288 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700289 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700290 m.setReferenceCount(meter.referenceCount());
Gamze Abakadadae722018-09-12 10:55:35 +0000291 if (meter.referenceCount() == 0) {
292 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, m));
293 }
alshabibeadfc8e2015-08-18 15:40:46 -0700294 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700295 });
296 }
297
298 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700299 public Meter getMeter(MeterKey key) {
300 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700301 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700302 }
303
304 @Override
305 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700306 return Collections2.transform(meters.asJavaMap().values(),
307 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700308 }
309
310 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200311 public Collection<Meter> getAllMeters(DeviceId deviceId) {
312 return Collections2.transform(
313 Collections2.filter(meters.asJavaMap().values(),
314 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
315 MeterData::meter);
316 }
317
318 @Override
alshabib7bb05012015-08-05 10:15:09 -0700319 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700320 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
321 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700322 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700323 }
324
alshabib5eb79392015-08-19 18:09:55 -0700325 @Override
326 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200327 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700328 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200329 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700330 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200331 // Remove the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700332 meters.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200333 // Free the id
334 freeMeterId(m.deviceId(), m.id());
335 // Finally notify the delegate
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100336 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700337 }
338
Jordi Ortizaa8de492016-12-01 00:21:36 +0100339 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000340 public void purgeMeter(DeviceId deviceId) {
341
342 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
343 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
344 .map(Map.Entry::getValue)
345 .collect(Collectors.toList());
346
347 metersPendingRemove.forEach(versionedMeterKey
348 -> deleteMeterNow(versionedMeterKey.value().meter()));
349
350 }
351
352 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100353 public long getMaxMeters(MeterFeaturesKey key) {
354 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
355 return features == null ? 0L : features.maxMeter();
356 }
357
Pier Luigif094c612017-10-14 12:15:02 +0200358 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
359 private long queryMaxMeters(DeviceId device) {
360 // Get driver handler for this device
361 DriverHandler handler = driverService.createHandler(device);
362 // If creation failed or the device does not have this behavior
363 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
364 // We cannot know max meter
365 return 0L;
366 }
367 // Get the behavior
368 MeterQuery query = handler.behaviour(MeterQuery.class);
369 // Return as max meter the result of the query
370 return query.getMaxMeters();
371 }
372
373 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
374 boolean available) {
375 // According to available, make available or unavailable a meter key
376 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
377 availableMeterIds.remove(MeterKey.key(deviceId, id));
378 }
379
380 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
381 // If there are no available ids
382 if (availableIds.isEmpty()) {
383 // Just end the cycle
384 return null;
385 }
386 // If it is the first fit
387 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
388 return availableIds.iterator().next();
389 }
390 // If it is random, get the size
391 int size = availableIds.size();
392 // Return a random element
393 return Iterables.get(availableIds, RandomUtils.nextInt(size));
394 }
395
396 // Implements reuse strategy
397 private MeterId firstReusableMeterId(DeviceId deviceId) {
398 // Filter key related to device id, and reduce to meter ids
399 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
400 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
401 .map(MeterKey::meterId)
402 .collect(Collectors.toSet());
403 // Get next available id
404 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
405 // Iterate until there are items
406 while (meterId != null) {
407 // If we are able to reserve the id
408 if (updateMeterIdAvailability(deviceId, meterId, false)) {
409 // Just end
410 return meterId;
411 }
412 // Update the set
413 localAvailableMeterIds.remove(meterId);
414 // Try another time
415 meterId = getNextAvailableId(localAvailableMeterIds);
416 }
417 // No reusable ids
418 return null;
419 }
420
421 @Override
422 public MeterId allocateMeterId(DeviceId deviceId) {
423 // Init steps
424 MeterId meterId;
425 long id;
426 // Try to reuse meter id
427 meterId = firstReusableMeterId(deviceId);
428 // We found a reusable id, return
429 if (meterId != null) {
430 return meterId;
431 }
432 // If there was no reusable MeterId we have to generate a new value
433 // using maxMeters as upper limit.
434 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
435 // If the device does not give us MeterFeatures
436 if (maxMeters == 0L) {
437 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
438 maxMeters = queryMaxMeters(deviceId);
439 }
440 // If we don't know the max, cannot proceed
441 if (maxMeters == 0L) {
442 return null;
443 }
444 // Get a new value
445 id = meterIdGenerators.incrementAndGet(deviceId);
446 // Check with the max, and if the value is bigger, cannot proceed
447 if (id >= maxMeters) {
448 return null;
449 }
450 // Done, return the value
451 return MeterId.meterId(id);
452 }
453
454 @Override
455 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200456 // Avoid to free meter not allocated
457 if (meterIdGenerators.get(deviceId) < meterId.id()) {
458 return;
459 }
Pier Luigif094c612017-10-14 12:15:02 +0200460 // Update the availability
461 updateMeterIdAvailability(deviceId, meterId, true);
462 }
463
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700464 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700465 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700466 public void event(MapEvent<MeterKey, MeterData> event) {
467 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700468 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
469 MeterData data = value.value();
alshabibeadfc8e2015-08-18 15:40:46 -0700470 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
471 switch (event.type()) {
472 case INSERT:
473 case UPDATE:
474 switch (data.meter().state()) {
475 case PENDING_ADD:
476 case PENDING_REMOVE:
477 if (!data.reason().isPresent() && local.equals(master)) {
478 notifyDelegate(
479 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
480 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
481 data.meter()));
482 } else if (data.reason().isPresent() && local.equals(data.origin())) {
483 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
484 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700485 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700486 }
487 break;
488 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100489 if (local.equals(data.origin()) &&
490 (data.meter().state() == MeterState.PENDING_ADD
491 || data.meter().state() == MeterState.ADDED)) {
492 futures.computeIfPresent(key, (k, v) -> {
Gamze Abaka7e65ac72019-03-05 15:40:57 +0000493 v.complete(MeterStoreResult.success());
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100494 notifyDelegate(
495 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
496 return null;
497 });
alshabibe1248b62015-08-20 17:21:55 -0700498 }
499 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700500 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700501 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700502 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700503 }
504 break;
505 default:
506 log.warn("Unknown meter state type {}", data.meter().state());
507 }
508 break;
509 case REMOVE:
510 //Only happens at origin so we do not need to care.
511 break;
512 default:
513 log.warn("Unknown Map event type {}", event.type());
514 }
515
516 }
517 }
518
519
alshabib7bb05012015-08-05 10:15:09 -0700520}