blob: 35bdad14342c7806c91f23936ee533890fc185f1 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
alshabib346b5b32015-03-06 00:42:16 -080026import org.onlab.util.KryoNamespace;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070027import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
Madan Jampanic156dd02015-08-12 15:57:46 -070029import org.onosproject.cluster.NodeId;
30import org.onosproject.mastership.MastershipService;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.net.ConnectPoint;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.flow.FlowEntry;
35import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.instructions.Instruction;
37import org.onosproject.net.flow.instructions.Instructions;
38import org.onosproject.net.statistic.StatisticStore;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
alshabib3d643ec2014-10-22 18:33:00 -070042import org.slf4j.Logger;
43
alshabib9c57bdd2014-11-28 19:14:06 -050044import java.util.Collections;
alshabib3d643ec2014-10-22 18:33:00 -070045import java.util.HashSet;
46import java.util.Map;
47import java.util.Set;
48import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080049import java.util.concurrent.ExecutorService;
50import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070051import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070052import java.util.concurrent.atomic.AtomicInteger;
53
Madan Jampani2af244a2015-02-22 13:12:01 -080054import static org.onlab.util.Tools.groupedThreads;
Brian O'Connorabafb502014-12-02 22:26:20 -080055import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
56import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
Thomas Vachuska82041f52014-11-30 22:14:02 -080057import static org.slf4j.LoggerFactory.getLogger;
58
alshabib3d643ec2014-10-22 18:33:00 -070059
60/**
61 * Maintains statistics using RPC calls to collect stats from remote instances
62 * on demand.
63 */
64@Component(immediate = true)
65@Service
66public class DistributedStatisticStore implements StatisticStore {
67
68 private final Logger log = getLogger(getClass());
69
Madan Jampani2af244a2015-02-22 13:12:01 -080070 // TODO: Make configurable.
71 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
72
alshabib3d643ec2014-10-22 18:33:00 -070073 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanic156dd02015-08-12 15:57:46 -070074 protected MastershipService mastershipService;
alshabib3d643ec2014-10-22 18:33:00 -070075
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080077 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080080 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070081
82 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
83 new ConcurrentHashMap<>();
84
85 private Map<ConnectPoint, Set<FlowEntry>> previous =
86 new ConcurrentHashMap<>();
87
88 private Map<ConnectPoint, Set<FlowEntry>> current =
89 new ConcurrentHashMap<>();
90
91 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
92 @Override
93 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070094 serializerPool = KryoNamespace.newBuilder()
95 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080096 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070097 // register this store specific classes here
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080098 .build();
alshabib3d643ec2014-10-22 18:33:00 -070099 }
Sho SHIMIZU9f614a42015-09-11 15:32:46 -0700100 };
alshabib3d643ec2014-10-22 18:33:00 -0700101
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
alshabib3d643ec2014-10-22 18:33:00 -0700104 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
105
106 @Activate
107 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800108
109 messageHandlingExecutor = Executors.newFixedThreadPool(
110 MESSAGE_HANDLER_THREAD_POOL_SIZE,
111 groupedThreads("onos/store/statistic", "message-handlers"));
112
Madan Jampani1151b552015-08-12 16:11:27 -0700113 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
114 SERIALIZER::decode,
115 this::getCurrentStatisticInternal,
116 SERIALIZER::encode,
117 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700118
Madan Jampani1151b552015-08-12 16:11:27 -0700119 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
120 SERIALIZER::decode,
121 this::getPreviousStatisticInternal,
122 SERIALIZER::encode,
123 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700124
alshabib3d643ec2014-10-22 18:33:00 -0700125 log.info("Started");
126 }
127
128 @Deactivate
129 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
131 clusterCommunicator.removeSubscriber(GET_CURRENT);
132 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700133 log.info("Stopped");
134 }
135
136 @Override
137 public void prepareForStatistics(FlowRule rule) {
138 ConnectPoint cp = buildConnectPoint(rule);
139 if (cp == null) {
140 return;
141 }
142 InternalStatisticRepresentation rep;
143 synchronized (representations) {
144 rep = getOrCreateRepresentation(cp);
145 }
146 rep.prepare();
147 }
148
149 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700150 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700151 ConnectPoint cp = buildConnectPoint(rule);
152 if (cp == null) {
153 return;
154 }
155 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500156 if (rep != null && rep.remove(rule)) {
157 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700158 }
alshabibf6c2ede2014-10-22 23:31:50 -0700159 Set<FlowEntry> values = current.get(cp);
160 if (values != null) {
161 values.remove(rule);
162 }
163 values = previous.get(cp);
164 if (values != null) {
165 values.remove(rule);
166 }
167
alshabib3d643ec2014-10-22 18:33:00 -0700168 }
169
170 @Override
171 public void addOrUpdateStatistic(FlowEntry rule) {
172 ConnectPoint cp = buildConnectPoint(rule);
173 if (cp == null) {
174 return;
175 }
176 InternalStatisticRepresentation rep = representations.get(cp);
177 if (rep != null && rep.submit(rule)) {
178 updatePublishedStats(cp, rep.get());
179 }
180 }
181
182 private synchronized void updatePublishedStats(ConnectPoint cp,
183 Set<FlowEntry> flowEntries) {
184 Set<FlowEntry> curr = current.get(cp);
185 if (curr == null) {
186 curr = new HashSet<>();
187 }
188 previous.put(cp, curr);
189 current.put(cp, flowEntries);
190
191 }
192
193 @Override
194 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700195 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700196 NodeId master = mastershipService.getMasterFor(deviceId);
197 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700198 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800199 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700200 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700201 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700202 return getCurrentStatisticInternal(connectPoint);
203 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700204 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
205 connectPoint,
206 GET_CURRENT,
207 SERIALIZER::encode,
208 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700209 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700210 STATISTIC_STORE_TIMEOUT_MILLIS,
211 TimeUnit.MILLISECONDS,
212 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700213 }
214
215 }
216
217 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
218 return current.get(connectPoint);
219 }
220
221 @Override
222 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700223 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700224 NodeId master = mastershipService.getMasterFor(deviceId);
225 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700226 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800227 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700228 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700229 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700230 return getPreviousStatisticInternal(connectPoint);
231 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700232 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
233 connectPoint,
234 GET_PREVIOUS,
235 SERIALIZER::encode,
236 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700237 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700238 STATISTIC_STORE_TIMEOUT_MILLIS,
239 TimeUnit.MILLISECONDS,
240 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700241 }
alshabib3d643ec2014-10-22 18:33:00 -0700242 }
243
244 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
245 return previous.get(connectPoint);
246 }
247
248 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
249
250 if (representations.containsKey(cp)) {
251 return representations.get(cp);
252 } else {
253 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
254 representations.put(cp, rep);
255 return rep;
256 }
257
258 }
259
260 private ConnectPoint buildConnectPoint(FlowRule rule) {
261 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800262
alshabib3d643ec2014-10-22 18:33:00 -0700263 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700264 return null;
265 }
266 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
267 return cp;
268 }
269
270 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700271 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700272 if (i.type() == Instruction.Type.OUTPUT) {
273 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
274 return out.port();
275 }
276 if (i.type() == Instruction.Type.DROP) {
277 return PortNumber.P0;
278 }
279 }
280 return null;
281 }
282
283 private class InternalStatisticRepresentation {
284
285 private final AtomicInteger counter = new AtomicInteger(0);
286 private final Set<FlowEntry> rules = new HashSet<>();
287
288 public void prepare() {
289 counter.incrementAndGet();
290 }
291
alshabib9c57bdd2014-11-28 19:14:06 -0500292 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700293 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500294 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700295 }
296
297 public synchronized boolean submit(FlowEntry rule) {
298 if (rules.contains(rule)) {
299 rules.remove(rule);
300 }
301 rules.add(rule);
302 if (counter.get() == 0) {
303 return true;
304 } else {
305 return counter.decrementAndGet() == 0;
306 }
307 }
308
309 public synchronized Set<FlowEntry> get() {
310 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700311 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700312 }
313
314
315 }
316
317}