blob: 273e3cc7205277e6023e106bc2fad5ee1b1e2c2f [file] [log] [blame]
alshabib3d643ec2014-10-22 18:33:00 -07001package org.onlab.onos.store.statistic.impl;
2
3import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
4import static org.slf4j.LoggerFactory.getLogger;
5
alshabibf6c2ede2014-10-22 23:31:50 -07006import com.google.common.collect.Sets;
alshabib3d643ec2014-10-22 18:33:00 -07007import org.apache.felix.scr.annotations.Activate;
8import org.apache.felix.scr.annotations.Component;
9import org.apache.felix.scr.annotations.Deactivate;
10import org.apache.felix.scr.annotations.Reference;
11import org.apache.felix.scr.annotations.ReferenceCardinality;
12import org.apache.felix.scr.annotations.Service;
13import org.onlab.onos.cluster.ClusterService;
14import org.onlab.onos.net.ConnectPoint;
15import org.onlab.onos.net.PortNumber;
16import org.onlab.onos.net.flow.FlowEntry;
17import org.onlab.onos.net.flow.FlowRule;
18import org.onlab.onos.net.flow.instructions.Instruction;
19import org.onlab.onos.net.flow.instructions.Instructions;
20import org.onlab.onos.net.statistic.StatisticStore;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
22import org.onlab.onos.store.cluster.messaging.ClusterMessage;
23import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
25import org.onlab.onos.store.flow.ReplicaInfo;
26import org.onlab.onos.store.flow.ReplicaInfoService;
27import org.onlab.onos.store.serializers.KryoNamespaces;
28import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070029import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070030import org.slf4j.Logger;
31
alshabib3d643ec2014-10-22 18:33:00 -070032import java.io.IOException;
33import java.util.HashSet;
34import java.util.Map;
35import java.util.Set;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.TimeUnit;
38import java.util.concurrent.TimeoutException;
39import java.util.concurrent.atomic.AtomicInteger;
40
41
42/**
43 * Maintains statistics using RPC calls to collect stats from remote instances
44 * on demand.
45 */
46@Component(immediate = true)
47@Service
48public class DistributedStatisticStore implements StatisticStore {
49
50 private final Logger log = getLogger(getClass());
51
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ReplicaInfoService replicaInfoManager;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 private ClusterCommunicationService clusterCommunicator;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private ClusterService clusterService;
60
61 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
62 new ConcurrentHashMap<>();
63
64 private Map<ConnectPoint, Set<FlowEntry>> previous =
65 new ConcurrentHashMap<>();
66
67 private Map<ConnectPoint, Set<FlowEntry>> current =
68 new ConcurrentHashMap<>();
69
70 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 @Override
72 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070073 serializerPool = KryoNamespace.newBuilder()
74 .register(KryoNamespaces.API)
75 // register this store specific classes here
alshabib3d643ec2014-10-22 18:33:00 -070076 .build()
77 .populate(1);
78 }
79 };;
80
81 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
82
83 @Activate
84 public void activate() {
85 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
86
87 @Override
88 public void handle(ClusterMessage message) {
89 ConnectPoint cp = SERIALIZER.decode(message.payload());
90 try {
91 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
92 } catch (IOException e) {
93 log.error("Failed to respond back", e);
94 }
95 }
96 });
97
98 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
99
100 @Override
101 public void handle(ClusterMessage message) {
102 ConnectPoint cp = SERIALIZER.decode(message.payload());
103 try {
104 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
105 } catch (IOException e) {
106 log.error("Failed to respond back", e);
107 }
108 }
109 });
110 log.info("Started");
111 }
112
113 @Deactivate
114 public void deactivate() {
115 log.info("Stopped");
116 }
117
118 @Override
119 public void prepareForStatistics(FlowRule rule) {
120 ConnectPoint cp = buildConnectPoint(rule);
121 if (cp == null) {
122 return;
123 }
124 InternalStatisticRepresentation rep;
125 synchronized (representations) {
126 rep = getOrCreateRepresentation(cp);
127 }
128 rep.prepare();
129 }
130
131 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700132 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700133 ConnectPoint cp = buildConnectPoint(rule);
134 if (cp == null) {
135 return;
136 }
137 InternalStatisticRepresentation rep = representations.get(cp);
138 if (rep != null) {
139 rep.remove(rule);
140 }
alshabibf6c2ede2014-10-22 23:31:50 -0700141 Set<FlowEntry> values = current.get(cp);
142 if (values != null) {
143 values.remove(rule);
144 }
145 values = previous.get(cp);
146 if (values != null) {
147 values.remove(rule);
148 }
149
alshabib3d643ec2014-10-22 18:33:00 -0700150 }
151
152 @Override
153 public void addOrUpdateStatistic(FlowEntry rule) {
154 ConnectPoint cp = buildConnectPoint(rule);
155 if (cp == null) {
156 return;
157 }
158 InternalStatisticRepresentation rep = representations.get(cp);
159 if (rep != null && rep.submit(rule)) {
160 updatePublishedStats(cp, rep.get());
161 }
162 }
163
164 private synchronized void updatePublishedStats(ConnectPoint cp,
165 Set<FlowEntry> flowEntries) {
166 Set<FlowEntry> curr = current.get(cp);
167 if (curr == null) {
168 curr = new HashSet<>();
169 }
170 previous.put(cp, curr);
171 current.put(cp, flowEntries);
172
173 }
174
175 @Override
176 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
177 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
178 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
179 return getCurrentStatisticInternal(connectPoint);
180 } else {
181 ClusterMessage message = new ClusterMessage(
182 clusterService.getLocalNode().id(),
183 GET_CURRENT,
184 SERIALIZER.encode(connectPoint));
185
186 try {
187 ClusterMessageResponse response =
188 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
189 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
190 TimeUnit.MILLISECONDS));
191 } catch (IOException | TimeoutException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700192 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700193 throw new RuntimeException(e);
194 }
195 }
196
197 }
198
199 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
200 return current.get(connectPoint);
201 }
202
203 @Override
204 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
205 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
206 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
207 return getPreviousStatisticInternal(connectPoint);
208 } else {
209 ClusterMessage message = new ClusterMessage(
210 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700211 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700212 SERIALIZER.encode(connectPoint));
213
214 try {
215 ClusterMessageResponse response =
216 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
217 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
218 TimeUnit.MILLISECONDS));
219 } catch (IOException | TimeoutException e) {
alshabibf6c2ede2014-10-22 23:31:50 -0700220 // FIXME: throw a StatsStoreException
alshabib3d643ec2014-10-22 18:33:00 -0700221 throw new RuntimeException(e);
222 }
223 }
224
225 }
226
227 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
228 return previous.get(connectPoint);
229 }
230
231 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
232
233 if (representations.containsKey(cp)) {
234 return representations.get(cp);
235 } else {
236 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
237 representations.put(cp, rep);
238 return rep;
239 }
240
241 }
242
243 private ConnectPoint buildConnectPoint(FlowRule rule) {
244 PortNumber port = getOutput(rule);
245 if (port == null) {
246 log.warn("Rule {} has no output.", rule);
247 return null;
248 }
249 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
250 return cp;
251 }
252
253 private PortNumber getOutput(FlowRule rule) {
254 for (Instruction i : rule.treatment().instructions()) {
255 if (i.type() == Instruction.Type.OUTPUT) {
256 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
257 return out.port();
258 }
259 if (i.type() == Instruction.Type.DROP) {
260 return PortNumber.P0;
261 }
262 }
263 return null;
264 }
265
266 private class InternalStatisticRepresentation {
267
268 private final AtomicInteger counter = new AtomicInteger(0);
269 private final Set<FlowEntry> rules = new HashSet<>();
270
271 public void prepare() {
272 counter.incrementAndGet();
273 }
274
275 public synchronized void remove(FlowRule rule) {
276 rules.remove(rule);
277 counter.decrementAndGet();
278 }
279
280 public synchronized boolean submit(FlowEntry rule) {
281 if (rules.contains(rule)) {
282 rules.remove(rule);
283 }
284 rules.add(rule);
285 if (counter.get() == 0) {
286 return true;
287 } else {
288 return counter.decrementAndGet() == 0;
289 }
290 }
291
292 public synchronized Set<FlowEntry> get() {
293 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700294 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700295 }
296
297
298 }
299
300}