blob: 17f85ff99920358031ee15e6f94c67995544a2ca [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Daniele Morocfd77402021-07-15 17:02:59 +020025import org.onosproject.core.ApplicationId;
Jordi Ortizaa8de492016-12-01 00:21:36 +010026import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020027import org.onosproject.net.behaviour.MeterQuery;
28import org.onosproject.net.driver.DriverHandler;
29import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.net.meter.Band;
31import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010033import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070034import org.onosproject.net.meter.Meter;
Wailok Shumbe900bd2021-08-05 00:55:47 +080035import org.onosproject.net.meter.MeterCellId;
alshabib10c810b2015-08-18 16:59:04 -070036import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortiz6c847762017-01-30 17:13:05 +010040import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070041import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070042import org.onosproject.net.meter.MeterOperation;
Wailok Shumbe900bd2021-08-05 00:55:47 +080043import org.onosproject.net.meter.MeterScope;
alshabib10c810b2015-08-18 16:59:04 -070044import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
Wailok Shumbe900bd2021-08-05 00:55:47 +080048import org.onosproject.net.meter.MeterTableKey;
49import org.onosproject.net.pi.model.PiMeterId;
50import org.onosproject.net.pi.runtime.PiMeterCellId;
alshabib7bb05012015-08-05 10:15:09 -070051import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020052import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020054import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020056import org.onosproject.store.service.DistributedPrimitive;
57import org.onosproject.store.service.DistributedSet;
Wailok Shumbe900bd2021-08-05 00:55:47 +080058import org.onosproject.store.service.EventuallyConsistentMap;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
alshabibeadfc8e2015-08-18 15:40:46 -070061import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070063import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070064import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070065import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070066import org.onosproject.store.service.Versioned;
Wailok Shumbe900bd2021-08-05 00:55:47 +080067import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068import org.osgi.service.component.annotations.Activate;
69import org.osgi.service.component.annotations.Component;
70import org.osgi.service.component.annotations.Deactivate;
71import org.osgi.service.component.annotations.Reference;
72import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070073import org.slf4j.Logger;
74
75import java.util.Collection;
Wailok Shumbe900bd2021-08-05 00:55:47 +080076import java.util.concurrent.ConcurrentMap;
77import java.util.concurrent.ConcurrentHashMap;
Gamze Abakaf57ef602019-03-11 06:52:48 +000078import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070079import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000080import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020081import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070082import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020083import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070084
pierventreb8ca2ad2021-08-18 09:40:14 +020085import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080086import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010087import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
Wailok Shumbe900bd2021-08-05 00:55:47 +080088import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
89import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
Wailok Shum664e2492021-07-29 00:02:56 +080090import static org.onosproject.net.meter.MeterStoreResult.Type.FAIL;
alshabib7bb05012015-08-05 10:15:09 -070091import static org.slf4j.LoggerFactory.getLogger;
92
93/**
94 * A distributed meter store implementation. Meters are stored consistently
95 * across the cluster.
96 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070098public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
99 implements MeterStore {
100
101 private Logger log = getLogger(getClass());
102
pierventre1b8afbc2020-07-13 14:07:05 +0200103 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -0700104 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +0200105 private ConsistentMap<MeterKey, MeterData> meters;
Wailok Shumbe900bd2021-08-05 00:55:47 +0800106 private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
pierventre44220052020-09-22 12:51:06 +0200107 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -0700108
pierventre1b8afbc2020-07-13 14:07:05 +0200109 // Meters features related objects
110 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Wailok Shumbe900bd2021-08-05 00:55:47 +0800111 private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
112 private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
113 new InternalFeaturesMapEventListener();
pierventre1b8afbc2020-07-13 14:07:05 +0200114
115 // Meters id related objects
116 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
pierventreec0e9422021-09-22 11:24:38 +0200117 protected final ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds =
118 new ConcurrentHashMap<>();
pierventre1b8afbc2020-07-13 14:07:05 +0200119 private static final String METERIDSTORE = "onos-meters-id-store";
Wailok Shumbe900bd2021-08-05 00:55:47 +0800120 private AtomicCounterMap<MeterTableKey> meterIdGenerators;
pierventre1b8afbc2020-07-13 14:07:05 +0200121
Charles Chan593acf92017-11-22 13:55:41 -0800122 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
123 .register(KryoNamespaces.API)
124 .register(MeterKey.class)
125 .register(MeterData.class)
126 .register(DefaultMeter.class)
127 .register(DefaultBand.class)
128 .register(Band.Type.class)
129 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530130 .register(Meter.Unit.class)
pierventreb8ca2ad2021-08-18 09:40:14 +0200131 .register(MeterFailReason.class)
132 .register(MeterTableKey.class)
133 .register(MeterFeatures.class)
134 .register(DefaultMeterFeatures.class)
135 .register(MeterFeaturesFlag.class)
136 .register(MeterScope.class);
Charles Chan593acf92017-11-22 13:55:41 -0800137 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
138
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700140 private StorageService storageService;
141
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200143 protected DriverService driverService;
144
pierventre1b8afbc2020-07-13 14:07:05 +0200145 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700146 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700147 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700148
pierventreb8ca2ad2021-08-18 09:40:14 +0200149 // Control the user defined index mode for the store.
150 protected boolean userDefinedIndexMode = false;
151
Pier Luigif094c612017-10-14 12:15:02 +0200152 /**
153 * Defines possible selection strategies to reuse meter ids.
154 */
155 enum ReuseStrategy {
156 /**
157 * Select randomly an available id.
158 */
159 RANDOM,
160 /**
161 * Select the first one.
162 */
163 FIRST_FIT
164 }
Pier Luigif094c612017-10-14 12:15:02 +0200165 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100166
alshabib7bb05012015-08-05 10:15:09 -0700167 @Activate
168 public void activate() {
alshabib70aaa1b2015-09-25 14:30:59 -0700169 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700170 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800171 .withSerializer(serializer).build();
Wailok Shumbe900bd2021-08-05 00:55:47 +0800172 meters.addListener(metersMapListener);
pierventre44220052020-09-22 12:51:06 +0200173 metersMap = meters.asJavaMap();
pierventrea7708792021-09-10 09:37:29 +0200174
Wailok Shumbe900bd2021-08-05 00:55:47 +0800175 metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
176 .withName(METERFEATURESSTORE)
177 .withTimestampProvider((key, features) -> new WallClockTimestamp())
pierventreb8ca2ad2021-08-18 09:40:14 +0200178 .withSerializer(APP_KRYO_BUILDER).build();
Wailok Shumbe900bd2021-08-05 00:55:47 +0800179 metersFeatures.addListener(featuresMapListener);
pierventrea7708792021-09-10 09:37:29 +0200180
Wailok Shumbe900bd2021-08-05 00:55:47 +0800181 meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
Pier Luigif094c612017-10-14 12:15:02 +0200182 .withName(METERIDSTORE)
Wailok Shumbe900bd2021-08-05 00:55:47 +0800183 .withSerializer(Serializer.using(KryoNamespaces.API,
184 MeterTableKey.class,
185 MeterScope.class)).build();
pierventreb8ca2ad2021-08-18 09:40:14 +0200186
alshabib7bb05012015-08-05 10:15:09 -0700187 log.info("Started");
188 }
189
190 @Deactivate
191 public void deactivate() {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800192 meters.removeListener(metersMapListener);
193 metersFeatures.removeListener(featuresMapListener);
Andrea Campanella185cf872021-09-24 11:26:24 +0200194
195 // EC map does clean only the local state
Wailok Shumbe900bd2021-08-05 00:55:47 +0800196 metersFeatures.destroy();
Andrea Campanella185cf872021-09-24 11:26:24 +0200197 // Distributed set does not override the default behavior
pierventreb8ca2ad2021-08-18 09:40:14 +0200198 availableMeterIds.forEach((key, set) -> set.destroy());
199
alshabib7bb05012015-08-05 10:15:09 -0700200 log.info("Stopped");
201 }
202
alshabib7bb05012015-08-05 10:15:09 -0700203 @Override
Wailok Shum664e2492021-07-29 00:02:56 +0800204 public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
pierventreb8ca2ad2021-08-18 09:40:14 +0200205 checkArgument(validIndex(meter), "Meter index is not valid");
Wailok Shum664e2492021-07-29 00:02:56 +0800206 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
207 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
208 MeterData data = new MeterData(meter, null);
Wailok Shum664e2492021-07-29 00:02:56 +0800209 futures.put(key, future);
Wailok Shum664e2492021-07-29 00:02:56 +0800210 try {
211 meters.compute(key, (k, v) -> data);
212 } catch (StorageException e) {
213 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
214 e.getMessage(), e);
215 futures.remove(key);
216 future.completeExceptionally(e);
217 }
218 return future;
219 }
220
221 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700222 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
223 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum664e2492021-07-29 00:02:56 +0800224 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
alshabib70aaa1b2015-09-25 14:30:59 -0700225 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200226 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700227 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700228 try {
pierventrea7708792021-09-10 09:37:29 +0200229 Versioned<MeterData> versionedData = meters.computeIfPresent(key, (k, v) -> {
230 DefaultMeter m = (DefaultMeter) v.meter();
231 MeterState meterState = m.state();
232 if (meterState == MeterState.PENDING_REMOVE) {
233 return v;
234 }
235 m.setState(meter.state());
236 return new MeterData(m, v.reason().isPresent() ? v.reason().get() : null);
237 });
238 // If it does not exist in the system, completes immediately
239 if (versionedData == null) {
240 futures.remove(key);
alshabibe1248b62015-08-20 17:21:55 -0700241 future.complete(MeterStoreResult.success());
242 }
alshabibeadfc8e2015-08-18 15:40:46 -0700243 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200244 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
245 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900246 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700247 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700248 }
alshabibeadfc8e2015-08-18 15:40:46 -0700249 return future;
alshabib7bb05012015-08-05 10:15:09 -0700250 }
251
252 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100253 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800254 // Store meter features, this is done once for each features of every device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100255 MeterStoreResult result = MeterStoreResult.success();
Wailok Shumbe900bd2021-08-05 00:55:47 +0800256 MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
Jordi Ortizaa8de492016-12-01 00:21:36 +0100257 try {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800258 metersFeatures.put(key, meterfeatures);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100259 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200260 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
261 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100262 result = MeterStoreResult.fail(TIMEOUT);
263 }
264 return result;
265 }
266
267 @Override
Wailok Shum664e2492021-07-29 00:02:56 +0800268 public MeterStoreResult storeMeterFeatures(Collection<MeterFeatures> meterfeatures) {
269 // These store operations is treated as one single operation
270 // If one of them is failed, Fail is returned
271 // But the failed operation will not block the rest.
272 MeterStoreResult result = MeterStoreResult.success();
273 for (MeterFeatures mf : meterfeatures) {
274 if (storeMeterFeatures(mf).type() == FAIL) {
275 result = MeterStoreResult.fail(TIMEOUT);
276 }
277 }
278 return result;
279 }
280
281 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100282 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
283 MeterStoreResult result = MeterStoreResult.success();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100284 try {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800285 Set<MeterTableKey> keys = metersFeatures.keySet().stream()
286 .filter(key -> key.deviceId().equals(deviceId))
287 .collect(Collectors.toUnmodifiableSet());
pierventreb8ca2ad2021-08-18 09:40:14 +0200288 keys.forEach(k -> metersFeatures.remove(k));
Jordi Ortizaa8de492016-12-01 00:21:36 +0100289 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200290 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
Wailok Shumbe900bd2021-08-05 00:55:47 +0800291 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100292 result = MeterStoreResult.fail(TIMEOUT);
293 }
294 return result;
295 }
296
297 @Override
Wailok Shum664e2492021-07-29 00:02:56 +0800298 public MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures) {
pierventrea7708792021-09-10 09:37:29 +0200299 // Same logic of storeMeterFeatures
Wailok Shum664e2492021-07-29 00:02:56 +0800300 MeterStoreResult result = MeterStoreResult.success();
301 for (MeterFeatures mf : meterfeatures) {
302 try {
303 MeterTableKey key = MeterTableKey.key(mf.deviceId(), mf.scope());
304 metersFeatures.remove(key);
305 } catch (StorageException e) {
306 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
307 e.getMessage(), e);
308 result = MeterStoreResult.fail(TIMEOUT);
309 }
310 }
311
312 return result;
313 }
314
315 @Override
pierventre44220052020-09-22 12:51:06 +0200316 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200317 // Update meter if present (stats workflow)
Wailok Shum664e2492021-07-29 00:02:56 +0800318 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
pierventre44220052020-09-22 12:51:06 +0200319 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700320 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100321 MeterState meterState = m.state();
322 if (meterState == MeterState.PENDING_ADD) {
323 m.setState(meter.state());
324 }
alshabib7bb05012015-08-05 10:15:09 -0700325 m.setProcessedPackets(meter.packetsSeen());
326 m.setProcessedBytes(meter.bytesSeen());
327 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700328 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700329 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200330 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700331 });
pierventre44220052020-09-22 12:51:06 +0200332 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700333 }
334
335 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700336 public Meter getMeter(MeterKey key) {
337 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700338 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700339 }
340
341 @Override
342 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200343 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700344 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700345 }
346
347 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200348 public Collection<Meter> getAllMeters(DeviceId deviceId) {
349 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200350 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200351 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
352 MeterData::meter);
353 }
354
355 @Override
pierventre36b9a892021-08-27 15:25:02 +0200356 public Collection<Meter> getAllMeters(DeviceId deviceId, MeterScope scope) {
357 if (scope.equals(MeterScope.globalScope())) {
358 return Collections2.transform(
359 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Daniele Moro4a1b4fb2022-02-08 11:08:39 +0100360 (MeterData m) -> m.meter().meterCellId().type() == INDEX &&
361 m.meter().deviceId().equals(deviceId)),
pierventre36b9a892021-08-27 15:25:02 +0200362 MeterData::meter);
363 }
364 return Collections2.transform(
365 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
366 (MeterData m) -> m.meter().meterCellId().type() == PIPELINE_INDEPENDENT &&
Daniele Moro4a1b4fb2022-02-08 11:08:39 +0100367 m.meter().deviceId().equals(deviceId) &&
pierventre36b9a892021-08-27 15:25:02 +0200368 ((PiMeterCellId) m.meter().meterCellId()).meterId().id().equals(scope.id())),
369 MeterData::meter);
370 }
371
372 @Override
alshabib7bb05012015-08-05 10:15:09 -0700373 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200374 // Meter ops failed (got notification from the sb)
pierventreb8ca2ad2021-08-18 09:40:14 +0200375 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200376 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700377 }
378
alshabib5eb79392015-08-19 18:09:55 -0700379 @Override
Wailok Shumbe900bd2021-08-05 00:55:47 +0800380 public void purgeMeter(Meter m) {
pierventrea7708792021-09-10 09:37:29 +0200381 // Once we receive the ack from the sb, create the key
382 // remove definitely the meter and free the id
pierventreb8ca2ad2021-08-18 09:40:14 +0200383 MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200384 try {
385 if (Versioned.valueOrNull(meters.remove(key)) != null) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800386 MeterScope scope;
387 if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
388 PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
389 scope = MeterScope.of(piMeterCellId.meterId().id());
390 } else {
391 scope = MeterScope.globalScope();
392 }
393 MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
394 freeMeterId(meterTableKey, m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200395 }
396 } catch (StorageException e) {
397 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
398 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100399 }
alshabib5eb79392015-08-19 18:09:55 -0700400 }
401
Jordi Ortizaa8de492016-12-01 00:21:36 +0100402 @Override
pierventreb8ca2ad2021-08-18 09:40:14 +0200403 public void purgeMeters(DeviceId deviceId) {
Gamze Abakaf57ef602019-03-11 06:52:48 +0000404 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
405 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
406 .map(Map.Entry::getValue)
407 .collect(Collectors.toList());
Gamze Abakaf57ef602019-03-11 06:52:48 +0000408 metersPendingRemove.forEach(versionedMeterKey
Wailok Shumbe900bd2021-08-05 00:55:47 +0800409 -> purgeMeter(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000410 }
411
412 @Override
Daniele Morocfd77402021-07-15 17:02:59 +0200413 public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
414 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
415 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
416 e.getValue().value().meter().appId().equals(appId))
417 .map(Map.Entry::getValue)
418 .collect(Collectors.toList());
pierventreb8ca2ad2021-08-18 09:40:14 +0200419 metersPendingRemove.forEach(versionedMeterKey
420 -> purgeMeter(versionedMeterKey.value().meter()));
421 }
422
423 @Override
424 public boolean userDefinedIndexMode(boolean enable) {
425 if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
426 userDefinedIndexMode = enable;
427 } else {
428 log.warn("Unable to {} user defined index mode as store did" +
429 "already some allocations", enable ? "activate" : "deactivate");
430 }
431 return userDefinedIndexMode;
Daniele Morocfd77402021-07-15 17:02:59 +0200432 }
433
pierventrea7708792021-09-10 09:37:29 +0200434 protected long getMaxMeters(MeterTableKey key) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800435 MeterFeatures features = metersFeatures.get(key);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100436 return features == null ? 0L : features.maxMeter();
437 }
438
pierventrea7708792021-09-10 09:37:29 +0200439 // Validate index using the meter features, useful mainly
440 // when user defined index mode is enabled
pierventreb8ca2ad2021-08-18 09:40:14 +0200441 private boolean validIndex(Meter meter) {
442 long index;
443 MeterTableKey key;
444
445 if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
446 PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
447 index = piMeterCellId.index();
448 key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
449 } else if (meter.meterCellId().type() == INDEX) {
450 MeterId meterId = (MeterId) meter.meterCellId();
451 index = meterId.id();
452 key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
453 } else {
pierventrea7708792021-09-10 09:37:29 +0200454 log.warn("Unable to validate index unsupported cell type {}", meter.meterCellId().type());
pierventreb8ca2ad2021-08-18 09:40:14 +0200455 return false;
456 }
457
458 MeterFeatures features = metersFeatures.get(key);
459 long startIndex = features == null ? -1L : features.startIndex();
460 long endIndex = features == null ? -1L : features.endIndex();
461 return index >= startIndex && index <= endIndex;
462 }
463
Wailok Shumbe900bd2021-08-05 00:55:47 +0800464 private long getStartIndex(MeterTableKey key) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800465 MeterFeatures features = metersFeatures.get(key);
466 return features == null ? -1L : features.startIndex();
467 }
468
469 private long getEndIndex(MeterTableKey key) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800470 MeterFeatures features = metersFeatures.get(key);
471 return features == null ? -1L : features.endIndex();
472 }
473
pierventrea7708792021-09-10 09:37:29 +0200474 // queryMaxMeters is implemented in MeterQuery behaviour implementations.
Pier Luigif094c612017-10-14 12:15:02 +0200475 private long queryMaxMeters(DeviceId device) {
Pier Luigif094c612017-10-14 12:15:02 +0200476 DriverHandler handler = driverService.createHandler(device);
Pier Luigif094c612017-10-14 12:15:02 +0200477 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
Pier Luigif094c612017-10-14 12:15:02 +0200478 return 0L;
479 }
pierventrea7708792021-09-10 09:37:29 +0200480
481 // FIXME architecturally this is not right, we should fallback to this
482 // behavior in the providers. Once we do that we can remove this code.
Pier Luigif094c612017-10-14 12:15:02 +0200483 MeterQuery query = handler.behaviour(MeterQuery.class);
pierventrea7708792021-09-10 09:37:29 +0200484 // This results to be necessary because the available ids sets are created
485 // in the meter features map listener if the device does not provide the meter
486 // feature this is the only chance to create this set.
Wailok Shumbe900bd2021-08-05 00:55:47 +0800487 String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
488 MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
489 insertAvailableKeySet(meterTableKey, setName);
pierventrea7708792021-09-10 09:37:29 +0200490
Pier Luigif094c612017-10-14 12:15:02 +0200491 return query.getMaxMeters();
492 }
493
Wailok Shumbe900bd2021-08-05 00:55:47 +0800494 private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
Pier Luigif094c612017-10-14 12:15:02 +0200495 boolean available) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800496 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
497 if (keySet == null) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800498 log.warn("Reusable Key set for device: {} scope: {} not found",
499 meterTableKey.deviceId(), meterTableKey.scope());
500 return false;
501 }
pierventrea7708792021-09-10 09:37:29 +0200502
Pier Luigif094c612017-10-14 12:15:02 +0200503 // According to available, make available or unavailable a meter key
Wailok Shumbe900bd2021-08-05 00:55:47 +0800504 DeviceId deviceId = meterTableKey.deviceId();
505 return available ? keySet.add(MeterKey.key(deviceId, id)) :
506 keySet.remove(MeterKey.key(deviceId, id));
Pier Luigif094c612017-10-14 12:15:02 +0200507 }
508
Wailok Shumbe900bd2021-08-05 00:55:47 +0800509 private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
Pier Luigif094c612017-10-14 12:15:02 +0200510 if (availableIds.isEmpty()) {
Pier Luigif094c612017-10-14 12:15:02 +0200511 return null;
512 }
pierventrea7708792021-09-10 09:37:29 +0200513
Pier Luigif094c612017-10-14 12:15:02 +0200514 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
515 return availableIds.iterator().next();
516 }
pierventrea7708792021-09-10 09:37:29 +0200517
518 // If it is random, get the size and return a random element
Pier Luigif094c612017-10-14 12:15:02 +0200519 int size = availableIds.size();
Pier Luigif094c612017-10-14 12:15:02 +0200520 return Iterables.get(availableIds, RandomUtils.nextInt(size));
521 }
522
pierventrea7708792021-09-10 09:37:29 +0200523 // Implements reuse strategy of the meter cell ids
Wailok Shumbe900bd2021-08-05 00:55:47 +0800524 private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800525 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
526 if (keySet == null) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800527 log.warn("Reusable Key set for device: {} scope: {} not found",
528 meterTableKey.deviceId(), meterTableKey.scope());
529 return null;
530 }
pierventrea7708792021-09-10 09:37:29 +0200531
Wailok Shumbe900bd2021-08-05 00:55:47 +0800532 Set<MeterCellId> localAvailableMeterIds = keySet.stream()
533 .filter(meterKey ->
534 meterKey.deviceId().equals(meterTableKey.deviceId()))
pierventreb8ca2ad2021-08-18 09:40:14 +0200535 .map(MeterKey::meterCellId)
Pier Luigif094c612017-10-14 12:15:02 +0200536 .collect(Collectors.toSet());
Wailok Shumbe900bd2021-08-05 00:55:47 +0800537 MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
Pier Luigif094c612017-10-14 12:15:02 +0200538 while (meterId != null) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800539 if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
Pier Luigif094c612017-10-14 12:15:02 +0200540 return meterId;
541 }
Pier Luigif094c612017-10-14 12:15:02 +0200542 localAvailableMeterIds.remove(meterId);
Pier Luigif094c612017-10-14 12:15:02 +0200543 meterId = getNextAvailableId(localAvailableMeterIds);
544 }
pierventrea7708792021-09-10 09:37:29 +0200545 // there are no available ids that can be reused
Pier Luigif094c612017-10-14 12:15:02 +0200546 return null;
547 }
548
549 @Override
Wailok Shumbe900bd2021-08-05 00:55:47 +0800550 public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
pierventreb8ca2ad2021-08-18 09:40:14 +0200551 if (userDefinedIndexMode) {
552 log.warn("Unable to allocate meter id when user defined index mode is enabled");
553 return null;
554 }
Wailok Shumbe900bd2021-08-05 00:55:47 +0800555 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
556 MeterCellId meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200557 long id;
Wailok Shumbe900bd2021-08-05 00:55:47 +0800558 // First, search for reusable key
559 meterCellId = firstReusableMeterId(meterTableKey);
560 if (meterCellId != null) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800561 return meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200562 }
Wailok Shumbe900bd2021-08-05 00:55:47 +0800563 // If there was no reusable meter id we have to generate a new value
564 // using start and end index as lower and upper bound respectively.
565 long startIndex = getStartIndex(meterTableKey);
566 long endIndex = getEndIndex(meterTableKey);
pierventrea7708792021-09-10 09:37:29 +0200567 // If the device does not give us MeterFeatures fallback to queryMeters
Wailok Shumbe900bd2021-08-05 00:55:47 +0800568 if (startIndex == -1L || endIndex == -1L) {
pierventrea7708792021-09-10 09:37:29 +0200569 // Only meaningful for OpenFlow today
Wailok Shumbe900bd2021-08-05 00:55:47 +0800570 long maxMeters = queryMaxMeters(deviceId);
Wailok Shumbe900bd2021-08-05 00:55:47 +0800571 if (maxMeters == 0L) {
572 return null;
573 } else {
Wailok Shum1d8c5e82021-09-13 17:23:20 +0800574 // OpenFlow meter index starts from 1, ends with max
Wailok Shumbe900bd2021-08-05 00:55:47 +0800575 startIndex = 1L;
Wailok Shum1d8c5e82021-09-13 17:23:20 +0800576 endIndex = maxMeters;
Wailok Shumbe900bd2021-08-05 00:55:47 +0800577 }
Pier Luigif094c612017-10-14 12:15:02 +0200578 }
pierventrea7708792021-09-10 09:37:29 +0200579
Wailok Shumbe900bd2021-08-05 00:55:47 +0800580 do {
Wailok Shumfa3b3282021-08-22 19:35:34 +0800581 id = meterIdGenerators.getAndIncrement(meterTableKey);
Wailok Shumbe900bd2021-08-05 00:55:47 +0800582 } while (id < startIndex);
Wailok Shumbe900bd2021-08-05 00:55:47 +0800583 if (id > endIndex) {
Pier Luigif094c612017-10-14 12:15:02 +0200584 return null;
585 }
pierventrea7708792021-09-10 09:37:29 +0200586
587 // For backward compatibility if we are using global scope,
588 // return a MeterId, otherwise we create a PiMeterCellId
Wailok Shumbe900bd2021-08-05 00:55:47 +0800589 if (meterScope.isGlobal()) {
590 return MeterId.meterId(id);
591 } else {
592 return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
593 }
594
Pier Luigif094c612017-10-14 12:15:02 +0200595 }
596
597 @Override
598 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Wailok Shumbe900bd2021-08-05 00:55:47 +0800599 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
600 freeMeterId(meterTableKey, meterId);
601 }
602
pierventrea7708792021-09-10 09:37:29 +0200603 protected void freeMeterId(DeviceId deviceId, MeterCellId meterCellId) {
604 MeterTableKey meterTableKey;
605 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
606 meterTableKey = MeterTableKey.key(deviceId,
607 MeterScope.of(((PiMeterCellId) meterCellId).meterId().id()));
608 } else if (meterCellId.type() == INDEX) {
609 meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
610 } else {
611 log.warn("Unable to free meter id unsupported cell type {}", meterCellId.type());
612 return;
613 }
614 freeMeterId(meterTableKey, meterCellId);
615 }
616
617 protected void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
pierventreb8ca2ad2021-08-18 09:40:14 +0200618 if (userDefinedIndexMode) {
pierventrea7708792021-09-10 09:37:29 +0200619 log.debug("Unable to free meter id when user defined index mode is enabled");
pierventreb8ca2ad2021-08-18 09:40:14 +0200620 return;
621 }
Wailok Shumbe900bd2021-08-05 00:55:47 +0800622 long index;
623 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
624 PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
625 index = piMeterCellId.index();
626 } else if (meterCellId.type() == INDEX) {
627 MeterId meterId = (MeterId) meterCellId;
628 index = meterId.id();
629 } else {
pierventrea7708792021-09-10 09:37:29 +0200630 log.warn("Unable to free meter id unsupported cell type {}", meterCellId.type());
Wailok Shumbe900bd2021-08-05 00:55:47 +0800631 return;
632 }
Pier Luigibdcd9672017-10-13 13:54:48 +0200633 // Avoid to free meter not allocated
Wailok Shumfa3b3282021-08-22 19:35:34 +0800634 if (meterIdGenerators.get(meterTableKey) <= index) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200635 return;
636 }
Wailok Shumbe900bd2021-08-05 00:55:47 +0800637 updateMeterIdAvailability(meterTableKey, meterCellId, true);
Pier Luigif094c612017-10-14 12:15:02 +0200638 }
639
pierventre1b8afbc2020-07-13 14:07:05 +0200640 // Enabling the events distribution across the cluster
Wailok Shumbe900bd2021-08-05 00:55:47 +0800641 private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700642 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700643 public void event(MapEvent<MeterKey, MeterData> event) {
644 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700645 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
646 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200647 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700648 switch (event.type()) {
649 case INSERT:
650 case UPDATE:
651 switch (data.meter().state()) {
652 case PENDING_ADD:
653 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200654 // Two cases. If there is a reason, the meter operation failed.
655 // Otherwise, we are ready to install/remove through the delegate.
656 if (data.reason().isEmpty()) {
657 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
658 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
659 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100660 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200661 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100662 return null;
663 });
alshabibe1248b62015-08-20 17:21:55 -0700664 }
665 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200666 case ADDED:
667 // Transition from pending to installed
668 if (data.meter().state() == MeterState.ADDED &&
669 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
670 futures.computeIfPresent(key, (k, v) -> {
671 v.complete(MeterStoreResult.success());
672 return null;
673 });
674 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
pierventre36b9a892021-08-27 15:25:02 +0200675 // Update stats case - we report reference count zero only for INDEX based meters
676 } else if (data.meter().referenceCount() == 0 &&
677 data.meter().meterCellId().type() == INDEX) {
pierventre1b8afbc2020-07-13 14:07:05 +0200678 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
679 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700680 }
681 break;
682 default:
683 log.warn("Unknown meter state type {}", data.meter().state());
684 }
685 break;
686 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200687 futures.computeIfPresent(key, (k, v) -> {
688 v.complete(MeterStoreResult.success());
689 return null;
690 });
pierventre1b8afbc2020-07-13 14:07:05 +0200691 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700692 break;
693 default:
694 log.warn("Unknown Map event type {}", event.type());
695 }
alshabibeadfc8e2015-08-18 15:40:46 -0700696 }
697 }
698
Wailok Shumbe900bd2021-08-05 00:55:47 +0800699 private class InternalFeaturesMapEventListener implements
700 EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
701 @Override
702 public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
703 MeterTableKey meterTableKey = event.key();
704 MeterFeatures meterFeatures = event.value();
705 switch (event.type()) {
706 case PUT:
Wailok Shumbe900bd2021-08-05 00:55:47 +0800707 String setName = AVAILABLEMETERIDSTORE + "-" +
708 meterFeatures.deviceId() + meterFeatures.scope().id();
709 insertAvailableKeySet(meterTableKey, setName);
710 break;
711 case REMOVE:
Wailok Shumbe900bd2021-08-05 00:55:47 +0800712 DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
713 if (set != null) {
714 set.destroy();
715 }
716 break;
717 default:
718 break;
719 }
720 }
721 }
722
723 private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
724 DistributedSet<MeterKey> availableMeterIdSet =
725 new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
726 .withName(setName)
727 .withSerializer(Serializer.using(KryoNamespaces.API,
728 MeterKey.class)).build(),
729 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
730 availableMeterIds.put(meterTableKey, availableMeterIdSet);
731 }
alshabib7bb05012015-08-05 10:15:09 -0700732}