blob: 03ae01bf2b8e70faf4249e729b37a8603f8e7130 [file] [log] [blame]
alshabib3d643ec2014-10-22 18:33:00 -07001package org.onlab.onos.store.statistic.impl;
2
3import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -07007
alshabib3d643ec2014-10-22 18:33:00 -07008import org.apache.felix.scr.annotations.Activate;
9import org.apache.felix.scr.annotations.Component;
10import org.apache.felix.scr.annotations.Deactivate;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
13import org.apache.felix.scr.annotations.Service;
14import org.onlab.onos.cluster.ClusterService;
15import org.onlab.onos.net.ConnectPoint;
16import org.onlab.onos.net.PortNumber;
17import org.onlab.onos.net.flow.FlowEntry;
18import org.onlab.onos.net.flow.FlowRule;
19import org.onlab.onos.net.flow.instructions.Instruction;
20import org.onlab.onos.net.flow.instructions.Instructions;
21import org.onlab.onos.net.statistic.StatisticStore;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
26import org.onlab.onos.store.flow.ReplicaInfo;
27import org.onlab.onos.store.flow.ReplicaInfoService;
28import org.onlab.onos.store.serializers.KryoNamespaces;
29import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070030import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070031import org.slf4j.Logger;
32
alshabib3d643ec2014-10-22 18:33:00 -070033import java.io.IOException;
34import java.util.HashSet;
35import java.util.Map;
36import java.util.Set;
37import java.util.concurrent.ConcurrentHashMap;
38import java.util.concurrent.TimeUnit;
39import java.util.concurrent.TimeoutException;
40import java.util.concurrent.atomic.AtomicInteger;
41
42
43/**
44 * Maintains statistics using RPC calls to collect stats from remote instances
45 * on demand.
46 */
47@Component(immediate = true)
48@Service
49public class DistributedStatisticStore implements StatisticStore {
50
51 private final Logger log = getLogger(getClass());
52
53 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
54 private ReplicaInfoService replicaInfoManager;
55
56 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
57 private ClusterCommunicationService clusterCommunicator;
58
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 private ClusterService clusterService;
61
62 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
63 new ConcurrentHashMap<>();
64
65 private Map<ConnectPoint, Set<FlowEntry>> previous =
66 new ConcurrentHashMap<>();
67
68 private Map<ConnectPoint, Set<FlowEntry>> current =
69 new ConcurrentHashMap<>();
70
71 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
72 @Override
73 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070074 serializerPool = KryoNamespace.newBuilder()
75 .register(KryoNamespaces.API)
76 // register this store specific classes here
alshabib3d643ec2014-10-22 18:33:00 -070077 .build()
78 .populate(1);
79 }
80 };;
81
82 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
83
84 @Activate
85 public void activate() {
86 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
87
88 @Override
89 public void handle(ClusterMessage message) {
90 ConnectPoint cp = SERIALIZER.decode(message.payload());
91 try {
92 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
93 } catch (IOException e) {
94 log.error("Failed to respond back", e);
95 }
96 }
97 });
98
99 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
100
101 @Override
102 public void handle(ClusterMessage message) {
103 ConnectPoint cp = SERIALIZER.decode(message.payload());
104 try {
105 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
106 } catch (IOException e) {
107 log.error("Failed to respond back", e);
108 }
109 }
110 });
111 log.info("Started");
112 }
113
114 @Deactivate
115 public void deactivate() {
116 log.info("Stopped");
117 }
118
119 @Override
120 public void prepareForStatistics(FlowRule rule) {
121 ConnectPoint cp = buildConnectPoint(rule);
122 if (cp == null) {
123 return;
124 }
125 InternalStatisticRepresentation rep;
126 synchronized (representations) {
127 rep = getOrCreateRepresentation(cp);
128 }
129 rep.prepare();
130 }
131
132 @Override
133 public void removeFromStatistics(FlowRule rule) {
134 ConnectPoint cp = buildConnectPoint(rule);
135 if (cp == null) {
136 return;
137 }
138 InternalStatisticRepresentation rep = representations.get(cp);
139 if (rep != null) {
140 rep.remove(rule);
141 }
142 }
143
144 @Override
145 public void addOrUpdateStatistic(FlowEntry rule) {
146 ConnectPoint cp = buildConnectPoint(rule);
147 if (cp == null) {
148 return;
149 }
150 InternalStatisticRepresentation rep = representations.get(cp);
151 if (rep != null && rep.submit(rule)) {
152 updatePublishedStats(cp, rep.get());
153 }
154 }
155
156 private synchronized void updatePublishedStats(ConnectPoint cp,
157 Set<FlowEntry> flowEntries) {
158 Set<FlowEntry> curr = current.get(cp);
159 if (curr == null) {
160 curr = new HashSet<>();
161 }
162 previous.put(cp, curr);
163 current.put(cp, flowEntries);
164
165 }
166
167 @Override
168 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
169 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
170 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
171 return getCurrentStatisticInternal(connectPoint);
172 } else {
173 ClusterMessage message = new ClusterMessage(
174 clusterService.getLocalNode().id(),
175 GET_CURRENT,
176 SERIALIZER.encode(connectPoint));
177
178 try {
179 ClusterMessageResponse response =
180 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
181 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
182 TimeUnit.MILLISECONDS));
183 } catch (IOException | TimeoutException e) {
184 // FIXME: throw a FlowStoreException
185 throw new RuntimeException(e);
186 }
187 }
188
189 }
190
191 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
192 return current.get(connectPoint);
193 }
194
195 @Override
196 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
197 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
198 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
199 return getPreviousStatisticInternal(connectPoint);
200 } else {
201 ClusterMessage message = new ClusterMessage(
202 clusterService.getLocalNode().id(),
203 GET_CURRENT,
204 SERIALIZER.encode(connectPoint));
205
206 try {
207 ClusterMessageResponse response =
208 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
209 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
210 TimeUnit.MILLISECONDS));
211 } catch (IOException | TimeoutException e) {
212 // FIXME: throw a FlowStoreException
213 throw new RuntimeException(e);
214 }
215 }
216
217 }
218
219 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
220 return previous.get(connectPoint);
221 }
222
223 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
224
225 if (representations.containsKey(cp)) {
226 return representations.get(cp);
227 } else {
228 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
229 representations.put(cp, rep);
230 return rep;
231 }
232
233 }
234
235 private ConnectPoint buildConnectPoint(FlowRule rule) {
236 PortNumber port = getOutput(rule);
237 if (port == null) {
238 log.warn("Rule {} has no output.", rule);
239 return null;
240 }
241 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
242 return cp;
243 }
244
245 private PortNumber getOutput(FlowRule rule) {
246 for (Instruction i : rule.treatment().instructions()) {
247 if (i.type() == Instruction.Type.OUTPUT) {
248 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
249 return out.port();
250 }
251 if (i.type() == Instruction.Type.DROP) {
252 return PortNumber.P0;
253 }
254 }
255 return null;
256 }
257
258 private class InternalStatisticRepresentation {
259
260 private final AtomicInteger counter = new AtomicInteger(0);
261 private final Set<FlowEntry> rules = new HashSet<>();
262
263 public void prepare() {
264 counter.incrementAndGet();
265 }
266
267 public synchronized void remove(FlowRule rule) {
268 rules.remove(rule);
269 counter.decrementAndGet();
270 }
271
272 public synchronized boolean submit(FlowEntry rule) {
273 if (rules.contains(rule)) {
274 rules.remove(rule);
275 }
276 rules.add(rule);
277 if (counter.get() == 0) {
278 return true;
279 } else {
280 return counter.decrementAndGet() == 0;
281 }
282 }
283
284 public synchronized Set<FlowEntry> get() {
285 counter.set(rules.size());
286 return ImmutableSet.copyOf(rules);
287 }
288
289
290 }
291
292}