blob: 718c68ea0099a15ccaddb84ba6d1417e9acc4cfd [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib3d643ec2014-10-22 18:33:00 -070016package org.onlab.onos.store.statistic.impl;
17
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
alshabib3d643ec2014-10-22 18:33:00 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.onos.cluster.ClusterService;
26import org.onlab.onos.net.ConnectPoint;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070027import org.onlab.onos.net.DeviceId;
alshabib3d643ec2014-10-22 18:33:00 -070028import org.onlab.onos.net.PortNumber;
29import org.onlab.onos.net.flow.FlowEntry;
30import org.onlab.onos.net.flow.FlowRule;
31import org.onlab.onos.net.flow.instructions.Instruction;
32import org.onlab.onos.net.flow.instructions.Instructions;
33import org.onlab.onos.net.statistic.StatisticStore;
34import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
35import org.onlab.onos.store.cluster.messaging.ClusterMessage;
36import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
alshabib3d643ec2014-10-22 18:33:00 -070037import org.onlab.onos.store.flow.ReplicaInfo;
38import org.onlab.onos.store.flow.ReplicaInfoService;
39import org.onlab.onos.store.serializers.KryoNamespaces;
40import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070041import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070042import org.slf4j.Logger;
43
alshabib3d643ec2014-10-22 18:33:00 -070044import java.io.IOException;
alshabib9c57bdd2014-11-28 19:14:06 -050045import java.util.Collections;
alshabib3d643ec2014-10-22 18:33:00 -070046import java.util.HashSet;
47import java.util.Map;
48import java.util.Set;
49import java.util.concurrent.ConcurrentHashMap;
Madan Jampani24f9efb2014-10-24 18:56:23 -070050import java.util.concurrent.ExecutionException;
51import java.util.concurrent.Future;
alshabib3d643ec2014-10-22 18:33:00 -070052import java.util.concurrent.TimeUnit;
53import java.util.concurrent.TimeoutException;
54import java.util.concurrent.atomic.AtomicInteger;
55
Thomas Vachuska82041f52014-11-30 22:14:02 -080056import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
57import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
58import static org.slf4j.LoggerFactory.getLogger;
59
alshabib3d643ec2014-10-22 18:33:00 -070060
61/**
62 * Maintains statistics using RPC calls to collect stats from remote instances
63 * on demand.
64 */
65@Component(immediate = true)
66@Service
67public class DistributedStatisticStore implements StatisticStore {
68
69 private final Logger log = getLogger(getClass());
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080072 protected ReplicaInfoService replicaInfoManager;
alshabib3d643ec2014-10-22 18:33:00 -070073
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080075 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070076
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080078 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070079
80 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
81 new ConcurrentHashMap<>();
82
83 private Map<ConnectPoint, Set<FlowEntry>> previous =
84 new ConcurrentHashMap<>();
85
86 private Map<ConnectPoint, Set<FlowEntry>> current =
87 new ConcurrentHashMap<>();
88
89 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
90 @Override
91 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070092 serializerPool = KryoNamespace.newBuilder()
93 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080094 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070095 // register this store specific classes here
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080096 .build();
alshabib3d643ec2014-10-22 18:33:00 -070097 }
98 };;
99
100 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
101
102 @Activate
103 public void activate() {
104 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
105
106 @Override
107 public void handle(ClusterMessage message) {
108 ConnectPoint cp = SERIALIZER.decode(message.payload());
109 try {
110 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
111 } catch (IOException e) {
112 log.error("Failed to respond back", e);
113 }
114 }
115 });
116
117 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
118
119 @Override
120 public void handle(ClusterMessage message) {
121 ConnectPoint cp = SERIALIZER.decode(message.payload());
122 try {
123 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
124 } catch (IOException e) {
125 log.error("Failed to respond back", e);
126 }
127 }
128 });
129 log.info("Started");
130 }
131
132 @Deactivate
133 public void deactivate() {
134 log.info("Stopped");
135 }
136
137 @Override
138 public void prepareForStatistics(FlowRule rule) {
139 ConnectPoint cp = buildConnectPoint(rule);
140 if (cp == null) {
141 return;
142 }
143 InternalStatisticRepresentation rep;
144 synchronized (representations) {
145 rep = getOrCreateRepresentation(cp);
146 }
147 rep.prepare();
148 }
149
150 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700151 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700152 ConnectPoint cp = buildConnectPoint(rule);
153 if (cp == null) {
154 return;
155 }
156 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500157 if (rep != null && rep.remove(rule)) {
158 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700159 }
alshabibf6c2ede2014-10-22 23:31:50 -0700160 Set<FlowEntry> values = current.get(cp);
161 if (values != null) {
162 values.remove(rule);
163 }
164 values = previous.get(cp);
165 if (values != null) {
166 values.remove(rule);
167 }
168
alshabib3d643ec2014-10-22 18:33:00 -0700169 }
170
171 @Override
172 public void addOrUpdateStatistic(FlowEntry rule) {
173 ConnectPoint cp = buildConnectPoint(rule);
174 if (cp == null) {
175 return;
176 }
177 InternalStatisticRepresentation rep = representations.get(cp);
178 if (rep != null && rep.submit(rule)) {
179 updatePublishedStats(cp, rep.get());
180 }
181 }
182
183 private synchronized void updatePublishedStats(ConnectPoint cp,
184 Set<FlowEntry> flowEntries) {
185 Set<FlowEntry> curr = current.get(cp);
186 if (curr == null) {
187 curr = new HashSet<>();
188 }
189 previous.put(cp, curr);
190 current.put(cp, flowEntries);
191
192 }
193
194 @Override
195 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700196 final DeviceId deviceId = connectPoint.deviceId();
197 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
198 if (!replicaInfo.master().isPresent()) {
199 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800200 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700201 }
alshabib3d643ec2014-10-22 18:33:00 -0700202 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
203 return getCurrentStatisticInternal(connectPoint);
204 } else {
205 ClusterMessage message = new ClusterMessage(
206 clusterService.getLocalNode().id(),
207 GET_CURRENT,
208 SERIALIZER.encode(connectPoint));
209
210 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700211 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700212 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
213 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
214 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700215 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Thomas Vachuska82041f52014-11-30 22:14:02 -0800216 log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
217 return Collections.emptySet();
alshabib3d643ec2014-10-22 18:33:00 -0700218 }
219 }
220
221 }
222
223 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
224 return current.get(connectPoint);
225 }
226
227 @Override
228 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700229 final DeviceId deviceId = connectPoint.deviceId();
230 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
231 if (!replicaInfo.master().isPresent()) {
232 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800233 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700234 }
alshabib3d643ec2014-10-22 18:33:00 -0700235 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
236 return getPreviousStatisticInternal(connectPoint);
237 } else {
238 ClusterMessage message = new ClusterMessage(
239 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700240 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700241 SERIALIZER.encode(connectPoint));
242
243 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700244 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700245 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
246 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
247 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700248 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Thomas Vachuska82041f52014-11-30 22:14:02 -0800249 log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
250 return Collections.emptySet();
alshabib3d643ec2014-10-22 18:33:00 -0700251 }
252 }
253
254 }
255
256 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
257 return previous.get(connectPoint);
258 }
259
260 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
261
262 if (representations.containsKey(cp)) {
263 return representations.get(cp);
264 } else {
265 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
266 representations.put(cp, rep);
267 return rep;
268 }
269
270 }
271
272 private ConnectPoint buildConnectPoint(FlowRule rule) {
273 PortNumber port = getOutput(rule);
274 if (port == null) {
Brian O'Connorfaaedf42014-11-17 14:48:48 -0800275 log.debug("Rule {} has no output.", rule);
alshabib3d643ec2014-10-22 18:33:00 -0700276 return null;
277 }
278 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
279 return cp;
280 }
281
282 private PortNumber getOutput(FlowRule rule) {
283 for (Instruction i : rule.treatment().instructions()) {
284 if (i.type() == Instruction.Type.OUTPUT) {
285 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
286 return out.port();
287 }
288 if (i.type() == Instruction.Type.DROP) {
289 return PortNumber.P0;
290 }
291 }
292 return null;
293 }
294
295 private class InternalStatisticRepresentation {
296
297 private final AtomicInteger counter = new AtomicInteger(0);
298 private final Set<FlowEntry> rules = new HashSet<>();
299
300 public void prepare() {
301 counter.incrementAndGet();
302 }
303
alshabib9c57bdd2014-11-28 19:14:06 -0500304 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700305 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500306 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700307 }
308
309 public synchronized boolean submit(FlowEntry rule) {
310 if (rules.contains(rule)) {
311 rules.remove(rule);
312 }
313 rules.add(rule);
314 if (counter.get() == 0) {
315 return true;
316 } else {
317 return counter.decrementAndGet() == 0;
318 }
319 }
320
321 public synchronized Set<FlowEntry> get() {
322 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700323 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700324 }
325
326
327 }
328
329}