blob: 8e59ef684bb7bd3d23543df30da7401bfd6e419c [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onlab.util.Tools;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.statistic.FlowStatisticStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090044import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080045import org.slf4j.Logger;
46
47import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090048import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080049import java.util.HashSet;
50import java.util.Map;
51import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080053import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.TimeUnit;
58
sangyun-hanad84e0c2016-02-19 18:30:03 +090059import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Strings.isNullOrEmpty;
61import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080062import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
64import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
65import static org.slf4j.LoggerFactory.getLogger;
66
67/**
68 * Maintains flow statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
71@Component(immediate = true)
72@Service
73public class DistributedFlowStatisticStore implements FlowStatisticStore {
74 private final Logger log = getLogger(getClass());
75
sangyun-hanad84e0c2016-02-19 18:30:03 +090076 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected MastershipService mastershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterCommunicationService clusterCommunicator;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 private Map<ConnectPoint, Set<FlowEntry>> previous =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> current =
91 new ConcurrentHashMap<>();
92
93 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
94 @Override
95 protected void setupKryoPool() {
96 serializerPool = KryoNamespace.newBuilder()
97 .register(KryoNamespaces.API)
98 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
99 // register this store specific classes here
100 .build();
101 }
102 };
103
104 private NodeId local;
105 private ExecutorService messageHandlingExecutor;
106
sangyun-hanad84e0c2016-02-19 18:30:03 +0900107 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
108 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
109 label = "Size of thread pool to assign message handler")
110 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
111
112
Madan Jampanic27b6b22016-02-05 11:36:31 -0800113 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
114
115 @Activate
116 public void activate() {
117 local = clusterService.getLocalNode().id();
118
119 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900120 messageHandlerThreadPoolSize,
Madan Jampanic27b6b22016-02-05 11:36:31 -0800121 groupedThreads("onos/store/statistic", "message-handlers"));
122
123 clusterCommunicator.addSubscriber(
124 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
125 messageHandlingExecutor);
126
127 clusterCommunicator.addSubscriber(
128 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
129 messageHandlingExecutor);
130
131 log.info("Started");
132 }
133
134 @Deactivate
135 public void deactivate() {
136 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
137 clusterCommunicator.removeSubscriber(GET_CURRENT);
138 messageHandlingExecutor.shutdown();
139 log.info("Stopped");
140 }
141
sangyun-hanad84e0c2016-02-19 18:30:03 +0900142 @Modified
143 public void modified(ComponentContext context) {
144 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
145
146 int newMessageHandlerThreadPoolSize;
147
148 try {
149 String s = get(properties, "messageHandlerThreadPoolSize");
150
151 newMessageHandlerThreadPoolSize =
152 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
153
154 } catch (NumberFormatException e) {
155 log.warn(e.getMessage());
156 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
157 }
158
159 // Any change in the following parameters implies thread pool restart
160 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
161 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
162 restartMessageHandlerThreadPool();
163 }
164
165 log.info(FORMAT, messageHandlerThreadPoolSize);
166 }
167
Madan Jampanic27b6b22016-02-05 11:36:31 -0800168 @Override
169 public synchronized void removeFlowStatistic(FlowRule rule) {
170 ConnectPoint cp = buildConnectPoint(rule);
171 if (cp == null) {
172 return;
173 }
174
175 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900176 current.computeIfPresent(cp, (c, e) -> {
177 e.remove(rule);
178 return e;
179 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800180
181 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900182 previous.computeIfPresent(cp, (c, e) -> {
183 e.remove(rule);
184 return e;
185 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800186 }
187
188 @Override
189 public synchronized void addFlowStatistic(FlowEntry rule) {
190 ConnectPoint cp = buildConnectPoint(rule);
191 if (cp == null) {
192 return;
193 }
194
195 // create one if absent and add this rule
196 current.putIfAbsent(cp, new HashSet<>());
197 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
198
199 // remove previous one if present
200 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
201 }
202
203 public synchronized void updateFlowStatistic(FlowEntry rule) {
204 ConnectPoint cp = buildConnectPoint(rule);
205 if (cp == null) {
206 return;
207 }
208
209 Set<FlowEntry> curr = current.get(cp);
210 if (curr == null) {
211 addFlowStatistic(rule);
212 } else {
213 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
214 findAny();
215 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
216 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
217 " Invalid Flow Update! Will be removed!!" +
218 " curr flowId=" + Long.toHexString(rule.id().value()) +
219 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
220 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
221 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
222 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
223 // something is wrong! invalid flow entry, so delete it
224 removeFlowStatistic(rule);
225 return;
226 }
227 Set<FlowEntry> prev = previous.get(cp);
228 if (prev == null) {
229 prev = new HashSet<>();
230 previous.put(cp, prev);
231 }
232
233 // previous one is exist
234 if (f.isPresent()) {
235 // remove old one and add new one
236 prev.remove(rule);
237 if (!prev.add(f.get())) {
238 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
239 " flowId={}, add failed into previous.",
240 Long.toHexString(rule.id().value()));
241 }
242 }
243
244 // remove old one and add new one
245 curr.remove(rule);
246 if (!curr.add(rule)) {
247 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
248 " flowId={}, add failed into current.",
249 Long.toHexString(rule.id().value()));
250 }
251 }
252 }
253
254 @Override
255 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
256 final DeviceId deviceId = connectPoint.deviceId();
257
258 NodeId master = mastershipService.getMasterFor(deviceId);
259 if (master == null) {
260 log.warn("No master for {}", deviceId);
261 return Collections.emptySet();
262 }
263
264 if (Objects.equal(local, master)) {
265 return getCurrentStatisticInternal(connectPoint);
266 } else {
267 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
268 connectPoint,
269 GET_CURRENT,
270 SERIALIZER::encode,
271 SERIALIZER::decode,
272 master),
273 STATISTIC_STORE_TIMEOUT_MILLIS,
274 TimeUnit.MILLISECONDS,
275 Collections.emptySet());
276 }
277 }
278
279 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
280 return current.get(connectPoint);
281 }
282
283 @Override
284 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
285 final DeviceId deviceId = connectPoint.deviceId();
286
287 NodeId master = mastershipService.getMasterFor(deviceId);
288 if (master == null) {
289 log.warn("No master for {}", deviceId);
290 return Collections.emptySet();
291 }
292
293 if (Objects.equal(local, master)) {
294 return getPreviousStatisticInternal(connectPoint);
295 } else {
296 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
297 connectPoint,
298 GET_PREVIOUS,
299 SERIALIZER::encode,
300 SERIALIZER::decode,
301 master),
302 STATISTIC_STORE_TIMEOUT_MILLIS,
303 TimeUnit.MILLISECONDS,
304 Collections.emptySet());
305 }
306 }
307
308 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
309 return previous.get(connectPoint);
310 }
311
312 private ConnectPoint buildConnectPoint(FlowRule rule) {
313 PortNumber port = getOutput(rule);
314
315 if (port == null) {
316 return null;
317 }
318 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
319 return cp;
320 }
321
322 private PortNumber getOutput(FlowRule rule) {
323 for (Instruction i : rule.treatment().allInstructions()) {
324 if (i.type() == Instruction.Type.OUTPUT) {
325 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
326 return out.port();
327 }
328 if (i.type() == Instruction.Type.DROP) {
329 return PortNumber.P0;
330 }
331 }
332 return null;
333 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900334
335 /**
336 * Sets thread pool size of message handler.
337 *
338 * @param poolSize
339 */
340 private void setMessageHandlerThreadPoolSize(int poolSize) {
341 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
342 messageHandlerThreadPoolSize = poolSize;
343 }
344
345 /**
346 * Restarts thread pool of message handler.
347 */
348 private void restartMessageHandlerThreadPool() {
349 ExecutorService prevExecutor = messageHandlingExecutor;
350 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
351 prevExecutor.shutdown();
352 }
353
354 /**
355 * Gets current thread pool size of message handler.
356 *
357 * @return messageHandlerThreadPoolSize
358 */
359 private int getMessageHandlerThreadPoolSize() {
360 return messageHandlerThreadPoolSize;
361 }
ssyoon90a98825a2015-08-26 00:48:15 +0900362}