blob: cfbcec8a979e5e115a4c80b002eca497cc85aff5 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
alshabib346b5b32015-03-06 00:42:16 -080026import org.onlab.util.KryoNamespace;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070027import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.DeviceId;
31import org.onosproject.net.PortNumber;
32import org.onosproject.net.flow.FlowEntry;
33import org.onosproject.net.flow.FlowRule;
34import org.onosproject.net.flow.instructions.Instruction;
35import org.onosproject.net.flow.instructions.Instructions;
36import org.onosproject.net.statistic.StatisticStore;
37import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.ClusterMessage;
39import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
40import org.onosproject.store.flow.ReplicaInfo;
41import org.onosproject.store.flow.ReplicaInfoService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
alshabib3d643ec2014-10-22 18:33:00 -070044import org.slf4j.Logger;
45
alshabib9c57bdd2014-11-28 19:14:06 -050046import java.util.Collections;
alshabib3d643ec2014-10-22 18:33:00 -070047import java.util.HashSet;
48import java.util.Map;
49import java.util.Set;
50import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080051import java.util.concurrent.ExecutorService;
52import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070053import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070054import java.util.concurrent.atomic.AtomicInteger;
55
Madan Jampani2af244a2015-02-22 13:12:01 -080056import static org.onlab.util.Tools.groupedThreads;
Brian O'Connorabafb502014-12-02 22:26:20 -080057import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
58import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
Thomas Vachuska82041f52014-11-30 22:14:02 -080059import static org.slf4j.LoggerFactory.getLogger;
60
alshabib3d643ec2014-10-22 18:33:00 -070061
62/**
63 * Maintains statistics using RPC calls to collect stats from remote instances
64 * on demand.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedStatisticStore implements StatisticStore {
69
70 private final Logger log = getLogger(getClass());
71
Madan Jampani2af244a2015-02-22 13:12:01 -080072 // TODO: Make configurable.
73 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
74
alshabib3d643ec2014-10-22 18:33:00 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080076 protected ReplicaInfoService replicaInfoManager;
alshabib3d643ec2014-10-22 18:33:00 -070077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080079 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080082 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070083
84 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
85 new ConcurrentHashMap<>();
86
87 private Map<ConnectPoint, Set<FlowEntry>> previous =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> current =
91 new ConcurrentHashMap<>();
92
93 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
94 @Override
95 protected void setupKryoPool() {
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070096 serializerPool = KryoNamespace.newBuilder()
97 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080098 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Yuta HIGUCHI4cf23ce2014-10-22 20:37:13 -070099 // register this store specific classes here
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800100 .build();
alshabib3d643ec2014-10-22 18:33:00 -0700101 }
102 };;
103
Madan Jampani2af244a2015-02-22 13:12:01 -0800104 private ExecutorService messageHandlingExecutor;
105
alshabib3d643ec2014-10-22 18:33:00 -0700106 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
107
108 @Activate
109 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800110
111 messageHandlingExecutor = Executors.newFixedThreadPool(
112 MESSAGE_HANDLER_THREAD_POOL_SIZE,
113 groupedThreads("onos/store/statistic", "message-handlers"));
114
alshabib3d643ec2014-10-22 18:33:00 -0700115 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
116
117 @Override
118 public void handle(ClusterMessage message) {
119 ConnectPoint cp = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700120 message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
alshabib3d643ec2014-10-22 18:33:00 -0700121 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800122 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700123
124 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
125
126 @Override
127 public void handle(ClusterMessage message) {
128 ConnectPoint cp = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700129 message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
alshabib3d643ec2014-10-22 18:33:00 -0700130 }
Madan Jampani2af244a2015-02-22 13:12:01 -0800131 }, messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700132 log.info("Started");
133 }
134
135 @Deactivate
136 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800137 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
138 clusterCommunicator.removeSubscriber(GET_CURRENT);
139 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700140 log.info("Stopped");
141 }
142
143 @Override
144 public void prepareForStatistics(FlowRule rule) {
145 ConnectPoint cp = buildConnectPoint(rule);
146 if (cp == null) {
147 return;
148 }
149 InternalStatisticRepresentation rep;
150 synchronized (representations) {
151 rep = getOrCreateRepresentation(cp);
152 }
153 rep.prepare();
154 }
155
156 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700157 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700158 ConnectPoint cp = buildConnectPoint(rule);
159 if (cp == null) {
160 return;
161 }
162 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500163 if (rep != null && rep.remove(rule)) {
164 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700165 }
alshabibf6c2ede2014-10-22 23:31:50 -0700166 Set<FlowEntry> values = current.get(cp);
167 if (values != null) {
168 values.remove(rule);
169 }
170 values = previous.get(cp);
171 if (values != null) {
172 values.remove(rule);
173 }
174
alshabib3d643ec2014-10-22 18:33:00 -0700175 }
176
177 @Override
178 public void addOrUpdateStatistic(FlowEntry rule) {
179 ConnectPoint cp = buildConnectPoint(rule);
180 if (cp == null) {
181 return;
182 }
183 InternalStatisticRepresentation rep = representations.get(cp);
184 if (rep != null && rep.submit(rule)) {
185 updatePublishedStats(cp, rep.get());
186 }
187 }
188
189 private synchronized void updatePublishedStats(ConnectPoint cp,
190 Set<FlowEntry> flowEntries) {
191 Set<FlowEntry> curr = current.get(cp);
192 if (curr == null) {
193 curr = new HashSet<>();
194 }
195 previous.put(cp, curr);
196 current.put(cp, flowEntries);
197
198 }
199
200 @Override
201 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700202 final DeviceId deviceId = connectPoint.deviceId();
203 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
204 if (!replicaInfo.master().isPresent()) {
205 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800206 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700207 }
alshabib3d643ec2014-10-22 18:33:00 -0700208 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
209 return getCurrentStatisticInternal(connectPoint);
210 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700211 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
212 connectPoint,
213 GET_CURRENT,
214 SERIALIZER::encode,
215 SERIALIZER::decode,
216 replicaInfo.master().get()),
217 STATISTIC_STORE_TIMEOUT_MILLIS,
218 TimeUnit.MILLISECONDS,
219 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700220 }
221
222 }
223
224 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
225 return current.get(connectPoint);
226 }
227
228 @Override
229 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700230 final DeviceId deviceId = connectPoint.deviceId();
231 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
232 if (!replicaInfo.master().isPresent()) {
233 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800234 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700235 }
alshabib3d643ec2014-10-22 18:33:00 -0700236 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
237 return getPreviousStatisticInternal(connectPoint);
238 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700239 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
240 connectPoint,
241 GET_PREVIOUS,
242 SERIALIZER::encode,
243 SERIALIZER::decode,
244 replicaInfo.master().get()),
245 STATISTIC_STORE_TIMEOUT_MILLIS,
246 TimeUnit.MILLISECONDS,
247 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700248 }
alshabib3d643ec2014-10-22 18:33:00 -0700249 }
250
251 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
252 return previous.get(connectPoint);
253 }
254
255 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
256
257 if (representations.containsKey(cp)) {
258 return representations.get(cp);
259 } else {
260 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
261 representations.put(cp, rep);
262 return rep;
263 }
264
265 }
266
267 private ConnectPoint buildConnectPoint(FlowRule rule) {
268 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800269
alshabib3d643ec2014-10-22 18:33:00 -0700270 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700271 return null;
272 }
273 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
274 return cp;
275 }
276
277 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700278 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700279 if (i.type() == Instruction.Type.OUTPUT) {
280 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
281 return out.port();
282 }
283 if (i.type() == Instruction.Type.DROP) {
284 return PortNumber.P0;
285 }
286 }
287 return null;
288 }
289
290 private class InternalStatisticRepresentation {
291
292 private final AtomicInteger counter = new AtomicInteger(0);
293 private final Set<FlowEntry> rules = new HashSet<>();
294
295 public void prepare() {
296 counter.incrementAndGet();
297 }
298
alshabib9c57bdd2014-11-28 19:14:06 -0500299 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700300 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500301 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700302 }
303
304 public synchronized boolean submit(FlowEntry rule) {
305 if (rules.contains(rule)) {
306 rules.remove(rule);
307 }
308 rules.add(rule);
309 if (counter.get() == 0) {
310 return true;
311 } else {
312 return counter.decrementAndGet() == 0;
313 }
314 }
315
316 public synchronized Set<FlowEntry> get() {
317 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700318 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700319 }
320
321
322 }
323
324}