blob: 62a94675b1f087e110fe70ea0ae8800cc4a4e03b [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070020import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070021import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070025import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.onosproject.mastership.MastershipService;
29import org.onosproject.net.meter.Band;
30import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070031import org.onosproject.net.meter.DefaultMeter;
32import org.onosproject.net.meter.Meter;
33import org.onosproject.net.meter.MeterEvent;
34import org.onosproject.net.meter.MeterFailReason;
35import org.onosproject.net.meter.MeterId;
alshabib70aaa1b2015-09-25 14:30:59 -070036import org.onosproject.net.meter.MeterKey;
alshabib10c810b2015-08-18 16:59:04 -070037import org.onosproject.net.meter.MeterOperation;
38import org.onosproject.net.meter.MeterState;
39import org.onosproject.net.meter.MeterStore;
40import org.onosproject.net.meter.MeterStoreDelegate;
41import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070042import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070043import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070044import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070045import org.onosproject.store.service.MapEvent;
46import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070047import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070048import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070049import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070050import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070051import org.slf4j.Logger;
52
alshabibeadfc8e2015-08-18 15:40:46 -070053import java.util.Arrays;
alshabib7bb05012015-08-05 10:15:09 -070054import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070055import java.util.Map;
56import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070057
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * A distributed meter store implementation. Meters are stored consistently
62 * across the cluster.
63 */
alshabib58fe6dc2015-08-19 17:16:13 -070064@Component(immediate = true)
65@Service
alshabib7bb05012015-08-05 10:15:09 -070066public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
67 implements MeterStore {
68
69 private Logger log = getLogger(getClass());
70
71 private static final String METERSTORE = "onos-meter-store";
alshabib7bb05012015-08-05 10:15:09 -070072
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 private StorageService storageService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070077 private MastershipService mastershipService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 private ClusterService clusterService;
81
alshabib70aaa1b2015-09-25 14:30:59 -070082 private ConsistentMap<MeterKey, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -070083 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -070084
HIGUCHI Yuta0574a552015-09-29 14:38:25 -070085 private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
alshabibeadfc8e2015-08-18 15:40:46 -070086
alshabib70aaa1b2015-09-25 14:30:59 -070087 private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
alshabibeadfc8e2015-08-18 15:40:46 -070088 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -070089
90 @Activate
91 public void activate() {
92
93 local = clusterService.getLocalNode().id();
94
alshabib7bb05012015-08-05 10:15:09 -070095
alshabib70aaa1b2015-09-25 14:30:59 -070096 meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -070097 .withName(METERSTORE)
alshabibeadfc8e2015-08-18 15:40:46 -070098 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
alshabib70aaa1b2015-09-25 14:30:59 -070099 MeterKey.class,
alshabib58fe6dc2015-08-19 17:16:13 -0700100 MeterData.class,
101 DefaultMeter.class,
102 DefaultBand.class,
103 Band.Type.class,
104 MeterState.class,
105 Meter.Unit.class,
106 MeterFailReason.class,
107 MeterId.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700108
alshabibeadfc8e2015-08-18 15:40:46 -0700109 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700110
111 log.info("Started");
112 }
113
114 @Deactivate
115 public void deactivate() {
116
alshabibeadfc8e2015-08-18 15:40:46 -0700117 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700118 log.info("Stopped");
119 }
120
alshabib7bb05012015-08-05 10:15:09 -0700121
122 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700123 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
124 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700125 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
126 futures.put(key, future);
alshabibeadfc8e2015-08-18 15:40:46 -0700127 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700128
alshabibeadfc8e2015-08-18 15:40:46 -0700129 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700130 meters.put(key, data);
alshabibeadfc8e2015-08-18 15:40:46 -0700131 } catch (StorageException e) {
132 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700133 }
134
alshabibeadfc8e2015-08-18 15:40:46 -0700135 return future;
136
alshabib7bb05012015-08-05 10:15:09 -0700137 }
138
139 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700140 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
141 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700142 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
143 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700144
alshabibeadfc8e2015-08-18 15:40:46 -0700145 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700146
147 // update the state of the meter. It will be pruned by observing
148 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700149 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700150 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700151 future.complete(MeterStoreResult.success());
152 }
alshabibeadfc8e2015-08-18 15:40:46 -0700153 } catch (StorageException e) {
154 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700155 }
156
alshabibeadfc8e2015-08-18 15:40:46 -0700157
158 return future;
alshabib7bb05012015-08-05 10:15:09 -0700159 }
160
161 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700162 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
163 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
alshabib70aaa1b2015-09-25 14:30:59 -0700164 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
165 futures.put(key, future);
alshabib7bb05012015-08-05 10:15:09 -0700166
alshabibeadfc8e2015-08-18 15:40:46 -0700167 MeterData data = new MeterData(meter, null, local);
168 try {
alshabib70aaa1b2015-09-25 14:30:59 -0700169 if (meters.computeIfPresent(key, (k, v) -> data) == null) {
alshabibe1248b62015-08-20 17:21:55 -0700170 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
171 }
alshabibeadfc8e2015-08-18 15:40:46 -0700172 } catch (StorageException e) {
173 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700174 }
alshabibeadfc8e2015-08-18 15:40:46 -0700175 return future;
alshabib7bb05012015-08-05 10:15:09 -0700176 }
177
178 @Override
179 public void updateMeterState(Meter meter) {
alshabib70aaa1b2015-09-25 14:30:59 -0700180 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
181 meters.computeIfPresent(key, (k, v) -> {
alshabibeadfc8e2015-08-18 15:40:46 -0700182 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700183 m.setState(meter.state());
184 m.setProcessedPackets(meter.packetsSeen());
185 m.setProcessedBytes(meter.bytesSeen());
186 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700187 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700188 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700189 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700190 });
191 }
192
193 @Override
alshabib70aaa1b2015-09-25 14:30:59 -0700194 public Meter getMeter(MeterKey key) {
195 MeterData data = Versioned.valueOrElse(meters.get(key), null);
alshabibeadfc8e2015-08-18 15:40:46 -0700196 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700197 }
198
199 @Override
200 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700201 return Collections2.transform(meters.asJavaMap().values(),
202 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700203 }
204
205 @Override
206 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabib70aaa1b2015-09-25 14:30:59 -0700207 MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
208 meters.computeIfPresent(key, (k, v) ->
alshabibeadfc8e2015-08-18 15:40:46 -0700209 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700210 }
211
alshabib5eb79392015-08-19 18:09:55 -0700212 @Override
213 public void deleteMeterNow(Meter m) {
alshabib70aaa1b2015-09-25 14:30:59 -0700214 MeterKey key = MeterKey.key(m.deviceId(), m.id());
215 futures.remove(key);
216 meters.remove(key);
alshabib5eb79392015-08-19 18:09:55 -0700217 }
218
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700219 private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
alshabibeadfc8e2015-08-18 15:40:46 -0700220 @Override
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700221 public void event(MapEvent<MeterKey, MeterData> event) {
222 MeterKey key = event.key();
alshabibeadfc8e2015-08-18 15:40:46 -0700223 MeterData data = event.value().value();
224 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
225 switch (event.type()) {
226 case INSERT:
227 case UPDATE:
228 switch (data.meter().state()) {
229 case PENDING_ADD:
230 case PENDING_REMOVE:
231 if (!data.reason().isPresent() && local.equals(master)) {
232 notifyDelegate(
233 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
234 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
235 data.meter()));
236 } else if (data.reason().isPresent() && local.equals(data.origin())) {
237 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
238 //TODO: No future -> no friend
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700239 futures.get(key).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700240 }
241 break;
242 case ADDED:
alshabibe1248b62015-08-20 17:21:55 -0700243 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700244 futures.remove(key).complete(MeterStoreResult.success());
alshabibe1248b62015-08-20 17:21:55 -0700245 }
246 break;
alshabibeadfc8e2015-08-18 15:40:46 -0700247 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700248 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
HIGUCHI Yuta0574a552015-09-29 14:38:25 -0700249 futures.remove(key).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700250 }
251 break;
252 default:
253 log.warn("Unknown meter state type {}", data.meter().state());
254 }
255 break;
256 case REMOVE:
257 //Only happens at origin so we do not need to care.
258 break;
259 default:
260 log.warn("Unknown Map event type {}", event.type());
261 }
262
263 }
264 }
265
266
alshabib7bb05012015-08-05 10:15:09 -0700267}