blob: d89ce252d6d4e02ae2db48c12aa2855e013c8b4f [file] [log] [blame]
alshabib3d643ec2014-10-22 18:33:00 -07001package org.onlab.onos.store.statistic.impl;
2
3import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import com.google.common.collect.ImmutableSet;
7import org.apache.felix.scr.annotations.Activate;
8import org.apache.felix.scr.annotations.Component;
9import org.apache.felix.scr.annotations.Deactivate;
10import org.apache.felix.scr.annotations.Reference;
11import org.apache.felix.scr.annotations.ReferenceCardinality;
12import org.apache.felix.scr.annotations.Service;
13import org.onlab.onos.cluster.ClusterService;
14import org.onlab.onos.net.ConnectPoint;
15import org.onlab.onos.net.PortNumber;
16import org.onlab.onos.net.flow.FlowEntry;
17import org.onlab.onos.net.flow.FlowRule;
18import org.onlab.onos.net.flow.instructions.Instruction;
19import org.onlab.onos.net.flow.instructions.Instructions;
20import org.onlab.onos.net.statistic.StatisticStore;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
22import org.onlab.onos.store.cluster.messaging.ClusterMessage;
23import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
25import org.onlab.onos.store.flow.ReplicaInfo;
26import org.onlab.onos.store.flow.ReplicaInfoService;
27import org.onlab.onos.store.serializers.KryoNamespaces;
28import org.onlab.onos.store.serializers.KryoSerializer;
29import org.slf4j.Logger;
30
31
32import java.io.IOException;
33import java.util.HashSet;
34import java.util.Map;
35import java.util.Set;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.TimeUnit;
38import java.util.concurrent.TimeoutException;
39import java.util.concurrent.atomic.AtomicInteger;
40
41
42/**
43 * Maintains statistics using RPC calls to collect stats from remote instances
44 * on demand.
45 */
46@Component(immediate = true)
47@Service
48public class DistributedStatisticStore implements StatisticStore {
49
50 private final Logger log = getLogger(getClass());
51
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ReplicaInfoService replicaInfoManager;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 private ClusterCommunicationService clusterCommunicator;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private ClusterService clusterService;
60
61 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
62 new ConcurrentHashMap<>();
63
64 private Map<ConnectPoint, Set<FlowEntry>> previous =
65 new ConcurrentHashMap<>();
66
67 private Map<ConnectPoint, Set<FlowEntry>> current =
68 new ConcurrentHashMap<>();
69
70 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 @Override
72 protected void setupKryoPool() {
73 serializerPool = KryoNamespaces.API.newBuilder()
74 .build()
75 .populate(1);
76 }
77 };;
78
79 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
80
81 @Activate
82 public void activate() {
83 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
84
85 @Override
86 public void handle(ClusterMessage message) {
87 ConnectPoint cp = SERIALIZER.decode(message.payload());
88 try {
89 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
90 } catch (IOException e) {
91 log.error("Failed to respond back", e);
92 }
93 }
94 });
95
96 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
97
98 @Override
99 public void handle(ClusterMessage message) {
100 ConnectPoint cp = SERIALIZER.decode(message.payload());
101 try {
102 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
103 } catch (IOException e) {
104 log.error("Failed to respond back", e);
105 }
106 }
107 });
108 log.info("Started");
109 }
110
111 @Deactivate
112 public void deactivate() {
113 log.info("Stopped");
114 }
115
116 @Override
117 public void prepareForStatistics(FlowRule rule) {
118 ConnectPoint cp = buildConnectPoint(rule);
119 if (cp == null) {
120 return;
121 }
122 InternalStatisticRepresentation rep;
123 synchronized (representations) {
124 rep = getOrCreateRepresentation(cp);
125 }
126 rep.prepare();
127 }
128
129 @Override
130 public void removeFromStatistics(FlowRule rule) {
131 ConnectPoint cp = buildConnectPoint(rule);
132 if (cp == null) {
133 return;
134 }
135 InternalStatisticRepresentation rep = representations.get(cp);
136 if (rep != null) {
137 rep.remove(rule);
138 }
139 }
140
141 @Override
142 public void addOrUpdateStatistic(FlowEntry rule) {
143 ConnectPoint cp = buildConnectPoint(rule);
144 if (cp == null) {
145 return;
146 }
147 InternalStatisticRepresentation rep = representations.get(cp);
148 if (rep != null && rep.submit(rule)) {
149 updatePublishedStats(cp, rep.get());
150 }
151 }
152
153 private synchronized void updatePublishedStats(ConnectPoint cp,
154 Set<FlowEntry> flowEntries) {
155 Set<FlowEntry> curr = current.get(cp);
156 if (curr == null) {
157 curr = new HashSet<>();
158 }
159 previous.put(cp, curr);
160 current.put(cp, flowEntries);
161
162 }
163
164 @Override
165 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
166 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
167 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
168 return getCurrentStatisticInternal(connectPoint);
169 } else {
170 ClusterMessage message = new ClusterMessage(
171 clusterService.getLocalNode().id(),
172 GET_CURRENT,
173 SERIALIZER.encode(connectPoint));
174
175 try {
176 ClusterMessageResponse response =
177 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
178 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
179 TimeUnit.MILLISECONDS));
180 } catch (IOException | TimeoutException e) {
181 // FIXME: throw a FlowStoreException
182 throw new RuntimeException(e);
183 }
184 }
185
186 }
187
188 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
189 return current.get(connectPoint);
190 }
191
192 @Override
193 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
194 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
195 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
196 return getPreviousStatisticInternal(connectPoint);
197 } else {
198 ClusterMessage message = new ClusterMessage(
199 clusterService.getLocalNode().id(),
200 GET_CURRENT,
201 SERIALIZER.encode(connectPoint));
202
203 try {
204 ClusterMessageResponse response =
205 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
206 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
207 TimeUnit.MILLISECONDS));
208 } catch (IOException | TimeoutException e) {
209 // FIXME: throw a FlowStoreException
210 throw new RuntimeException(e);
211 }
212 }
213
214 }
215
216 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
217 return previous.get(connectPoint);
218 }
219
220 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
221
222 if (representations.containsKey(cp)) {
223 return representations.get(cp);
224 } else {
225 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
226 representations.put(cp, rep);
227 return rep;
228 }
229
230 }
231
232 private ConnectPoint buildConnectPoint(FlowRule rule) {
233 PortNumber port = getOutput(rule);
234 if (port == null) {
235 log.warn("Rule {} has no output.", rule);
236 return null;
237 }
238 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
239 return cp;
240 }
241
242 private PortNumber getOutput(FlowRule rule) {
243 for (Instruction i : rule.treatment().instructions()) {
244 if (i.type() == Instruction.Type.OUTPUT) {
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
246 return out.port();
247 }
248 if (i.type() == Instruction.Type.DROP) {
249 return PortNumber.P0;
250 }
251 }
252 return null;
253 }
254
255 private class InternalStatisticRepresentation {
256
257 private final AtomicInteger counter = new AtomicInteger(0);
258 private final Set<FlowEntry> rules = new HashSet<>();
259
260 public void prepare() {
261 counter.incrementAndGet();
262 }
263
264 public synchronized void remove(FlowRule rule) {
265 rules.remove(rule);
266 counter.decrementAndGet();
267 }
268
269 public synchronized boolean submit(FlowEntry rule) {
270 if (rules.contains(rule)) {
271 rules.remove(rule);
272 }
273 rules.add(rule);
274 if (counter.get() == 0) {
275 return true;
276 } else {
277 return counter.decrementAndGet() == 0;
278 }
279 }
280
281 public synchronized Set<FlowEntry> get() {
282 counter.set(rules.size());
283 return ImmutableSet.copyOf(rules);
284 }
285
286
287 }
288
289}