blob: 5b9252d9404c307bd556d18b4a7c2fd0993d5447 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Daniele Moro43ac2892021-07-15 17:02:59 +020025import org.onosproject.core.ApplicationId;
Jordi Ortizaa8de492016-12-01 00:21:36 +010026import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020027import org.onosproject.net.behaviour.MeterQuery;
28import org.onosproject.net.driver.DriverHandler;
29import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.net.meter.Band;
31import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010033import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070034import org.onosproject.net.meter.Meter;
Wailok Shumf013a782021-07-26 16:51:01 +080035import org.onosproject.net.meter.MeterCellId;
alshabib10c810b2015-08-18 16:59:04 -070036import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010040import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
Wailok Shumf013a782021-07-26 16:51:01 +080044import org.onosproject.net.meter.MeterScope;
alshabib10c810b2015-08-18 16:59:04 -070045import org.onosproject.net.meter.MeterState;
46import org.onosproject.net.meter.MeterStore;
47import org.onosproject.net.meter.MeterStoreDelegate;
48import org.onosproject.net.meter.MeterStoreResult;
Wailok Shumf013a782021-07-26 16:51:01 +080049import org.onosproject.net.meter.MeterTableKey;
50import org.onosproject.net.pi.model.PiMeterId;
51import org.onosproject.net.pi.runtime.PiMeterCellId;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070054import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020055import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070056import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020057import org.onosproject.store.service.DistributedPrimitive;
58import org.onosproject.store.service.DistributedSet;
Wailok Shumf013a782021-07-26 16:51:01 +080059import org.onosproject.store.service.EventuallyConsistentMap;
60import org.onosproject.store.service.EventuallyConsistentMapEvent;
61import org.onosproject.store.service.EventuallyConsistentMapListener;
alshabibeadfc8e2015-08-18 15:40:46 -070062import org.onosproject.store.service.MapEvent;
63import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070064import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070065import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070066import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070067import org.onosproject.store.service.Versioned;
Wailok Shumf013a782021-07-26 16:51:01 +080068import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
72import org.osgi.service.component.annotations.Reference;
73import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070074import org.slf4j.Logger;
75
76import java.util.Collection;
Wailok Shumf013a782021-07-26 16:51:01 +080077import java.util.concurrent.ConcurrentMap;
78import java.util.concurrent.ConcurrentHashMap;
Gamze Abakaf57ef602019-03-11 06:52:48 +000079import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070080import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000081import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020082import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070083import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020084import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070085
pierventre3b39bd82021-08-18 09:40:14 +020086import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080087import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010088import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
Wailok Shumf013a782021-07-26 16:51:01 +080089import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
90import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
Wailok Shum6a249352021-07-29 00:02:56 +080091import static org.onosproject.net.meter.MeterStoreResult.Type.FAIL;
alshabib7bb05012015-08-05 10:15:09 -070092import static org.slf4j.LoggerFactory.getLogger;
93
94/**
95 * A distributed meter store implementation. Meters are stored consistently
96 * across the cluster.
97 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070099public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
100 implements MeterStore {
101
102 private Logger log = getLogger(getClass());
103
pierventre1b8afbc2020-07-13 14:07:05 +0200104 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -0700105 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +0200106 private ConsistentMap<MeterKey, MeterData> meters;
Wailok Shumf013a782021-07-26 16:51:01 +0800107 private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
pierventre44220052020-09-22 12:51:06 +0200108 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -0700109
pierventre1b8afbc2020-07-13 14:07:05 +0200110 // Meters features related objects
111 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800112 private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
113 private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
114 new InternalFeaturesMapEventListener();
pierventre1b8afbc2020-07-13 14:07:05 +0200115
116 // Meters id related objects
117 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
118 // Available meter identifiers
pierventre3b39bd82021-08-18 09:40:14 +0200119 protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
pierventre1b8afbc2020-07-13 14:07:05 +0200120 // Atomic counter map for generation of new identifiers;
121 private static final String METERIDSTORE = "onos-meters-id-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800122 private AtomicCounterMap<MeterTableKey> meterIdGenerators;
pierventre1b8afbc2020-07-13 14:07:05 +0200123
124 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800125 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
126 .register(KryoNamespaces.API)
127 .register(MeterKey.class)
128 .register(MeterData.class)
129 .register(DefaultMeter.class)
130 .register(DefaultBand.class)
131 .register(Band.Type.class)
132 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530133 .register(Meter.Unit.class)
pierventre3b39bd82021-08-18 09:40:14 +0200134 .register(MeterFailReason.class)
135 .register(MeterTableKey.class)
136 .register(MeterFeatures.class)
137 .register(DefaultMeterFeatures.class)
138 .register(MeterFeaturesFlag.class)
139 .register(MeterScope.class);
Charles Chan593acf92017-11-22 13:55:41 -0800140 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
141
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700143 private StorageService storageService;
144
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200146 protected DriverService driverService;
147
pierventre1b8afbc2020-07-13 14:07:05 +0200148 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700149 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700150 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700151
pierventre3b39bd82021-08-18 09:40:14 +0200152 // Control the user defined index mode for the store.
153 protected boolean userDefinedIndexMode = false;
154
Pier Luigif094c612017-10-14 12:15:02 +0200155 /**
156 * Defines possible selection strategies to reuse meter ids.
157 */
158 enum ReuseStrategy {
159 /**
160 * Select randomly an available id.
161 */
162 RANDOM,
163 /**
164 * Select the first one.
165 */
166 FIRST_FIT
167 }
Pier Luigif094c612017-10-14 12:15:02 +0200168 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100169
alshabib7bb05012015-08-05 10:15:09 -0700170 @Activate
171 public void activate() {
pierventre1b8afbc2020-07-13 14:07:05 +0200172 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700173 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700174 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800175 .withSerializer(serializer).build();
Wailok Shumf013a782021-07-26 16:51:01 +0800176 meters.addListener(metersMapListener);
pierventre44220052020-09-22 12:51:06 +0200177 metersMap = meters.asJavaMap();
Wailok Shumf013a782021-07-26 16:51:01 +0800178 // Init meter features map
179 metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
180 .withName(METERFEATURESSTORE)
181 .withTimestampProvider((key, features) -> new WallClockTimestamp())
pierventre3b39bd82021-08-18 09:40:14 +0200182 .withSerializer(APP_KRYO_BUILDER).build();
Wailok Shumf013a782021-07-26 16:51:01 +0800183 metersFeatures.addListener(featuresMapListener);
184 // Init the map of the available ids set
185 // Set will be created when a new Meter Features is pushed to the store
186 availableMeterIds = new ConcurrentHashMap<>();
Pier Luigif094c612017-10-14 12:15:02 +0200187 // Init atomic map counters
Wailok Shumf013a782021-07-26 16:51:01 +0800188 meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
Pier Luigif094c612017-10-14 12:15:02 +0200189 .withName(METERIDSTORE)
Wailok Shumf013a782021-07-26 16:51:01 +0800190 .withSerializer(Serializer.using(KryoNamespaces.API,
191 MeterTableKey.class,
192 MeterScope.class)).build();
pierventre3b39bd82021-08-18 09:40:14 +0200193
alshabib7bb05012015-08-05 10:15:09 -0700194 log.info("Started");
195 }
196
197 @Deactivate
198 public void deactivate() {
Wailok Shumf013a782021-07-26 16:51:01 +0800199 meters.removeListener(metersMapListener);
200 metersFeatures.removeListener(featuresMapListener);
201 meters.destroy();
202 metersFeatures.destroy();
pierventre3b39bd82021-08-18 09:40:14 +0200203 availableMeterIds.forEach((key, set) -> set.destroy());
204
alshabib7bb05012015-08-05 10:15:09 -0700205 log.info("Stopped");
206 }
207
alshabib7bb05012015-08-05 10:15:09 -0700208 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800209 public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
pierventre3b39bd82021-08-18 09:40:14 +0200210 // Verify integrity of the index
211 checkArgument(validIndex(meter), "Meter index is not valid");
Wailok Shum6a249352021-07-29 00:02:56 +0800212 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
213 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
214 MeterData data = new MeterData(meter, null);
215 // Store the future related to the operation
216 futures.put(key, future);
217 // Check if the meter exists
218 try {
219 meters.compute(key, (k, v) -> data);
220 } catch (StorageException e) {
221 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
222 e.getMessage(), e);
223 futures.remove(key);
224 future.completeExceptionally(e);
225 }
226 return future;
227 }
228
229 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700230 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
pierventre3b39bd82021-08-18 09:40:14 +0200231 return addOrUpdateMeter(meter);
alshabib7bb05012015-08-05 10:15:09 -0700232 }
233
234 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700235 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200236 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700237 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800238 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
Pier Luigif094c612017-10-14 12:15:02 +0200239 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700240 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200241 // Create the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200242 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200243 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700244 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700245 try {
Pier Luigif094c612017-10-14 12:15:02 +0200246 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700247 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200248 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700249 future.complete(MeterStoreResult.success());
250 }
alshabibeadfc8e2015-08-18 15:40:46 -0700251 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200252 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
253 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900254 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700255 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700256 }
Pier Luigif094c612017-10-14 12:15:02 +0200257 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700258 return future;
alshabib7bb05012015-08-05 10:15:09 -0700259 }
260
261 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100262 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
Wailok Shumf013a782021-07-26 16:51:01 +0800263 // Store meter features, this is done once for each features of every device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100264 MeterStoreResult result = MeterStoreResult.success();
Wailok Shumf013a782021-07-26 16:51:01 +0800265 MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
Jordi Ortizaa8de492016-12-01 00:21:36 +0100266 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800267 metersFeatures.put(key, meterfeatures);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100268 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200269 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
270 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100271 result = MeterStoreResult.fail(TIMEOUT);
272 }
273 return result;
274 }
275
276 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800277 public MeterStoreResult storeMeterFeatures(Collection<MeterFeatures> meterfeatures) {
278 // These store operations is treated as one single operation
279 // If one of them is failed, Fail is returned
280 // But the failed operation will not block the rest.
281 MeterStoreResult result = MeterStoreResult.success();
282 for (MeterFeatures mf : meterfeatures) {
283 if (storeMeterFeatures(mf).type() == FAIL) {
284 result = MeterStoreResult.fail(TIMEOUT);
285 }
286 }
287 return result;
288 }
289
290 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100291 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
292 MeterStoreResult result = MeterStoreResult.success();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100293 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800294 Set<MeterTableKey> keys = metersFeatures.keySet().stream()
295 .filter(key -> key.deviceId().equals(deviceId))
296 .collect(Collectors.toUnmodifiableSet());
pierventre3b39bd82021-08-18 09:40:14 +0200297 keys.forEach(k -> metersFeatures.remove(k));
Jordi Ortizaa8de492016-12-01 00:21:36 +0100298 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200299 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
Wailok Shumf013a782021-07-26 16:51:01 +0800300 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100301 result = MeterStoreResult.fail(TIMEOUT);
302 }
Wailok Shumf013a782021-07-26 16:51:01 +0800303
Jordi Ortizaa8de492016-12-01 00:21:36 +0100304 return result;
305 }
306
307 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800308 public MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures) {
309 // These store operations is treated as one single operation
310 // If one of them is failed, Fail is returned
311 // But the failed operation will not block the rest.
312 MeterStoreResult result = MeterStoreResult.success();
313 for (MeterFeatures mf : meterfeatures) {
314 try {
315 MeterTableKey key = MeterTableKey.key(mf.deviceId(), mf.scope());
316 metersFeatures.remove(key);
317 } catch (StorageException e) {
318 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
319 e.getMessage(), e);
320 result = MeterStoreResult.fail(TIMEOUT);
321 }
322 }
323
324 return result;
325 }
326
327 @Override
pierventre44220052020-09-22 12:51:06 +0200328 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200329 // Update meter if present (stats workflow)
Wailok Shum6a249352021-07-29 00:02:56 +0800330 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
pierventre44220052020-09-22 12:51:06 +0200331 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700332 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100333 MeterState meterState = m.state();
334 if (meterState == MeterState.PENDING_ADD) {
335 m.setState(meter.state());
336 }
alshabib7bb05012015-08-05 10:15:09 -0700337 m.setProcessedPackets(meter.packetsSeen());
338 m.setProcessedBytes(meter.bytesSeen());
339 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700340 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700341 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200342 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700343 });
pierventre44220052020-09-22 12:51:06 +0200344 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700345 }
346
347 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700348 public Meter getMeter(MeterKey key) {
349 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700350 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700351 }
352
353 @Override
354 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200355 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700356 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700357 }
358
359 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200360 public Collection<Meter> getAllMeters(DeviceId deviceId) {
361 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200362 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200363 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
364 MeterData::meter);
365 }
366
367 @Override
pierventrec0914ec2021-08-27 15:25:02 +0200368 public Collection<Meter> getAllMeters(DeviceId deviceId, MeterScope scope) {
369 if (scope.equals(MeterScope.globalScope())) {
370 return Collections2.transform(
371 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
372 (MeterData m) -> m.meter().meterCellId().type() == INDEX),
373 MeterData::meter);
374 }
375 return Collections2.transform(
376 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
377 (MeterData m) -> m.meter().meterCellId().type() == PIPELINE_INDEPENDENT &&
378 ((PiMeterCellId) m.meter().meterCellId()).meterId().id().equals(scope.id())),
379 MeterData::meter);
380 }
381
382 @Override
alshabib7bb05012015-08-05 10:15:09 -0700383 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200384 // Meter ops failed (got notification from the sb)
pierventre3b39bd82021-08-18 09:40:14 +0200385 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200386 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700387 }
388
alshabib5eb79392015-08-19 18:09:55 -0700389 @Override
390 public void deleteMeterNow(Meter m) {
Wailok Shumf013a782021-07-26 16:51:01 +0800391 // This method is renamed in onos-2.5
392 purgeMeter(m);
393 }
394
395 @Override
396 public void purgeMeter(Meter m) {
pierventre1b8afbc2020-07-13 14:07:05 +0200397 // Once we receive the ack from the sb
398 // create the key and remove definitely the meter
pierventre3b39bd82021-08-18 09:40:14 +0200399 MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200400 try {
401 if (Versioned.valueOrNull(meters.remove(key)) != null) {
402 // Free the id
Wailok Shumf013a782021-07-26 16:51:01 +0800403 MeterScope scope;
404 if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
405 PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
406 scope = MeterScope.of(piMeterCellId.meterId().id());
407 } else {
408 scope = MeterScope.globalScope();
409 }
410 MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
411 freeMeterId(meterTableKey, m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200412 }
413 } catch (StorageException e) {
414 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
415 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100416 }
alshabib5eb79392015-08-19 18:09:55 -0700417 }
418
Jordi Ortizaa8de492016-12-01 00:21:36 +0100419 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000420 public void purgeMeter(DeviceId deviceId) {
pierventre3b39bd82021-08-18 09:40:14 +0200421 // This method is renamed in onos-2.5
422 purgeMeters(deviceId);
423 }
424
425 @Override
426 public void purgeMeters(DeviceId deviceId) {
Gamze Abakaf57ef602019-03-11 06:52:48 +0000427 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
428 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
429 .map(Map.Entry::getValue)
430 .collect(Collectors.toList());
Gamze Abakaf57ef602019-03-11 06:52:48 +0000431 metersPendingRemove.forEach(versionedMeterKey
Wailok Shumf013a782021-07-26 16:51:01 +0800432 -> purgeMeter(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000433 }
434
435 @Override
Daniele Moro43ac2892021-07-15 17:02:59 +0200436 public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
437 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
438 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
439 e.getValue().value().meter().appId().equals(appId))
440 .map(Map.Entry::getValue)
441 .collect(Collectors.toList());
pierventre3b39bd82021-08-18 09:40:14 +0200442 metersPendingRemove.forEach(versionedMeterKey
443 -> purgeMeter(versionedMeterKey.value().meter()));
444 }
445
446 @Override
447 public boolean userDefinedIndexMode(boolean enable) {
448 if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
449 userDefinedIndexMode = enable;
450 } else {
451 log.warn("Unable to {} user defined index mode as store did" +
452 "already some allocations", enable ? "activate" : "deactivate");
453 }
454 return userDefinedIndexMode;
Daniele Moro43ac2892021-07-15 17:02:59 +0200455 }
456
457 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100458 public long getMaxMeters(MeterFeaturesKey key) {
pierventre1b8afbc2020-07-13 14:07:05 +0200459 // Leverage the meter features to know the max id
Wailok Shumf013a782021-07-26 16:51:01 +0800460 // Create a Meter Table key with FeaturesKey's device and global scope
461 MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
462 return getMaxMeters(meterTableKey);
463 }
464
465 private long getMaxMeters(MeterTableKey key) {
466 // Leverage the meter features to know the max id
467 MeterFeatures features = metersFeatures.get(key);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100468 return features == null ? 0L : features.maxMeter();
469 }
470
pierventre3b39bd82021-08-18 09:40:14 +0200471 private boolean validIndex(Meter meter) {
472 long index;
473 MeterTableKey key;
474
475 if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
476 PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
477 index = piMeterCellId.index();
478 key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
479 } else if (meter.meterCellId().type() == INDEX) {
480 MeterId meterId = (MeterId) meter.meterCellId();
481 index = meterId.id();
482 key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
483 } else {
484 return false;
485 }
486
487 MeterFeatures features = metersFeatures.get(key);
488 long startIndex = features == null ? -1L : features.startIndex();
489 long endIndex = features == null ? -1L : features.endIndex();
490 return index >= startIndex && index <= endIndex;
491 }
492
Wailok Shumf013a782021-07-26 16:51:01 +0800493 private long getStartIndex(MeterTableKey key) {
494 // Leverage the meter features to know the start id
495 // Since we are using index now
496 // if there is no features related to the key
497 // -1 is returned
498 MeterFeatures features = metersFeatures.get(key);
499 return features == null ? -1L : features.startIndex();
500 }
501
502 private long getEndIndex(MeterTableKey key) {
503 // Leverage the meter features to know the max id
504 // Since we are using index now
505 // if there is no features related to the key
506 // -1 is returned
507 MeterFeatures features = metersFeatures.get(key);
508 return features == null ? -1L : features.endIndex();
509 }
510
Pier Luigif094c612017-10-14 12:15:02 +0200511 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
512 private long queryMaxMeters(DeviceId device) {
513 // Get driver handler for this device
514 DriverHandler handler = driverService.createHandler(device);
515 // If creation failed or the device does not have this behavior
516 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
517 // We cannot know max meter
518 return 0L;
519 }
520 // Get the behavior
521 MeterQuery query = handler.behaviour(MeterQuery.class);
Wailok Shumf013a782021-07-26 16:51:01 +0800522 // Insert a new available key set to the map
523 String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
524 MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
525 insertAvailableKeySet(meterTableKey, setName);
Pier Luigif094c612017-10-14 12:15:02 +0200526 // Return as max meter the result of the query
527 return query.getMaxMeters();
528 }
529
Wailok Shumf013a782021-07-26 16:51:01 +0800530 private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
Pier Luigif094c612017-10-14 12:15:02 +0200531 boolean available) {
Wailok Shumf013a782021-07-26 16:51:01 +0800532 // Retrieve the set first
533 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
534 if (keySet == null) {
535 // A reusable set should be inserted when a features is pushed
536 log.warn("Reusable Key set for device: {} scope: {} not found",
537 meterTableKey.deviceId(), meterTableKey.scope());
538 return false;
539 }
Pier Luigif094c612017-10-14 12:15:02 +0200540 // According to available, make available or unavailable a meter key
Wailok Shumf013a782021-07-26 16:51:01 +0800541 DeviceId deviceId = meterTableKey.deviceId();
542 return available ? keySet.add(MeterKey.key(deviceId, id)) :
543 keySet.remove(MeterKey.key(deviceId, id));
Pier Luigif094c612017-10-14 12:15:02 +0200544 }
545
Wailok Shumf013a782021-07-26 16:51:01 +0800546 private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
Pier Luigif094c612017-10-14 12:15:02 +0200547 // If there are no available ids
548 if (availableIds.isEmpty()) {
549 // Just end the cycle
550 return null;
551 }
552 // If it is the first fit
553 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
554 return availableIds.iterator().next();
555 }
556 // If it is random, get the size
557 int size = availableIds.size();
558 // Return a random element
559 return Iterables.get(availableIds, RandomUtils.nextInt(size));
560 }
561
562 // Implements reuse strategy
Wailok Shumf013a782021-07-26 16:51:01 +0800563 private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
564 // Create a Table key and use it to retrieve the reusable meterCellId set
565 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
566 if (keySet == null) {
567 // A reusable set should be inserted when a features is pushed
568 log.warn("Reusable Key set for device: {} scope: {} not found",
569 meterTableKey.deviceId(), meterTableKey.scope());
570 return null;
571 }
Pier Luigif094c612017-10-14 12:15:02 +0200572 // Filter key related to device id, and reduce to meter ids
Wailok Shumf013a782021-07-26 16:51:01 +0800573 Set<MeterCellId> localAvailableMeterIds = keySet.stream()
574 .filter(meterKey ->
575 meterKey.deviceId().equals(meterTableKey.deviceId()))
pierventre3b39bd82021-08-18 09:40:14 +0200576 .map(MeterKey::meterCellId)
Pier Luigif094c612017-10-14 12:15:02 +0200577 .collect(Collectors.toSet());
578 // Get next available id
Wailok Shumf013a782021-07-26 16:51:01 +0800579 MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
Pier Luigif094c612017-10-14 12:15:02 +0200580 // Iterate until there are items
581 while (meterId != null) {
582 // If we are able to reserve the id
Wailok Shumf013a782021-07-26 16:51:01 +0800583 if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
Pier Luigif094c612017-10-14 12:15:02 +0200584 // Just end
585 return meterId;
586 }
587 // Update the set
588 localAvailableMeterIds.remove(meterId);
589 // Try another time
590 meterId = getNextAvailableId(localAvailableMeterIds);
591 }
592 // No reusable ids
593 return null;
594 }
595
596 @Override
597 public MeterId allocateMeterId(DeviceId deviceId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800598 // We use global scope for MeterId
599 return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
600 }
601
602 @Override
603 public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
pierventre3b39bd82021-08-18 09:40:14 +0200604 if (userDefinedIndexMode) {
605 log.warn("Unable to allocate meter id when user defined index mode is enabled");
606 return null;
607 }
Wailok Shumf013a782021-07-26 16:51:01 +0800608 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
609 MeterCellId meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200610 long id;
Wailok Shumf013a782021-07-26 16:51:01 +0800611 // First, search for reusable key
612 meterCellId = firstReusableMeterId(meterTableKey);
613 if (meterCellId != null) {
614 // A reusable key is found
615 return meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200616 }
Wailok Shumf013a782021-07-26 16:51:01 +0800617 // If there was no reusable meter id we have to generate a new value
618 // using start and end index as lower and upper bound respectively.
619 long startIndex = getStartIndex(meterTableKey);
620 long endIndex = getEndIndex(meterTableKey);
Pier Luigif094c612017-10-14 12:15:02 +0200621 // If the device does not give us MeterFeatures
Wailok Shumf013a782021-07-26 16:51:01 +0800622 if (startIndex == -1L || endIndex == -1L) {
Pier Luigif094c612017-10-14 12:15:02 +0200623 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
Wailok Shumf013a782021-07-26 16:51:01 +0800624 // Only meaningful to OpenFLow
625 long maxMeters = queryMaxMeters(deviceId);
626 // If we don't know the max, cannot proceed
627 if (maxMeters == 0L) {
628 return null;
629 } else {
Wailok Shum90b988a2021-09-13 17:23:20 +0800630 // OpenFlow meter index starts from 1, ends with max
Wailok Shumf013a782021-07-26 16:51:01 +0800631 startIndex = 1L;
Wailok Shum90b988a2021-09-13 17:23:20 +0800632 endIndex = maxMeters;
Wailok Shumf013a782021-07-26 16:51:01 +0800633 }
Pier Luigif094c612017-10-14 12:15:02 +0200634 }
635 // Get a new value
Wailok Shumf013a782021-07-26 16:51:01 +0800636 // If the value is smaller than the start index, get another one
637 do {
Wailok Shum79919522021-08-22 19:35:34 +0800638 id = meterIdGenerators.getAndIncrement(meterTableKey);
Wailok Shumf013a782021-07-26 16:51:01 +0800639 } while (id < startIndex);
640 // Check with the end index, and if the value is bigger, cannot proceed
641 if (id > endIndex) {
Pier Luigif094c612017-10-14 12:15:02 +0200642 return null;
643 }
644 // Done, return the value
Wailok Shumf013a782021-07-26 16:51:01 +0800645 // If we are using global scope, return a MeterId
646 // Else, return a PiMeterId
647 if (meterScope.isGlobal()) {
648 return MeterId.meterId(id);
649 } else {
650 return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
651 }
652
Pier Luigif094c612017-10-14 12:15:02 +0200653 }
654
655 @Override
656 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800657 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
658 freeMeterId(meterTableKey, meterId);
659 }
660
661 private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
pierventre3b39bd82021-08-18 09:40:14 +0200662 if (userDefinedIndexMode) {
663 log.warn("Unable to free meter id when user defined index mode is enabled");
664 return;
665 }
Wailok Shumf013a782021-07-26 16:51:01 +0800666 long index;
667 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
668 PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
669 index = piMeterCellId.index();
670 } else if (meterCellId.type() == INDEX) {
671 MeterId meterId = (MeterId) meterCellId;
672 index = meterId.id();
673 } else {
674 return;
675 }
Pier Luigibdcd9672017-10-13 13:54:48 +0200676 // Avoid to free meter not allocated
Wailok Shum79919522021-08-22 19:35:34 +0800677 if (meterIdGenerators.get(meterTableKey) <= index) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200678 return;
679 }
Pier Luigif094c612017-10-14 12:15:02 +0200680 // Update the availability
Wailok Shumf013a782021-07-26 16:51:01 +0800681 updateMeterIdAvailability(meterTableKey, meterCellId, true);
Pier Luigif094c612017-10-14 12:15:02 +0200682 }
683
pierventre1b8afbc2020-07-13 14:07:05 +0200684 // Enabling the events distribution across the cluster
Wailok Shumf013a782021-07-26 16:51:01 +0800685 private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700686 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700687 public void event(MapEvent<MeterKey, MeterData> event) {
688 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700689 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
690 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200691 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700692 switch (event.type()) {
693 case INSERT:
694 case UPDATE:
695 switch (data.meter().state()) {
696 case PENDING_ADD:
697 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200698 // Two cases. If there is a reason, the meter operation failed.
699 // Otherwise, we are ready to install/remove through the delegate.
700 if (data.reason().isEmpty()) {
701 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
702 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
703 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100704 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200705 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100706 return null;
707 });
alshabibe1248b62015-08-20 17:21:55 -0700708 }
709 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200710 case ADDED:
711 // Transition from pending to installed
712 if (data.meter().state() == MeterState.ADDED &&
713 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
714 futures.computeIfPresent(key, (k, v) -> {
715 v.complete(MeterStoreResult.success());
716 return null;
717 });
718 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
pierventrec0914ec2021-08-27 15:25:02 +0200719 // Update stats case - we report reference count zero only for INDEX based meters
720 } else if (data.meter().referenceCount() == 0 &&
721 data.meter().meterCellId().type() == INDEX) {
pierventre1b8afbc2020-07-13 14:07:05 +0200722 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
723 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700724 }
725 break;
726 default:
727 log.warn("Unknown meter state type {}", data.meter().state());
728 }
729 break;
730 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200731 // Meter removal case
732 futures.computeIfPresent(key, (k, v) -> {
733 v.complete(MeterStoreResult.success());
734 return null;
735 });
736 // Finally notify the delegate
737 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700738 break;
739 default:
740 log.warn("Unknown Map event type {}", event.type());
741 }
alshabibeadfc8e2015-08-18 15:40:46 -0700742 }
743 }
744
Wailok Shumf013a782021-07-26 16:51:01 +0800745 private class InternalFeaturesMapEventListener implements
746 EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
747 @Override
748 public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
749 MeterTableKey meterTableKey = event.key();
750 MeterFeatures meterFeatures = event.value();
751 switch (event.type()) {
752 case PUT:
753 // Put a new available meter id set to the map
754 String setName = AVAILABLEMETERIDSTORE + "-" +
755 meterFeatures.deviceId() + meterFeatures.scope().id();
756 insertAvailableKeySet(meterTableKey, setName);
757 break;
758 case REMOVE:
759 // Remove the set
760 DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
761 if (set != null) {
762 set.destroy();
763 }
764 break;
765 default:
766 break;
767 }
768 }
769 }
770
771 private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
772 DistributedSet<MeterKey> availableMeterIdSet =
773 new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
774 .withName(setName)
775 .withSerializer(Serializer.using(KryoNamespaces.API,
776 MeterKey.class)).build(),
777 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
778 availableMeterIds.put(meterTableKey, availableMeterIdSet);
779 }
alshabib7bb05012015-08-05 10:15:09 -0700780}