blob: 38f8748f1f1d0a098fc4115f0bd2d70a074fcb76 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080016package org.onosproject.store.meter.impl;
alshabib7bb05012015-08-05 10:15:09 -070017
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
pierventre44220052020-09-22 12:51:06 +020019import com.google.common.collect.ImmutableSet;
Pier Luigif094c612017-10-14 12:15:02 +020020import com.google.common.collect.Iterables;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import com.google.common.collect.Lists;
alshabibeadfc8e2015-08-18 15:40:46 -070022import com.google.common.collect.Maps;
Pier Luigif094c612017-10-14 12:15:02 +020023import org.apache.commons.lang.math.RandomUtils;
Charles Chan593acf92017-11-22 13:55:41 -080024import org.onlab.util.KryoNamespace;
Jordi Ortizaa8de492016-12-01 00:21:36 +010025import org.onosproject.net.DeviceId;
Pier Luigif094c612017-10-14 12:15:02 +020026import org.onosproject.net.behaviour.MeterQuery;
27import org.onosproject.net.driver.DriverHandler;
28import org.onosproject.net.driver.DriverService;
alshabib58fe6dc2015-08-19 17:16:13 -070029import org.onosproject.net.meter.Band;
30import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070031import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010032import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070033import org.onosproject.net.meter.Meter;
Wailok Shumf013a782021-07-26 16:51:01 +080034import org.onosproject.net.meter.MeterCellId;
alshabib10c810b2015-08-18 16:59:04 -070035import org.onosproject.net.meter.MeterEvent;
36import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010037import org.onosproject.net.meter.MeterFeatures;
cansu.toprak409289d2017-10-27 10:04:05 +030038import org.onosproject.net.meter.MeterFeaturesFlag;
Jordi Ortizaa8de492016-12-01 00:21:36 +010039import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010040import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070041import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070042import org.onosproject.net.meter.MeterOperation;
Wailok Shumf013a782021-07-26 16:51:01 +080043import org.onosproject.net.meter.MeterScope;
alshabib10c810b2015-08-18 16:59:04 -070044import org.onosproject.net.meter.MeterState;
45import org.onosproject.net.meter.MeterStore;
46import org.onosproject.net.meter.MeterStoreDelegate;
47import org.onosproject.net.meter.MeterStoreResult;
Wailok Shumf013a782021-07-26 16:51:01 +080048import org.onosproject.net.meter.MeterTableKey;
49import org.onosproject.net.pi.model.PiMeterId;
50import org.onosproject.net.pi.runtime.PiMeterCellId;
alshabib7bb05012015-08-05 10:15:09 -070051import org.onosproject.store.AbstractStore;
Pier Luigif094c612017-10-14 12:15:02 +020052import org.onosproject.store.primitives.DefaultDistributedSet;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.serializers.KryoNamespaces;
Pier Luigif094c612017-10-14 12:15:02 +020054import org.onosproject.store.service.AtomicCounterMap;
alshabib7bb05012015-08-05 10:15:09 -070055import org.onosproject.store.service.ConsistentMap;
Pier Luigif094c612017-10-14 12:15:02 +020056import org.onosproject.store.service.DistributedPrimitive;
57import org.onosproject.store.service.DistributedSet;
Wailok Shumf013a782021-07-26 16:51:01 +080058import org.onosproject.store.service.EventuallyConsistentMap;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
alshabibeadfc8e2015-08-18 15:40:46 -070061import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070063import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070064import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070065import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070066import org.onosproject.store.service.Versioned;
Wailok Shumf013a782021-07-26 16:51:01 +080067import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068import org.osgi.service.component.annotations.Activate;
69import org.osgi.service.component.annotations.Component;
70import org.osgi.service.component.annotations.Deactivate;
71import org.osgi.service.component.annotations.Reference;
72import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070073import org.slf4j.Logger;
74
75import java.util.Collection;
Wailok Shumf013a782021-07-26 16:51:01 +080076import java.util.concurrent.ConcurrentMap;
77import java.util.concurrent.ConcurrentHashMap;
Gamze Abakaf57ef602019-03-11 06:52:48 +000078import java.util.List;
alshabibeadfc8e2015-08-18 15:40:46 -070079import java.util.Map;
Gamze Abakaf57ef602019-03-11 06:52:48 +000080import java.util.Objects;
Pier Luigif094c612017-10-14 12:15:02 +020081import java.util.Set;
alshabibeadfc8e2015-08-18 15:40:46 -070082import java.util.concurrent.CompletableFuture;
Pier Luigif094c612017-10-14 12:15:02 +020083import java.util.stream.Collectors;
alshabib7bb05012015-08-05 10:15:09 -070084
Thomas Vachuska52f2cd12018-11-08 21:20:04 -080085import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
Jordi Ortizaa8de492016-12-01 00:21:36 +010086import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
Wailok Shumf013a782021-07-26 16:51:01 +080087import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
88import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
Wailok Shum6a249352021-07-29 00:02:56 +080089import static org.onosproject.net.meter.MeterStoreResult.Type.FAIL;
alshabib7bb05012015-08-05 10:15:09 -070090import static org.slf4j.LoggerFactory.getLogger;
91
92/**
93 * A distributed meter store implementation. Meters are stored consistently
94 * across the cluster.
95 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070096@Component(immediate = true, service = MeterStore.class)
alshabib7bb05012015-08-05 10:15:09 -070097public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
98 implements MeterStore {
99
100 private Logger log = getLogger(getClass());
101
pierventre1b8afbc2020-07-13 14:07:05 +0200102 // Meters map related objects
alshabib7bb05012015-08-05 10:15:09 -0700103 private static final String METERSTORE = "onos-meter-store";
pierventre1b8afbc2020-07-13 14:07:05 +0200104 private ConsistentMap<MeterKey, MeterData> meters;
Wailok Shumf013a782021-07-26 16:51:01 +0800105 private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
pierventre44220052020-09-22 12:51:06 +0200106 private Map<MeterKey, MeterData> metersMap;
alshabib7bb05012015-08-05 10:15:09 -0700107
pierventre1b8afbc2020-07-13 14:07:05 +0200108 // Meters features related objects
109 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800110 private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
111 private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
112 new InternalFeaturesMapEventListener();
pierventre1b8afbc2020-07-13 14:07:05 +0200113
114 // Meters id related objects
115 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
116 // Available meter identifiers
Wailok Shumf013a782021-07-26 16:51:01 +0800117 private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
pierventre1b8afbc2020-07-13 14:07:05 +0200118 // Atomic counter map for generation of new identifiers;
119 private static final String METERIDSTORE = "onos-meters-id-store";
Wailok Shumf013a782021-07-26 16:51:01 +0800120 private AtomicCounterMap<MeterTableKey> meterIdGenerators;
pierventre1b8afbc2020-07-13 14:07:05 +0200121
122 // Serializer related objects
Charles Chan593acf92017-11-22 13:55:41 -0800123 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
124 .register(KryoNamespaces.API)
125 .register(MeterKey.class)
126 .register(MeterData.class)
127 .register(DefaultMeter.class)
128 .register(DefaultBand.class)
129 .register(Band.Type.class)
130 .register(MeterState.class)
debmaiti1bea2892019-06-04 12:36:38 +0530131 .register(Meter.Unit.class)
132 .register(MeterFailReason.class);
Charles Chan593acf92017-11-22 13:55:41 -0800133 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
134
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib7bb05012015-08-05 10:15:09 -0700136 private StorageService storageService;
137
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Pier Luigif094c612017-10-14 12:15:02 +0200139 protected DriverService driverService;
140
pierventre1b8afbc2020-07-13 14:07:05 +0200141 // Local cache to handle async ops through futures.
alshabib70aaa1b2015-09-25 14:30:59 -0700142 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -0700143 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -0700144
Pier Luigif094c612017-10-14 12:15:02 +0200145 /**
146 * Defines possible selection strategies to reuse meter ids.
147 */
148 enum ReuseStrategy {
149 /**
150 * Select randomly an available id.
151 */
152 RANDOM,
153 /**
154 * Select the first one.
155 */
156 FIRST_FIT
157 }
Pier Luigif094c612017-10-14 12:15:02 +0200158 private ReuseStrategy reuseStrategy = FIRST_FIT;
Jordi Ortiz6c847762017-01-30 17:13:05 +0100159
alshabib7bb05012015-08-05 10:15:09 -0700160 @Activate
161 public void activate() {
pierventre1b8afbc2020-07-13 14:07:05 +0200162 // Init meters map and setup the map listener
alshabib70aaa1b2015-09-25 14:30:59 -0700163 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700164 .withName(METERSTORE)
Charles Chan593acf92017-11-22 13:55:41 -0800165 .withSerializer(serializer).build();
Wailok Shumf013a782021-07-26 16:51:01 +0800166 meters.addListener(metersMapListener);
pierventre44220052020-09-22 12:51:06 +0200167 metersMap = meters.asJavaMap();
Wailok Shumf013a782021-07-26 16:51:01 +0800168 // Init meter features map
169 metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
170 .withName(METERFEATURESSTORE)
171 .withTimestampProvider((key, features) -> new WallClockTimestamp())
172 .withSerializer(KryoNamespace.newBuilder()
173 .register(KryoNamespaces.API)
174 .register(MeterTableKey.class)
175 .register(MeterFeatures.class)
176 .register(DefaultMeterFeatures.class)
177 .register(DefaultBand.class)
178 .register(Band.Type.class)
179 .register(Meter.Unit.class)
180 .register(MeterFailReason.class)
181 .register(MeterFeaturesFlag.class)).build();
182 metersFeatures.addListener(featuresMapListener);
183 // Init the map of the available ids set
184 // Set will be created when a new Meter Features is pushed to the store
185 availableMeterIds = new ConcurrentHashMap<>();
Pier Luigif094c612017-10-14 12:15:02 +0200186 // Init atomic map counters
Wailok Shumf013a782021-07-26 16:51:01 +0800187 meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
Pier Luigif094c612017-10-14 12:15:02 +0200188 .withName(METERIDSTORE)
Wailok Shumf013a782021-07-26 16:51:01 +0800189 .withSerializer(Serializer.using(KryoNamespaces.API,
190 MeterTableKey.class,
191 MeterScope.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700192 log.info("Started");
193 }
194
195 @Deactivate
196 public void deactivate() {
Wailok Shumf013a782021-07-26 16:51:01 +0800197 meters.removeListener(metersMapListener);
198 metersFeatures.removeListener(featuresMapListener);
199 meters.destroy();
200 metersFeatures.destroy();
201 availableMeterIds.forEach((key, set) -> {
202 set.destroy();
203 });
alshabib7bb05012015-08-05 10:15:09 -0700204 log.info("Stopped");
205 }
206
alshabib7bb05012015-08-05 10:15:09 -0700207 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800208 public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
209 // Init steps
210 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
211 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
212 MeterData data = new MeterData(meter, null);
213 // Store the future related to the operation
214 futures.put(key, future);
215 // Check if the meter exists
216 try {
217 meters.compute(key, (k, v) -> data);
218 } catch (StorageException e) {
219 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
220 e.getMessage(), e);
221 futures.remove(key);
222 future.completeExceptionally(e);
223 }
224 return future;
225 }
226
227 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700228 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200229 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700230 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800231 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
Pier Luigif094c612017-10-14 12:15:02 +0200232 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700233 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200234 // Store the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200235 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700236 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700237 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700238 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200239 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
240 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900241 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700242 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700243 }
Pier Luigif094c612017-10-14 12:15:02 +0200244 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700245 return future;
alshabib7bb05012015-08-05 10:15:09 -0700246 }
247
248 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700249 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
Pier Luigif094c612017-10-14 12:15:02 +0200250 // Init steps
alshabibeadfc8e2015-08-18 15:40:46 -0700251 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800252 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
Pier Luigif094c612017-10-14 12:15:02 +0200253 // Store the future related to the operation
alshabib70aaa1b2015-09-25 14:30:59 -0700254 futures.put(key, future);
Pier Luigif094c612017-10-14 12:15:02 +0200255 // Create the meter data
pierventre1b8afbc2020-07-13 14:07:05 +0200256 MeterData data = new MeterData(meter, null);
Pier Luigif094c612017-10-14 12:15:02 +0200257 // Update the state of the meter. It will be pruned by observing
alshabib7bb05012015-08-05 10:15:09 -0700258 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700259 try {
Pier Luigif094c612017-10-14 12:15:02 +0200260 // If it does not exist in the system
alshabib70aaa1b2015-09-25 14:30:59 -0700261 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
Pier Luigif094c612017-10-14 12:15:02 +0200262 // Complete immediately
alshabibe1248b62015-08-20 17:21:55 -0700263 future.complete(MeterStoreResult.success());
264 }
alshabibeadfc8e2015-08-18 15:40:46 -0700265 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200266 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
267 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900268 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700269 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700270 }
Pier Luigif094c612017-10-14 12:15:02 +0200271 // Done, return the future
alshabibeadfc8e2015-08-18 15:40:46 -0700272 return future;
alshabib7bb05012015-08-05 10:15:09 -0700273 }
274
275 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100276 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
Wailok Shumf013a782021-07-26 16:51:01 +0800277 // Store meter features, this is done once for each features of every device
Jordi Ortizaa8de492016-12-01 00:21:36 +0100278 MeterStoreResult result = MeterStoreResult.success();
Wailok Shumf013a782021-07-26 16:51:01 +0800279 MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
Jordi Ortizaa8de492016-12-01 00:21:36 +0100280 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800281 metersFeatures.put(key, meterfeatures);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100282 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200283 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
284 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100285 result = MeterStoreResult.fail(TIMEOUT);
286 }
287 return result;
288 }
289
290 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800291 public MeterStoreResult storeMeterFeatures(Collection<MeterFeatures> meterfeatures) {
292 // These store operations is treated as one single operation
293 // If one of them is failed, Fail is returned
294 // But the failed operation will not block the rest.
295 MeterStoreResult result = MeterStoreResult.success();
296 for (MeterFeatures mf : meterfeatures) {
297 if (storeMeterFeatures(mf).type() == FAIL) {
298 result = MeterStoreResult.fail(TIMEOUT);
299 }
300 }
301 return result;
302 }
303
304 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100305 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
306 MeterStoreResult result = MeterStoreResult.success();
Jordi Ortizaa8de492016-12-01 00:21:36 +0100307 try {
Wailok Shumf013a782021-07-26 16:51:01 +0800308 Set<MeterTableKey> keys = metersFeatures.keySet().stream()
309 .filter(key -> key.deviceId().equals(deviceId))
310 .collect(Collectors.toUnmodifiableSet());
311 keys.forEach(k -> {
312 metersFeatures.remove(k);
313 });
Jordi Ortizaa8de492016-12-01 00:21:36 +0100314 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200315 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
Wailok Shumf013a782021-07-26 16:51:01 +0800316 e.getMessage(), e);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100317 result = MeterStoreResult.fail(TIMEOUT);
318 }
Wailok Shumf013a782021-07-26 16:51:01 +0800319
Jordi Ortizaa8de492016-12-01 00:21:36 +0100320 return result;
321 }
322
323 @Override
Wailok Shum6a249352021-07-29 00:02:56 +0800324 public MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures) {
325 // These store operations is treated as one single operation
326 // If one of them is failed, Fail is returned
327 // But the failed operation will not block the rest.
328 MeterStoreResult result = MeterStoreResult.success();
329 for (MeterFeatures mf : meterfeatures) {
330 try {
331 MeterTableKey key = MeterTableKey.key(mf.deviceId(), mf.scope());
332 metersFeatures.remove(key);
333 } catch (StorageException e) {
334 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
335 e.getMessage(), e);
336 result = MeterStoreResult.fail(TIMEOUT);
337 }
338 }
339
340 return result;
341 }
342
343 @Override
pierventre1b8afbc2020-07-13 14:07:05 +0200344 // TODO Should we remove it ? We are not using it
alshabibeadfc8e2015-08-18 15:40:46 -0700345 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
346 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
Wailok Shum6a249352021-07-29 00:02:56 +0800347 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
alshabib70aaa1b2015-09-25 14:30:59 -0700348 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700349
pierventre1b8afbc2020-07-13 14:07:05 +0200350 MeterData data = new MeterData(meter, null);
alshabibeadfc8e2015-08-18 15:40:46 -0700351 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700352 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700353 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
354 }
alshabibeadfc8e2015-08-18 15:40:46 -0700355 } catch (StorageException e) {
pierventre1b8afbc2020-07-13 14:07:05 +0200356 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
357 e.getMessage(), e);
Hwanwook Lee8206ad92018-01-02 18:03:50 +0900358 futures.remove(key);
alshabibeadfc8e2015-08-18 15:40:46 -0700359 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700360 }
alshabibeadfc8e2015-08-18 15:40:46 -0700361 return future;
alshabib7bb05012015-08-05 10:15:09 -0700362 }
363
364 @Override
pierventre44220052020-09-22 12:51:06 +0200365 public Meter updateMeterState(Meter meter) {
pierventre1b8afbc2020-07-13 14:07:05 +0200366 // Update meter if present (stats workflow)
Wailok Shum6a249352021-07-29 00:02:56 +0800367 MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
pierventre44220052020-09-22 12:51:06 +0200368 Versioned<MeterData> value = meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700369 DefaultMeter m = (DefaultMeter) v.meter();
pier59721bf2020-01-08 08:57:46 +0100370 MeterState meterState = m.state();
371 if (meterState == MeterState.PENDING_ADD) {
372 m.setState(meter.state());
373 }
alshabib7bb05012015-08-05 10:15:09 -0700374 m.setProcessedPackets(meter.packetsSeen());
375 m.setProcessedBytes(meter.bytesSeen());
376 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700377 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700378 m.setReferenceCount(meter.referenceCount());
pierventre1b8afbc2020-07-13 14:07:05 +0200379 return new MeterData(m, null);
alshabib7bb05012015-08-05 10:15:09 -0700380 });
pierventre44220052020-09-22 12:51:06 +0200381 return value != null ? value.value().meter() : null;
alshabib7bb05012015-08-05 10:15:09 -0700382 }
383
384 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700385 public Meter getMeter(MeterKey key) {
386 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700387 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700388 }
389
390 @Override
391 public Collection<Meter> getAllMeters() {
pierventre44220052020-09-22 12:51:06 +0200392 return Collections2.transform(ImmutableSet.copyOf(metersMap.values()),
alshabibeadfc8e2015-08-18 15:40:46 -0700393 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700394 }
395
396 @Override
Jordi Ortiz9287b632017-06-22 11:01:37 +0200397 public Collection<Meter> getAllMeters(DeviceId deviceId) {
398 return Collections2.transform(
pierventre44220052020-09-22 12:51:06 +0200399 Collections2.filter(ImmutableSet.copyOf(metersMap.values()),
Jordi Ortiz9287b632017-06-22 11:01:37 +0200400 (MeterData m) -> m.meter().deviceId().equals(deviceId)),
401 MeterData::meter);
402 }
403
404 @Override
alshabib7bb05012015-08-05 10:15:09 -0700405 public void failedMeter(MeterOperation op, MeterFailReason reason) {
pierventre1b8afbc2020-07-13 14:07:05 +0200406 // Meter ops failed (got notification from the sb)
alshabib70aaa1b2015-09-25 14:30:59 -0700407 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
pierventre1b8afbc2020-07-13 14:07:05 +0200408 meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
alshabib7bb05012015-08-05 10:15:09 -0700409 }
410
alshabib5eb79392015-08-19 18:09:55 -0700411 @Override
412 public void deleteMeterNow(Meter m) {
Wailok Shumf013a782021-07-26 16:51:01 +0800413 // This method is renamed in onos-2.5
414 purgeMeter(m);
415 }
416
417 @Override
418 public void purgeMeter(Meter m) {
pierventre1b8afbc2020-07-13 14:07:05 +0200419 // Once we receive the ack from the sb
420 // create the key and remove definitely the meter
alshabib70aaa1b2015-09-25 14:30:59 -0700421 MeterKey key = MeterKey.key(m.deviceId(), m.id());
pierventre1b8afbc2020-07-13 14:07:05 +0200422 try {
423 if (Versioned.valueOrNull(meters.remove(key)) != null) {
424 // Free the id
Wailok Shumf013a782021-07-26 16:51:01 +0800425 MeterScope scope;
426 if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
427 PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
428 scope = MeterScope.of(piMeterCellId.meterId().id());
429 } else {
430 scope = MeterScope.globalScope();
431 }
432 MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
433 freeMeterId(meterTableKey, m.meterCellId());
pierventre1b8afbc2020-07-13 14:07:05 +0200434 }
435 } catch (StorageException e) {
436 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
437 e.getMessage(), e);
pier59721bf2020-01-08 08:57:46 +0100438 }
alshabib5eb79392015-08-19 18:09:55 -0700439 }
440
Jordi Ortizaa8de492016-12-01 00:21:36 +0100441 @Override
Gamze Abakaf57ef602019-03-11 06:52:48 +0000442 public void purgeMeter(DeviceId deviceId) {
pierventre1b8afbc2020-07-13 14:07:05 +0200443 // Purge api (typically used when the device is offline)
Gamze Abakaf57ef602019-03-11 06:52:48 +0000444 List<Versioned<MeterData>> metersPendingRemove = meters.stream()
445 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
446 .map(Map.Entry::getValue)
447 .collect(Collectors.toList());
pierventre1b8afbc2020-07-13 14:07:05 +0200448 // Remove definitely the meter
Gamze Abakaf57ef602019-03-11 06:52:48 +0000449 metersPendingRemove.forEach(versionedMeterKey
Wailok Shumf013a782021-07-26 16:51:01 +0800450 -> purgeMeter(versionedMeterKey.value().meter()));
Gamze Abakaf57ef602019-03-11 06:52:48 +0000451 }
452
453 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100454 public long getMaxMeters(MeterFeaturesKey key) {
pierventre1b8afbc2020-07-13 14:07:05 +0200455 // Leverage the meter features to know the max id
Wailok Shumf013a782021-07-26 16:51:01 +0800456 // Create a Meter Table key with FeaturesKey's device and global scope
457 MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
458 return getMaxMeters(meterTableKey);
459 }
460
461 private long getMaxMeters(MeterTableKey key) {
462 // Leverage the meter features to know the max id
463 MeterFeatures features = metersFeatures.get(key);
Jordi Ortizaa8de492016-12-01 00:21:36 +0100464 return features == null ? 0L : features.maxMeter();
465 }
466
Wailok Shumf013a782021-07-26 16:51:01 +0800467 private long getStartIndex(MeterTableKey key) {
468 // Leverage the meter features to know the start id
469 // Since we are using index now
470 // if there is no features related to the key
471 // -1 is returned
472 MeterFeatures features = metersFeatures.get(key);
473 return features == null ? -1L : features.startIndex();
474 }
475
476 private long getEndIndex(MeterTableKey key) {
477 // Leverage the meter features to know the max id
478 // Since we are using index now
479 // if there is no features related to the key
480 // -1 is returned
481 MeterFeatures features = metersFeatures.get(key);
482 return features == null ? -1L : features.endIndex();
483 }
484
Pier Luigif094c612017-10-14 12:15:02 +0200485 // queryMaxMeters is implemented in FullMetersAvailable behaviour.
486 private long queryMaxMeters(DeviceId device) {
487 // Get driver handler for this device
488 DriverHandler handler = driverService.createHandler(device);
489 // If creation failed or the device does not have this behavior
490 if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
491 // We cannot know max meter
492 return 0L;
493 }
494 // Get the behavior
495 MeterQuery query = handler.behaviour(MeterQuery.class);
Wailok Shumf013a782021-07-26 16:51:01 +0800496 // Insert a new available key set to the map
497 String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
498 MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
499 insertAvailableKeySet(meterTableKey, setName);
Pier Luigif094c612017-10-14 12:15:02 +0200500 // Return as max meter the result of the query
501 return query.getMaxMeters();
502 }
503
Wailok Shumf013a782021-07-26 16:51:01 +0800504 private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
Pier Luigif094c612017-10-14 12:15:02 +0200505 boolean available) {
Wailok Shumf013a782021-07-26 16:51:01 +0800506 // Retrieve the set first
507 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
508 if (keySet == null) {
509 // A reusable set should be inserted when a features is pushed
510 log.warn("Reusable Key set for device: {} scope: {} not found",
511 meterTableKey.deviceId(), meterTableKey.scope());
512 return false;
513 }
Pier Luigif094c612017-10-14 12:15:02 +0200514 // According to available, make available or unavailable a meter key
Wailok Shumf013a782021-07-26 16:51:01 +0800515 DeviceId deviceId = meterTableKey.deviceId();
516 return available ? keySet.add(MeterKey.key(deviceId, id)) :
517 keySet.remove(MeterKey.key(deviceId, id));
Pier Luigif094c612017-10-14 12:15:02 +0200518 }
519
Wailok Shumf013a782021-07-26 16:51:01 +0800520 private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
Pier Luigif094c612017-10-14 12:15:02 +0200521 // If there are no available ids
522 if (availableIds.isEmpty()) {
523 // Just end the cycle
524 return null;
525 }
526 // If it is the first fit
527 if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
528 return availableIds.iterator().next();
529 }
530 // If it is random, get the size
531 int size = availableIds.size();
532 // Return a random element
533 return Iterables.get(availableIds, RandomUtils.nextInt(size));
534 }
535
536 // Implements reuse strategy
Wailok Shumf013a782021-07-26 16:51:01 +0800537 private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
538 // Create a Table key and use it to retrieve the reusable meterCellId set
539 DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
540 if (keySet == null) {
541 // A reusable set should be inserted when a features is pushed
542 log.warn("Reusable Key set for device: {} scope: {} not found",
543 meterTableKey.deviceId(), meterTableKey.scope());
544 return null;
545 }
Pier Luigif094c612017-10-14 12:15:02 +0200546 // Filter key related to device id, and reduce to meter ids
Wailok Shumf013a782021-07-26 16:51:01 +0800547 Set<MeterCellId> localAvailableMeterIds = keySet.stream()
548 .filter(meterKey ->
549 meterKey.deviceId().equals(meterTableKey.deviceId()))
Pier Luigif094c612017-10-14 12:15:02 +0200550 .map(MeterKey::meterId)
551 .collect(Collectors.toSet());
552 // Get next available id
Wailok Shumf013a782021-07-26 16:51:01 +0800553 MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
Pier Luigif094c612017-10-14 12:15:02 +0200554 // Iterate until there are items
555 while (meterId != null) {
556 // If we are able to reserve the id
Wailok Shumf013a782021-07-26 16:51:01 +0800557 if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
Pier Luigif094c612017-10-14 12:15:02 +0200558 // Just end
559 return meterId;
560 }
561 // Update the set
562 localAvailableMeterIds.remove(meterId);
563 // Try another time
564 meterId = getNextAvailableId(localAvailableMeterIds);
565 }
566 // No reusable ids
567 return null;
568 }
569
570 @Override
571 public MeterId allocateMeterId(DeviceId deviceId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800572 // We use global scope for MeterId
573 return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
574 }
575
576 @Override
577 public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
578 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
579 MeterCellId meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200580 long id;
Wailok Shumf013a782021-07-26 16:51:01 +0800581 // First, search for reusable key
582 meterCellId = firstReusableMeterId(meterTableKey);
583 if (meterCellId != null) {
584 // A reusable key is found
585 return meterCellId;
Pier Luigif094c612017-10-14 12:15:02 +0200586 }
Wailok Shumf013a782021-07-26 16:51:01 +0800587 // If there was no reusable meter id we have to generate a new value
588 // using start and end index as lower and upper bound respectively.
589 long startIndex = getStartIndex(meterTableKey);
590 long endIndex = getEndIndex(meterTableKey);
Pier Luigif094c612017-10-14 12:15:02 +0200591 // If the device does not give us MeterFeatures
Wailok Shumf013a782021-07-26 16:51:01 +0800592 if (startIndex == -1L || endIndex == -1L) {
Pier Luigif094c612017-10-14 12:15:02 +0200593 // MeterFeatures couldn't be retrieved, fallback to queryMeters.
Wailok Shumf013a782021-07-26 16:51:01 +0800594 // Only meaningful to OpenFLow
595 long maxMeters = queryMaxMeters(deviceId);
596 // If we don't know the max, cannot proceed
597 if (maxMeters == 0L) {
598 return null;
599 } else {
600 // OpenFlow meter index starts from 1, ends with max-1
601 startIndex = 1L;
602 endIndex = maxMeters - 1;
603 }
Pier Luigif094c612017-10-14 12:15:02 +0200604 }
605 // Get a new value
Wailok Shumf013a782021-07-26 16:51:01 +0800606 // If the value is smaller than the start index, get another one
607 do {
608 id = meterIdGenerators.incrementAndGet(meterTableKey);
609 } while (id < startIndex);
610 // Check with the end index, and if the value is bigger, cannot proceed
611 if (id > endIndex) {
Pier Luigif094c612017-10-14 12:15:02 +0200612 return null;
613 }
614 // Done, return the value
Wailok Shumf013a782021-07-26 16:51:01 +0800615 // If we are using global scope, return a MeterId
616 // Else, return a PiMeterId
617 if (meterScope.isGlobal()) {
618 return MeterId.meterId(id);
619 } else {
620 return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
621 }
622
Pier Luigif094c612017-10-14 12:15:02 +0200623 }
624
625 @Override
626 public void freeMeterId(DeviceId deviceId, MeterId meterId) {
Wailok Shumf013a782021-07-26 16:51:01 +0800627 MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
628 freeMeterId(meterTableKey, meterId);
629 }
630
631 private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
632 long index;
633 if (meterCellId.type() == PIPELINE_INDEPENDENT) {
634 PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
635 index = piMeterCellId.index();
636 } else if (meterCellId.type() == INDEX) {
637 MeterId meterId = (MeterId) meterCellId;
638 index = meterId.id();
639 } else {
640 return;
641 }
Pier Luigibdcd9672017-10-13 13:54:48 +0200642 // Avoid to free meter not allocated
Wailok Shumf013a782021-07-26 16:51:01 +0800643 if (meterIdGenerators.get(meterTableKey) < index) {
Pier Luigibdcd9672017-10-13 13:54:48 +0200644 return;
645 }
Pier Luigif094c612017-10-14 12:15:02 +0200646 // Update the availability
Wailok Shumf013a782021-07-26 16:51:01 +0800647 updateMeterIdAvailability(meterTableKey, meterCellId, true);
Pier Luigif094c612017-10-14 12:15:02 +0200648 }
649
pierventre1b8afbc2020-07-13 14:07:05 +0200650 // Enabling the events distribution across the cluster
Wailok Shumf013a782021-07-26 16:51:01 +0800651 private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700652 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700653 public void event(MapEvent<MeterKey, MeterData> event) {
654 MeterKey key = event.key();
Ray Milkeyd0f017f2018-09-21 12:52:34 -0700655 Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
656 MeterData data = value.value();
pierventre1b8afbc2020-07-13 14:07:05 +0200657 MeterData oldData = Versioned.valueOrNull(event.oldValue());
alshabibeadfc8e2015-08-18 15:40:46 -0700658 switch (event.type()) {
659 case INSERT:
660 case UPDATE:
661 switch (data.meter().state()) {
662 case PENDING_ADD:
663 case PENDING_REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200664 // Two cases. If there is a reason, the meter operation failed.
665 // Otherwise, we are ready to install/remove through the delegate.
666 if (data.reason().isEmpty()) {
667 notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
668 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
669 } else {
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100670 futures.computeIfPresent(key, (k, v) -> {
pierventre1b8afbc2020-07-13 14:07:05 +0200671 v.complete(MeterStoreResult.fail(data.reason().get()));
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100672 return null;
673 });
alshabibe1248b62015-08-20 17:21:55 -0700674 }
675 break;
pierventre1b8afbc2020-07-13 14:07:05 +0200676 case ADDED:
677 // Transition from pending to installed
678 if (data.meter().state() == MeterState.ADDED &&
679 (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
680 futures.computeIfPresent(key, (k, v) -> {
681 v.complete(MeterStoreResult.success());
682 return null;
683 });
684 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
685 // Update stats case
686 } else if (data.meter().referenceCount() == 0) {
687 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
688 data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700689 }
690 break;
691 default:
692 log.warn("Unknown meter state type {}", data.meter().state());
693 }
694 break;
695 case REMOVE:
pierventre1b8afbc2020-07-13 14:07:05 +0200696 // Meter removal case
697 futures.computeIfPresent(key, (k, v) -> {
698 v.complete(MeterStoreResult.success());
699 return null;
700 });
701 // Finally notify the delegate
702 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
alshabibeadfc8e2015-08-18 15:40:46 -0700703 break;
704 default:
705 log.warn("Unknown Map event type {}", event.type());
706 }
alshabibeadfc8e2015-08-18 15:40:46 -0700707 }
708 }
709
Wailok Shumf013a782021-07-26 16:51:01 +0800710 private class InternalFeaturesMapEventListener implements
711 EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
712 @Override
713 public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
714 MeterTableKey meterTableKey = event.key();
715 MeterFeatures meterFeatures = event.value();
716 switch (event.type()) {
717 case PUT:
718 // Put a new available meter id set to the map
719 String setName = AVAILABLEMETERIDSTORE + "-" +
720 meterFeatures.deviceId() + meterFeatures.scope().id();
721 insertAvailableKeySet(meterTableKey, setName);
722 break;
723 case REMOVE:
724 // Remove the set
725 DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
726 if (set != null) {
727 set.destroy();
728 }
729 break;
730 default:
731 break;
732 }
733 }
734 }
735
736 private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
737 DistributedSet<MeterKey> availableMeterIdSet =
738 new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
739 .withName(setName)
740 .withSerializer(Serializer.using(KryoNamespaces.API,
741 MeterKey.class)).build(),
742 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
743 availableMeterIds.put(meterTableKey, availableMeterIdSet);
744 }
alshabib7bb05012015-08-05 10:15:09 -0700745}