blob: 6477e68fa897a5865ca172be9b941565b9f2c952 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
alshabibeadfc8e2015-08-18 15:40:46 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.Maps;
alshabib7bb05012015-08-05 10:15:09 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Deactivate;
alshabib7bb05012015-08-05 10:15:09 -070022import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib7bb05012015-08-05 10:15:09 -070024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.NodeId;
alshabib7bb05012015-08-05 10:15:09 -070026import org.onosproject.incubator.net.meter.DefaultMeter;
27import org.onosproject.incubator.net.meter.Meter;
28import org.onosproject.incubator.net.meter.MeterEvent;
29import org.onosproject.incubator.net.meter.MeterFailReason;
30import org.onosproject.incubator.net.meter.MeterId;
31import org.onosproject.incubator.net.meter.MeterOperation;
alshabibeadfc8e2015-08-18 15:40:46 -070032import org.onosproject.incubator.net.meter.MeterState;
alshabib7bb05012015-08-05 10:15:09 -070033import org.onosproject.incubator.net.meter.MeterStore;
34import org.onosproject.incubator.net.meter.MeterStoreDelegate;
alshabibeadfc8e2015-08-18 15:40:46 -070035import org.onosproject.incubator.net.meter.MeterStoreResult;
alshabib7bb05012015-08-05 10:15:09 -070036import org.onosproject.mastership.MastershipService;
37import org.onosproject.store.AbstractStore;
alshabibeadfc8e2015-08-18 15:40:46 -070038import org.onosproject.store.serializers.KryoNamespaces;
alshabib7bb05012015-08-05 10:15:09 -070039import org.onosproject.store.service.ConsistentMap;
alshabibeadfc8e2015-08-18 15:40:46 -070040import org.onosproject.store.service.MapEvent;
41import org.onosproject.store.service.MapEventListener;
alshabib7bb05012015-08-05 10:15:09 -070042import org.onosproject.store.service.Serializer;
alshabibeadfc8e2015-08-18 15:40:46 -070043import org.onosproject.store.service.StorageException;
alshabib7bb05012015-08-05 10:15:09 -070044import org.onosproject.store.service.StorageService;
alshabibeadfc8e2015-08-18 15:40:46 -070045import org.onosproject.store.service.Versioned;
alshabib7bb05012015-08-05 10:15:09 -070046import org.slf4j.Logger;
47
alshabibeadfc8e2015-08-18 15:40:46 -070048import java.util.Arrays;
alshabib7bb05012015-08-05 10:15:09 -070049import java.util.Collection;
alshabibeadfc8e2015-08-18 15:40:46 -070050import java.util.Map;
51import java.util.concurrent.CompletableFuture;
alshabib7bb05012015-08-05 10:15:09 -070052
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * A distributed meter store implementation. Meters are stored consistently
57 * across the cluster.
58 */
59public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
60 implements MeterStore {
61
62 private Logger log = getLogger(getClass());
63
64 private static final String METERSTORE = "onos-meter-store";
alshabib7bb05012015-08-05 10:15:09 -070065
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 private StorageService storageService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib7bb05012015-08-05 10:15:09 -070070 private MastershipService mastershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private ClusterService clusterService;
74
alshabibeadfc8e2015-08-18 15:40:46 -070075 private ConsistentMap<MeterId, MeterData> meters;
alshabib7bb05012015-08-05 10:15:09 -070076 private NodeId local;
alshabib7bb05012015-08-05 10:15:09 -070077
alshabibeadfc8e2015-08-18 15:40:46 -070078 private MapEventListener mapListener = new InternalMapEventListener();
79
80 private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
81 Maps.newConcurrentMap();
alshabib7bb05012015-08-05 10:15:09 -070082
83 @Activate
84 public void activate() {
85
86 local = clusterService.getLocalNode().id();
87
alshabib7bb05012015-08-05 10:15:09 -070088
alshabibeadfc8e2015-08-18 15:40:46 -070089 meters = storageService.<MeterId, MeterData>consistentMapBuilder()
alshabib7bb05012015-08-05 10:15:09 -070090 .withName(METERSTORE)
alshabibeadfc8e2015-08-18 15:40:46 -070091 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
92 MeterData.class))
alshabib7bb05012015-08-05 10:15:09 -070093 .build();
94
alshabibeadfc8e2015-08-18 15:40:46 -070095 meters.addListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -070096
97 log.info("Started");
98 }
99
100 @Deactivate
101 public void deactivate() {
102
alshabibeadfc8e2015-08-18 15:40:46 -0700103 meters.removeListener(mapListener);
alshabib7bb05012015-08-05 10:15:09 -0700104 log.info("Stopped");
105 }
106
alshabib7bb05012015-08-05 10:15:09 -0700107
108 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700109 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
110 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
111 futures.put(meter.id(), future);
112 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700113
alshabibeadfc8e2015-08-18 15:40:46 -0700114 try {
115 meters.put(meter.id(), data);
116 } catch (StorageException e) {
117 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700118 }
119
alshabibeadfc8e2015-08-18 15:40:46 -0700120 return future;
121
alshabib7bb05012015-08-05 10:15:09 -0700122 }
123
124 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700125 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
126 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
127 futures.put(meter.id(), future);
alshabib7bb05012015-08-05 10:15:09 -0700128
alshabibeadfc8e2015-08-18 15:40:46 -0700129 MeterData data = new MeterData(meter, null, local);
alshabib7bb05012015-08-05 10:15:09 -0700130
131 // update the state of the meter. It will be pruned by observing
132 // that it has been removed from the dataplane.
alshabibeadfc8e2015-08-18 15:40:46 -0700133 try {
134 meters.put(meter.id(), data);
135 } catch (StorageException e) {
136 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700137 }
138
alshabibeadfc8e2015-08-18 15:40:46 -0700139
140 return future;
alshabib7bb05012015-08-05 10:15:09 -0700141 }
142
143 @Override
alshabibeadfc8e2015-08-18 15:40:46 -0700144 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
145 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
146 futures.put(meter.id(), future);
alshabib7bb05012015-08-05 10:15:09 -0700147
alshabibeadfc8e2015-08-18 15:40:46 -0700148 MeterData data = new MeterData(meter, null, local);
149 try {
150 meters.put(meter.id(), data);
151 } catch (StorageException e) {
152 future.completeExceptionally(e);
alshabib7bb05012015-08-05 10:15:09 -0700153 }
alshabibeadfc8e2015-08-18 15:40:46 -0700154 return future;
alshabib7bb05012015-08-05 10:15:09 -0700155 }
156
157 @Override
158 public void updateMeterState(Meter meter) {
alshabibeadfc8e2015-08-18 15:40:46 -0700159 meters.computeIfPresent(meter.id(), (id, v) -> {
160 DefaultMeter m = (DefaultMeter) v.meter();
alshabib7bb05012015-08-05 10:15:09 -0700161 m.setState(meter.state());
162 m.setProcessedPackets(meter.packetsSeen());
163 m.setProcessedBytes(meter.bytesSeen());
164 m.setLife(meter.life());
alshabibeadfc8e2015-08-18 15:40:46 -0700165 // TODO: Prune if drops to zero.
alshabib7bb05012015-08-05 10:15:09 -0700166 m.setReferenceCount(meter.referenceCount());
alshabibeadfc8e2015-08-18 15:40:46 -0700167 return new MeterData(m, null, v.origin());
alshabib7bb05012015-08-05 10:15:09 -0700168 });
169 }
170
171 @Override
172 public Meter getMeter(MeterId meterId) {
alshabibeadfc8e2015-08-18 15:40:46 -0700173 MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
174 return data == null ? null : data.meter();
alshabib7bb05012015-08-05 10:15:09 -0700175 }
176
177 @Override
178 public Collection<Meter> getAllMeters() {
alshabibeadfc8e2015-08-18 15:40:46 -0700179 return Collections2.transform(meters.asJavaMap().values(),
180 MeterData::meter);
alshabib7bb05012015-08-05 10:15:09 -0700181 }
182
183 @Override
184 public void failedMeter(MeterOperation op, MeterFailReason reason) {
alshabibeadfc8e2015-08-18 15:40:46 -0700185 meters.computeIfPresent(op.meter().id(), (k, v) ->
186 new MeterData(v.meter(), reason, v.origin()));
alshabib7bb05012015-08-05 10:15:09 -0700187 }
188
alshabibeadfc8e2015-08-18 15:40:46 -0700189 private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
190 @Override
191 public void event(MapEvent<MeterId, MeterData> event) {
192 MeterData data = event.value().value();
193 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
194 switch (event.type()) {
195 case INSERT:
196 case UPDATE:
197 switch (data.meter().state()) {
198 case PENDING_ADD:
199 case PENDING_REMOVE:
200 if (!data.reason().isPresent() && local.equals(master)) {
201 notifyDelegate(
202 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
203 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
204 data.meter()));
205 } else if (data.reason().isPresent() && local.equals(data.origin())) {
206 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
207 //TODO: No future -> no friend
208 futures.get(data.meter().id()).complete(msr);
209 }
210 break;
211 case ADDED:
212 case REMOVED:
213 if (local.equals(data.origin())) {
214 futures.get(data.meter().id()).complete(MeterStoreResult.success());
215 }
216 break;
217 default:
218 log.warn("Unknown meter state type {}", data.meter().state());
219 }
220 break;
221 case REMOVE:
222 //Only happens at origin so we do not need to care.
223 break;
224 default:
225 log.warn("Unknown Map event type {}", event.type());
226 }
227
228 }
229 }
230
231
alshabib7bb05012015-08-05 10:15:09 -0700232}