blob: 29506b4a9be161bf26e78bbbc0022f68c00f4792 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Charles Chan593acf92017-11-22 13:55:41 -080019import com.google.common.collect.Lists;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
alshabibeadfc8e2015-08-18 15:40:46 -070021import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020022import org.apache.commons.lang.math.RandomUtils;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070024import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070025import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.apache.felix.scr.annotations.Service;
Charles Chan593acf92017-11-22 13:55:41 -080029import org.onlab.util.KryoNamespace;
alshabib7bb05012015-08-05 10:15:09 -070030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070032import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010033import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020034import org.onosproject.net.behaviour.MeterQuery;
35import org.onosproject.net.driver.DriverHandler;
36import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070037import org.onosproject.net.meter.Band;
38import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070039import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010040import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070041import org.onosproject.net.meter.Meter;
42import org.onosproject.net.meter.MeterEvent;
43import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010044import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030045import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010046import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010047import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070048import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070049import org.onosproject.net.meter.MeterOperation;
50import org.onosproject.net.meter.MeterState;
51import org.onosproject.net.meter.MeterStore;
52import org.onosproject.net.meter.MeterStoreDelegate;
53import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070054import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020055import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070056import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020057import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070058import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020059import org.onosproject.store.service.DistributedPrimitive;
60import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070061import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070063import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070064import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070065import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070066import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070067import org.slf4j.Logger;
68
69import java.util.Collection;
Gamze Abaka91b38542019-03-11 06:52:48 +000070import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070071import java.util.Map;
Gamze Abaka91b38542019-03-11 06:52:48 +000072import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020073import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070074import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020075import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070076
Pier Luigif094c612017-10-14 12:15:02 +020077import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010078import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070079import static org.slf4j.LoggerFactory.getLogger;
80
81/**
82 * A distributed meter store implementation. Meters are stored consistently
83 * across the cluster.
84 */
alshabib58fe6dc2015-08-19 17:16:13 -070085@Component(immediate = true)
86@Service
alshabib7bb05012015-08-05 10:15:09 -070087public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
88 implements MeterStore {
89
90 private Logger log = getLogger(getClass());
91
92 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010093 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010094 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020095 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070096
Charles Chan593acf92017-11-22 13:55:41 -080097 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
98 .register(KryoNamespaces.API)
99 .register(MeterKey.class)
100 .register(MeterData.class)
101 .register(DefaultMeter.class)
102 .register(DefaultBand.class)
103 .register(Band.Type.class)
104 .register(MeterState.class)
105 .register(Meter.Unit.class);
106
107 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
108
alshabib7bb05012015-08-05 10:15:09 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 private StorageService storageService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -0700113 private MastershipService mastershipService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 private ClusterService clusterService;
117
Pier Luigif094c612017-10-14 12:15:02 +0200118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected DriverService driverService;
120
alshabib70aaa1b2015-09-25 14:30:59 -0700121 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700122 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700123
Jordi Ortizaa8de492016-12-01 00:21:36 +0100124 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
125
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700126 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700127
alshabib70aaa1b2015-09-25 14:30:59 -0700128 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700129 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700130
Pier Luigif094c612017-10-14 12:15:02 +0200131 // Available meter identifiers
132 private DistributedSet<MeterKey> availableMeterIds;
133
134 // Atomic counter map for generation of new identifiers;
135 private AtomicCounterMap<DeviceId> meterIdGenerators;
136
137 /**
138 * Defines possible selection strategies to reuse meter ids.
139 */
140 enum ReuseStrategy {
141 /**
142 * Select randomly an available id.
143 */
144 RANDOM,
145 /**
146 * Select the first one.
147 */
148 FIRST_FIT
149 }
150
151 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100152
alshabib7bb05012015-08-05 10:15:09 -0700153 @Activate
154 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700155 local = clusterService.getLocalNode().id();
156
alshabib70aaa1b2015-09-25 14:30:59 -0700157 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700158 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800159 .withSerializer(serializer).build();
alshabib7bb05012015-08-05 10:15:09 -0700160
alshabibeadfc8e2015-08-18 15:40:46 -0700161 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700162
Jordi Ortizaa8de492016-12-01 00:21:36 +0100163 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
164 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200165 .withSerializer(Serializer.using(KryoNamespaces.API,
166 MeterFeaturesKey.class,
167 MeterFeatures.class,
168 DefaultMeterFeatures.class,
169 Band.Type.class,
170 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300171 MeterFailReason.class,
172 MeterFeaturesFlag.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100173
Pier Luigif094c612017-10-14 12:15:02 +0200174 // Init the set of the available ids
175 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100176 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200177 .withSerializer(Serializer.using(KryoNamespaces.API,
178 MeterKey.class)).build(),
179 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
180
181 // Init atomic map counters
182 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
183 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700184 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100185
alshabib7bb05012015-08-05 10:15:09 -0700186 log.info("Started");
187 }
188
189 @Deactivate
190 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700191 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700192 log.info("Stopped");
193 }
194
alshabib7bb05012015-08-05 10:15:09 -0700195 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700196 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200197 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700198 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700199 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200200 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700201 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200202 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700203 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700204 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700205 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700206 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900207 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700208 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700209 }
Pier Luigif094c612017-10-14 12:15:02 +0200210 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700211 return future;
alshabib7bb05012015-08-05 10:15:09 -0700212 }
213
214 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700215 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200216 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700217 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700218 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200219 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700220 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200221 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700222 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200223 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700224 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700225 try {
Pier Luigif094c612017-10-14 12:15:02 +0200226 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700227 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200228 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700229 future.complete(MeterStoreResult.success());
230 }
alshabibeadfc8e2015-08-18 15:40:46 -0700231 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900232 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700233 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700234 }
Pier Luigif094c612017-10-14 12:15:02 +0200235 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700236 return future;
alshabib7bb05012015-08-05 10:15:09 -0700237 }
238
239 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100240 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
241 MeterStoreResult result = MeterStoreResult.success();
242 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
243 try {
244 meterFeatures.putIfAbsent(key, meterfeatures);
245 } catch (StorageException e) {
246 result = MeterStoreResult.fail(TIMEOUT);
247 }
248 return result;
249 }
250
251 @Override
252 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
253 MeterStoreResult result = MeterStoreResult.success();
254 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
255 try {
256 meterFeatures.remove(key);
257 } catch (StorageException e) {
258 result = MeterStoreResult.fail(TIMEOUT);
259 }
260 return result;
261 }
262
263 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700264 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
265 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700266 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
267 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700268
alshabibeadfc8e2015-08-18 15:40:46 -0700269 MeterData data = new MeterData(meter, null, local);
270 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700271 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700272 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
273 }
alshabibeadfc8e2015-08-18 15:40:46 -0700274 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900275 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700276 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700277 }
alshabibeadfc8e2015-08-18 15:40:46 -0700278 return future;
alshabib7bb05012015-08-05 10:15:09 -0700279 }
280
281 @Override
282 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700283 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
284 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700285 DefaultMeter m = (DefaultMeter) v.meter();
Gamze Abaka2d4113b2019-11-11 10:00:27 +0000286 MeterState meterState = m.state();
287 if (meterState == MeterState.PENDING_ADD) {
288 m.setState(meter.state());
289 }
alshabib7bb05012015-08-05 10:15:09 -0700290 m.setProcessedPackets(meter.packetsSeen());
291 m.setProcessedBytes(meter.bytesSeen());
292 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700293 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700294 m.setReferenceCount(meter.referenceCount());
Gamze Abakab5c60cb2018-09-12 10:55:35 +0000295 if (meter.referenceCount() == 0) {
296 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, m));
297 }
alshabibeadfc8e2015-08-18 15:40:46 -0700298 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700299 });
300 }
301
302 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700303 public Meter getMeter(MeterKey key) {
304 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700305 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700306 }
307
308 @Override
309 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700310 return Collections2.transform(meters.asJavaMap().values(),
311 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700312 }
313
314 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200315 public Collection<Meter> getAllMeters(DeviceId deviceId) {
316 return Collections2.transform(
317 Collections2.filter(meters.asJavaMap().values(),
318 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
319 MeterData::meter);
320 }
321
322 @Override
alshabib7bb05012015-08-05 10:15:09 -0700323 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700324 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
325 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700326 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700327 }
328
alshabib5eb79392015-08-19 18:09:55 -0700329 @Override
330 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200331 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700332 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200333 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700334 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200335 // Remove the meter
Gamze Abaka2d4113b2019-11-11 10:00:27 +0000336 if (Versioned.valueOrNull(meters.remove(key)) != null) {
337 // Free the id
338 freeMeterId(m.deviceId(), m.id());
339 // Finally notify the delegate
340 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
341 }
alshabib5eb79392015-08-19 18:09:55 -0700342 }
343
Jordi Ortizaa8de492016-12-01 00:21:36 +0100344 @Override
Gamze Abaka91b38542019-03-11 06:52:48 +0000345 public void purgeMeter(DeviceId deviceId) {
346
347 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
348 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
349 .map(Map.Entry::getValue)
350 .collect(Collectors.toList());
351
352 metersPendingRemove.forEach(versionedMeterKey
353 -> deleteMeterNow(versionedMeterKey.value().meter()));
354
355 }
356
357 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100358 public long getMaxMeters(MeterFeaturesKey key) {
359 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
360 return features == null ? 0L : features.maxMeter();
361 }
362
Pier Luigif094c612017-10-14 12:15:02 +0200363 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
364 private long queryMaxMeters(DeviceId device) {
365 // Get driver handler for this device
366 DriverHandler handler = driverService.createHandler(device);
367 // If creation failed or the device does not have this behavior
368 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
369 // We cannot know max meter
370 return 0L;
371 }
372 // Get the behavior
373 MeterQuery query = handler.behaviour(MeterQuery.class);
374 // Return as max meter the result of the query
375 return query.getMaxMeters();
376 }
377
378 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
379 boolean available) {
380 // According to available, make available or unavailable a meter key
381 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
382 availableMeterIds.remove(MeterKey.key(deviceId, id));
383 }
384
385 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
386 // If there are no available ids
387 if (availableIds.isEmpty()) {
388 // Just end the cycle
389 return null;
390 }
391 // If it is the first fit
392 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
393 return availableIds.iterator().next();
394 }
395 // If it is random, get the size
396 int size = availableIds.size();
397 // Return a random element
398 return Iterables.get(availableIds, RandomUtils.nextInt(size));
399 }
400
401 // Implements reuse strategy
402 private MeterId firstReusableMeterId(DeviceId deviceId) {
403 // Filter key related to device id, and reduce to meter ids
404 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
405 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
406 .map(MeterKey::meterId)
407 .collect(Collectors.toSet());
408 // Get next available id
409 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
410 // Iterate until there are items
411 while (meterId != null) {
412 // If we are able to reserve the id
413 if (updateMeterIdAvailability(deviceId, meterId, false)) {
414 // Just end
415 return meterId;
416 }
417 // Update the set
418 localAvailableMeterIds.remove(meterId);
419 // Try another time
420 meterId = getNextAvailableId(localAvailableMeterIds);
421 }
422 // No reusable ids
423 return null;
424 }
425
426 @Override
427 public MeterId allocateMeterId(DeviceId deviceId) {
428 // Init steps
429 MeterId meterId;
430 long id;
431 // Try to reuse meter id
432 meterId = firstReusableMeterId(deviceId);
433 // We found a reusable id, return
434 if (meterId != null) {
435 return meterId;
436 }
437 // If there was no reusable MeterId we have to generate a new value
438 // using maxMeters as upper limit.
439 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
440 // If the device does not give us MeterFeatures
441 if (maxMeters == 0L) {
442 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
443 maxMeters = queryMaxMeters(deviceId);
444 }
445 // If we don't know the max, cannot proceed
446 if (maxMeters == 0L) {
447 return null;
448 }
449 // Get a new value
450 id = meterIdGenerators.incrementAndGet(deviceId);
451 // Check with the max, and if the value is bigger, cannot proceed
452 if (id >= maxMeters) {
453 return null;
454 }
455 // Done, return the value
456 return MeterId.meterId(id);
457 }
458
459 @Override
460 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200461 // Avoid to free meter not allocated
462 if (meterIdGenerators.get(deviceId) < meterId.id()) {
463 return;
464 }
Pier Luigif094c612017-10-14 12:15:02 +0200465 // Update the availability
466 updateMeterIdAvailability(deviceId, meterId, true);
467 }
468
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700469 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700470 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700471 public void event(MapEvent<MeterKey, MeterData> event) {
472 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700473 MeterData data = event.value().value();
474 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
475 switch (event.type()) {
476 case INSERT:
477 case UPDATE:
478 switch (data.meter().state()) {
479 case PENDING_ADD:
480 case PENDING_REMOVE:
481 if (!data.reason().isPresent() && local.equals(master)) {
482 notifyDelegate(
483 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
484 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
485 data.meter()));
486 } else if (data.reason().isPresent() && local.equals(data.origin())) {
487 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
488 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700489 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700490 }
491 break;
492 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100493 if (local.equals(data.origin()) &&
494 (data.meter().state() == MeterState.PENDING_ADD
495 || data.meter().state() == MeterState.ADDED)) {
496 futures.computeIfPresent(key, (k, v) -> {
Gamze Abakaa29fab92019-03-05 15:40:57 +0000497 v.complete(MeterStoreResult.success());
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100498 notifyDelegate(
499 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
500 return null;
501 });
alshabibe1248b62015-08-20 17:21:55 -0700502 }
503 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700504 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700505 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700506 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700507 }
508 break;
509 default:
510 log.warn("Unknown meter state type {}", data.meter().state());
511 }
512 break;
513 case REMOVE:
514 //Only happens at origin so we do not need to care.
515 break;
516 default:
517 log.warn("Unknown Map event type {}", event.type());
518 }
519
520 }
521 }
522
523
alshabib7bb05012015-08-05 10:15:09 -0700524}