blob: 5ec5d2e7c12dfd0ab2f08b4942a5438a4ac5a1fc [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070020import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070021import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020022import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080023import org.onlab.util.KryoNamespace;
alshabib7bb05012015-08-05 10:15:09 -070024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070026import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010027import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020028import org.onosproject.net.behaviour.MeterQuery;
29import org.onosproject.net.driver.DriverHandler;
30import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070031import org.onosproject.net.meter.Band;
32import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010034import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070035import org.onosproject.net.meter.Meter;
36import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010040import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
44import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020049import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070050import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020051import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.service.DistributedPrimitive;
54import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070055import org.onosproject.store.service.MapEvent;
56import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070057import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070058import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070059import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070060import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070066import org.slf4j.Logger;
67
68import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070069import java.util.Map;
Pier Luigif094c612017-10-14 12:15:02 +020070import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070071import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020072import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070073
Pier Luigif094c612017-10-14 12:15:02 +020074import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010075import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070076import static org.slf4j.LoggerFactory.getLogger;
77
78/**
79 * A distributed meter store implementation. Meters are stored consistently
80 * across the cluster.
81 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070083public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
84 implements MeterStore {
85
86 private Logger log = getLogger(getClass());
87
88 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010089 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010090 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020091 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070092
Charles Chan593acf92017-11-22 13:55:41 -080093 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(MeterKey.class)
96 .register(MeterData.class)
97 .register(DefaultMeter.class)
98 .register(DefaultBand.class)
99 .register(Band.Type.class)
100 .register(MeterState.class)
101 .register(Meter.Unit.class);
102
103 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
104
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700106 private StorageService storageService;
107
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700109 private MastershipService mastershipService;
110
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700112 private ClusterService clusterService;
113
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200115 protected DriverService driverService;
116
alshabib70aaa1b2015-09-25 14:30:59 -0700117 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700118 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700119
Jordi Ortizaa8de492016-12-01 00:21:36 +0100120 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
121
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700122 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700123
alshabib70aaa1b2015-09-25 14:30:59 -0700124 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700125 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700126
Pier Luigif094c612017-10-14 12:15:02 +0200127 // Available meter identifiers
128 private DistributedSet<MeterKey> availableMeterIds;
129
130 // Atomic counter map for generation of new identifiers;
131 private AtomicCounterMap<DeviceId> meterIdGenerators;
132
133 /**
134 * Defines possible selection strategies to reuse meter ids.
135 */
136 enum ReuseStrategy {
137 /**
138 * Select randomly an available id.
139 */
140 RANDOM,
141 /**
142 * Select the first one.
143 */
144 FIRST_FIT
145 }
146
147 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100148
alshabib7bb05012015-08-05 10:15:09 -0700149 @Activate
150 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700151 local = clusterService.getLocalNode().id();
152
alshabib70aaa1b2015-09-25 14:30:59 -0700153 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700154 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800155 .withSerializer(serializer).build();
alshabib7bb05012015-08-05 10:15:09 -0700156
alshabibeadfc8e2015-08-18 15:40:46 -0700157 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700158
Jordi Ortizaa8de492016-12-01 00:21:36 +0100159 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
160 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200161 .withSerializer(Serializer.using(KryoNamespaces.API,
162 MeterFeaturesKey.class,
163 MeterFeatures.class,
164 DefaultMeterFeatures.class,
165 Band.Type.class,
166 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300167 MeterFailReason.class,
168 MeterFeaturesFlag.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100169
Pier Luigif094c612017-10-14 12:15:02 +0200170 // Init the set of the available ids
171 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100172 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200173 .withSerializer(Serializer.using(KryoNamespaces.API,
174 MeterKey.class)).build(),
175 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
176
177 // Init atomic map counters
178 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
179 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700180 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100181
alshabib7bb05012015-08-05 10:15:09 -0700182 log.info("Started");
183 }
184
185 @Deactivate
186 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700187 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700188 log.info("Stopped");
189 }
190
alshabib7bb05012015-08-05 10:15:09 -0700191 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700192 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200193 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700194 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700195 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200196 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700197 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200198 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700199 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700200 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700201 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700202 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900203 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700204 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700205 }
Pier Luigif094c612017-10-14 12:15:02 +0200206 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700207 return future;
alshabib7bb05012015-08-05 10:15:09 -0700208 }
209
210 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700211 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200212 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700213 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700214 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200215 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700216 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200217 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700218 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200219 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700220 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700221 try {
Pier Luigif094c612017-10-14 12:15:02 +0200222 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700223 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200224 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700225 future.complete(MeterStoreResult.success());
226 }
alshabibeadfc8e2015-08-18 15:40:46 -0700227 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900228 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700229 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700230 }
Pier Luigif094c612017-10-14 12:15:02 +0200231 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700232 return future;
alshabib7bb05012015-08-05 10:15:09 -0700233 }
234
235 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100236 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
237 MeterStoreResult result = MeterStoreResult.success();
238 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
239 try {
240 meterFeatures.putIfAbsent(key, meterfeatures);
241 } catch (StorageException e) {
242 result = MeterStoreResult.fail(TIMEOUT);
243 }
244 return result;
245 }
246
247 @Override
248 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
249 MeterStoreResult result = MeterStoreResult.success();
250 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
251 try {
252 meterFeatures.remove(key);
253 } catch (StorageException e) {
254 result = MeterStoreResult.fail(TIMEOUT);
255 }
256 return result;
257 }
258
259 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700260 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
261 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700262 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
263 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700264
alshabibeadfc8e2015-08-18 15:40:46 -0700265 MeterData data = new MeterData(meter, null, local);
266 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700267 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700268 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
269 }
alshabibeadfc8e2015-08-18 15:40:46 -0700270 } catch (StorageException e) {
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900271 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700272 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700273 }
alshabibeadfc8e2015-08-18 15:40:46 -0700274 return future;
alshabib7bb05012015-08-05 10:15:09 -0700275 }
276
277 @Override
278 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700279 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
280 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700281 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700282 m.setState(meter.state());
283 m.setProcessedPackets(meter.packetsSeen());
284 m.setProcessedBytes(meter.bytesSeen());
285 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700286 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700287 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700288 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700289 });
290 }
291
292 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700293 public Meter getMeter(MeterKey key) {
294 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700295 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700296 }
297
298 @Override
299 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700300 return Collections2.transform(meters.asJavaMap().values(),
301 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700302 }
303
304 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200305 public Collection<Meter> getAllMeters(DeviceId deviceId) {
306 return Collections2.transform(
307 Collections2.filter(meters.asJavaMap().values(),
308 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
309 MeterData::meter);
310 }
311
312 @Override
alshabib7bb05012015-08-05 10:15:09 -0700313 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700314 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
315 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700316 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700317 }
318
alshabib5eb79392015-08-19 18:09:55 -0700319 @Override
320 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200321 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700322 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200323 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700324 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200325 // Remove the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700326 meters.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200327 // Free the id
328 freeMeterId(m.deviceId(), m.id());
329 // Finally notify the delegate
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100330 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700331 }
332
Jordi Ortizaa8de492016-12-01 00:21:36 +0100333 @Override
334 public long getMaxMeters(MeterFeaturesKey key) {
335 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
336 return features == null ? 0L : features.maxMeter();
337 }
338
Pier Luigif094c612017-10-14 12:15:02 +0200339 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
340 private long queryMaxMeters(DeviceId device) {
341 // Get driver handler for this device
342 DriverHandler handler = driverService.createHandler(device);
343 // If creation failed or the device does not have this behavior
344 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
345 // We cannot know max meter
346 return 0L;
347 }
348 // Get the behavior
349 MeterQuery query = handler.behaviour(MeterQuery.class);
350 // Return as max meter the result of the query
351 return query.getMaxMeters();
352 }
353
354 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
355 boolean available) {
356 // According to available, make available or unavailable a meter key
357 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
358 availableMeterIds.remove(MeterKey.key(deviceId, id));
359 }
360
361 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
362 // If there are no available ids
363 if (availableIds.isEmpty()) {
364 // Just end the cycle
365 return null;
366 }
367 // If it is the first fit
368 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
369 return availableIds.iterator().next();
370 }
371 // If it is random, get the size
372 int size = availableIds.size();
373 // Return a random element
374 return Iterables.get(availableIds, RandomUtils.nextInt(size));
375 }
376
377 // Implements reuse strategy
378 private MeterId firstReusableMeterId(DeviceId deviceId) {
379 // Filter key related to device id, and reduce to meter ids
380 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
381 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
382 .map(MeterKey::meterId)
383 .collect(Collectors.toSet());
384 // Get next available id
385 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
386 // Iterate until there are items
387 while (meterId != null) {
388 // If we are able to reserve the id
389 if (updateMeterIdAvailability(deviceId, meterId, false)) {
390 // Just end
391 return meterId;
392 }
393 // Update the set
394 localAvailableMeterIds.remove(meterId);
395 // Try another time
396 meterId = getNextAvailableId(localAvailableMeterIds);
397 }
398 // No reusable ids
399 return null;
400 }
401
402 @Override
403 public MeterId allocateMeterId(DeviceId deviceId) {
404 // Init steps
405 MeterId meterId;
406 long id;
407 // Try to reuse meter id
408 meterId = firstReusableMeterId(deviceId);
409 // We found a reusable id, return
410 if (meterId != null) {
411 return meterId;
412 }
413 // If there was no reusable MeterId we have to generate a new value
414 // using maxMeters as upper limit.
415 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
416 // If the device does not give us MeterFeatures
417 if (maxMeters == 0L) {
418 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
419 maxMeters = queryMaxMeters(deviceId);
420 }
421 // If we don't know the max, cannot proceed
422 if (maxMeters == 0L) {
423 return null;
424 }
425 // Get a new value
426 id = meterIdGenerators.incrementAndGet(deviceId);
427 // Check with the max, and if the value is bigger, cannot proceed
428 if (id >= maxMeters) {
429 return null;
430 }
431 // Done, return the value
432 return MeterId.meterId(id);
433 }
434
435 @Override
436 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200437 // Avoid to free meter not allocated
438 if (meterIdGenerators.get(deviceId) < meterId.id()) {
439 return;
440 }
Pier Luigif094c612017-10-14 12:15:02 +0200441 // Update the availability
442 updateMeterIdAvailability(deviceId, meterId, true);
443 }
444
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700445 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700446 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700447 public void event(MapEvent<MeterKey, MeterData> event) {
448 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700449 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
450 MeterData data = value.value();
alshabibeadfc8e2015-08-18 15:40:46 -0700451 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
452 switch (event.type()) {
453 case INSERT:
454 case UPDATE:
455 switch (data.meter().state()) {
456 case PENDING_ADD:
457 case PENDING_REMOVE:
458 if (!data.reason().isPresent() && local.equals(master)) {
459 notifyDelegate(
460 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
461 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
462 data.meter()));
463 } else if (data.reason().isPresent() && local.equals(data.origin())) {
464 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
465 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700466 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700467 }
468 break;
469 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100470 if (local.equals(data.origin()) &&
471 (data.meter().state() == MeterState.PENDING_ADD
472 || data.meter().state() == MeterState.ADDED)) {
473 futures.computeIfPresent(key, (k, v) -> {
474 notifyDelegate(
475 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
476 return null;
477 });
alshabibe1248b62015-08-20 17:21:55 -0700478 }
479 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700480 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700481 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700482 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700483 }
484 break;
485 default:
486 log.warn("Unknown meter state type {}", data.meter().state());
487 }
488 break;
489 case REMOVE:
490 //Only happens at origin so we do not need to care.
491 break;
492 default:
493 log.warn("Unknown Map event type {}", event.type());
494 }
495
496 }
497 }
498
499
alshabib7bb05012015-08-05 10:15:09 -0700500}