blob: b681c27f4339cfef1e4d9e302076936cfaac011d [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
alshabib7bb05012015-08-05 10:15:09 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070020import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070021import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070025import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.onosproject.mastership.MastershipService;
Jordi Ortizaa8de492016-12-01 00:21:36 +010029import org.onosproject.net.DeviceId;
alshabib58fe6dc2015-08-19 17:16:13 -070030import org.onosproject.net.meter.Band;
31import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070032import org.onosproject.net.meter.DefaultMeter;
33import org.onosproject.net.meter.Meter;
34import org.onosproject.net.meter.MeterEvent;
35import org.onosproject.net.meter.MeterFailReason;
Jordi Ortizaa8de492016-12-01 00:21:36 +010036import org.onosproject.net.meter.MeterFeatures;
37import org.onosproject.net.meter.MeterFeaturesKey;
alshabib70aaa1b2015-09-25 14:30:59 -070038import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070039import org.onosproject.net.meter.MeterOperation;
40import org.onosproject.net.meter.MeterState;
41import org.onosproject.net.meter.MeterStore;
42import org.onosproject.net.meter.MeterStoreDelegate;
43import org.onosproject.net.meter.MeterStoreResult;
Jordi Ortizaa8de492016-12-01 00:21:36 +010044import org.onosproject.net.meter.DefaultMeterFeatures;
alshabib7bb05012015-08-05 10:15:09 -070045import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070046import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070047import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070048import org.onosproject.store.service.MapEvent;
49import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070050import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070051import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070052import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070053import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070054import org.slf4j.Logger;
55
alshabibeadfc8e2015-08-18 15:40:46 -070056import java.util.Arrays;
alshabib7bb05012015-08-05 10:15:09 -070057import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070058import java.util.Map;
59import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070060
Jordi Ortizaa8de492016-12-01 00:21:36 +010061import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
alshabib7bb05012015-08-05 10:15:09 -070062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * A distributed meter store implementation. Meters are stored consistently
66 * across the cluster.
67 */
alshabib58fe6dc2015-08-19 17:16:13 -070068@Component(immediate = true)
69@Service
alshabib7bb05012015-08-05 10:15:09 -070070public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
71 implements MeterStore {
72
73 private Logger log = getLogger(getClass());
74
75 private static final String METERSTORE = "onos-meter-store";
Jordi Ortizaa8de492016-12-01 00:21:36 +010076 private static final String METERFEATURESSTORE = "onos-meter-features-store";
alshabib7bb05012015-08-05 10:15:09 -070077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 private StorageService storageService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070082 private MastershipService mastershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 private ClusterService clusterService;
86
alshabib70aaa1b2015-09-25 14:30:59 -070087 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -070088 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -070089
Jordi Ortizaa8de492016-12-01 00:21:36 +010090 private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
91
HIGUCHI Yuta0574a552015-09-29 14:38:25 -070092 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -070093
alshabib70aaa1b2015-09-25 14:30:59 -070094 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -070095 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -070096
97 @Activate
98 public void activate() {
99
100 local = clusterService.getLocalNode().id();
101
alshabib7bb05012015-08-05 10:15:09 -0700102
alshabib70aaa1b2015-09-25 14:30:59 -0700103 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -0700104 .withName(METERSTORE)
alshabibeadfc8e2015-08-18 15:40:46 -0700105 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
alshabib70aaa1b2015-09-25 14:30:59 -0700106 MeterKey.class,
alshabib58fe6dc2015-08-19 17:16:13 -0700107 MeterData.class,
108 DefaultMeter.class,
109 DefaultBand.class,
110 Band.Type.class,
111 MeterState.class,
112 Meter.Unit.class,
HIGUCHI Yuta03666a32016-05-18 11:49:09 -0700113 MeterFailReason.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700114
alshabibeadfc8e2015-08-18 15:40:46 -0700115 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700116
Jordi Ortizaa8de492016-12-01 00:21:36 +0100117 meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
118 .withName(METERFEATURESSTORE)
119 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
120 MeterFeaturesKey.class,
121 MeterFeatures.class,
122 DefaultMeterFeatures.class,
123 Band.Type.class,
124 Meter.Unit.class,
125 MeterFailReason.class)).build();
126
alshabib7bb05012015-08-05 10:15:09 -0700127 log.info("Started");
128 }
129
130 @Deactivate
131 public void deactivate() {
132
alshabibeadfc8e2015-08-18 15:40:46 -0700133 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700134 log.info("Stopped");
135 }
136
alshabib7bb05012015-08-05 10:15:09 -0700137
138 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700139 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
140 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700141 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
142 futures.put(key, future);
alshabibeadfc8e2015-08-18 15:40:46 -0700143 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700144
alshabibeadfc8e2015-08-18 15:40:46 -0700145 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700146 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700147 } catch (StorageException e) {
148 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700149 }
150
alshabibeadfc8e2015-08-18 15:40:46 -0700151 return future;
152
alshabib7bb05012015-08-05 10:15:09 -0700153 }
154
155 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700156 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
157 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700158 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
159 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700160
alshabibeadfc8e2015-08-18 15:40:46 -0700161 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700162
163 // update the state of the meter. It will be pruned by observing
164 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700165 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700166 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700167 future.complete(MeterStoreResult.success());
168 }
alshabibeadfc8e2015-08-18 15:40:46 -0700169 } catch (StorageException e) {
170 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700171 }
172
alshabibeadfc8e2015-08-18 15:40:46 -0700173
174 return future;
alshabib7bb05012015-08-05 10:15:09 -0700175 }
176
177 @Override
Jordi Ortizaa8de492016-12-01 00:21:36 +0100178 public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
179 MeterStoreResult result = MeterStoreResult.success();
180 MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
181 try {
182 meterFeatures.putIfAbsent(key, meterfeatures);
183 } catch (StorageException e) {
184 result = MeterStoreResult.fail(TIMEOUT);
185 }
186 return result;
187 }
188
189 @Override
190 public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
191 MeterStoreResult result = MeterStoreResult.success();
192 MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
193 try {
194 meterFeatures.remove(key);
195 } catch (StorageException e) {
196 result = MeterStoreResult.fail(TIMEOUT);
197 }
198 return result;
199 }
200
201 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700202 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
203 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700204 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
205 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700206
alshabibeadfc8e2015-08-18 15:40:46 -0700207 MeterData data = new MeterData(meter, null, local);
208 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700209 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700210 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
211 }
alshabibeadfc8e2015-08-18 15:40:46 -0700212 } catch (StorageException e) {
213 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700214 }
alshabibeadfc8e2015-08-18 15:40:46 -0700215 return future;
alshabib7bb05012015-08-05 10:15:09 -0700216 }
217
218 @Override
219 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700220 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
221 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700222 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700223 m.setState(meter.state());
224 m.setProcessedPackets(meter.packetsSeen());
225 m.setProcessedBytes(meter.bytesSeen());
226 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700227 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700228 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700229 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700230 });
231 }
232
233 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700234 public Meter getMeter(MeterKey key) {
235 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700236 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700237 }
238
239 @Override
240 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700241 return Collections2.transform(meters.asJavaMap().values(),
242 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700243 }
244
245 @Override
246 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700247 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
248 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700249 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700250 }
251
alshabib5eb79392015-08-19 18:09:55 -0700252 @Override
253 public void deleteMeterNow(Meter m) {
alshabib70aaa1b2015-09-25 14:30:59 -0700254 MeterKey key = MeterKey.key(m.deviceId(), m.id());
255 futures.remove(key);
256 meters.remove(key);
alshabib5eb79392015-08-19 18:09:55 -0700257 }
258
Jordi Ortizaa8de492016-12-01 00:21:36 +0100259 @Override
260 public long getMaxMeters(MeterFeaturesKey key) {
261 MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
262 return features == null ? 0L : features.maxMeter();
263 }
264
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700265 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700266 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700267 public void event(MapEvent<MeterKey, MeterData> event) {
268 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700269 MeterData data = event.value().value();
270 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
271 switch (event.type()) {
272 case INSERT:
273 case UPDATE:
274 switch (data.meter().state()) {
275 case PENDING_ADD:
276 case PENDING_REMOVE:
277 if (!data.reason().isPresent() && local.equals(master)) {
278 notifyDelegate(
279 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
280 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
281 data.meter()));
282 } else if (data.reason().isPresent() && local.equals(data.origin())) {
283 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
284 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700285 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700286 }
287 break;
288 case ADDED:
alshabibe1248b62015-08-20 17:21:55 -0700289 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700290 futures.remove(key).complete(MeterStoreResult.success());
alshabibe1248b62015-08-20 17:21:55 -0700291 }
292 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700293 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700294 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700295 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700296 }
297 break;
298 default:
299 log.warn("Unknown meter state type {}", data.meter().state());
300 }
301 break;
302 case REMOVE:
303 //Only happens at origin so we do not need to care.
304 break;
305 default:
306 log.warn("Unknown Map event type {}", event.type());
307 }
308
309 }
310 }
311
312
alshabib7bb05012015-08-05 10:15:09 -0700313}