blob: b8f2080d5e8b3ea3d5cfa22a91f0eaa76094e679 [file] [log] [blame]
alshabib7bb05012015-08-05 10:15:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.store.meter.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Deactivate;
20import org.apache.felix.scr.annotations.Property;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.onlab.util.KryoNamespace;
24import org.onlab.util.Tools;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.incubator.net.meter.DefaultBand;
28import org.onosproject.incubator.net.meter.DefaultMeter;
29import org.onosproject.incubator.net.meter.Meter;
30import org.onosproject.incubator.net.meter.MeterEvent;
31import org.onosproject.incubator.net.meter.MeterFailReason;
32import org.onosproject.incubator.net.meter.MeterId;
33import org.onosproject.incubator.net.meter.MeterOperation;
34import org.onosproject.incubator.net.meter.MeterStore;
35import org.onosproject.incubator.net.meter.MeterStoreDelegate;
36import org.onosproject.mastership.MastershipService;
37import org.onosproject.store.AbstractStore;
38import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
39import org.onosproject.store.cluster.messaging.MessageSubject;
40import org.onosproject.store.service.ConsistentMap;
41import org.onosproject.store.service.Serializer;
42import org.onosproject.store.service.StorageService;
43import org.slf4j.Logger;
44
45import java.util.Collection;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.stream.Collectors;
50
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * A distributed meter store implementation. Meters are stored consistently
55 * across the cluster.
56 */
57public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
58 implements MeterStore {
59
60 private Logger log = getLogger(getClass());
61
62 private static final String METERSTORE = "onos-meter-store";
63 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
64
65 private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
66
67
68 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
69 label = "Number of threads in the message handler pool")
70 private int msgPoolSize;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private StorageService storageService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 private ClusterCommunicationService clusterCommunicationService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 private MastershipService mastershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 private ClusterService clusterService;
83
84 private ConsistentMap<MeterId, Meter> meters;
85 private NodeId local;
86 private KryoNamespace kryoNameSpace;
87
88 private Serializer serializer;
89
90 @Activate
91 public void activate() {
92
93 local = clusterService.getLocalNode().id();
94
95 kryoNameSpace =
96 KryoNamespace.newBuilder()
97 .register(DefaultMeter.class)
98 .register(DefaultBand.class)
99 .build();
100
101 serializer = Serializer.using(kryoNameSpace);
102
103 meters = storageService.<MeterId, Meter>consistentMapBuilder()
104 .withName(METERSTORE)
105 .withSerializer(serializer)
106 .build();
107
108 ExecutorService executors = Executors.newFixedThreadPool(
109 msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
110 registerMessageHandlers(executors);
111
112 log.info("Started");
113 }
114
115 @Deactivate
116 public void deactivate() {
117
118
119 log.info("Stopped");
120 }
121
122 private void registerMessageHandlers(ExecutorService executor) {
123 clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
124 this::notifyDelegate, executor);
125
126 }
127
128
129 @Override
130 public void storeMeter(Meter meter) {
131 NodeId master = mastershipService.getMasterFor(meter.deviceId());
132
133 meters.put(meter.id(), meter);
134
135 MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
136 new MeterOperation(meter, MeterOperation.Type.ADD));
137 if (Objects.equals(local, master)) {
138 notifyDelegate(event);
139 } else {
140 clusterCommunicationService.unicast(
141 event,
142 UPDATE_METER,
143 serializer::encode,
144 master
145 ).whenComplete((result, error) -> {
146 if (error != null) {
147 log.warn("Failed to install meter {} because {} on {}",
148 meter, error, master);
149
150 // notify app of failure
151 meter.context().ifPresent(c -> c.onError(
152 event.subject(), MeterFailReason.UNKNOWN));
153 }
154 });
155 }
156
157 }
158
159 @Override
160 public void deleteMeter(Meter meter) {
161
162 NodeId master = mastershipService.getMasterFor(meter.deviceId());
163
164 // update the state of the meter. It will be pruned by observing
165 // that it has been removed from the dataplane.
166 meters.put(meter.id(), meter);
167
168 MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
169 new MeterOperation(meter, MeterOperation.Type.REMOVE));
170 if (Objects.equals(local, master)) {
171 notifyDelegate(event);
172 } else {
173 clusterCommunicationService.unicast(
174 event,
175 UPDATE_METER,
176 serializer::encode,
177 master
178 ).whenComplete((result, error) -> {
179 if (error != null) {
180 log.warn("Failed to delete meter {} because {} on {}",
181 meter, error, master);
182
183 // notify app of failure
184 meter.context().ifPresent(c -> c.onError(
185 event.subject(), MeterFailReason.UNKNOWN));
186 }
187 });
188 }
189
190 }
191
192 @Override
193 public void updateMeter(Meter meter) {
194
195 NodeId master = mastershipService.getMasterFor(meter.deviceId());
196
197 meters.put(meter.id(), meter);
198
199 MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
200 new MeterOperation(meter, MeterOperation.Type.MODIFY));
201 if (Objects.equals(local, master)) {
202 notifyDelegate(event);
203 } else {
204 clusterCommunicationService.unicast(
205 event,
206 UPDATE_METER,
207 serializer::encode,
208 master
209 ).whenComplete((result, error) -> {
210 if (error != null) {
211 log.warn("Failed to update meter {} because {} on {}",
212 meter, error, master);
213
214 // notify app of failure
215 meter.context().ifPresent(c -> c.onError(
216 event.subject(), MeterFailReason.UNKNOWN));
217 }
218 });
219 }
220
221 }
222
223 @Override
224 public void updateMeterState(Meter meter) {
225 meters.compute(meter.id(), (id, v) -> {
226 DefaultMeter m = (DefaultMeter) v;
227 m.setState(meter.state());
228 m.setProcessedPackets(meter.packetsSeen());
229 m.setProcessedBytes(meter.bytesSeen());
230 m.setLife(meter.life());
231 m.setReferenceCount(meter.referenceCount());
232 return m;
233 });
234 }
235
236 @Override
237 public Meter getMeter(MeterId meterId) {
238 return meters.get(meterId).value();
239 }
240
241 @Override
242 public Collection<Meter> getAllMeters() {
243 return meters.values().stream()
244 .map(v -> v.value()).collect(Collectors.toSet());
245 }
246
247 @Override
248 public void failedMeter(MeterOperation op, MeterFailReason reason) {
249 NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
250 meters.remove(op.meter().id());
251
252 MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
253 if (Objects.equals(local, master)) {
254 notifyDelegate(event);
255 } else {
256 clusterCommunicationService.unicast(
257 event,
258 UPDATE_METER,
259 serializer::encode,
260 master
261 ).whenComplete((result, error) -> {
262 if (error != null) {
263 log.warn("Failed to delete failed meter {} because {} on {}",
264 op.meter(), error, master);
265
266 // Can't do any more...
267 }
268 });
269 }
270
271 }
272
273}