blob: 6f57f19bcbbe5d978ff2dcac0311a81e7b50762d [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
Madan Jampanic27b6b22016-02-05 11:36:31 -080020import org.onlab.util.Tools;
sangyun-han9f0af2d2016-08-04 13:04:59 +090021import org.onosproject.cfg.ComponentConfigService;
Madan Jampanic27b6b22016-02-05 11:36:31 -080022import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.mastership.MastershipService;
25import org.onosproject.net.ConnectPoint;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.PortNumber;
28import org.onosproject.net.flow.FlowEntry;
29import org.onosproject.net.flow.FlowRule;
30import org.onosproject.net.flow.instructions.Instruction;
31import org.onosproject.net.flow.instructions.Instructions;
32import org.onosproject.net.statistic.FlowStatisticStore;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070034import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic27b6b22016-02-05 11:36:31 -080035import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070036import org.onosproject.store.service.Serializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090037import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070038import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Modified;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
Madan Jampanic27b6b22016-02-05 11:36:31 -080044import org.slf4j.Logger;
45
46import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090047import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080048import java.util.HashSet;
49import java.util.Map;
50import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090051import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080052import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.TimeUnit;
57
sangyun-hanad84e0c2016-02-19 18:30:03 +090058import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070060import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090061import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080062import static org.onlab.util.Tools.groupedThreads;
Madan Jampanic27b6b22016-02-05 11:36:31 -080063import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Maintains flow statistics using RPC calls to collect stats from remote instances
67 * on demand.
68 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069@Component(immediate = true, service = FlowStatisticStore.class)
Madan Jampanic27b6b22016-02-05 11:36:31 -080070public class DistributedFlowStatisticStore implements FlowStatisticStore {
71 private final Logger log = getLogger(getClass());
72
sangyun-hanad84e0c2016-02-19 18:30:03 +090073 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080074
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080076 protected MastershipService mastershipService;
77
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080079 protected ClusterCommunicationService clusterCommunicator;
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080082 protected ClusterService clusterService;
83
Ray Milkeyd84f89b2018-08-17 14:54:17 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY)
sangyun-han9f0af2d2016-08-04 13:04:59 +090085 protected ComponentConfigService cfgService;
86
Madan Jampanic27b6b22016-02-05 11:36:31 -080087 private Map<ConnectPoint, Set<FlowEntry>> previous =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> current =
91 new ConcurrentHashMap<>();
92
Madan Jampani78be2492016-06-03 23:27:07 -070093 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
94 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
95
Jordan Halterman2c83a102017-08-20 17:11:41 -070096 protected static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -080097
98 private NodeId local;
99 private ExecutorService messageHandlingExecutor;
100
sangyun-hanad84e0c2016-02-19 18:30:03 +0900101 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700102 //@Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
103 // label = "Size of thread pool to assign message handler")
sangyun-hanad84e0c2016-02-19 18:30:03 +0900104 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
105
106
Madan Jampanic27b6b22016-02-05 11:36:31 -0800107 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
108
109 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900110 public void activate(ComponentContext context) {
111 cfgService.registerProperties(getClass());
112
113 modified(context);
114
Madan Jampanic27b6b22016-02-05 11:36:31 -0800115 local = clusterService.getLocalNode().id();
116
117 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900118 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800119 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800120
121 clusterCommunicator.addSubscriber(
122 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
123 messageHandlingExecutor);
124
125 clusterCommunicator.addSubscriber(
126 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
127 messageHandlingExecutor);
128
129 log.info("Started");
130 }
131
132 @Deactivate
133 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900134 cfgService.unregisterProperties(getClass(), false);
Madan Jampanic27b6b22016-02-05 11:36:31 -0800135 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
136 clusterCommunicator.removeSubscriber(GET_CURRENT);
137 messageHandlingExecutor.shutdown();
138 log.info("Stopped");
139 }
140
sangyun-hanad84e0c2016-02-19 18:30:03 +0900141 @Modified
142 public void modified(ComponentContext context) {
143 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
144
145 int newMessageHandlerThreadPoolSize;
146
147 try {
148 String s = get(properties, "messageHandlerThreadPoolSize");
149
150 newMessageHandlerThreadPoolSize =
151 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
152
153 } catch (NumberFormatException e) {
154 log.warn(e.getMessage());
155 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
156 }
157
158 // Any change in the following parameters implies thread pool restart
159 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
160 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
161 restartMessageHandlerThreadPool();
162 }
163
164 log.info(FORMAT, messageHandlerThreadPoolSize);
165 }
166
Madan Jampanic27b6b22016-02-05 11:36:31 -0800167 @Override
168 public synchronized void removeFlowStatistic(FlowRule rule) {
169 ConnectPoint cp = buildConnectPoint(rule);
170 if (cp == null) {
171 return;
172 }
173
174 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900175 current.computeIfPresent(cp, (c, e) -> {
176 e.remove(rule);
177 return e;
178 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800179
180 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900181 previous.computeIfPresent(cp, (c, e) -> {
182 e.remove(rule);
183 return e;
184 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800185 }
186
187 @Override
188 public synchronized void addFlowStatistic(FlowEntry rule) {
189 ConnectPoint cp = buildConnectPoint(rule);
190 if (cp == null) {
191 return;
192 }
193
194 // create one if absent and add this rule
195 current.putIfAbsent(cp, new HashSet<>());
Ray Milkey88cc3432017-03-30 17:19:08 -0700196 current.computeIfPresent(cp, (c, e) -> {
197 e.add(rule); return e;
198 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800199
200 // remove previous one if present
Ray Milkey88cc3432017-03-30 17:19:08 -0700201 previous.computeIfPresent(cp, (c, e) -> {
202 e.remove(rule); return e;
203 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800204 }
205
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800206 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800207 public synchronized void updateFlowStatistic(FlowEntry rule) {
208 ConnectPoint cp = buildConnectPoint(rule);
209 if (cp == null) {
210 return;
211 }
212
213 Set<FlowEntry> curr = current.get(cp);
214 if (curr == null) {
215 addFlowStatistic(rule);
216 } else {
217 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
218 findAny();
219 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
220 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
221 " Invalid Flow Update! Will be removed!!" +
222 " curr flowId=" + Long.toHexString(rule.id().value()) +
223 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
224 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
225 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
226 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
227 // something is wrong! invalid flow entry, so delete it
228 removeFlowStatistic(rule);
229 return;
230 }
231 Set<FlowEntry> prev = previous.get(cp);
232 if (prev == null) {
233 prev = new HashSet<>();
234 previous.put(cp, prev);
235 }
236
237 // previous one is exist
238 if (f.isPresent()) {
239 // remove old one and add new one
240 prev.remove(rule);
241 if (!prev.add(f.get())) {
242 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
243 " flowId={}, add failed into previous.",
244 Long.toHexString(rule.id().value()));
245 }
246 }
247
248 // remove old one and add new one
249 curr.remove(rule);
250 if (!curr.add(rule)) {
251 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
252 " flowId={}, add failed into current.",
253 Long.toHexString(rule.id().value()));
254 }
255 }
256 }
257
258 @Override
259 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
260 final DeviceId deviceId = connectPoint.deviceId();
261
262 NodeId master = mastershipService.getMasterFor(deviceId);
263 if (master == null) {
264 log.warn("No master for {}", deviceId);
265 return Collections.emptySet();
266 }
267
268 if (Objects.equal(local, master)) {
269 return getCurrentStatisticInternal(connectPoint);
270 } else {
271 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
272 connectPoint,
273 GET_CURRENT,
274 SERIALIZER::encode,
275 SERIALIZER::decode,
276 master),
277 STATISTIC_STORE_TIMEOUT_MILLIS,
278 TimeUnit.MILLISECONDS,
279 Collections.emptySet());
280 }
281 }
282
283 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
284 return current.get(connectPoint);
285 }
286
287 @Override
288 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
289 final DeviceId deviceId = connectPoint.deviceId();
290
291 NodeId master = mastershipService.getMasterFor(deviceId);
292 if (master == null) {
293 log.warn("No master for {}", deviceId);
294 return Collections.emptySet();
295 }
296
297 if (Objects.equal(local, master)) {
298 return getPreviousStatisticInternal(connectPoint);
299 } else {
300 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
301 connectPoint,
302 GET_PREVIOUS,
303 SERIALIZER::encode,
304 SERIALIZER::decode,
305 master),
306 STATISTIC_STORE_TIMEOUT_MILLIS,
307 TimeUnit.MILLISECONDS,
308 Collections.emptySet());
309 }
310 }
311
312 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
313 return previous.get(connectPoint);
314 }
315
316 private ConnectPoint buildConnectPoint(FlowRule rule) {
317 PortNumber port = getOutput(rule);
318
319 if (port == null) {
320 return null;
321 }
322 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
323 return cp;
324 }
325
326 private PortNumber getOutput(FlowRule rule) {
327 for (Instruction i : rule.treatment().allInstructions()) {
328 if (i.type() == Instruction.Type.OUTPUT) {
329 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
330 return out.port();
331 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800332 }
333 return null;
334 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900335
336 /**
337 * Sets thread pool size of message handler.
338 *
339 * @param poolSize
340 */
341 private void setMessageHandlerThreadPoolSize(int poolSize) {
342 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
343 messageHandlerThreadPoolSize = poolSize;
344 }
345
346 /**
347 * Restarts thread pool of message handler.
348 */
349 private void restartMessageHandlerThreadPool() {
350 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700351 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
352 groupedThreads("DistFlowStats", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900353 prevExecutor.shutdown();
354 }
355
356 /**
357 * Gets current thread pool size of message handler.
358 *
359 * @return messageHandlerThreadPoolSize
360 */
361 private int getMessageHandlerThreadPoolSize() {
362 return messageHandlerThreadPoolSize;
363 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800364}