blob: b654652e9e841ca9beeca58253e39e341456cea9 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
alshabibeadfc8e2015-08-18 15:40:46 -070020import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020021import org.apache.commons.lang.math.RandomUtils;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070023import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070024import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070027import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010031import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020032import org.onosproject.net.behaviour.MeterQuery;
33import org.onosproject.net.driver.DriverHandler;
34import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070035import org.onosproject.net.meter.Band;
36import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070037import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010038import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070039import org.onosproject.net.meter.Meter;
40import org.onosproject.net.meter.MeterEvent;
41import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010042import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030043import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010044import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010045import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070046import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070047import org.onosproject.net.meter.MeterOperation;
48import org.onosproject.net.meter.MeterState;
49import org.onosproject.net.meter.MeterStore;
50import org.onosproject.net.meter.MeterStoreDelegate;
51import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070054import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020055import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070056import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020057import org.onosproject.store.service.DistributedPrimitive;
58import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070059import org.onosproject.store.service.MapEvent;
60import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070061import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070062import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070063import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070064import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070065import org.slf4j.Logger;
66
67import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070068import java.util.Map;
Pier Luigif094c612017-10-14 12:15:02 +020069import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070070import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020071import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070072
Pier Luigif094c612017-10-14 12:15:02 +020073import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010074import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070075import static org.slf4j.LoggerFactory.getLogger;
76
77/**
78 * A distributed meter store implementation. Meters are stored consistently
79 * across the cluster.
80 */
alshabib58fe6dc2015-08-19 17:16:13 -070081@Component(immediate = true)
82@Service
alshabib7bb05012015-08-05 10:15:09 -070083public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
84 implements MeterStore {
85
86 private Logger log = getLogger(getClass());
87
88 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010089 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010090 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020091 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070092
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 private StorageService storageService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070097 private MastershipService mastershipService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 private ClusterService clusterService;
101
Pier Luigif094c612017-10-14 12:15:02 +0200102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected DriverService driverService;
104
alshabib70aaa1b2015-09-25 14:30:59 -0700105 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700106 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700107
Jordi Ortizaa8de492016-12-01 00:21:36 +0100108 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
109
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700110 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700111
alshabib70aaa1b2015-09-25 14:30:59 -0700112 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700113 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700114
Pier Luigif094c612017-10-14 12:15:02 +0200115 // Available meter identifiers
116 private DistributedSet<MeterKey> availableMeterIds;
117
118 // Atomic counter map for generation of new identifiers;
119 private AtomicCounterMap<DeviceId> meterIdGenerators;
120
121 /**
122 * Defines possible selection strategies to reuse meter ids.
123 */
124 enum ReuseStrategy {
125 /**
126 * Select randomly an available id.
127 */
128 RANDOM,
129 /**
130 * Select the first one.
131 */
132 FIRST_FIT
133 }
134
135 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100136
alshabib7bb05012015-08-05 10:15:09 -0700137 @Activate
138 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700139 local = clusterService.getLocalNode().id();
140
alshabib70aaa1b2015-09-25 14:30:59 -0700141 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700142 .withName(METERSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200143 .withSerializer(Serializer.using(KryoNamespaces.API,
alshabib70aaa1b2015-09-25 14:30:59 -0700144 MeterKey.class,
alshabib58fe6dc2015-08-19 17:16:13 -0700145 MeterData.class,
146 DefaultMeter.class,
147 DefaultBand.class,
148 Band.Type.class,
149 MeterState.class,
150 Meter.Unit.class,
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700151 MeterFailReason.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700152
alshabibeadfc8e2015-08-18 15:40:46 -0700153 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700154
Jordi Ortizaa8de492016-12-01 00:21:36 +0100155 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
156 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200157 .withSerializer(Serializer.using(KryoNamespaces.API,
158 MeterFeaturesKey.class,
159 MeterFeatures.class,
160 DefaultMeterFeatures.class,
161 Band.Type.class,
162 Meter.Unit.class,
cansu.toprak409289d2017-10-27 10:04:05 +0300163 MeterFailReason.class,
164 MeterFeaturesFlag.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100165
Pier Luigif094c612017-10-14 12:15:02 +0200166 // Init the set of the available ids
167 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100168 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200169 .withSerializer(Serializer.using(KryoNamespaces.API,
170 MeterKey.class)).build(),
171 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
172
173 // Init atomic map counters
174 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
175 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700176 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100177
alshabib7bb05012015-08-05 10:15:09 -0700178 log.info("Started");
179 }
180
181 @Deactivate
182 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700183 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700184 log.info("Stopped");
185 }
186
alshabib7bb05012015-08-05 10:15:09 -0700187 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700188 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200189 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700190 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700191 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200192 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700193 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200194 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700195 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700196 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700197 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700198 } catch (StorageException e) {
199 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700200 }
Pier Luigif094c612017-10-14 12:15:02 +0200201 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700202 return future;
alshabib7bb05012015-08-05 10:15:09 -0700203 }
204
205 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700206 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200207 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700208 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700209 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200210 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700211 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200212 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700213 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200214 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700215 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700216 try {
Pier Luigif094c612017-10-14 12:15:02 +0200217 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700218 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200219 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700220 future.complete(MeterStoreResult.success());
221 }
alshabibeadfc8e2015-08-18 15:40:46 -0700222 } catch (StorageException e) {
223 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700224 }
Pier Luigif094c612017-10-14 12:15:02 +0200225 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700226 return future;
alshabib7bb05012015-08-05 10:15:09 -0700227 }
228
229 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100230 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
231 MeterStoreResult result = MeterStoreResult.success();
232 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
233 try {
234 meterFeatures.putIfAbsent(key, meterfeatures);
235 } catch (StorageException e) {
236 result = MeterStoreResult.fail(TIMEOUT);
237 }
238 return result;
239 }
240
241 @Override
242 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
243 MeterStoreResult result = MeterStoreResult.success();
244 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
245 try {
246 meterFeatures.remove(key);
247 } catch (StorageException e) {
248 result = MeterStoreResult.fail(TIMEOUT);
249 }
250 return result;
251 }
252
253 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700254 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
255 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700256 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
257 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700258
alshabibeadfc8e2015-08-18 15:40:46 -0700259 MeterData data = new MeterData(meter, null, local);
260 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700261 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700262 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
263 }
alshabibeadfc8e2015-08-18 15:40:46 -0700264 } catch (StorageException e) {
265 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700266 }
alshabibeadfc8e2015-08-18 15:40:46 -0700267 return future;
alshabib7bb05012015-08-05 10:15:09 -0700268 }
269
270 @Override
271 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700272 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
273 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700274 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700275 m.setState(meter.state());
276 m.setProcessedPackets(meter.packetsSeen());
277 m.setProcessedBytes(meter.bytesSeen());
278 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700279 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700280 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700281 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700282 });
283 }
284
285 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700286 public Meter getMeter(MeterKey key) {
287 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700288 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700289 }
290
291 @Override
292 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700293 return Collections2.transform(meters.asJavaMap().values(),
294 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700295 }
296
297 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200298 public Collection<Meter> getAllMeters(DeviceId deviceId) {
299 return Collections2.transform(
300 Collections2.filter(meters.asJavaMap().values(),
301 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
302 MeterData::meter);
303 }
304
305 @Override
alshabib7bb05012015-08-05 10:15:09 -0700306 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700307 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
308 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700309 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700310 }
311
alshabib5eb79392015-08-19 18:09:55 -0700312 @Override
313 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200314 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700315 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200316 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700317 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200318 // Remove the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700319 meters.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200320 // Free the id
321 freeMeterId(m.deviceId(), m.id());
322 // Finally notify the delegate
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100323 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700324 }
325
Jordi Ortizaa8de492016-12-01 00:21:36 +0100326 @Override
327 public long getMaxMeters(MeterFeaturesKey key) {
328 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
329 return features == null ? 0L : features.maxMeter();
330 }
331
Pier Luigif094c612017-10-14 12:15:02 +0200332 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
333 private long queryMaxMeters(DeviceId device) {
334 // Get driver handler for this device
335 DriverHandler handler = driverService.createHandler(device);
336 // If creation failed or the device does not have this behavior
337 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
338 // We cannot know max meter
339 return 0L;
340 }
341 // Get the behavior
342 MeterQuery query = handler.behaviour(MeterQuery.class);
343 // Return as max meter the result of the query
344 return query.getMaxMeters();
345 }
346
347 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
348 boolean available) {
349 // According to available, make available or unavailable a meter key
350 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
351 availableMeterIds.remove(MeterKey.key(deviceId, id));
352 }
353
354 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
355 // If there are no available ids
356 if (availableIds.isEmpty()) {
357 // Just end the cycle
358 return null;
359 }
360 // If it is the first fit
361 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
362 return availableIds.iterator().next();
363 }
364 // If it is random, get the size
365 int size = availableIds.size();
366 // Return a random element
367 return Iterables.get(availableIds, RandomUtils.nextInt(size));
368 }
369
370 // Implements reuse strategy
371 private MeterId firstReusableMeterId(DeviceId deviceId) {
372 // Filter key related to device id, and reduce to meter ids
373 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
374 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
375 .map(MeterKey::meterId)
376 .collect(Collectors.toSet());
377 // Get next available id
378 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
379 // Iterate until there are items
380 while (meterId != null) {
381 // If we are able to reserve the id
382 if (updateMeterIdAvailability(deviceId, meterId, false)) {
383 // Just end
384 return meterId;
385 }
386 // Update the set
387 localAvailableMeterIds.remove(meterId);
388 // Try another time
389 meterId = getNextAvailableId(localAvailableMeterIds);
390 }
391 // No reusable ids
392 return null;
393 }
394
395 @Override
396 public MeterId allocateMeterId(DeviceId deviceId) {
397 // Init steps
398 MeterId meterId;
399 long id;
400 // Try to reuse meter id
401 meterId = firstReusableMeterId(deviceId);
402 // We found a reusable id, return
403 if (meterId != null) {
404 return meterId;
405 }
406 // If there was no reusable MeterId we have to generate a new value
407 // using maxMeters as upper limit.
408 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
409 // If the device does not give us MeterFeatures
410 if (maxMeters == 0L) {
411 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
412 maxMeters = queryMaxMeters(deviceId);
413 }
414 // If we don't know the max, cannot proceed
415 if (maxMeters == 0L) {
416 return null;
417 }
418 // Get a new value
419 id = meterIdGenerators.incrementAndGet(deviceId);
420 // Check with the max, and if the value is bigger, cannot proceed
421 if (id >= maxMeters) {
422 return null;
423 }
424 // Done, return the value
425 return MeterId.meterId(id);
426 }
427
428 @Override
429 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
430 // Update the availability
431 updateMeterIdAvailability(deviceId, meterId, true);
432 }
433
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700434 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700435 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700436 public void event(MapEvent<MeterKey, MeterData> event) {
437 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700438 MeterData data = event.value().value();
439 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
440 switch (event.type()) {
441 case INSERT:
442 case UPDATE:
443 switch (data.meter().state()) {
444 case PENDING_ADD:
445 case PENDING_REMOVE:
446 if (!data.reason().isPresent() && local.equals(master)) {
447 notifyDelegate(
448 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
449 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
450 data.meter()));
451 } else if (data.reason().isPresent() && local.equals(data.origin())) {
452 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
453 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700454 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700455 }
456 break;
457 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100458 if (local.equals(data.origin()) &&
459 (data.meter().state() == MeterState.PENDING_ADD
460 || data.meter().state() == MeterState.ADDED)) {
461 futures.computeIfPresent(key, (k, v) -> {
462 notifyDelegate(
463 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
464 return null;
465 });
alshabibe1248b62015-08-20 17:21:55 -0700466 }
467 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700468 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700469 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700470 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700471 }
472 break;
473 default:
474 log.warn("Unknown meter state type {}", data.meter().state());
475 }
476 break;
477 case REMOVE:
478 //Only happens at origin so we do not need to care.
479 break;
480 default:
481 log.warn("Unknown Map event type {}", event.type());
482 }
483
484 }
485 }
486
487
alshabib7bb05012015-08-05 10:15:09 -0700488}