blob: b2034dc10a713677b7c8c042e1f24b38c595bdb6 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Daniele Moro43ac2892021-07-15 17:02:59 +020025import org.onosproject.core.ApplicationId;
Jordi Ortizaa8de492016-12-01 00:21:36 +010026import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020027import org.onosproject.net.behaviour.MeterQuery;
28import org.onosproject.net.driver.DriverHandler;
29import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.net.meter.Band;
31import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010033import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070034import org.onosproject.net.meter.Meter;
Wailok Shumf013a782021-07-26 16:51:01 +080035import org.onosproject.net.meter.MeterCellId;
alshabib10c810b2015-08-18 16:59:04 -070036import org.onosproject.net.meter.MeterEvent;
37import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010038import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030039import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010040import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010041import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070042import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070043import org.onosproject.net.meter.MeterOperation;
Wailok Shumf013a782021-07-26 16:51:01 +080044import org.onosproject.net.meter.MeterScope;
alshabib10c810b2015-08-18 16:59:04 -070045import org.onosproject.net.meter.MeterState;
46import org.onosproject.net.meter.MeterStore;
47import org.onosproject.net.meter.MeterStoreDelegate;
48import org.onosproject.net.meter.MeterStoreResult;
Wailok Shumf013a782021-07-26 16:51:01 +080049import org.onosproject.net.meter.MeterTableKey;
50import org.onosproject.net.pi.model.PiMeterId;
51import org.onosproject.net.pi.runtime.PiMeterCellId;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020053import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070054import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020055import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070056import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020057import org.onosproject.store.service.DistributedPrimitive;
58import org.onosproject.store.service.DistributedSet;
Wailok Shumf013a782021-07-26 16:51:01 +080059import org.onosproject.store.service.EventuallyConsistentMap;
60import org.onosproject.store.service.EventuallyConsistentMapEvent;
61import org.onosproject.store.service.EventuallyConsistentMapListener;
alshabibeadfc8e2015-08-18 15:40:46 -070062import org.onosproject.store.service.MapEvent;
63import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070064import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070065import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070066import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070067import org.onosproject.store.service.Versioned;
Wailok Shumf013a782021-07-26 16:51:01 +080068import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
72import org.osgi.service.component.annotations.Reference;
73import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070074import org.slf4j.Logger;
75
76import java.util.Collection;
Wailok Shumf013a782021-07-26 16:51:01 +080077import java.util.concurrent.ConcurrentMap;
78import java.util.concurrent.ConcurrentHashMap;
Gamze Abakaf57ef602019-03-11 06:52:48 +000079import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070080import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000081import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020082import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070083import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020084import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070085
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080086import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010087import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
Wailok Shumf013a782021-07-26 16:51:01 +080088import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
89import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
Wailok Shum6a249352021-07-29 00:02:56 +080090import static org.onosproject.net.meter.MeterStoreResult.Type.FAIL;
alshabib7bb05012015-08-05 10:15:09 -070091import static org.slf4j.LoggerFactory.getLogger;
92
93/**
94 * A distributed meter store implementation. Meters are stored consistently
95 * across the cluster.
96 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070098public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
99 implements MeterStore {
100
101 private Logger log = getLogger(getClass());
102
pierventre1b8afbc2020-07-13 14:07:05 +0200103 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -0700104 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +0200105 private ConsistentMap<MeterKey, MeterData> meters;
Wailok Shumf013a782021-07-26 16:51:01 +0800106 private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
pierventre44220052020-09-22 12:51:06 +0200107 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -0700108
pierventre1b8afbc2020-07-13 14:07:05 +0200109 // Meters features related objects
110 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800111 private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
112 private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
113 new InternalFeaturesMapEventListener();
pierventre1b8afbc2020-07-13 14:07:05 +0200114
115 // Meters id related objects
116 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
117 // Available meter identifiers
Wailok Shumf013a782021-07-26 16:51:01 +0800118 private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
pierventre1b8afbc2020-07-13 14:07:05 +0200119 // Atomic counter map for generation of new identifiers;
120 private static final String METERIDSTORE = "onos-meters-id-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800121 private AtomicCounterMap<MeterTableKey> meterIdGenerators;
pierventre1b8afbc2020-07-13 14:07:05 +0200122
123 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800124 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
125 .register(KryoNamespaces.API)
126 .register(MeterKey.class)
127 .register(MeterData.class)
128 .register(DefaultMeter.class)
129 .register(DefaultBand.class)
130 .register(Band.Type.class)
131 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530132 .register(Meter.Unit.class)
133 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800134 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
135
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700137 private StorageService storageService;
138
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200140 protected DriverService driverService;
141
pierventre1b8afbc2020-07-13 14:07:05 +0200142 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700143 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700144 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700145
Pier Luigif094c612017-10-14 12:15:02 +0200146 /**
147 * Defines possible selection strategies to reuse meter ids.
148 */
149 enum ReuseStrategy {
150 /**
151 * Select randomly an available id.
152 */
153 RANDOM,
154 /**
155 * Select the first one.
156 */
157 FIRST_FIT
158 }
Pier Luigif094c612017-10-14 12:15:02 +0200159 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100160
alshabib7bb05012015-08-05 10:15:09 -0700161 @Activate
162 public void activate() {
pierventre1b8afbc2020-07-13 14:07:05 +0200163 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700164 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700165 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800166 .withSerializer(serializer).build();
Wailok Shumf013a782021-07-26 16:51:01 +0800167 meters.addListener(metersMapListener);
pierventre44220052020-09-22 12:51:06 +0200168 metersMap = meters.asJavaMap();
Wailok Shumf013a782021-07-26 16:51:01 +0800169 // Init meter features map
170 metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
171 .withName(METERFEATURESSTORE)
172 .withTimestampProvider((key, features) -> new WallClockTimestamp())
173 .withSerializer(KryoNamespace.newBuilder()
174 .register(KryoNamespaces.API)
175 .register(MeterTableKey.class)
176 .register(MeterFeatures.class)
177 .register(DefaultMeterFeatures.class)
178 .register(DefaultBand.class)
179 .register(Band.Type.class)
180 .register(Meter.Unit.class)
181 .register(MeterFailReason.class)
182 .register(MeterFeaturesFlag.class)).build();
183 metersFeatures.addListener(featuresMapListener);
184 // Init the map of the available ids set
185 // Set will be created when a new Meter Features is pushed to the store
186 availableMeterIds = new ConcurrentHashMap<>();
Pier Luigif094c612017-10-14 12:15:02 +0200187 // Init atomic map counters
Wailok Shumf013a782021-07-26 16:51:01 +0800188 meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
Pier Luigif094c612017-10-14 12:15:02 +0200189 .withName(METERIDSTORE)
Wailok Shumf013a782021-07-26 16:51:01 +0800190 .withSerializer(Serializer.using(KryoNamespaces.API,
191 MeterTableKey.class,
192 MeterScope.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700193 log.info("Started");
194 }
195
196 @Deactivate
197 public void deactivate() {
Wailok Shumf013a782021-07-26 16:51:01 +0800198 meters.removeListener(metersMapListener);
199 metersFeatures.removeListener(featuresMapListener);
200 meters.destroy();
201 metersFeatures.destroy();
202 availableMeterIds.forEach((key, set) -> {
203 set.destroy();
204 });
alshabib7bb05012015-08-05 10:15:09 -0700205 log.info("Stopped");
206 }
207
alshabib7bb05012015-08-05 10:15:09 -0700208 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800209 public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
210 // Init steps
211 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
212 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
213 MeterData data = new MeterData(meter, null);
214 // Store the future related to the operation
215 futures.put(key, future);
216 // Check if the meter exists
217 try {
218 meters.compute(key, (k, v) -> data);
219 } catch (StorageException e) {
220 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
221 e.getMessage(), e);
222 futures.remove(key);
223 future.completeExceptionally(e);
224 }
225 return future;
226 }
227
228 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700229 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200230 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700231 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800232 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
Pier Luigif094c612017-10-14 12:15:02 +0200233 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700234 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200235 // Store the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200236 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700237 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700238 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700239 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200240 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
241 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900242 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700243 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700244 }
Pier Luigif094c612017-10-14 12:15:02 +0200245 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700246 return future;
alshabib7bb05012015-08-05 10:15:09 -0700247 }
248
249 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700250 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200251 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700252 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800253 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
Pier Luigif094c612017-10-14 12:15:02 +0200254 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700255 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200256 // Create the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200257 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200258 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700259 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700260 try {
Pier Luigif094c612017-10-14 12:15:02 +0200261 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700262 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200263 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700264 future.complete(MeterStoreResult.success());
265 }
alshabibeadfc8e2015-08-18 15:40:46 -0700266 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200267 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
268 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900269 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700270 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700271 }
Pier Luigif094c612017-10-14 12:15:02 +0200272 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700273 return future;
alshabib7bb05012015-08-05 10:15:09 -0700274 }
275
276 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100277 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
Wailok Shumf013a782021-07-26 16:51:01 +0800278 // Store meter features, this is done once for each features of every device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100279 MeterStoreResult result = MeterStoreResult.success();
Wailok Shumf013a782021-07-26 16:51:01 +0800280 MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
Jordi Ortizaa8de492016-12-01 00:21:36 +0100281 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800282 metersFeatures.put(key, meterfeatures);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100283 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200284 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
285 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100286 result = MeterStoreResult.fail(TIMEOUT);
287 }
288 return result;
289 }
290
291 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800292 public MeterStoreResult storeMeterFeatures(Collection<MeterFeatures> meterfeatures) {
293 // These store operations is treated as one single operation
294 // If one of them is failed, Fail is returned
295 // But the failed operation will not block the rest.
296 MeterStoreResult result = MeterStoreResult.success();
297 for (MeterFeatures mf : meterfeatures) {
298 if (storeMeterFeatures(mf).type() == FAIL) {
299 result = MeterStoreResult.fail(TIMEOUT);
300 }
301 }
302 return result;
303 }
304
305 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100306 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
307 MeterStoreResult result = MeterStoreResult.success();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100308 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800309 Set<MeterTableKey> keys = metersFeatures.keySet().stream()
310 .filter(key -> key.deviceId().equals(deviceId))
311 .collect(Collectors.toUnmodifiableSet());
312 keys.forEach(k -> {
313 metersFeatures.remove(k);
314 });
Jordi Ortizaa8de492016-12-01 00:21:36 +0100315 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200316 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
Wailok Shumf013a782021-07-26 16:51:01 +0800317 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100318 result = MeterStoreResult.fail(TIMEOUT);
319 }
Wailok Shumf013a782021-07-26 16:51:01 +0800320
Jordi Ortizaa8de492016-12-01 00:21:36 +0100321 return result;
322 }
323
324 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800325 public MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures) {
326 // These store operations is treated as one single operation
327 // If one of them is failed, Fail is returned
328 // But the failed operation will not block the rest.
329 MeterStoreResult result = MeterStoreResult.success();
330 for (MeterFeatures mf : meterfeatures) {
331 try {
332 MeterTableKey key = MeterTableKey.key(mf.deviceId(), mf.scope());
333 metersFeatures.remove(key);
334 } catch (StorageException e) {
335 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
336 e.getMessage(), e);
337 result = MeterStoreResult.fail(TIMEOUT);
338 }
339 }
340
341 return result;
342 }
343
344 @Override
pierventre1b8afbc2020-07-13 14:07:05 +0200345 // TODO Should we remove it ? We are not using it
alshabibeadfc8e2015-08-18 15:40:46 -0700346 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
347 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800348 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
alshabib70aaa1b2015-09-25 14:30:59 -0700349 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700350
pierventre1b8afbc2020-07-13 14:07:05 +0200351 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700352 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700353 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700354 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
355 }
alshabibeadfc8e2015-08-18 15:40:46 -0700356 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200357 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
358 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900359 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700360 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700361 }
alshabibeadfc8e2015-08-18 15:40:46 -0700362 return future;
alshabib7bb05012015-08-05 10:15:09 -0700363 }
364
365 @Override
pierventre44220052020-09-22 12:51:06 +0200366 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200367 // Update meter if present (stats workflow)
Wailok Shum6a249352021-07-29 00:02:56 +0800368 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
pierventre44220052020-09-22 12:51:06 +0200369 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700370 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100371 MeterState meterState = m.state();
372 if (meterState == MeterState.PENDING_ADD) {
373 m.setState(meter.state());
374 }
alshabib7bb05012015-08-05 10:15:09 -0700375 m.setProcessedPackets(meter.packetsSeen());
376 m.setProcessedBytes(meter.bytesSeen());
377 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700378 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700379 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200380 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700381 });
pierventre44220052020-09-22 12:51:06 +0200382 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700383 }
384
385 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700386 public Meter getMeter(MeterKey key) {
387 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700388 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700389 }
390
391 @Override
392 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200393 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700394 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700395 }
396
397 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200398 public Collection<Meter> getAllMeters(DeviceId deviceId) {
399 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200400 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200401 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
402 MeterData::meter);
403 }
404
405 @Override
alshabib7bb05012015-08-05 10:15:09 -0700406 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200407 // Meter ops failed (got notification from the sb)
alshabib70aaa1b2015-09-25 14:30:59 -0700408 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
pierventre1b8afbc2020-07-13 14:07:05 +0200409 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700410 }
411
alshabib5eb79392015-08-19 18:09:55 -0700412 @Override
413 public void deleteMeterNow(Meter m) {
Wailok Shumf013a782021-07-26 16:51:01 +0800414 // This method is renamed in onos-2.5
415 purgeMeter(m);
416 }
417
418 @Override
419 public void purgeMeter(Meter m) {
pierventre1b8afbc2020-07-13 14:07:05 +0200420 // Once we receive the ack from the sb
421 // create the key and remove definitely the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700422 MeterKey key = MeterKey.key(m.deviceId(), m.id());
pierventre1b8afbc2020-07-13 14:07:05 +0200423 try {
424 if (Versioned.valueOrNull(meters.remove(key)) != null) {
425 // Free the id
Wailok Shumf013a782021-07-26 16:51:01 +0800426 MeterScope scope;
427 if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
428 PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
429 scope = MeterScope.of(piMeterCellId.meterId().id());
430 } else {
431 scope = MeterScope.globalScope();
432 }
433 MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
434 freeMeterId(meterTableKey, m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200435 }
436 } catch (StorageException e) {
437 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
438 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100439 }
alshabib5eb79392015-08-19 18:09:55 -0700440 }
441
Jordi Ortizaa8de492016-12-01 00:21:36 +0100442 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000443 public void purgeMeter(DeviceId deviceId) {
Gamze Abakaf57ef602019-03-11 06:52:48 +0000444 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
445 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
446 .map(Map.Entry::getValue)
447 .collect(Collectors.toList());
Gamze Abakaf57ef602019-03-11 06:52:48 +0000448 metersPendingRemove.forEach(versionedMeterKey
Wailok Shumf013a782021-07-26 16:51:01 +0800449 -> purgeMeter(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000450 }
451
452 @Override
Daniele Moro43ac2892021-07-15 17:02:59 +0200453 public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
454 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
455 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
456 e.getValue().value().meter().appId().equals(appId))
457 .map(Map.Entry::getValue)
458 .collect(Collectors.toList());
459 metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
460 }
461
462 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100463 public long getMaxMeters(MeterFeaturesKey key) {
pierventre1b8afbc2020-07-13 14:07:05 +0200464 // Leverage the meter features to know the max id
Wailok Shumf013a782021-07-26 16:51:01 +0800465 // Create a Meter Table key with FeaturesKey's device and global scope
466 MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
467 return getMaxMeters(meterTableKey);
468 }
469
470 private long getMaxMeters(MeterTableKey key) {
471 // Leverage the meter features to know the max id
472 MeterFeatures features = metersFeatures.get(key);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100473 return features == null ? 0L : features.maxMeter();
474 }
475
Wailok Shumf013a782021-07-26 16:51:01 +0800476 private long getStartIndex(MeterTableKey key) {
477 // Leverage the meter features to know the start id
478 // Since we are using index now
479 // if there is no features related to the key
480 // -1 is returned
481 MeterFeatures features = metersFeatures.get(key);
482 return features == null ? -1L : features.startIndex();
483 }
484
485 private long getEndIndex(MeterTableKey key) {
486 // Leverage the meter features to know the max id
487 // Since we are using index now
488 // if there is no features related to the key
489 // -1 is returned
490 MeterFeatures features = metersFeatures.get(key);
491 return features == null ? -1L : features.endIndex();
492 }
493
Pier Luigif094c612017-10-14 12:15:02 +0200494 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
495 private long queryMaxMeters(DeviceId device) {
496 // Get driver handler for this device
497 DriverHandler handler = driverService.createHandler(device);
498 // If creation failed or the device does not have this behavior
499 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
500 // We cannot know max meter
501 return 0L;
502 }
503 // Get the behavior
504 MeterQuery query = handler.behaviour(MeterQuery.class);
Wailok Shumf013a782021-07-26 16:51:01 +0800505 // Insert a new available key set to the map
506 String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
507 MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
508 insertAvailableKeySet(meterTableKey, setName);
Pier Luigif094c612017-10-14 12:15:02 +0200509 // Return as max meter the result of the query
510 return query.getMaxMeters();
511 }
512
Wailok Shumf013a782021-07-26 16:51:01 +0800513 private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
Pier Luigif094c612017-10-14 12:15:02 +0200514 boolean available) {
Wailok Shumf013a782021-07-26 16:51:01 +0800515 // Retrieve the set first
516 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
517 if (keySet == null) {
518 // A reusable set should be inserted when a features is pushed
519 log.warn("Reusable Key set for device: {} scope: {} not found",
520 meterTableKey.deviceId(), meterTableKey.scope());
521 return false;
522 }
Pier Luigif094c612017-10-14 12:15:02 +0200523 // According to available, make available or unavailable a meter key
Wailok Shumf013a782021-07-26 16:51:01 +0800524 DeviceId deviceId = meterTableKey.deviceId();
525 return available ? keySet.add(MeterKey.key(deviceId, id)) :
526 keySet.remove(MeterKey.key(deviceId, id));
Pier Luigif094c612017-10-14 12:15:02 +0200527 }
528
Wailok Shumf013a782021-07-26 16:51:01 +0800529 private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
Pier Luigif094c612017-10-14 12:15:02 +0200530 // If there are no available ids
531 if (availableIds.isEmpty()) {
532 // Just end the cycle
533 return null;
534 }
535 // If it is the first fit
536 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
537 return availableIds.iterator().next();
538 }
539 // If it is random, get the size
540 int size = availableIds.size();
541 // Return a random element
542 return Iterables.get(availableIds, RandomUtils.nextInt(size));
543 }
544
545 // Implements reuse strategy
Wailok Shumf013a782021-07-26 16:51:01 +0800546 private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
547 // Create a Table key and use it to retrieve the reusable meterCellId set
548 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
549 if (keySet == null) {
550 // A reusable set should be inserted when a features is pushed
551 log.warn("Reusable Key set for device: {} scope: {} not found",
552 meterTableKey.deviceId(), meterTableKey.scope());
553 return null;
554 }
Pier Luigif094c612017-10-14 12:15:02 +0200555 // Filter key related to device id, and reduce to meter ids
Wailok Shumf013a782021-07-26 16:51:01 +0800556 Set<MeterCellId> localAvailableMeterIds = keySet.stream()
557 .filter(meterKey ->
558 meterKey.deviceId().equals(meterTableKey.deviceId()))
Pier Luigif094c612017-10-14 12:15:02 +0200559 .map(MeterKey::meterId)
560 .collect(Collectors.toSet());
561 // Get next available id
Wailok Shumf013a782021-07-26 16:51:01 +0800562 MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
Pier Luigif094c612017-10-14 12:15:02 +0200563 // Iterate until there are items
564 while (meterId != null) {
565 // If we are able to reserve the id
Wailok Shumf013a782021-07-26 16:51:01 +0800566 if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
Pier Luigif094c612017-10-14 12:15:02 +0200567 // Just end
568 return meterId;
569 }
570 // Update the set
571 localAvailableMeterIds.remove(meterId);
572 // Try another time
573 meterId = getNextAvailableId(localAvailableMeterIds);
574 }
575 // No reusable ids
576 return null;
577 }
578
579 @Override
580 public MeterId allocateMeterId(DeviceId deviceId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800581 // We use global scope for MeterId
582 return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
583 }
584
585 @Override
586 public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
587 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
588 MeterCellId meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200589 long id;
Wailok Shumf013a782021-07-26 16:51:01 +0800590 // First, search for reusable key
591 meterCellId = firstReusableMeterId(meterTableKey);
592 if (meterCellId != null) {
593 // A reusable key is found
594 return meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200595 }
Wailok Shumf013a782021-07-26 16:51:01 +0800596 // If there was no reusable meter id we have to generate a new value
597 // using start and end index as lower and upper bound respectively.
598 long startIndex = getStartIndex(meterTableKey);
599 long endIndex = getEndIndex(meterTableKey);
Pier Luigif094c612017-10-14 12:15:02 +0200600 // If the device does not give us MeterFeatures
Wailok Shumf013a782021-07-26 16:51:01 +0800601 if (startIndex == -1L || endIndex == -1L) {
Pier Luigif094c612017-10-14 12:15:02 +0200602 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
Wailok Shumf013a782021-07-26 16:51:01 +0800603 // Only meaningful to OpenFLow
604 long maxMeters = queryMaxMeters(deviceId);
605 // If we don't know the max, cannot proceed
606 if (maxMeters == 0L) {
607 return null;
608 } else {
609 // OpenFlow meter index starts from 1, ends with max-1
610 startIndex = 1L;
611 endIndex = maxMeters - 1;
612 }
Pier Luigif094c612017-10-14 12:15:02 +0200613 }
614 // Get a new value
Wailok Shumf013a782021-07-26 16:51:01 +0800615 // If the value is smaller than the start index, get another one
616 do {
617 id = meterIdGenerators.incrementAndGet(meterTableKey);
618 } while (id < startIndex);
619 // Check with the end index, and if the value is bigger, cannot proceed
620 if (id > endIndex) {
Pier Luigif094c612017-10-14 12:15:02 +0200621 return null;
622 }
623 // Done, return the value
Wailok Shumf013a782021-07-26 16:51:01 +0800624 // If we are using global scope, return a MeterId
625 // Else, return a PiMeterId
626 if (meterScope.isGlobal()) {
627 return MeterId.meterId(id);
628 } else {
629 return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
630 }
631
Pier Luigif094c612017-10-14 12:15:02 +0200632 }
633
634 @Override
635 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800636 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
637 freeMeterId(meterTableKey, meterId);
638 }
639
640 private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
641 long index;
642 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
643 PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
644 index = piMeterCellId.index();
645 } else if (meterCellId.type() == INDEX) {
646 MeterId meterId = (MeterId) meterCellId;
647 index = meterId.id();
648 } else {
649 return;
650 }
Pier Luigibdcd9672017-10-13 13:54:48 +0200651 // Avoid to free meter not allocated
Wailok Shumf013a782021-07-26 16:51:01 +0800652 if (meterIdGenerators.get(meterTableKey) < index) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200653 return;
654 }
Pier Luigif094c612017-10-14 12:15:02 +0200655 // Update the availability
Wailok Shumf013a782021-07-26 16:51:01 +0800656 updateMeterIdAvailability(meterTableKey, meterCellId, true);
Pier Luigif094c612017-10-14 12:15:02 +0200657 }
658
pierventre1b8afbc2020-07-13 14:07:05 +0200659 // Enabling the events distribution across the cluster
Wailok Shumf013a782021-07-26 16:51:01 +0800660 private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700661 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700662 public void event(MapEvent<MeterKey, MeterData> event) {
663 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700664 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
665 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200666 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700667 switch (event.type()) {
668 case INSERT:
669 case UPDATE:
670 switch (data.meter().state()) {
671 case PENDING_ADD:
672 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200673 // Two cases. If there is a reason, the meter operation failed.
674 // Otherwise, we are ready to install/remove through the delegate.
675 if (data.reason().isEmpty()) {
676 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
677 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
678 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100679 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200680 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100681 return null;
682 });
alshabibe1248b62015-08-20 17:21:55 -0700683 }
684 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200685 case ADDED:
686 // Transition from pending to installed
687 if (data.meter().state() == MeterState.ADDED &&
688 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
689 futures.computeIfPresent(key, (k, v) -> {
690 v.complete(MeterStoreResult.success());
691 return null;
692 });
693 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
694 // Update stats case
695 } else if (data.meter().referenceCount() == 0) {
696 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
697 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700698 }
699 break;
700 default:
701 log.warn("Unknown meter state type {}", data.meter().state());
702 }
703 break;
704 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200705 // Meter removal case
706 futures.computeIfPresent(key, (k, v) -> {
707 v.complete(MeterStoreResult.success());
708 return null;
709 });
710 // Finally notify the delegate
711 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700712 break;
713 default:
714 log.warn("Unknown Map event type {}", event.type());
715 }
alshabibeadfc8e2015-08-18 15:40:46 -0700716 }
717 }
718
Wailok Shumf013a782021-07-26 16:51:01 +0800719 private class InternalFeaturesMapEventListener implements
720 EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
721 @Override
722 public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
723 MeterTableKey meterTableKey = event.key();
724 MeterFeatures meterFeatures = event.value();
725 switch (event.type()) {
726 case PUT:
727 // Put a new available meter id set to the map
728 String setName = AVAILABLEMETERIDSTORE + "-" +
729 meterFeatures.deviceId() + meterFeatures.scope().id();
730 insertAvailableKeySet(meterTableKey, setName);
731 break;
732 case REMOVE:
733 // Remove the set
734 DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
735 if (set != null) {
736 set.destroy();
737 }
738 break;
739 default:
740 break;
741 }
742 }
743 }
744
745 private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
746 DistributedSet<MeterKey> availableMeterIdSet =
747 new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
748 .withName(setName)
749 .withSerializer(Serializer.using(KryoNamespaces.API,
750 MeterKey.class)).build(),
751 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
752 availableMeterIds.put(meterTableKey, availableMeterIdSet);
753 }
alshabib7bb05012015-08-05 10:15:09 -0700754}