blob: 907631d2dd9135201544561ce59a356f9dac3175 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
alshabib346b5b32015-03-06 00:42:16 -080026import org.onlab.util.KryoNamespace;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070027import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.DeviceId;
31import org.onosproject.net.PortNumber;
32import org.onosproject.net.flow.FlowEntry;
33import org.onosproject.net.flow.FlowRule;
34import org.onosproject.net.flow.instructions.Instruction;
35import org.onosproject.net.flow.instructions.Instructions;
36import org.onosproject.net.statistic.StatisticStore;
37import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.ClusterMessage;
39import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
40import org.onosproject.store.flow.ReplicaInfo;
41import org.onosproject.store.flow.ReplicaInfoService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
alshabib3d643ec2014-10-22 18:33:00 -070044import org.slf4j.Logger;
45
alshabib3d643ec2014-10-22 18:33:00 -070046import java.io.IOException;
alshabib9c57bdd2014-11-28 19:14:06 -050047import java.util.Collections;
alshabib3d643ec2014-10-22 18:33:00 -070048import java.util.HashSet;
49import java.util.Map;
50import java.util.Set;
51import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080052import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070054import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070055import java.util.concurrent.atomic.AtomicInteger;
56
Madan Jampani2af244a2015-02-22 13:12:01 -080057import static org.onlab.util.Tools.groupedThreads;
Brian O'Connorabafb502014-12-02 22:26:20 -080058import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
59import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
Thomas Vachuska82041f52014-11-30 22:14:02 -080060import static org.slf4j.LoggerFactory.getLogger;
61
alshabib3d643ec2014-10-22 18:33:00 -070062
63/**
64 * Maintains statistics using RPC calls to collect stats from remote instances
65 * on demand.
66 */
67@Component(immediate = true)
68@Service
69public class DistributedStatisticStore implements StatisticStore {
70
71 private final Logger log = getLogger(getClass());
72
Madan Jampani2af244a2015-02-22 13:12:01 -080073 // TODO: Make configurable.
74 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
75
alshabib3d643ec2014-10-22 18:33:00 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080077 protected ReplicaInfoService replicaInfoManager;
alshabib3d643ec2014-10-22 18:33:00 -070078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080080 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070081
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080083 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070084
85 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
86 new ConcurrentHashMap<>();
87
88 private Map<ConnectPoint, Set<FlowEntry>> previous =
89 new ConcurrentHashMap<>();
90
91 private Map<ConnectPoint, Set<FlowEntry>> current =
92 new ConcurrentHashMap<>();
93
94 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 @Override
96 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070097 serializerPool = KryoNamespace.newBuilder()
98 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080099 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -0700100 // register this store specific classes here
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800101 .build();
alshabib3d643ec2014-10-22 18:33:00 -0700102 }
103 };;
104
Madan Jampani2af244a2015-02-22 13:12:01 -0800105 private ExecutorService messageHandlingExecutor;
106
alshabib3d643ec2014-10-22 18:33:00 -0700107 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
108
109 @Activate
110 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800111
112 messageHandlingExecutor = Executors.newFixedThreadPool(
113 MESSAGE_HANDLER_THREAD_POOL_SIZE,
114 groupedThreads("onos/store/statistic", "message-handlers"));
115
alshabib3d643ec2014-10-22 18:33:00 -0700116 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
117
118 @Override
119 public void handle(ClusterMessage message) {
120 ConnectPoint cp = SERIALIZER.decode(message.payload());
121 try {
122 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
123 } catch (IOException e) {
124 log.error("Failed to respond back", e);
125 }
126 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800127 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700128
129 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
130
131 @Override
132 public void handle(ClusterMessage message) {
133 ConnectPoint cp = SERIALIZER.decode(message.payload());
134 try {
135 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
136 } catch (IOException e) {
137 log.error("Failed to respond back", e);
138 }
139 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800140 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700141 log.info("Started");
142 }
143
144 @Deactivate
145 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800146 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
147 clusterCommunicator.removeSubscriber(GET_CURRENT);
148 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700149 log.info("Stopped");
150 }
151
152 @Override
153 public void prepareForStatistics(FlowRule rule) {
154 ConnectPoint cp = buildConnectPoint(rule);
155 if (cp == null) {
156 return;
157 }
158 InternalStatisticRepresentation rep;
159 synchronized (representations) {
160 rep = getOrCreateRepresentation(cp);
161 }
162 rep.prepare();
163 }
164
165 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700166 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700167 ConnectPoint cp = buildConnectPoint(rule);
168 if (cp == null) {
169 return;
170 }
171 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500172 if (rep != null && rep.remove(rule)) {
173 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700174 }
alshabibf6c2ede2014-10-22 23:31:50 -0700175 Set<FlowEntry> values = current.get(cp);
176 if (values != null) {
177 values.remove(rule);
178 }
179 values = previous.get(cp);
180 if (values != null) {
181 values.remove(rule);
182 }
183
alshabib3d643ec2014-10-22 18:33:00 -0700184 }
185
186 @Override
187 public void addOrUpdateStatistic(FlowEntry rule) {
188 ConnectPoint cp = buildConnectPoint(rule);
189 if (cp == null) {
190 return;
191 }
192 InternalStatisticRepresentation rep = representations.get(cp);
193 if (rep != null && rep.submit(rule)) {
194 updatePublishedStats(cp, rep.get());
195 }
196 }
197
198 private synchronized void updatePublishedStats(ConnectPoint cp,
199 Set<FlowEntry> flowEntries) {
200 Set<FlowEntry> curr = current.get(cp);
201 if (curr == null) {
202 curr = new HashSet<>();
203 }
204 previous.put(cp, curr);
205 current.put(cp, flowEntries);
206
207 }
208
209 @Override
210 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700211 final DeviceId deviceId = connectPoint.deviceId();
212 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
213 if (!replicaInfo.master().isPresent()) {
214 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800215 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700216 }
alshabib3d643ec2014-10-22 18:33:00 -0700217 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
218 return getCurrentStatisticInternal(connectPoint);
219 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700220 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
221 connectPoint,
222 GET_CURRENT,
223 SERIALIZER::encode,
224 SERIALIZER::decode,
225 replicaInfo.master().get()),
226 STATISTIC_STORE_TIMEOUT_MILLIS,
227 TimeUnit.MILLISECONDS,
228 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700229 }
230
231 }
232
233 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
234 return current.get(connectPoint);
235 }
236
237 @Override
238 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700239 final DeviceId deviceId = connectPoint.deviceId();
240 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
241 if (!replicaInfo.master().isPresent()) {
242 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800243 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700244 }
alshabib3d643ec2014-10-22 18:33:00 -0700245 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
246 return getPreviousStatisticInternal(connectPoint);
247 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700248 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
249 connectPoint,
250 GET_PREVIOUS,
251 SERIALIZER::encode,
252 SERIALIZER::decode,
253 replicaInfo.master().get()),
254 STATISTIC_STORE_TIMEOUT_MILLIS,
255 TimeUnit.MILLISECONDS,
256 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700257 }
alshabib3d643ec2014-10-22 18:33:00 -0700258 }
259
260 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
261 return previous.get(connectPoint);
262 }
263
264 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
265
266 if (representations.containsKey(cp)) {
267 return representations.get(cp);
268 } else {
269 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
270 representations.put(cp, rep);
271 return rep;
272 }
273
274 }
275
276 private ConnectPoint buildConnectPoint(FlowRule rule) {
277 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800278
alshabib3d643ec2014-10-22 18:33:00 -0700279 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700280 return null;
281 }
282 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
283 return cp;
284 }
285
286 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700287 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700288 if (i.type() == Instruction.Type.OUTPUT) {
289 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
290 return out.port();
291 }
292 if (i.type() == Instruction.Type.DROP) {
293 return PortNumber.P0;
294 }
295 }
296 return null;
297 }
298
299 private class InternalStatisticRepresentation {
300
301 private final AtomicInteger counter = new AtomicInteger(0);
302 private final Set<FlowEntry> rules = new HashSet<>();
303
304 public void prepare() {
305 counter.incrementAndGet();
306 }
307
alshabib9c57bdd2014-11-28 19:14:06 -0500308 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700309 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500310 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700311 }
312
313 public synchronized boolean submit(FlowEntry rule) {
314 if (rules.contains(rule)) {
315 rules.remove(rule);
316 }
317 rules.add(rule);
318 if (counter.get() == 0) {
319 return true;
320 } else {
321 return counter.decrementAndGet() == 0;
322 }
323 }
324
325 public synchronized Set<FlowEntry> get() {
326 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700327 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700328 }
329
330
331 }
332
333}