blob: 2fdba6ffad6ab4c0f872fff90903a4cc36d0c6bc [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Jordi Ortizaa8de492016-12-01 00:21:36 +010025import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020026import org.onosproject.net.behaviour.MeterQuery;
27import org.onosproject.net.driver.DriverHandler;
28import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070029import org.onosproject.net.meter.Band;
30import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070031import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010032import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.Meter;
Wailok Shumf013a782021-07-26 16:51:01 +080034import org.onosproject.net.meter.MeterCellId;
alshabib10c810b2015-08-18 16:59:04 -070035import org.onosproject.net.meter.MeterEvent;
36import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010037import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030038import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010039import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010040import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070041import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070042import org.onosproject.net.meter.MeterOperation;
Wailok Shumf013a782021-07-26 16:51:01 +080043import org.onosproject.net.meter.MeterScope;
alshabib10c810b2015-08-18 16:59:04 -070044import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
Wailok Shumf013a782021-07-26 16:51:01 +080048import org.onosproject.net.meter.MeterTableKey;
49import org.onosproject.net.pi.model.PiMeterId;
50import org.onosproject.net.pi.runtime.PiMeterCellId;
alshabib7bb05012015-08-05 10:15:09 -070051import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020052import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020054import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020056import org.onosproject.store.service.DistributedPrimitive;
57import org.onosproject.store.service.DistributedSet;
Wailok Shumf013a782021-07-26 16:51:01 +080058import org.onosproject.store.service.EventuallyConsistentMap;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
alshabibeadfc8e2015-08-18 15:40:46 -070061import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070063import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070064import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070065import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070066import org.onosproject.store.service.Versioned;
Wailok Shumf013a782021-07-26 16:51:01 +080067import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068import org.osgi.service.component.annotations.Activate;
69import org.osgi.service.component.annotations.Component;
70import org.osgi.service.component.annotations.Deactivate;
71import org.osgi.service.component.annotations.Reference;
72import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070073import org.slf4j.Logger;
74
75import java.util.Collection;
Wailok Shumf013a782021-07-26 16:51:01 +080076import java.util.concurrent.ConcurrentMap;
77import java.util.concurrent.ConcurrentHashMap;
Gamze Abakaf57ef602019-03-11 06:52:48 +000078import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070079import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000080import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020081import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070082import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020083import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070084
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080085import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010086import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
Wailok Shumf013a782021-07-26 16:51:01 +080087import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
88import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
alshabib7bb05012015-08-05 10:15:09 -070089import static org.slf4j.LoggerFactory.getLogger;
90
91/**
92 * A distributed meter store implementation. Meters are stored consistently
93 * across the cluster.
94 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070096public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
97 implements MeterStore {
98
99 private Logger log = getLogger(getClass());
100
pierventre1b8afbc2020-07-13 14:07:05 +0200101 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -0700102 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +0200103 private ConsistentMap<MeterKey, MeterData> meters;
Wailok Shumf013a782021-07-26 16:51:01 +0800104 private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
pierventre44220052020-09-22 12:51:06 +0200105 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -0700106
pierventre1b8afbc2020-07-13 14:07:05 +0200107 // Meters features related objects
108 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800109 private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
110 private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
111 new InternalFeaturesMapEventListener();
pierventre1b8afbc2020-07-13 14:07:05 +0200112
113 // Meters id related objects
114 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
115 // Available meter identifiers
Wailok Shumf013a782021-07-26 16:51:01 +0800116 private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
pierventre1b8afbc2020-07-13 14:07:05 +0200117 // Atomic counter map for generation of new identifiers;
118 private static final String METERIDSTORE = "onos-meters-id-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800119 private AtomicCounterMap<MeterTableKey> meterIdGenerators;
pierventre1b8afbc2020-07-13 14:07:05 +0200120
121 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800122 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
123 .register(KryoNamespaces.API)
124 .register(MeterKey.class)
125 .register(MeterData.class)
126 .register(DefaultMeter.class)
127 .register(DefaultBand.class)
128 .register(Band.Type.class)
129 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530130 .register(Meter.Unit.class)
131 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800132 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
133
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700135 private StorageService storageService;
136
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200138 protected DriverService driverService;
139
pierventre1b8afbc2020-07-13 14:07:05 +0200140 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700141 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700142 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700143
Pier Luigif094c612017-10-14 12:15:02 +0200144 /**
145 * Defines possible selection strategies to reuse meter ids.
146 */
147 enum ReuseStrategy {
148 /**
149 * Select randomly an available id.
150 */
151 RANDOM,
152 /**
153 * Select the first one.
154 */
155 FIRST_FIT
156 }
Pier Luigif094c612017-10-14 12:15:02 +0200157 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100158
alshabib7bb05012015-08-05 10:15:09 -0700159 @Activate
160 public void activate() {
pierventre1b8afbc2020-07-13 14:07:05 +0200161 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700162 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700163 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800164 .withSerializer(serializer).build();
Wailok Shumf013a782021-07-26 16:51:01 +0800165 meters.addListener(metersMapListener);
pierventre44220052020-09-22 12:51:06 +0200166 metersMap = meters.asJavaMap();
Wailok Shumf013a782021-07-26 16:51:01 +0800167 // Init meter features map
168 metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
169 .withName(METERFEATURESSTORE)
170 .withTimestampProvider((key, features) -> new WallClockTimestamp())
171 .withSerializer(KryoNamespace.newBuilder()
172 .register(KryoNamespaces.API)
173 .register(MeterTableKey.class)
174 .register(MeterFeatures.class)
175 .register(DefaultMeterFeatures.class)
176 .register(DefaultBand.class)
177 .register(Band.Type.class)
178 .register(Meter.Unit.class)
179 .register(MeterFailReason.class)
180 .register(MeterFeaturesFlag.class)).build();
181 metersFeatures.addListener(featuresMapListener);
182 // Init the map of the available ids set
183 // Set will be created when a new Meter Features is pushed to the store
184 availableMeterIds = new ConcurrentHashMap<>();
Pier Luigif094c612017-10-14 12:15:02 +0200185 // Init atomic map counters
Wailok Shumf013a782021-07-26 16:51:01 +0800186 meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
Pier Luigif094c612017-10-14 12:15:02 +0200187 .withName(METERIDSTORE)
Wailok Shumf013a782021-07-26 16:51:01 +0800188 .withSerializer(Serializer.using(KryoNamespaces.API,
189 MeterTableKey.class,
190 MeterScope.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700191 log.info("Started");
192 }
193
194 @Deactivate
195 public void deactivate() {
Wailok Shumf013a782021-07-26 16:51:01 +0800196 meters.removeListener(metersMapListener);
197 metersFeatures.removeListener(featuresMapListener);
198 meters.destroy();
199 metersFeatures.destroy();
200 availableMeterIds.forEach((key, set) -> {
201 set.destroy();
202 });
alshabib7bb05012015-08-05 10:15:09 -0700203 log.info("Stopped");
204 }
205
alshabib7bb05012015-08-05 10:15:09 -0700206 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700207 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200208 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700209 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700210 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200211 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700212 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200213 // Store the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200214 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700215 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700216 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700217 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200218 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
219 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900220 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700221 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700222 }
Pier Luigif094c612017-10-14 12:15:02 +0200223 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700224 return future;
alshabib7bb05012015-08-05 10:15:09 -0700225 }
226
227 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700228 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200229 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700230 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700231 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Pier Luigif094c612017-10-14 12:15:02 +0200232 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700233 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200234 // Create the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200235 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200236 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700237 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700238 try {
Pier Luigif094c612017-10-14 12:15:02 +0200239 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700240 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200241 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700242 future.complete(MeterStoreResult.success());
243 }
alshabibeadfc8e2015-08-18 15:40:46 -0700244 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200245 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
246 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900247 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700248 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700249 }
Pier Luigif094c612017-10-14 12:15:02 +0200250 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700251 return future;
alshabib7bb05012015-08-05 10:15:09 -0700252 }
253
254 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100255 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
Wailok Shumf013a782021-07-26 16:51:01 +0800256 // Store meter features, this is done once for each features of every device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100257 MeterStoreResult result = MeterStoreResult.success();
Wailok Shumf013a782021-07-26 16:51:01 +0800258 MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
Jordi Ortizaa8de492016-12-01 00:21:36 +0100259 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800260 metersFeatures.put(key, meterfeatures);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100261 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200262 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
263 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100264 result = MeterStoreResult.fail(TIMEOUT);
265 }
266 return result;
267 }
268
269 @Override
270 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
271 MeterStoreResult result = MeterStoreResult.success();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100272 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800273 Set<MeterTableKey> keys = metersFeatures.keySet().stream()
274 .filter(key -> key.deviceId().equals(deviceId))
275 .collect(Collectors.toUnmodifiableSet());
276 keys.forEach(k -> {
277 metersFeatures.remove(k);
278 });
Jordi Ortizaa8de492016-12-01 00:21:36 +0100279 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200280 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
Wailok Shumf013a782021-07-26 16:51:01 +0800281 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100282 result = MeterStoreResult.fail(TIMEOUT);
283 }
Wailok Shumf013a782021-07-26 16:51:01 +0800284
Jordi Ortizaa8de492016-12-01 00:21:36 +0100285 return result;
286 }
287
288 @Override
pierventre1b8afbc2020-07-13 14:07:05 +0200289 // TODO Should we remove it ? We are not using it
alshabibeadfc8e2015-08-18 15:40:46 -0700290 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
291 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700292 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
293 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700294
pierventre1b8afbc2020-07-13 14:07:05 +0200295 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700296 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700297 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700298 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
299 }
alshabibeadfc8e2015-08-18 15:40:46 -0700300 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200301 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
302 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900303 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700304 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700305 }
alshabibeadfc8e2015-08-18 15:40:46 -0700306 return future;
alshabib7bb05012015-08-05 10:15:09 -0700307 }
308
309 @Override
pierventre44220052020-09-22 12:51:06 +0200310 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200311 // Update meter if present (stats workflow)
alshabib70aaa1b2015-09-25 14:30:59 -0700312 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
pierventre44220052020-09-22 12:51:06 +0200313 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700314 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100315 MeterState meterState = m.state();
316 if (meterState == MeterState.PENDING_ADD) {
317 m.setState(meter.state());
318 }
alshabib7bb05012015-08-05 10:15:09 -0700319 m.setProcessedPackets(meter.packetsSeen());
320 m.setProcessedBytes(meter.bytesSeen());
321 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700322 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700323 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200324 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700325 });
pierventre44220052020-09-22 12:51:06 +0200326 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700327 }
328
329 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700330 public Meter getMeter(MeterKey key) {
331 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700332 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700333 }
334
335 @Override
336 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200337 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700338 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700339 }
340
341 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200342 public Collection<Meter> getAllMeters(DeviceId deviceId) {
343 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200344 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200345 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
346 MeterData::meter);
347 }
348
349 @Override
alshabib7bb05012015-08-05 10:15:09 -0700350 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200351 // Meter ops failed (got notification from the sb)
alshabib70aaa1b2015-09-25 14:30:59 -0700352 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
pierventre1b8afbc2020-07-13 14:07:05 +0200353 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700354 }
355
alshabib5eb79392015-08-19 18:09:55 -0700356 @Override
357 public void deleteMeterNow(Meter m) {
Wailok Shumf013a782021-07-26 16:51:01 +0800358 // This method is renamed in onos-2.5
359 purgeMeter(m);
360 }
361
362 @Override
363 public void purgeMeter(Meter m) {
pierventre1b8afbc2020-07-13 14:07:05 +0200364 // Once we receive the ack from the sb
365 // create the key and remove definitely the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700366 MeterKey key = MeterKey.key(m.deviceId(), m.id());
pierventre1b8afbc2020-07-13 14:07:05 +0200367 try {
368 if (Versioned.valueOrNull(meters.remove(key)) != null) {
369 // Free the id
Wailok Shumf013a782021-07-26 16:51:01 +0800370 MeterScope scope;
371 if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
372 PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
373 scope = MeterScope.of(piMeterCellId.meterId().id());
374 } else {
375 scope = MeterScope.globalScope();
376 }
377 MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
378 freeMeterId(meterTableKey, m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200379 }
380 } catch (StorageException e) {
381 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
382 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100383 }
alshabib5eb79392015-08-19 18:09:55 -0700384 }
385
Jordi Ortizaa8de492016-12-01 00:21:36 +0100386 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000387 public void purgeMeter(DeviceId deviceId) {
pierventre1b8afbc2020-07-13 14:07:05 +0200388 // Purge api (typically used when the device is offline)
Gamze Abakaf57ef602019-03-11 06:52:48 +0000389 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
390 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
391 .map(Map.Entry::getValue)
392 .collect(Collectors.toList());
pierventre1b8afbc2020-07-13 14:07:05 +0200393 // Remove definitely the meter
Gamze Abakaf57ef602019-03-11 06:52:48 +0000394 metersPendingRemove.forEach(versionedMeterKey
Wailok Shumf013a782021-07-26 16:51:01 +0800395 -> purgeMeter(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000396 }
397
398 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100399 public long getMaxMeters(MeterFeaturesKey key) {
pierventre1b8afbc2020-07-13 14:07:05 +0200400 // Leverage the meter features to know the max id
Wailok Shumf013a782021-07-26 16:51:01 +0800401 // Create a Meter Table key with FeaturesKey's device and global scope
402 MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
403 return getMaxMeters(meterTableKey);
404 }
405
406 private long getMaxMeters(MeterTableKey key) {
407 // Leverage the meter features to know the max id
408 MeterFeatures features = metersFeatures.get(key);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100409 return features == null ? 0L : features.maxMeter();
410 }
411
Wailok Shumf013a782021-07-26 16:51:01 +0800412 private long getStartIndex(MeterTableKey key) {
413 // Leverage the meter features to know the start id
414 // Since we are using index now
415 // if there is no features related to the key
416 // -1 is returned
417 MeterFeatures features = metersFeatures.get(key);
418 return features == null ? -1L : features.startIndex();
419 }
420
421 private long getEndIndex(MeterTableKey key) {
422 // Leverage the meter features to know the max id
423 // Since we are using index now
424 // if there is no features related to the key
425 // -1 is returned
426 MeterFeatures features = metersFeatures.get(key);
427 return features == null ? -1L : features.endIndex();
428 }
429
Pier Luigif094c612017-10-14 12:15:02 +0200430 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
431 private long queryMaxMeters(DeviceId device) {
432 // Get driver handler for this device
433 DriverHandler handler = driverService.createHandler(device);
434 // If creation failed or the device does not have this behavior
435 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
436 // We cannot know max meter
437 return 0L;
438 }
439 // Get the behavior
440 MeterQuery query = handler.behaviour(MeterQuery.class);
Wailok Shumf013a782021-07-26 16:51:01 +0800441 // Insert a new available key set to the map
442 String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
443 MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
444 insertAvailableKeySet(meterTableKey, setName);
Pier Luigif094c612017-10-14 12:15:02 +0200445 // Return as max meter the result of the query
446 return query.getMaxMeters();
447 }
448
Wailok Shumf013a782021-07-26 16:51:01 +0800449 private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
Pier Luigif094c612017-10-14 12:15:02 +0200450 boolean available) {
Wailok Shumf013a782021-07-26 16:51:01 +0800451 // Retrieve the set first
452 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
453 if (keySet == null) {
454 // A reusable set should be inserted when a features is pushed
455 log.warn("Reusable Key set for device: {} scope: {} not found",
456 meterTableKey.deviceId(), meterTableKey.scope());
457 return false;
458 }
Pier Luigif094c612017-10-14 12:15:02 +0200459 // According to available, make available or unavailable a meter key
Wailok Shumf013a782021-07-26 16:51:01 +0800460 DeviceId deviceId = meterTableKey.deviceId();
461 return available ? keySet.add(MeterKey.key(deviceId, id)) :
462 keySet.remove(MeterKey.key(deviceId, id));
Pier Luigif094c612017-10-14 12:15:02 +0200463 }
464
Wailok Shumf013a782021-07-26 16:51:01 +0800465 private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
Pier Luigif094c612017-10-14 12:15:02 +0200466 // If there are no available ids
467 if (availableIds.isEmpty()) {
468 // Just end the cycle
469 return null;
470 }
471 // If it is the first fit
472 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
473 return availableIds.iterator().next();
474 }
475 // If it is random, get the size
476 int size = availableIds.size();
477 // Return a random element
478 return Iterables.get(availableIds, RandomUtils.nextInt(size));
479 }
480
481 // Implements reuse strategy
Wailok Shumf013a782021-07-26 16:51:01 +0800482 private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
483 // Create a Table key and use it to retrieve the reusable meterCellId set
484 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
485 if (keySet == null) {
486 // A reusable set should be inserted when a features is pushed
487 log.warn("Reusable Key set for device: {} scope: {} not found",
488 meterTableKey.deviceId(), meterTableKey.scope());
489 return null;
490 }
Pier Luigif094c612017-10-14 12:15:02 +0200491 // Filter key related to device id, and reduce to meter ids
Wailok Shumf013a782021-07-26 16:51:01 +0800492 Set<MeterCellId> localAvailableMeterIds = keySet.stream()
493 .filter(meterKey ->
494 meterKey.deviceId().equals(meterTableKey.deviceId()))
Pier Luigif094c612017-10-14 12:15:02 +0200495 .map(MeterKey::meterId)
496 .collect(Collectors.toSet());
497 // Get next available id
Wailok Shumf013a782021-07-26 16:51:01 +0800498 MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
Pier Luigif094c612017-10-14 12:15:02 +0200499 // Iterate until there are items
500 while (meterId != null) {
501 // If we are able to reserve the id
Wailok Shumf013a782021-07-26 16:51:01 +0800502 if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
Pier Luigif094c612017-10-14 12:15:02 +0200503 // Just end
504 return meterId;
505 }
506 // Update the set
507 localAvailableMeterIds.remove(meterId);
508 // Try another time
509 meterId = getNextAvailableId(localAvailableMeterIds);
510 }
511 // No reusable ids
512 return null;
513 }
514
515 @Override
516 public MeterId allocateMeterId(DeviceId deviceId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800517 // We use global scope for MeterId
518 return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
519 }
520
521 @Override
522 public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
523 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
524 MeterCellId meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200525 long id;
Wailok Shumf013a782021-07-26 16:51:01 +0800526 // First, search for reusable key
527 meterCellId = firstReusableMeterId(meterTableKey);
528 if (meterCellId != null) {
529 // A reusable key is found
530 return meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200531 }
Wailok Shumf013a782021-07-26 16:51:01 +0800532 // If there was no reusable meter id we have to generate a new value
533 // using start and end index as lower and upper bound respectively.
534 long startIndex = getStartIndex(meterTableKey);
535 long endIndex = getEndIndex(meterTableKey);
Pier Luigif094c612017-10-14 12:15:02 +0200536 // If the device does not give us MeterFeatures
Wailok Shumf013a782021-07-26 16:51:01 +0800537 if (startIndex == -1L || endIndex == -1L) {
Pier Luigif094c612017-10-14 12:15:02 +0200538 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
Wailok Shumf013a782021-07-26 16:51:01 +0800539 // Only meaningful to OpenFLow
540 long maxMeters = queryMaxMeters(deviceId);
541 // If we don't know the max, cannot proceed
542 if (maxMeters == 0L) {
543 return null;
544 } else {
545 // OpenFlow meter index starts from 1, ends with max-1
546 startIndex = 1L;
547 endIndex = maxMeters - 1;
548 }
Pier Luigif094c612017-10-14 12:15:02 +0200549 }
550 // Get a new value
Wailok Shumf013a782021-07-26 16:51:01 +0800551 // If the value is smaller than the start index, get another one
552 do {
553 id = meterIdGenerators.incrementAndGet(meterTableKey);
554 } while (id < startIndex);
555 // Check with the end index, and if the value is bigger, cannot proceed
556 if (id > endIndex) {
Pier Luigif094c612017-10-14 12:15:02 +0200557 return null;
558 }
559 // Done, return the value
Wailok Shumf013a782021-07-26 16:51:01 +0800560 // If we are using global scope, return a MeterId
561 // Else, return a PiMeterId
562 if (meterScope.isGlobal()) {
563 return MeterId.meterId(id);
564 } else {
565 return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
566 }
567
Pier Luigif094c612017-10-14 12:15:02 +0200568 }
569
570 @Override
571 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800572 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
573 freeMeterId(meterTableKey, meterId);
574 }
575
576 private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
577 long index;
578 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
579 PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
580 index = piMeterCellId.index();
581 } else if (meterCellId.type() == INDEX) {
582 MeterId meterId = (MeterId) meterCellId;
583 index = meterId.id();
584 } else {
585 return;
586 }
Pier Luigibdcd9672017-10-13 13:54:48 +0200587 // Avoid to free meter not allocated
Wailok Shumf013a782021-07-26 16:51:01 +0800588 if (meterIdGenerators.get(meterTableKey) < index) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200589 return;
590 }
Pier Luigif094c612017-10-14 12:15:02 +0200591 // Update the availability
Wailok Shumf013a782021-07-26 16:51:01 +0800592 updateMeterIdAvailability(meterTableKey, meterCellId, true);
Pier Luigif094c612017-10-14 12:15:02 +0200593 }
594
pierventre1b8afbc2020-07-13 14:07:05 +0200595 // Enabling the events distribution across the cluster
Wailok Shumf013a782021-07-26 16:51:01 +0800596 private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700597 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700598 public void event(MapEvent<MeterKey, MeterData> event) {
599 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700600 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
601 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200602 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700603 switch (event.type()) {
604 case INSERT:
605 case UPDATE:
606 switch (data.meter().state()) {
607 case PENDING_ADD:
608 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200609 // Two cases. If there is a reason, the meter operation failed.
610 // Otherwise, we are ready to install/remove through the delegate.
611 if (data.reason().isEmpty()) {
612 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
613 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
614 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100615 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200616 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100617 return null;
618 });
alshabibe1248b62015-08-20 17:21:55 -0700619 }
620 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200621 case ADDED:
622 // Transition from pending to installed
623 if (data.meter().state() == MeterState.ADDED &&
624 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
625 futures.computeIfPresent(key, (k, v) -> {
626 v.complete(MeterStoreResult.success());
627 return null;
628 });
629 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
630 // Update stats case
631 } else if (data.meter().referenceCount() == 0) {
632 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
633 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700634 }
635 break;
636 default:
637 log.warn("Unknown meter state type {}", data.meter().state());
638 }
639 break;
640 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200641 // Meter removal case
642 futures.computeIfPresent(key, (k, v) -> {
643 v.complete(MeterStoreResult.success());
644 return null;
645 });
646 // Finally notify the delegate
647 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700648 break;
649 default:
650 log.warn("Unknown Map event type {}", event.type());
651 }
alshabibeadfc8e2015-08-18 15:40:46 -0700652 }
653 }
654
Wailok Shumf013a782021-07-26 16:51:01 +0800655 private class InternalFeaturesMapEventListener implements
656 EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
657 @Override
658 public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
659 MeterTableKey meterTableKey = event.key();
660 MeterFeatures meterFeatures = event.value();
661 switch (event.type()) {
662 case PUT:
663 // Put a new available meter id set to the map
664 String setName = AVAILABLEMETERIDSTORE + "-" +
665 meterFeatures.deviceId() + meterFeatures.scope().id();
666 insertAvailableKeySet(meterTableKey, setName);
667 break;
668 case REMOVE:
669 // Remove the set
670 DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
671 if (set != null) {
672 set.destroy();
673 }
674 break;
675 default:
676 break;
677 }
678 }
679 }
680
681 private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
682 DistributedSet<MeterKey> availableMeterIdSet =
683 new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
684 .withName(setName)
685 .withSerializer(Serializer.using(KryoNamespaces.API,
686 MeterKey.class)).build(),
687 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
688 availableMeterIds.put(meterTableKey, availableMeterIdSet);
689 }
alshabib7bb05012015-08-05 10:15:09 -0700690}