blob: 9c125287e8637e5c222d7bc6e9ef836c950e09d7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2af244a2015-02-22 13:12:01 -080019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import org.onosproject.cluster.ClusterService;
27import org.onosproject.net.ConnectPoint;
28import org.onosproject.net.DeviceId;
29import org.onosproject.net.PortNumber;
30import org.onosproject.net.flow.FlowEntry;
31import org.onosproject.net.flow.FlowRule;
32import org.onosproject.net.flow.instructions.Instruction;
33import org.onosproject.net.flow.instructions.Instructions;
34import org.onosproject.net.statistic.StatisticStore;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.flow.ReplicaInfo;
39import org.onosproject.store.flow.ReplicaInfoService;
40import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070042import org.onlab.util.KryoNamespace;
alshabib3d643ec2014-10-22 18:33:00 -070043import org.slf4j.Logger;
44
alshabib3d643ec2014-10-22 18:33:00 -070045import java.io.IOException;
alshabib9c57bdd2014-11-28 19:14:06 -050046import java.util.Collections;
alshabib3d643ec2014-10-22 18:33:00 -070047import java.util.HashSet;
48import java.util.Map;
49import java.util.Set;
50import java.util.concurrent.ConcurrentHashMap;
Madan Jampani24f9efb2014-10-24 18:56:23 -070051import java.util.concurrent.ExecutionException;
Madan Jampani2af244a2015-02-22 13:12:01 -080052import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
Madan Jampani24f9efb2014-10-24 18:56:23 -070054import java.util.concurrent.Future;
alshabib3d643ec2014-10-22 18:33:00 -070055import java.util.concurrent.TimeUnit;
56import java.util.concurrent.TimeoutException;
57import java.util.concurrent.atomic.AtomicInteger;
58
Madan Jampani2af244a2015-02-22 13:12:01 -080059import static org.onlab.util.Tools.groupedThreads;
Brian O'Connorabafb502014-12-02 22:26:20 -080060import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
61import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
Thomas Vachuska82041f52014-11-30 22:14:02 -080062import static org.slf4j.LoggerFactory.getLogger;
63
alshabib3d643ec2014-10-22 18:33:00 -070064
65/**
66 * Maintains statistics using RPC calls to collect stats from remote instances
67 * on demand.
68 */
69@Component(immediate = true)
70@Service
71public class DistributedStatisticStore implements StatisticStore {
72
73 private final Logger log = getLogger(getClass());
74
Madan Jampani2af244a2015-02-22 13:12:01 -080075 // TODO: Make configurable.
76 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
77
alshabib3d643ec2014-10-22 18:33:00 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080079 protected ReplicaInfoService replicaInfoManager;
alshabib3d643ec2014-10-22 18:33:00 -070080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080082 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070083
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080085 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070086
87 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> previous =
91 new ConcurrentHashMap<>();
92
93 private Map<ConnectPoint, Set<FlowEntry>> current =
94 new ConcurrentHashMap<>();
95
96 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
97 @Override
98 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070099 serializerPool = KryoNamespace.newBuilder()
100 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800101 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -0700102 // register this store specific classes here
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800103 .build();
alshabib3d643ec2014-10-22 18:33:00 -0700104 }
105 };;
106
Madan Jampani2af244a2015-02-22 13:12:01 -0800107 private ExecutorService messageHandlingExecutor;
108
alshabib3d643ec2014-10-22 18:33:00 -0700109 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
110
111 @Activate
112 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800113
114 messageHandlingExecutor = Executors.newFixedThreadPool(
115 MESSAGE_HANDLER_THREAD_POOL_SIZE,
116 groupedThreads("onos/store/statistic", "message-handlers"));
117
alshabib3d643ec2014-10-22 18:33:00 -0700118 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
119
120 @Override
121 public void handle(ClusterMessage message) {
122 ConnectPoint cp = SERIALIZER.decode(message.payload());
123 try {
124 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
125 } catch (IOException e) {
126 log.error("Failed to respond back", e);
127 }
128 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800129 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700130
131 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
132
133 @Override
134 public void handle(ClusterMessage message) {
135 ConnectPoint cp = SERIALIZER.decode(message.payload());
136 try {
137 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
138 } catch (IOException e) {
139 log.error("Failed to respond back", e);
140 }
141 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800142 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700143 log.info("Started");
144 }
145
146 @Deactivate
147 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800148 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
149 clusterCommunicator.removeSubscriber(GET_CURRENT);
150 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700151 log.info("Stopped");
152 }
153
154 @Override
155 public void prepareForStatistics(FlowRule rule) {
156 ConnectPoint cp = buildConnectPoint(rule);
157 if (cp == null) {
158 return;
159 }
160 InternalStatisticRepresentation rep;
161 synchronized (representations) {
162 rep = getOrCreateRepresentation(cp);
163 }
164 rep.prepare();
165 }
166
167 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700168 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700169 ConnectPoint cp = buildConnectPoint(rule);
170 if (cp == null) {
171 return;
172 }
173 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500174 if (rep != null && rep.remove(rule)) {
175 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700176 }
alshabibf6c2ede2014-10-22 23:31:50 -0700177 Set<FlowEntry> values = current.get(cp);
178 if (values != null) {
179 values.remove(rule);
180 }
181 values = previous.get(cp);
182 if (values != null) {
183 values.remove(rule);
184 }
185
alshabib3d643ec2014-10-22 18:33:00 -0700186 }
187
188 @Override
189 public void addOrUpdateStatistic(FlowEntry rule) {
190 ConnectPoint cp = buildConnectPoint(rule);
191 if (cp == null) {
192 return;
193 }
194 InternalStatisticRepresentation rep = representations.get(cp);
195 if (rep != null && rep.submit(rule)) {
196 updatePublishedStats(cp, rep.get());
197 }
198 }
199
200 private synchronized void updatePublishedStats(ConnectPoint cp,
201 Set<FlowEntry> flowEntries) {
202 Set<FlowEntry> curr = current.get(cp);
203 if (curr == null) {
204 curr = new HashSet<>();
205 }
206 previous.put(cp, curr);
207 current.put(cp, flowEntries);
208
209 }
210
211 @Override
212 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700213 final DeviceId deviceId = connectPoint.deviceId();
214 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
215 if (!replicaInfo.master().isPresent()) {
216 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800217 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700218 }
alshabib3d643ec2014-10-22 18:33:00 -0700219 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
220 return getCurrentStatisticInternal(connectPoint);
221 } else {
222 ClusterMessage message = new ClusterMessage(
223 clusterService.getLocalNode().id(),
224 GET_CURRENT,
225 SERIALIZER.encode(connectPoint));
226
227 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700228 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700229 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
230 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
231 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700232 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Thomas Vachuska82041f52014-11-30 22:14:02 -0800233 log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
234 return Collections.emptySet();
alshabib3d643ec2014-10-22 18:33:00 -0700235 }
236 }
237
238 }
239
240 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
241 return current.get(connectPoint);
242 }
243
244 @Override
245 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700246 final DeviceId deviceId = connectPoint.deviceId();
247 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
248 if (!replicaInfo.master().isPresent()) {
249 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800250 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700251 }
alshabib3d643ec2014-10-22 18:33:00 -0700252 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
253 return getPreviousStatisticInternal(connectPoint);
254 } else {
255 ClusterMessage message = new ClusterMessage(
256 clusterService.getLocalNode().id(),
alshabibf6c2ede2014-10-22 23:31:50 -0700257 GET_PREVIOUS,
alshabib3d643ec2014-10-22 18:33:00 -0700258 SERIALIZER.encode(connectPoint));
259
260 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700261 Future<byte[]> response =
alshabib3d643ec2014-10-22 18:33:00 -0700262 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
263 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
264 TimeUnit.MILLISECONDS));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700265 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Thomas Vachuska82041f52014-11-30 22:14:02 -0800266 log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
267 return Collections.emptySet();
alshabib3d643ec2014-10-22 18:33:00 -0700268 }
269 }
270
271 }
272
273 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
274 return previous.get(connectPoint);
275 }
276
277 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
278
279 if (representations.containsKey(cp)) {
280 return representations.get(cp);
281 } else {
282 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
283 representations.put(cp, rep);
284 return rep;
285 }
286
287 }
288
289 private ConnectPoint buildConnectPoint(FlowRule rule) {
290 PortNumber port = getOutput(rule);
291 if (port == null) {
Brian O'Connorfaaedf42014-11-17 14:48:48 -0800292 log.debug("Rule {} has no output.", rule);
alshabib3d643ec2014-10-22 18:33:00 -0700293 return null;
294 }
295 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
296 return cp;
297 }
298
299 private PortNumber getOutput(FlowRule rule) {
300 for (Instruction i : rule.treatment().instructions()) {
301 if (i.type() == Instruction.Type.OUTPUT) {
302 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
303 return out.port();
304 }
305 if (i.type() == Instruction.Type.DROP) {
306 return PortNumber.P0;
307 }
308 }
309 return null;
310 }
311
312 private class InternalStatisticRepresentation {
313
314 private final AtomicInteger counter = new AtomicInteger(0);
315 private final Set<FlowEntry> rules = new HashSet<>();
316
317 public void prepare() {
318 counter.incrementAndGet();
319 }
320
alshabib9c57bdd2014-11-28 19:14:06 -0500321 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700322 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500323 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700324 }
325
326 public synchronized boolean submit(FlowEntry rule) {
327 if (rules.contains(rule)) {
328 rules.remove(rule);
329 }
330 rules.add(rule);
331 if (counter.get() == 0) {
332 return true;
333 } else {
334 return counter.decrementAndGet() == 0;
335 }
336 }
337
338 public synchronized Set<FlowEntry> get() {
339 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700340 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700341 }
342
343
344 }
345
346}