blob: 29036635c1883e84fd66f7dc0612abdd9469bee6 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
Pier Luigif094c612017-10-14 12:15:02 +020019import com.google.common.collect.Iterables;
alshabibeadfc8e2015-08-18 15:40:46 -070020import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020021import org.apache.commons.lang.math.RandomUtils;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070023import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070024import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070027import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010031import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020032import org.onosproject.net.behaviour.MeterQuery;
33import org.onosproject.net.driver.DriverHandler;
34import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070035import org.onosproject.net.meter.Band;
36import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070037import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010038import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070039import org.onosproject.net.meter.Meter;
40import org.onosproject.net.meter.MeterEvent;
41import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010042import org.onosproject.net.meter.MeterFeatures;
43import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010044import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070045import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070046import org.onosproject.net.meter.MeterOperation;
47import org.onosproject.net.meter.MeterState;
48import org.onosproject.net.meter.MeterStore;
49import org.onosproject.net.meter.MeterStoreDelegate;
50import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070051import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020052import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020054import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020056import org.onosproject.store.service.DistributedPrimitive;
57import org.onosproject.store.service.DistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070058import org.onosproject.store.service.MapEvent;
59import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070060import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070061import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070062import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070063import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070064import org.slf4j.Logger;
65
66import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070067import java.util.Map;
Pier Luigif094c612017-10-14 12:15:02 +020068import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070069import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020070import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070071
Pier Luigif094c612017-10-14 12:15:02 +020072import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010073import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070074import static org.slf4j.LoggerFactory.getLogger;
75
76/**
77 * A distributed meter store implementation. Meters are stored consistently
78 * across the cluster.
79 */
alshabib58fe6dc2015-08-19 17:16:13 -070080@Component(immediate = true)
81@Service
alshabib7bb05012015-08-05 10:15:09 -070082public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
83 implements MeterStore {
84
85 private Logger log = getLogger(getClass());
86
87 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010088 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010089 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
Pier Luigif094c612017-10-14 12:15:02 +020090 private static final String METERIDSTORE = "onos-meters-id-store";
alshabib7bb05012015-08-05 10:15:09 -070091
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 private StorageService storageService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070096 private MastershipService mastershipService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 private ClusterService clusterService;
100
Pier Luigif094c612017-10-14 12:15:02 +0200101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected DriverService driverService;
103
alshabib70aaa1b2015-09-25 14:30:59 -0700104 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -0700105 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -0700106
Jordi Ortizaa8de492016-12-01 00:21:36 +0100107 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
108
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700109 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -0700110
alshabib70aaa1b2015-09-25 14:30:59 -0700111 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700112 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700113
Pier Luigif094c612017-10-14 12:15:02 +0200114 // Available meter identifiers
115 private DistributedSet<MeterKey> availableMeterIds;
116
117 // Atomic counter map for generation of new identifiers;
118 private AtomicCounterMap<DeviceId> meterIdGenerators;
119
120 /**
121 * Defines possible selection strategies to reuse meter ids.
122 */
123 enum ReuseStrategy {
124 /**
125 * Select randomly an available id.
126 */
127 RANDOM,
128 /**
129 * Select the first one.
130 */
131 FIRST_FIT
132 }
133
134 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100135
alshabib7bb05012015-08-05 10:15:09 -0700136 @Activate
137 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700138 local = clusterService.getLocalNode().id();
139
alshabib70aaa1b2015-09-25 14:30:59 -0700140 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700141 .withName(METERSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200142 .withSerializer(Serializer.using(KryoNamespaces.API,
alshabib70aaa1b2015-09-25 14:30:59 -0700143 MeterKey.class,
alshabib58fe6dc2015-08-19 17:16:13 -0700144 MeterData.class,
145 DefaultMeter.class,
146 DefaultBand.class,
147 Band.Type.class,
148 MeterState.class,
149 Meter.Unit.class,
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700150 MeterFailReason.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700151
alshabibeadfc8e2015-08-18 15:40:46 -0700152 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700153
Jordi Ortizaa8de492016-12-01 00:21:36 +0100154 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
155 .withName(METERFEATURESSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200156 .withSerializer(Serializer.using(KryoNamespaces.API,
157 MeterFeaturesKey.class,
158 MeterFeatures.class,
159 DefaultMeterFeatures.class,
160 Band.Type.class,
161 Meter.Unit.class,
162 MeterFailReason.class)).build();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100163
Pier Luigif094c612017-10-14 12:15:02 +0200164 // Init the set of the available ids
165 availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
Jordi Ortiz6c847762017-01-30 17:13:05 +0100166 .withName(AVAILABLEMETERIDSTORE)
Pier Luigif094c612017-10-14 12:15:02 +0200167 .withSerializer(Serializer.using(KryoNamespaces.API,
168 MeterKey.class)).build(),
169 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
170
171 // Init atomic map counters
172 meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
173 .withName(METERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700174 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100175
alshabib7bb05012015-08-05 10:15:09 -0700176 log.info("Started");
177 }
178
179 @Deactivate
180 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700181 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700182 log.info("Stopped");
183 }
184
alshabib7bb05012015-08-05 10:15:09 -0700185 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700186 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200187 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700188 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700189 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200190 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700191 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200192 // Store the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700193 MeterData data = new MeterData(meter, null, local);
alshabibeadfc8e2015-08-18 15:40:46 -0700194 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700195 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700196 } catch (StorageException e) {
197 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700198 }
Pier Luigif094c612017-10-14 12:15:02 +0200199 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700200 return future;
alshabib7bb05012015-08-05 10:15:09 -0700201 }
202
203 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700204 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200205 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700206 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700207 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200208 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700209 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200210 // Create the meter data
alshabibeadfc8e2015-08-18 15:40:46 -0700211 MeterData data = new MeterData(meter, null, local);
Pier Luigif094c612017-10-14 12:15:02 +0200212 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700213 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700214 try {
Pier Luigif094c612017-10-14 12:15:02 +0200215 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700216 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200217 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700218 future.complete(MeterStoreResult.success());
219 }
alshabibeadfc8e2015-08-18 15:40:46 -0700220 } catch (StorageException e) {
221 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700222 }
Pier Luigif094c612017-10-14 12:15:02 +0200223 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700224 return future;
alshabib7bb05012015-08-05 10:15:09 -0700225 }
226
227 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100228 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
229 MeterStoreResult result = MeterStoreResult.success();
230 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
231 try {
232 meterFeatures.putIfAbsent(key, meterfeatures);
233 } catch (StorageException e) {
234 result = MeterStoreResult.fail(TIMEOUT);
235 }
236 return result;
237 }
238
239 @Override
240 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
241 MeterStoreResult result = MeterStoreResult.success();
242 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
243 try {
244 meterFeatures.remove(key);
245 } catch (StorageException e) {
246 result = MeterStoreResult.fail(TIMEOUT);
247 }
248 return result;
249 }
250
251 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700252 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
253 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700254 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
255 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700256
alshabibeadfc8e2015-08-18 15:40:46 -0700257 MeterData data = new MeterData(meter, null, local);
258 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700259 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700260 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
261 }
alshabibeadfc8e2015-08-18 15:40:46 -0700262 } catch (StorageException e) {
263 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700264 }
alshabibeadfc8e2015-08-18 15:40:46 -0700265 return future;
alshabib7bb05012015-08-05 10:15:09 -0700266 }
267
268 @Override
269 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700270 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
271 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700272 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700273 m.setState(meter.state());
274 m.setProcessedPackets(meter.packetsSeen());
275 m.setProcessedBytes(meter.bytesSeen());
276 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700277 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700278 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700279 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700280 });
281 }
282
283 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700284 public Meter getMeter(MeterKey key) {
285 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700286 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700287 }
288
289 @Override
290 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700291 return Collections2.transform(meters.asJavaMap().values(),
292 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700293 }
294
295 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200296 public Collection<Meter> getAllMeters(DeviceId deviceId) {
297 return Collections2.transform(
298 Collections2.filter(meters.asJavaMap().values(),
299 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
300 MeterData::meter);
301 }
302
303 @Override
alshabib7bb05012015-08-05 10:15:09 -0700304 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700305 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
306 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700307 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700308 }
309
alshabib5eb79392015-08-19 18:09:55 -0700310 @Override
311 public void deleteMeterNow(Meter m) {
Pier Luigif094c612017-10-14 12:15:02 +0200312 // Create the key
alshabib70aaa1b2015-09-25 14:30:59 -0700313 MeterKey key = MeterKey.key(m.deviceId(), m.id());
Pier Luigif094c612017-10-14 12:15:02 +0200314 // Remove the future
alshabib70aaa1b2015-09-25 14:30:59 -0700315 futures.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200316 // Remove the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700317 meters.remove(key);
Pier Luigif094c612017-10-14 12:15:02 +0200318 // Free the id
319 freeMeterId(m.deviceId(), m.id());
320 // Finally notify the delegate
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100321 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700322 }
323
Jordi Ortizaa8de492016-12-01 00:21:36 +0100324 @Override
325 public long getMaxMeters(MeterFeaturesKey key) {
326 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
327 return features == null ? 0L : features.maxMeter();
328 }
329
Pier Luigif094c612017-10-14 12:15:02 +0200330 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
331 private long queryMaxMeters(DeviceId device) {
332 // Get driver handler for this device
333 DriverHandler handler = driverService.createHandler(device);
334 // If creation failed or the device does not have this behavior
335 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
336 // We cannot know max meter
337 return 0L;
338 }
339 // Get the behavior
340 MeterQuery query = handler.behaviour(MeterQuery.class);
341 // Return as max meter the result of the query
342 return query.getMaxMeters();
343 }
344
345 private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
346 boolean available) {
347 // According to available, make available or unavailable a meter key
348 return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
349 availableMeterIds.remove(MeterKey.key(deviceId, id));
350 }
351
352 private MeterId getNextAvailableId(Set<MeterId> availableIds) {
353 // If there are no available ids
354 if (availableIds.isEmpty()) {
355 // Just end the cycle
356 return null;
357 }
358 // If it is the first fit
359 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
360 return availableIds.iterator().next();
361 }
362 // If it is random, get the size
363 int size = availableIds.size();
364 // Return a random element
365 return Iterables.get(availableIds, RandomUtils.nextInt(size));
366 }
367
368 // Implements reuse strategy
369 private MeterId firstReusableMeterId(DeviceId deviceId) {
370 // Filter key related to device id, and reduce to meter ids
371 Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
372 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
373 .map(MeterKey::meterId)
374 .collect(Collectors.toSet());
375 // Get next available id
376 MeterId meterId = getNextAvailableId(localAvailableMeterIds);
377 // Iterate until there are items
378 while (meterId != null) {
379 // If we are able to reserve the id
380 if (updateMeterIdAvailability(deviceId, meterId, false)) {
381 // Just end
382 return meterId;
383 }
384 // Update the set
385 localAvailableMeterIds.remove(meterId);
386 // Try another time
387 meterId = getNextAvailableId(localAvailableMeterIds);
388 }
389 // No reusable ids
390 return null;
391 }
392
393 @Override
394 public MeterId allocateMeterId(DeviceId deviceId) {
395 // Init steps
396 MeterId meterId;
397 long id;
398 // Try to reuse meter id
399 meterId = firstReusableMeterId(deviceId);
400 // We found a reusable id, return
401 if (meterId != null) {
402 return meterId;
403 }
404 // If there was no reusable MeterId we have to generate a new value
405 // using maxMeters as upper limit.
406 long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
407 // If the device does not give us MeterFeatures
408 if (maxMeters == 0L) {
409 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
410 maxMeters = queryMaxMeters(deviceId);
411 }
412 // If we don't know the max, cannot proceed
413 if (maxMeters == 0L) {
414 return null;
415 }
416 // Get a new value
417 id = meterIdGenerators.incrementAndGet(deviceId);
418 // Check with the max, and if the value is bigger, cannot proceed
419 if (id >= maxMeters) {
420 return null;
421 }
422 // Done, return the value
423 return MeterId.meterId(id);
424 }
425
426 @Override
427 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
428 // Update the availability
429 updateMeterIdAvailability(deviceId, meterId, true);
430 }
431
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700432 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700433 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700434 public void event(MapEvent<MeterKey, MeterData> event) {
435 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700436 MeterData data = event.value().value();
437 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
438 switch (event.type()) {
439 case INSERT:
440 case UPDATE:
441 switch (data.meter().state()) {
442 case PENDING_ADD:
443 case PENDING_REMOVE:
444 if (!data.reason().isPresent() && local.equals(master)) {
445 notifyDelegate(
446 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
447 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
448 data.meter()));
449 } else if (data.reason().isPresent() && local.equals(data.origin())) {
450 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
451 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700452 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700453 }
454 break;
455 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100456 if (local.equals(data.origin()) &&
457 (data.meter().state() == MeterState.PENDING_ADD
458 || data.meter().state() == MeterState.ADDED)) {
459 futures.computeIfPresent(key, (k, v) -> {
460 notifyDelegate(
461 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
462 return null;
463 });
alshabibe1248b62015-08-20 17:21:55 -0700464 }
465 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700466 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700467 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700468 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700469 }
470 break;
471 default:
472 log.warn("Unknown meter state type {}", data.meter().state());
473 }
474 break;
475 case REMOVE:
476 //Only happens at origin so we do not need to care.
477 break;
478 default:
479 log.warn("Unknown Map event type {}", event.type());
480 }
481
482 }
483 }
484
485
alshabib7bb05012015-08-05 10:15:09 -0700486}