blob: f343d839ae24b080a9c89110ae6e9397fcc96637 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070020import org.apache.felix.scr.annotations.Activate;
alshabib58fe6dc2015-08-19 17:16:13 -070021import org.apache.felix.scr.annotations.Component;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib58fe6dc2015-08-19 17:16:13 -070025import org.apache.felix.scr.annotations.Service;
alshabib7bb05012015-08-05 10:15:09 -070026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
alshabib58fe6dc2015-08-19 17:16:13 -070028import org.onosproject.mastership.MastershipService;
29import org.onosproject.net.meter.Band;
30import org.onosproject.net.meter.DefaultBand;
alshabib10c810b2015-08-18 16:59:04 -070031import org.onosproject.net.meter.DefaultMeter;
32import org.onosproject.net.meter.Meter;
33import org.onosproject.net.meter.MeterEvent;
34import org.onosproject.net.meter.MeterFailReason;
35import org.onosproject.net.meter.MeterId;
36import org.onosproject.net.meter.MeterOperation;
37import org.onosproject.net.meter.MeterState;
38import org.onosproject.net.meter.MeterStore;
39import org.onosproject.net.meter.MeterStoreDelegate;
40import org.onosproject.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070041import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070042import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070043import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070044import org.onosproject.store.service.MapEvent;
45import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070046import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070047import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070048import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070049import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070050import org.slf4j.Logger;
51
alshabibeadfc8e2015-08-18 15:40:46 -070052import java.util.Arrays;
alshabib7bb05012015-08-05 10:15:09 -070053import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070054import java.util.Map;
55import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070056
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * A distributed meter store implementation. Meters are stored consistently
61 * across the cluster.
62 */
alshabib58fe6dc2015-08-19 17:16:13 -070063@Component(immediate = true)
64@Service
alshabib7bb05012015-08-05 10:15:09 -070065public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
66 implements MeterStore {
67
68 private Logger log = getLogger(getClass());
69
70 private static final String METERSTORE = "onos-meter-store";
alshabib7bb05012015-08-05 10:15:09 -070071
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private StorageService storageService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070076 private MastershipService mastershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 private ClusterService clusterService;
80
alshabibeadfc8e2015-08-18 15:40:46 -070081 private ConsistentMap<MeterId, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -070082 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -070083
alshabibeadfc8e2015-08-18 15:40:46 -070084 private MapEventListener mapListener = new InternalMapEventListener();
85
86 private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
87 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -070088
89 @Activate
90 public void activate() {
91
92 local = clusterService.getLocalNode().id();
93
alshabib7bb05012015-08-05 10:15:09 -070094
alshabibeadfc8e2015-08-18 15:40:46 -070095 meters = storageService.<MeterId, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -070096 .withName(METERSTORE)
alshabibeadfc8e2015-08-18 15:40:46 -070097 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
alshabib58fe6dc2015-08-19 17:16:13 -070098 MeterData.class,
99 DefaultMeter.class,
100 DefaultBand.class,
101 Band.Type.class,
102 MeterState.class,
103 Meter.Unit.class,
104 MeterFailReason.class,
105 MeterId.class)).build();
alshabib7bb05012015-08-05 10:15:09 -0700106
alshabibeadfc8e2015-08-18 15:40:46 -0700107 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700108
109 log.info("Started");
110 }
111
112 @Deactivate
113 public void deactivate() {
114
alshabibeadfc8e2015-08-18 15:40:46 -0700115 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700116 log.info("Stopped");
117 }
118
alshabib7bb05012015-08-05 10:15:09 -0700119
120 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700121 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
122 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
123 futures.put(meter.id(), future);
124 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700125
alshabibeadfc8e2015-08-18 15:40:46 -0700126 try {
127 meters.put(meter.id(), data);
128 } catch (StorageException e) {
129 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700130 }
131
alshabibeadfc8e2015-08-18 15:40:46 -0700132 return future;
133
alshabib7bb05012015-08-05 10:15:09 -0700134 }
135
136 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700137 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
138 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
139 futures.put(meter.id(), future);
alshabib7bb05012015-08-05 10:15:09 -0700140
alshabibeadfc8e2015-08-18 15:40:46 -0700141 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700142
143 // update the state of the meter. It will be pruned by observing
144 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700145 try {
146 meters.put(meter.id(), data);
147 } catch (StorageException e) {
148 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700149 }
150
alshabibeadfc8e2015-08-18 15:40:46 -0700151
152 return future;
alshabib7bb05012015-08-05 10:15:09 -0700153 }
154
155 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700156 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
157 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
158 futures.put(meter.id(), future);
alshabib7bb05012015-08-05 10:15:09 -0700159
alshabibeadfc8e2015-08-18 15:40:46 -0700160 MeterData data = new MeterData(meter, null, local);
161 try {
162 meters.put(meter.id(), data);
163 } catch (StorageException e) {
164 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700165 }
alshabibeadfc8e2015-08-18 15:40:46 -0700166 return future;
alshabib7bb05012015-08-05 10:15:09 -0700167 }
168
169 @Override
170 public void updateMeterState(Meter meter) {
alshabibeadfc8e2015-08-18 15:40:46 -0700171 meters.computeIfPresent(meter.id(), (id, v) -> {
172 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700173 m.setState(meter.state());
174 m.setProcessedPackets(meter.packetsSeen());
175 m.setProcessedBytes(meter.bytesSeen());
176 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700177 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700178 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700179 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700180 });
181 }
182
183 @Override
184 public Meter getMeter(MeterId meterId) {
alshabibeadfc8e2015-08-18 15:40:46 -0700185 MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
186 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700187 }
188
189 @Override
190 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700191 return Collections2.transform(meters.asJavaMap().values(),
192 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700193 }
194
195 @Override
196 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabibeadfc8e2015-08-18 15:40:46 -0700197 meters.computeIfPresent(op.meter().id(), (k, v) ->
198 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700199 }
200
alshabib5eb79392015-08-19 18:09:55 -0700201 @Override
202 public void deleteMeterNow(Meter m) {
203 futures.remove(m.id());
204 meters.remove(m.id());
205 }
206
alshabibeadfc8e2015-08-18 15:40:46 -0700207 private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
208 @Override
209 public void event(MapEvent<MeterId, MeterData> event) {
210 MeterData data = event.value().value();
211 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
212 switch (event.type()) {
213 case INSERT:
214 case UPDATE:
215 switch (data.meter().state()) {
216 case PENDING_ADD:
217 case PENDING_REMOVE:
218 if (!data.reason().isPresent() && local.equals(master)) {
219 notifyDelegate(
220 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
221 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
222 data.meter()));
223 } else if (data.reason().isPresent() && local.equals(data.origin())) {
224 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
225 //TODO: No future -> no friend
alshabib5eb79392015-08-19 18:09:55 -0700226 futures.get(data.meter().id()).complete(msr);
alshabibeadfc8e2015-08-18 15:40:46 -0700227 }
228 break;
229 case ADDED:
230 case REMOVED:
alshabib5eb79392015-08-19 18:09:55 -0700231 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
alshabib58fe6dc2015-08-19 17:16:13 -0700232 futures.remove(data.meter().id()).complete(MeterStoreResult.success());
alshabibeadfc8e2015-08-18 15:40:46 -0700233 }
234 break;
235 default:
236 log.warn("Unknown meter state type {}", data.meter().state());
237 }
238 break;
239 case REMOVE:
240 //Only happens at origin so we do not need to care.
241 break;
242 default:
243 log.warn("Unknown Map event type {}", event.type());
244 }
245
246 }
247 }
248
249
alshabib7bb05012015-08-05 10:15:09 -0700250}