blob: 25276d5604dc293bc19716cf31693b1a1059ba62 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070020import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070021import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070025import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010029import org.onosproject.net.DeviceId;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.net.meter.Band;
31import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.DefaultMeter;
Jordi Ortiz6c847762017-01-30 17:13:05 +010033import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib10c810b2015-08-18 16:59:04 -070034import org.onosproject.net.meter.Meter;
35import org.onosproject.net.meter.MeterEvent;
36import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010037import org.onosproject.net.meter.MeterFeatures;
38import org.onosproject.net.meter.MeterFeaturesKey;
Jordi Ortiz6c847762017-01-30 17:13:05 +010039import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070040import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070041import org.onosproject.net.meter.MeterOperation;
42import org.onosproject.net.meter.MeterState;
43import org.onosproject.net.meter.MeterStore;
44import org.onosproject.net.meter.MeterStoreDelegate;
45import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070046import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070047import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070049import org.onosproject.store.service.MapEvent;
50import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070051import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070052import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070053import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070054import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070055import org.slf4j.Logger;
56
alshabibeadfc8e2015-08-18 15:40:46 -070057import java.util.Arrays;
Jordi Ortiz6c847762017-01-30 17:13:05 +010058import java.util.BitSet;
alshabib7bb05012015-08-05 10:15:09 -070059import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070060import java.util.Map;
61import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070062
Jordi Ortizaa8de492016-12-01 00:21:36 +010063import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070064import static org.slf4j.LoggerFactory.getLogger;
65
66/**
67 * A distributed meter store implementation. Meters are stored consistently
68 * across the cluster.
69 */
alshabib58fe6dc2015-08-19 17:16:13 -070070@Component(immediate = true)
71@Service
alshabib7bb05012015-08-05 10:15:09 -070072public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
73 implements MeterStore {
74
75 private Logger log = getLogger(getClass());
76
77 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010078 private static final String METERFEATURESSTORE = "onos-meter-features-store";
Jordi Ortiz6c847762017-01-30 17:13:05 +010079 private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
alshabib7bb05012015-08-05 10:15:09 -070080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 private StorageService storageService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070085 private MastershipService mastershipService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 private ClusterService clusterService;
89
alshabib70aaa1b2015-09-25 14:30:59 -070090 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -070091 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -070092
Jordi Ortizaa8de492016-12-01 00:21:36 +010093 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
94
HIGUCHI Yuta0574a552015-09-29 14:38:25 -070095 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -070096
alshabib70aaa1b2015-09-25 14:30:59 -070097 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -070098 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -070099
Jordi Ortiz6c847762017-01-30 17:13:05 +0100100 private ConsistentMap<DeviceId, BitSet> availableMeterIds;
101
alshabib7bb05012015-08-05 10:15:09 -0700102 @Activate
103 public void activate() {
alshabib7bb05012015-08-05 10:15:09 -0700104 local = clusterService.getLocalNode().id();
105
alshabib70aaa1b2015-09-25 14:30:59 -0700106 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700107 .withName(METERSTORE)
alshabibeadfc8e2015-08-18 15:40:46 -0700108 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
alshabib70aaa1b2015-09-25 14:30:59 -0700109 MeterKey.class,
alshabib58fe6dc2015-08-19 17:16:13 -0700110 MeterData.class,
111 DefaultMeter.class,
112 DefaultBand.class,
113 Band.Type.class,
114 MeterState.class,
115 Meter.Unit.class,
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700116 MeterFailReason.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700117
alshabibeadfc8e2015-08-18 15:40:46 -0700118 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700119
Jordi Ortizaa8de492016-12-01 00:21:36 +0100120 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
121 .withName(METERFEATURESSTORE)
122 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
123 MeterFeaturesKey.class,
124 MeterFeatures.class,
125 DefaultMeterFeatures.class,
126 Band.Type.class,
127 Meter.Unit.class,
128 MeterFailReason.class)).build();
129
Jordi Ortiz6c847762017-01-30 17:13:05 +0100130 availableMeterIds = storageService.<DeviceId, BitSet>consistentMapBuilder()
131 .withName(AVAILABLEMETERIDSTORE)
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700132 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
Jordi Ortiz6c847762017-01-30 17:13:05 +0100133
alshabib7bb05012015-08-05 10:15:09 -0700134 log.info("Started");
135 }
136
137 @Deactivate
138 public void deactivate() {
alshabibeadfc8e2015-08-18 15:40:46 -0700139 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700140 log.info("Stopped");
141 }
142
Jordi Ortiz6c847762017-01-30 17:13:05 +0100143 private void updateMeterIdAvailability(DeviceId deviceId, MeterId id,
144 boolean available) {
145 availableMeterIds.compute(deviceId, (k, v) -> {
146 v = v == null ? new BitSet() : v;
147 v.set(id.id().intValue(), available);
148 return v;
149 });
150 }
151
Yuta HIGUCHI872c9822017-05-25 09:35:14 -0700152 @Override
Jordi Ortiz6c847762017-01-30 17:13:05 +0100153 public MeterId firstReusableMeterId(DeviceId deviceId) {
154 Versioned<BitSet> bitSetVersioned = availableMeterIds.get(deviceId);
155 if (bitSetVersioned == null) {
156 return null;
157 }
158 BitSet value = bitSetVersioned.value();
159 int nextSetBit = value.nextSetBit(1);
160 if (nextSetBit < 0) {
161 return null;
162 }
163 return MeterId.meterId(nextSetBit);
164 }
alshabib7bb05012015-08-05 10:15:09 -0700165
166 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700167 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
168 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700169 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
Jordi Ortiz6c847762017-01-30 17:13:05 +0100170 updateMeterIdAvailability(meter.deviceId(), meter.id(), false);
alshabib70aaa1b2015-09-25 14:30:59 -0700171 futures.put(key, future);
alshabibeadfc8e2015-08-18 15:40:46 -0700172 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700173
alshabibeadfc8e2015-08-18 15:40:46 -0700174 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700175 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700176 } catch (StorageException e) {
177 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700178 }
179
alshabibeadfc8e2015-08-18 15:40:46 -0700180 return future;
181
alshabib7bb05012015-08-05 10:15:09 -0700182 }
183
184 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700185 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
186 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700187 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
188 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700189
alshabibeadfc8e2015-08-18 15:40:46 -0700190 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700191
192 // update the state of the meter. It will be pruned by observing
193 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700194 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700195 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700196 future.complete(MeterStoreResult.success());
197 }
Jordi Ortiz6c847762017-01-30 17:13:05 +0100198 updateMeterIdAvailability(meter.deviceId(), meter.id(), true);
alshabibeadfc8e2015-08-18 15:40:46 -0700199 } catch (StorageException e) {
200 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700201 }
202
alshabibeadfc8e2015-08-18 15:40:46 -0700203
204 return future;
alshabib7bb05012015-08-05 10:15:09 -0700205 }
206
207 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100208 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
209 MeterStoreResult result = MeterStoreResult.success();
210 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
211 try {
212 meterFeatures.putIfAbsent(key, meterfeatures);
213 } catch (StorageException e) {
214 result = MeterStoreResult.fail(TIMEOUT);
215 }
216 return result;
217 }
218
219 @Override
220 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
221 MeterStoreResult result = MeterStoreResult.success();
222 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
223 try {
224 meterFeatures.remove(key);
225 } catch (StorageException e) {
226 result = MeterStoreResult.fail(TIMEOUT);
227 }
228 return result;
229 }
230
231 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700232 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
233 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700234 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
235 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700236
alshabibeadfc8e2015-08-18 15:40:46 -0700237 MeterData data = new MeterData(meter, null, local);
238 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700239 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700240 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
241 }
alshabibeadfc8e2015-08-18 15:40:46 -0700242 } catch (StorageException e) {
243 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700244 }
alshabibeadfc8e2015-08-18 15:40:46 -0700245 return future;
alshabib7bb05012015-08-05 10:15:09 -0700246 }
247
248 @Override
249 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700250 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
251 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700252 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700253 m.setState(meter.state());
254 m.setProcessedPackets(meter.packetsSeen());
255 m.setProcessedBytes(meter.bytesSeen());
256 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700257 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700258 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700259 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700260 });
261 }
262
263 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700264 public Meter getMeter(MeterKey key) {
265 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700266 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700267 }
268
269 @Override
270 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700271 return Collections2.transform(meters.asJavaMap().values(),
272 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700273 }
274
275 @Override
276 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700277 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
278 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700279 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700280 }
281
alshabib5eb79392015-08-19 18:09:55 -0700282 @Override
283 public void deleteMeterNow(Meter m) {
alshabib70aaa1b2015-09-25 14:30:59 -0700284 MeterKey key = MeterKey.key(m.deviceId(), m.id());
285 futures.remove(key);
286 meters.remove(key);
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100287 notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
alshabib5eb79392015-08-19 18:09:55 -0700288 }
289
Jordi Ortizaa8de492016-12-01 00:21:36 +0100290 @Override
291 public long getMaxMeters(MeterFeaturesKey key) {
292 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
293 return features == null ? 0L : features.maxMeter();
294 }
295
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700296 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700297 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700298 public void event(MapEvent<MeterKey, MeterData> event) {
299 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700300 MeterData data = event.value().value();
301 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
302 switch (event.type()) {
303 case INSERT:
304 case UPDATE:
305 switch (data.meter().state()) {
306 case PENDING_ADD:
307 case PENDING_REMOVE:
308 if (!data.reason().isPresent() && local.equals(master)) {
309 notifyDelegate(
310 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
311 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
312 data.meter()));
313 } else if (data.reason().isPresent() && local.equals(data.origin())) {
314 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
315 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700316 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700317 }
318 break;
319 case ADDED:
Jordi Ortizdf28ecd2017-03-25 19:22:36 +0100320 if (local.equals(data.origin()) &&
321 (data.meter().state() == MeterState.PENDING_ADD
322 || data.meter().state() == MeterState.ADDED)) {
323 futures.computeIfPresent(key, (k, v) -> {
324 notifyDelegate(
325 new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
326 return null;
327 });
alshabibe1248b62015-08-20 17:21:55 -0700328 }
329 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700330 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700331 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700332 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700333 }
334 break;
335 default:
336 log.warn("Unknown meter state type {}", data.meter().state());
337 }
338 break;
339 case REMOVE:
340 //Only happens at origin so we do not need to care.
341 break;
342 default:
343 log.warn("Unknown Map event type {}", event.type());
344 }
345
346 }
347 }
348
349
alshabib7bb05012015-08-05 10:15:09 -0700350}