blob: 21371c9c47ea990dab508db368797b94d7574bcf [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
Madan Jampani78be2492016-06-03 23:27:07 -070020
Madan Jampanic27b6b22016-02-05 11:36:31 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Madan Jampanic27b6b22016-02-05 11:36:31 -080029import org.onlab.util.Tools;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.statistic.FlowStatisticStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070042import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic27b6b22016-02-05 11:36:31 -080043import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070044import org.onosproject.store.serializers.StoreSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090045import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080046import org.slf4j.Logger;
47
48import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080050import java.util.HashSet;
51import java.util.Map;
52import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090053import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080054import java.util.Set;
55import java.util.concurrent.ConcurrentHashMap;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.TimeUnit;
59
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070062import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090063import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080064import static org.onlab.util.Tools.groupedThreads;
Madan Jampanic27b6b22016-02-05 11:36:31 -080065import static org.slf4j.LoggerFactory.getLogger;
66
67/**
68 * Maintains flow statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
71@Component(immediate = true)
72@Service
73public class DistributedFlowStatisticStore implements FlowStatisticStore {
74 private final Logger log = getLogger(getClass());
75
sangyun-hanad84e0c2016-02-19 18:30:03 +090076 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected MastershipService mastershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterCommunicationService clusterCommunicator;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 private Map<ConnectPoint, Set<FlowEntry>> previous =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> current =
91 new ConcurrentHashMap<>();
92
Madan Jampani78be2492016-06-03 23:27:07 -070093 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
94 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
95
HIGUCHI Yutae7290652016-05-18 11:29:01 -070096 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -080097
98 private NodeId local;
99 private ExecutorService messageHandlingExecutor;
100
sangyun-hanad84e0c2016-02-19 18:30:03 +0900101 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
102 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
103 label = "Size of thread pool to assign message handler")
104 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
105
106
Madan Jampanic27b6b22016-02-05 11:36:31 -0800107 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
108
109 @Activate
110 public void activate() {
111 local = clusterService.getLocalNode().id();
112
113 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900114 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800115 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800116
117 clusterCommunicator.addSubscriber(
118 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
119 messageHandlingExecutor);
120
121 clusterCommunicator.addSubscriber(
122 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
123 messageHandlingExecutor);
124
125 log.info("Started");
126 }
127
128 @Deactivate
129 public void deactivate() {
130 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
131 clusterCommunicator.removeSubscriber(GET_CURRENT);
132 messageHandlingExecutor.shutdown();
133 log.info("Stopped");
134 }
135
sangyun-hanad84e0c2016-02-19 18:30:03 +0900136 @Modified
137 public void modified(ComponentContext context) {
138 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
139
140 int newMessageHandlerThreadPoolSize;
141
142 try {
143 String s = get(properties, "messageHandlerThreadPoolSize");
144
145 newMessageHandlerThreadPoolSize =
146 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
147
148 } catch (NumberFormatException e) {
149 log.warn(e.getMessage());
150 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
151 }
152
153 // Any change in the following parameters implies thread pool restart
154 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
155 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
156 restartMessageHandlerThreadPool();
157 }
158
159 log.info(FORMAT, messageHandlerThreadPoolSize);
160 }
161
Madan Jampanic27b6b22016-02-05 11:36:31 -0800162 @Override
163 public synchronized void removeFlowStatistic(FlowRule rule) {
164 ConnectPoint cp = buildConnectPoint(rule);
165 if (cp == null) {
166 return;
167 }
168
169 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900170 current.computeIfPresent(cp, (c, e) -> {
171 e.remove(rule);
172 return e;
173 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800174
175 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900176 previous.computeIfPresent(cp, (c, e) -> {
177 e.remove(rule);
178 return e;
179 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800180 }
181
182 @Override
183 public synchronized void addFlowStatistic(FlowEntry rule) {
184 ConnectPoint cp = buildConnectPoint(rule);
185 if (cp == null) {
186 return;
187 }
188
189 // create one if absent and add this rule
190 current.putIfAbsent(cp, new HashSet<>());
191 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
192
193 // remove previous one if present
194 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
195 }
196
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800197 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800198 public synchronized void updateFlowStatistic(FlowEntry rule) {
199 ConnectPoint cp = buildConnectPoint(rule);
200 if (cp == null) {
201 return;
202 }
203
204 Set<FlowEntry> curr = current.get(cp);
205 if (curr == null) {
206 addFlowStatistic(rule);
207 } else {
208 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
209 findAny();
210 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
211 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
212 " Invalid Flow Update! Will be removed!!" +
213 " curr flowId=" + Long.toHexString(rule.id().value()) +
214 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
215 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
216 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
217 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
218 // something is wrong! invalid flow entry, so delete it
219 removeFlowStatistic(rule);
220 return;
221 }
222 Set<FlowEntry> prev = previous.get(cp);
223 if (prev == null) {
224 prev = new HashSet<>();
225 previous.put(cp, prev);
226 }
227
228 // previous one is exist
229 if (f.isPresent()) {
230 // remove old one and add new one
231 prev.remove(rule);
232 if (!prev.add(f.get())) {
233 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
234 " flowId={}, add failed into previous.",
235 Long.toHexString(rule.id().value()));
236 }
237 }
238
239 // remove old one and add new one
240 curr.remove(rule);
241 if (!curr.add(rule)) {
242 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
243 " flowId={}, add failed into current.",
244 Long.toHexString(rule.id().value()));
245 }
246 }
247 }
248
249 @Override
250 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
251 final DeviceId deviceId = connectPoint.deviceId();
252
253 NodeId master = mastershipService.getMasterFor(deviceId);
254 if (master == null) {
255 log.warn("No master for {}", deviceId);
256 return Collections.emptySet();
257 }
258
259 if (Objects.equal(local, master)) {
260 return getCurrentStatisticInternal(connectPoint);
261 } else {
262 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
263 connectPoint,
264 GET_CURRENT,
265 SERIALIZER::encode,
266 SERIALIZER::decode,
267 master),
268 STATISTIC_STORE_TIMEOUT_MILLIS,
269 TimeUnit.MILLISECONDS,
270 Collections.emptySet());
271 }
272 }
273
274 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
275 return current.get(connectPoint);
276 }
277
278 @Override
279 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
280 final DeviceId deviceId = connectPoint.deviceId();
281
282 NodeId master = mastershipService.getMasterFor(deviceId);
283 if (master == null) {
284 log.warn("No master for {}", deviceId);
285 return Collections.emptySet();
286 }
287
288 if (Objects.equal(local, master)) {
289 return getPreviousStatisticInternal(connectPoint);
290 } else {
291 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
292 connectPoint,
293 GET_PREVIOUS,
294 SERIALIZER::encode,
295 SERIALIZER::decode,
296 master),
297 STATISTIC_STORE_TIMEOUT_MILLIS,
298 TimeUnit.MILLISECONDS,
299 Collections.emptySet());
300 }
301 }
302
303 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
304 return previous.get(connectPoint);
305 }
306
307 private ConnectPoint buildConnectPoint(FlowRule rule) {
308 PortNumber port = getOutput(rule);
309
310 if (port == null) {
311 return null;
312 }
313 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
314 return cp;
315 }
316
317 private PortNumber getOutput(FlowRule rule) {
318 for (Instruction i : rule.treatment().allInstructions()) {
319 if (i.type() == Instruction.Type.OUTPUT) {
320 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
321 return out.port();
322 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800323 }
324 return null;
325 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900326
327 /**
328 * Sets thread pool size of message handler.
329 *
330 * @param poolSize
331 */
332 private void setMessageHandlerThreadPoolSize(int poolSize) {
333 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
334 messageHandlerThreadPoolSize = poolSize;
335 }
336
337 /**
338 * Restarts thread pool of message handler.
339 */
340 private void restartMessageHandlerThreadPool() {
341 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700342 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
343 groupedThreads("DistFlowStats", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900344 prevExecutor.shutdown();
345 }
346
347 /**
348 * Gets current thread pool size of message handler.
349 *
350 * @return messageHandlerThreadPoolSize
351 */
352 private int getMessageHandlerThreadPoolSize() {
353 return messageHandlerThreadPoolSize;
354 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800355}