blob: a7a3a02f86bc0df38c2b971b3c95745278a5e251 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib3d643ec2014-10-22 18:33:00 -070016package org.onlab.onos.store.statistic.impl;
17
18import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
19import static org.slf4j.LoggerFactory.getLogger;
20
alshabibf6c2ede2014-10-22 23:31:50 -070021import com.google.common.collect.Sets;
Madan Jampani24f9efb2014-10-24 18:56:23 -070022
alshabib3d643ec2014-10-22 18:33:00 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.onos.cluster.ClusterService;
30import org.onlab.onos.net.ConnectPoint;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070031import org.onlab.onos.net.DeviceId;
alshabib3d643ec2014-10-22 18:33:00 -070032import org.onlab.onos.net.PortNumber;
33import org.onlab.onos.net.flow.FlowEntry;
34import org.onlab.onos.net.flow.FlowRule;
35import org.onlab.onos.net.flow.instructions.Instruction;
36import org.onlab.onos.net.flow.instructions.Instructions;
37import org.onlab.onos.net.statistic.StatisticStore;
38import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
alshabib3d643ec2014-10-22 18:33:00 -070041import org.onlab.onos.store.flow.ReplicaInfo;
42import org.onlab.onos.store.flow.ReplicaInfoService;
43import org.onlab.onos.store.serializers.KryoNamespaces;
44import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070045import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070046import org.slf4j.Logger;
47
alshabib3d643ec2014-10-22 18:33:00 -070048import java.io.IOException;
49import java.util.HashSet;
50import java.util.Map;
51import java.util.Set;
52import java.util.concurrent.ConcurrentHashMap;
Madan Jampani24f9efb2014-10-24 18:56:23 -070053import java.util.concurrent.ExecutionException;
54import java.util.concurrent.Future;
alshabib3d643ec2014-10-22 18:33:00 -070055import java.util.concurrent.TimeUnit;
56import java.util.concurrent.TimeoutException;
57import java.util.concurrent.atomic.AtomicInteger;
58
59
60/**
61 * Maintains statistics using RPC calls to collect stats from remote instances
62 * on demand.
63 */
64@Component(immediate = true)
65@Service
66public class DistributedStatisticStore implements StatisticStore {
67
68 private final Logger log = getLogger(getClass());
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 private ReplicaInfoService replicaInfoManager;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 private ClusterCommunicationService clusterCommunicator;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 private ClusterService clusterService;
78
79 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
80 new ConcurrentHashMap<>();
81
82 private Map<ConnectPoint, Set<FlowEntry>> previous =
83 new ConcurrentHashMap<>();
84
85 private Map<ConnectPoint, Set<FlowEntry>> current =
86 new ConcurrentHashMap<>();
87
88 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
89 @Override
90 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070091 serializerPool = KryoNamespace.newBuilder()
92 .register(KryoNamespaces.API)
93 // register this store specific classes here
alshabib3d643ec2014-10-22 18:33:00 -070094 .build()
95 .populate(1);
96 }
97 };;
98
99 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
100
101 @Activate
102 public void activate() {
103 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
104
105 @Override
106 public void handle(ClusterMessage message) {
107 ConnectPoint cp = SERIALIZER.decode(message.payload());
108 try {
109 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
110 } catch (IOException e) {
111 log.error("Failed to respond back", e);
112 }
113 }
114 });
115
116 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
117
118 @Override
119 public void handle(ClusterMessage message) {
120 ConnectPoint cp = SERIALIZER.decode(message.payload());
121 try {
122 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
123 } catch (IOException e) {
124 log.error("Failed to respond back", e);
125 }
126 }
127 });
128 log.info("Started");
129 }
130
131 @Deactivate
132 public void deactivate() {
133 log.info("Stopped");
134 }
135
136 @Override
137 public void prepareForStatistics(FlowRule rule) {
138 ConnectPoint cp = buildConnectPoint(rule);
139 if (cp == null) {
140 return;
141 }
142 InternalStatisticRepresentation rep;
143 synchronized (representations) {
144 rep = getOrCreateRepresentation(cp);
145 }
146 rep.prepare();
147 }
148
149 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700150 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700151 ConnectPoint cp = buildConnectPoint(rule);
152 if (cp == null) {
153 return;
154 }
155 InternalStatisticRepresentation rep = representations.get(cp);
156 if (rep != null) {
157 rep.remove(rule);
158 }
alshabibf6c2ede2014-10-22 23:31:50 -0700159 Set<FlowEntry> values = current.get(cp);
160 if (values != null) {
161 values.remove(rule);
162 }
163 values = previous.get(cp);
164 if (values != null) {
165 values.remove(rule);
166 }
167
alshabib3d643ec2014-10-22 18:33:00 -0700168 }
169
170 @Override
171 public void addOrUpdateStatistic(FlowEntry rule) {
172 ConnectPoint cp = buildConnectPoint(rule);
173 if (cp == null) {
174 return;
175 }
176 InternalStatisticRepresentation rep = representations.get(cp);
177 if (rep != null && rep.submit(rule)) {
178 updatePublishedStats(cp, rep.get());
179 }
180 }
181
182 private synchronized void updatePublishedStats(ConnectPoint cp,
183 Set<FlowEntry> flowEntries) {
184 Set<FlowEntry> curr = current.get(cp);
185 if (curr == null) {
186 curr = new HashSet<>();
187 }
188 previous.put(cp, curr);
189 current.put(cp, flowEntries);
190
191 }
192
193 @Override
194 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700195 final DeviceId deviceId = connectPoint.deviceId();
196 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
197 if (!replicaInfo.master().isPresent()) {
198 log.warn("No master for {}", deviceId);
199 // TODO: revisit if this should be returning empty collection.
200 // FIXME: throw a StatsStoreException
201 throw new RuntimeException("No master for " + deviceId);
202 }
alshabib3d643ec2014-10-22 18:33:00 -0700203 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
204 return getCurrentStatisticInternal(connectPoint);
205 } else {
206 ClusterMessage message = new ClusterMessage(
207 clusterService.getLocalNode().id(),
208 GET_CURRENT,
209 SERIALIZER.encode(connectPoint));
210
211 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700212 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700213 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
214 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
215 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700216 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700217 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700218 throw new RuntimeException(e);
219 }
220 }
221
222 }
223
224 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
225 return current.get(connectPoint);
226 }
227
228 @Override
229 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700230 final DeviceId deviceId = connectPoint.deviceId();
231 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
232 if (!replicaInfo.master().isPresent()) {
233 log.warn("No master for {}", deviceId);
234 // TODO: revisit if this should be returning empty collection.
235 // FIXME: throw a StatsStoreException
236 throw new RuntimeException("No master for " + deviceId);
237 }
alshabib3d643ec2014-10-22 18:33:00 -0700238 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
239 return getPreviousStatisticInternal(connectPoint);
240 } else {
241 ClusterMessage message = new ClusterMessage(
242 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700243 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700244 SERIALIZER.encode(connectPoint));
245
246 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700247 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700248 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
249 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
250 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700251 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700252 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700253 throw new RuntimeException(e);
254 }
255 }
256
257 }
258
259 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
260 return previous.get(connectPoint);
261 }
262
263 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
264
265 if (representations.containsKey(cp)) {
266 return representations.get(cp);
267 } else {
268 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
269 representations.put(cp, rep);
270 return rep;
271 }
272
273 }
274
275 private ConnectPoint buildConnectPoint(FlowRule rule) {
276 PortNumber port = getOutput(rule);
277 if (port == null) {
278 log.warn("Rule {} has no output.", rule);
279 return null;
280 }
281 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
282 return cp;
283 }
284
285 private PortNumber getOutput(FlowRule rule) {
286 for (Instruction i : rule.treatment().instructions()) {
287 if (i.type() == Instruction.Type.OUTPUT) {
288 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
289 return out.port();
290 }
291 if (i.type() == Instruction.Type.DROP) {
292 return PortNumber.P0;
293 }
294 }
295 return null;
296 }
297
298 private class InternalStatisticRepresentation {
299
300 private final AtomicInteger counter = new AtomicInteger(0);
301 private final Set<FlowEntry> rules = new HashSet<>();
302
303 public void prepare() {
304 counter.incrementAndGet();
305 }
306
307 public synchronized void remove(FlowRule rule) {
308 rules.remove(rule);
309 counter.decrementAndGet();
310 }
311
312 public synchronized boolean submit(FlowEntry rule) {
313 if (rules.contains(rule)) {
314 rules.remove(rule);
315 }
316 rules.add(rule);
317 if (counter.get() == 0) {
318 return true;
319 } else {
320 return counter.decrementAndGet() == 0;
321 }
322 }
323
324 public synchronized Set<FlowEntry> get() {
325 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700326 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700327 }
328
329
330 }
331
332}