blob: bd28e475f82e226026109f7ea55e4cf8c2ddf19f [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib3d643ec2014-10-22 18:33:00 -070016package org.onlab.onos.store.statistic.impl;
17
18import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
19import static org.slf4j.LoggerFactory.getLogger;
20
alshabibf6c2ede2014-10-22 23:31:50 -070021import com.google.common.collect.Sets;
Madan Jampani24f9efb2014-10-24 18:56:23 -070022
alshabib3d643ec2014-10-22 18:33:00 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.onos.cluster.ClusterService;
30import org.onlab.onos.net.ConnectPoint;
31import org.onlab.onos.net.PortNumber;
32import org.onlab.onos.net.flow.FlowEntry;
33import org.onlab.onos.net.flow.FlowRule;
34import org.onlab.onos.net.flow.instructions.Instruction;
35import org.onlab.onos.net.flow.instructions.Instructions;
36import org.onlab.onos.net.statistic.StatisticStore;
37import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
alshabib3d643ec2014-10-22 18:33:00 -070040import org.onlab.onos.store.flow.ReplicaInfo;
41import org.onlab.onos.store.flow.ReplicaInfoService;
42import org.onlab.onos.store.serializers.KryoNamespaces;
43import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070044import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070045import org.slf4j.Logger;
46
alshabib3d643ec2014-10-22 18:33:00 -070047import java.io.IOException;
48import java.util.HashSet;
49import java.util.Map;
50import java.util.Set;
51import java.util.concurrent.ConcurrentHashMap;
Madan Jampani24f9efb2014-10-24 18:56:23 -070052import java.util.concurrent.ExecutionException;
53import java.util.concurrent.Future;
alshabib3d643ec2014-10-22 18:33:00 -070054import java.util.concurrent.TimeUnit;
55import java.util.concurrent.TimeoutException;
56import java.util.concurrent.atomic.AtomicInteger;
57
58
59/**
60 * Maintains statistics using RPC calls to collect stats from remote instances
61 * on demand.
62 */
63@Component(immediate = true)
64@Service
65public class DistributedStatisticStore implements StatisticStore {
66
67 private final Logger log = getLogger(getClass());
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 private ReplicaInfoService replicaInfoManager;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private ClusterCommunicationService clusterCommunicator;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 private ClusterService clusterService;
77
78 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
79 new ConcurrentHashMap<>();
80
81 private Map<ConnectPoint, Set<FlowEntry>> previous =
82 new ConcurrentHashMap<>();
83
84 private Map<ConnectPoint, Set<FlowEntry>> current =
85 new ConcurrentHashMap<>();
86
87 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
88 @Override
89 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070090 serializerPool = KryoNamespace.newBuilder()
91 .register(KryoNamespaces.API)
92 // register this store specific classes here
alshabib3d643ec2014-10-22 18:33:00 -070093 .build()
94 .populate(1);
95 }
96 };;
97
98 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
99
100 @Activate
101 public void activate() {
102 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
103
104 @Override
105 public void handle(ClusterMessage message) {
106 ConnectPoint cp = SERIALIZER.decode(message.payload());
107 try {
108 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
109 } catch (IOException e) {
110 log.error("Failed to respond back", e);
111 }
112 }
113 });
114
115 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
116
117 @Override
118 public void handle(ClusterMessage message) {
119 ConnectPoint cp = SERIALIZER.decode(message.payload());
120 try {
121 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
122 } catch (IOException e) {
123 log.error("Failed to respond back", e);
124 }
125 }
126 });
127 log.info("Started");
128 }
129
130 @Deactivate
131 public void deactivate() {
132 log.info("Stopped");
133 }
134
135 @Override
136 public void prepareForStatistics(FlowRule rule) {
137 ConnectPoint cp = buildConnectPoint(rule);
138 if (cp == null) {
139 return;
140 }
141 InternalStatisticRepresentation rep;
142 synchronized (representations) {
143 rep = getOrCreateRepresentation(cp);
144 }
145 rep.prepare();
146 }
147
148 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700149 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700150 ConnectPoint cp = buildConnectPoint(rule);
151 if (cp == null) {
152 return;
153 }
154 InternalStatisticRepresentation rep = representations.get(cp);
155 if (rep != null) {
156 rep.remove(rule);
157 }
alshabibf6c2ede2014-10-22 23:31:50 -0700158 Set<FlowEntry> values = current.get(cp);
159 if (values != null) {
160 values.remove(rule);
161 }
162 values = previous.get(cp);
163 if (values != null) {
164 values.remove(rule);
165 }
166
alshabib3d643ec2014-10-22 18:33:00 -0700167 }
168
169 @Override
170 public void addOrUpdateStatistic(FlowEntry rule) {
171 ConnectPoint cp = buildConnectPoint(rule);
172 if (cp == null) {
173 return;
174 }
175 InternalStatisticRepresentation rep = representations.get(cp);
176 if (rep != null && rep.submit(rule)) {
177 updatePublishedStats(cp, rep.get());
178 }
179 }
180
181 private synchronized void updatePublishedStats(ConnectPoint cp,
182 Set<FlowEntry> flowEntries) {
183 Set<FlowEntry> curr = current.get(cp);
184 if (curr == null) {
185 curr = new HashSet<>();
186 }
187 previous.put(cp, curr);
188 current.put(cp, flowEntries);
189
190 }
191
192 @Override
193 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
194 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
195 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
196 return getCurrentStatisticInternal(connectPoint);
197 } else {
198 ClusterMessage message = new ClusterMessage(
199 clusterService.getLocalNode().id(),
200 GET_CURRENT,
201 SERIALIZER.encode(connectPoint));
202
203 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700204 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700205 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
206 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
207 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700208 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700209 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700210 throw new RuntimeException(e);
211 }
212 }
213
214 }
215
216 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
217 return current.get(connectPoint);
218 }
219
220 @Override
221 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
222 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
223 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
224 return getPreviousStatisticInternal(connectPoint);
225 } else {
226 ClusterMessage message = new ClusterMessage(
227 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700228 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700229 SERIALIZER.encode(connectPoint));
230
231 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700232 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700233 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
234 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
235 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700236 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700237 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700238 throw new RuntimeException(e);
239 }
240 }
241
242 }
243
244 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
245 return previous.get(connectPoint);
246 }
247
248 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
249
250 if (representations.containsKey(cp)) {
251 return representations.get(cp);
252 } else {
253 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
254 representations.put(cp, rep);
255 return rep;
256 }
257
258 }
259
260 private ConnectPoint buildConnectPoint(FlowRule rule) {
261 PortNumber port = getOutput(rule);
262 if (port == null) {
263 log.warn("Rule {} has no output.", rule);
264 return null;
265 }
266 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
267 return cp;
268 }
269
270 private PortNumber getOutput(FlowRule rule) {
271 for (Instruction i : rule.treatment().instructions()) {
272 if (i.type() == Instruction.Type.OUTPUT) {
273 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
274 return out.port();
275 }
276 if (i.type() == Instruction.Type.DROP) {
277 return PortNumber.P0;
278 }
279 }
280 return null;
281 }
282
283 private class InternalStatisticRepresentation {
284
285 private final AtomicInteger counter = new AtomicInteger(0);
286 private final Set<FlowEntry> rules = new HashSet<>();
287
288 public void prepare() {
289 counter.incrementAndGet();
290 }
291
292 public synchronized void remove(FlowRule rule) {
293 rules.remove(rule);
294 counter.decrementAndGet();
295 }
296
297 public synchronized boolean submit(FlowEntry rule) {
298 if (rules.contains(rule)) {
299 rules.remove(rule);
300 }
301 rules.add(rule);
302 if (counter.get() == 0) {
303 return true;
304 } else {
305 return counter.decrementAndGet() == 0;
306 }
307 }
308
309 public synchronized Set<FlowEntry> get() {
310 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700311 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700312 }
313
314
315 }
316
317}