blob: 7106aefe84b4e61a9cdf80f9ce29e48cdc96fc53 [file] [log] [blame]
alshabib3d643ec2014-10-22 18:33:00 -07001package org.onlab.onos.store.statistic.impl;
2
3import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
4import static org.slf4j.LoggerFactory.getLogger;
5
alshabibf6c2ede2014-10-22 23:31:50 -07006import com.google.common.collect.Sets;
Madan Jampani24f9efb2014-10-24 18:56:23 -07007
alshabib3d643ec2014-10-22 18:33:00 -07008import org.apache.felix.scr.annotations.Activate;
9import org.apache.felix.scr.annotations.Component;
10import org.apache.felix.scr.annotations.Deactivate;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
13import org.apache.felix.scr.annotations.Service;
14import org.onlab.onos.cluster.ClusterService;
15import org.onlab.onos.net.ConnectPoint;
16import org.onlab.onos.net.PortNumber;
17import org.onlab.onos.net.flow.FlowEntry;
18import org.onlab.onos.net.flow.FlowRule;
19import org.onlab.onos.net.flow.instructions.Instruction;
20import org.onlab.onos.net.flow.instructions.Instructions;
21import org.onlab.onos.net.statistic.StatisticStore;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
alshabib3d643ec2014-10-22 18:33:00 -070025import org.onlab.onos.store.flow.ReplicaInfo;
26import org.onlab.onos.store.flow.ReplicaInfoService;
27import org.onlab.onos.store.serializers.KryoNamespaces;
28import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070029import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070030import org.slf4j.Logger;
31
alshabib3d643ec2014-10-22 18:33:00 -070032import java.io.IOException;
33import java.util.HashSet;
34import java.util.Map;
35import java.util.Set;
36import java.util.concurrent.ConcurrentHashMap;
Madan Jampani24f9efb2014-10-24 18:56:23 -070037import java.util.concurrent.ExecutionException;
38import java.util.concurrent.Future;
alshabib3d643ec2014-10-22 18:33:00 -070039import java.util.concurrent.TimeUnit;
40import java.util.concurrent.TimeoutException;
41import java.util.concurrent.atomic.AtomicInteger;
42
43
44/**
45 * Maintains statistics using RPC calls to collect stats from remote instances
46 * on demand.
47 */
48@Component(immediate = true)
49@Service
50public class DistributedStatisticStore implements StatisticStore {
51
52 private final Logger log = getLogger(getClass());
53
54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
55 private ReplicaInfoService replicaInfoManager;
56
57 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
58 private ClusterCommunicationService clusterCommunicator;
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 private ClusterService clusterService;
62
63 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
64 new ConcurrentHashMap<>();
65
66 private Map<ConnectPoint, Set<FlowEntry>> previous =
67 new ConcurrentHashMap<>();
68
69 private Map<ConnectPoint, Set<FlowEntry>> current =
70 new ConcurrentHashMap<>();
71
72 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
73 @Override
74 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070075 serializerPool = KryoNamespace.newBuilder()
76 .register(KryoNamespaces.API)
77 // register this store specific classes here
alshabib3d643ec2014-10-22 18:33:00 -070078 .build()
79 .populate(1);
80 }
81 };;
82
83 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
84
85 @Activate
86 public void activate() {
87 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
88
89 @Override
90 public void handle(ClusterMessage message) {
91 ConnectPoint cp = SERIALIZER.decode(message.payload());
92 try {
93 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
94 } catch (IOException e) {
95 log.error("Failed to respond back", e);
96 }
97 }
98 });
99
100 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
101
102 @Override
103 public void handle(ClusterMessage message) {
104 ConnectPoint cp = SERIALIZER.decode(message.payload());
105 try {
106 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
107 } catch (IOException e) {
108 log.error("Failed to respond back", e);
109 }
110 }
111 });
112 log.info("Started");
113 }
114
115 @Deactivate
116 public void deactivate() {
117 log.info("Stopped");
118 }
119
120 @Override
121 public void prepareForStatistics(FlowRule rule) {
122 ConnectPoint cp = buildConnectPoint(rule);
123 if (cp == null) {
124 return;
125 }
126 InternalStatisticRepresentation rep;
127 synchronized (representations) {
128 rep = getOrCreateRepresentation(cp);
129 }
130 rep.prepare();
131 }
132
133 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700134 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700135 ConnectPoint cp = buildConnectPoint(rule);
136 if (cp == null) {
137 return;
138 }
139 InternalStatisticRepresentation rep = representations.get(cp);
140 if (rep != null) {
141 rep.remove(rule);
142 }
alshabibf6c2ede2014-10-22 23:31:50 -0700143 Set<FlowEntry> values = current.get(cp);
144 if (values != null) {
145 values.remove(rule);
146 }
147 values = previous.get(cp);
148 if (values != null) {
149 values.remove(rule);
150 }
151
alshabib3d643ec2014-10-22 18:33:00 -0700152 }
153
154 @Override
155 public void addOrUpdateStatistic(FlowEntry rule) {
156 ConnectPoint cp = buildConnectPoint(rule);
157 if (cp == null) {
158 return;
159 }
160 InternalStatisticRepresentation rep = representations.get(cp);
161 if (rep != null && rep.submit(rule)) {
162 updatePublishedStats(cp, rep.get());
163 }
164 }
165
166 private synchronized void updatePublishedStats(ConnectPoint cp,
167 Set<FlowEntry> flowEntries) {
168 Set<FlowEntry> curr = current.get(cp);
169 if (curr == null) {
170 curr = new HashSet<>();
171 }
172 previous.put(cp, curr);
173 current.put(cp, flowEntries);
174
175 }
176
177 @Override
178 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
179 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
180 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
181 return getCurrentStatisticInternal(connectPoint);
182 } else {
183 ClusterMessage message = new ClusterMessage(
184 clusterService.getLocalNode().id(),
185 GET_CURRENT,
186 SERIALIZER.encode(connectPoint));
187
188 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700189 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700190 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
191 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
192 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700193 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700194 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700195 throw new RuntimeException(e);
196 }
197 }
198
199 }
200
201 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
202 return current.get(connectPoint);
203 }
204
205 @Override
206 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
207 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
208 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
209 return getPreviousStatisticInternal(connectPoint);
210 } else {
211 ClusterMessage message = new ClusterMessage(
212 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700213 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700214 SERIALIZER.encode(connectPoint));
215
216 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700217 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700218 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
219 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
220 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700221 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700222 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700223 throw new RuntimeException(e);
224 }
225 }
226
227 }
228
229 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
230 return previous.get(connectPoint);
231 }
232
233 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
234
235 if (representations.containsKey(cp)) {
236 return representations.get(cp);
237 } else {
238 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
239 representations.put(cp, rep);
240 return rep;
241 }
242
243 }
244
245 private ConnectPoint buildConnectPoint(FlowRule rule) {
246 PortNumber port = getOutput(rule);
247 if (port == null) {
248 log.warn("Rule {} has no output.", rule);
249 return null;
250 }
251 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
252 return cp;
253 }
254
255 private PortNumber getOutput(FlowRule rule) {
256 for (Instruction i : rule.treatment().instructions()) {
257 if (i.type() == Instruction.Type.OUTPUT) {
258 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
259 return out.port();
260 }
261 if (i.type() == Instruction.Type.DROP) {
262 return PortNumber.P0;
263 }
264 }
265 return null;
266 }
267
268 private class InternalStatisticRepresentation {
269
270 private final AtomicInteger counter = new AtomicInteger(0);
271 private final Set<FlowEntry> rules = new HashSet<>();
272
273 public void prepare() {
274 counter.incrementAndGet();
275 }
276
277 public synchronized void remove(FlowRule rule) {
278 rules.remove(rule);
279 counter.decrementAndGet();
280 }
281
282 public synchronized boolean submit(FlowEntry rule) {
283 if (rules.contains(rule)) {
284 rules.remove(rule);
285 }
286 rules.add(rule);
287 if (counter.get() == 0) {
288 return true;
289 } else {
290 return counter.decrementAndGet() == 0;
291 }
292 }
293
294 public synchronized Set<FlowEntry> get() {
295 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700296 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700297 }
298
299
300 }
301
302}