blob: 74ab40b1123c7c7a77cc4c63f5822061e8a4fcb0 [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
Madan Jampani78be2492016-06-03 23:27:07 -070020
Madan Jampanic27b6b22016-02-05 11:36:31 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Madan Jampanic27b6b22016-02-05 11:36:31 -080029import org.onlab.util.Tools;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.statistic.FlowStatisticStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070042import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic27b6b22016-02-05 11:36:31 -080043import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070044import org.onosproject.store.serializers.StoreSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090045import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080046import org.slf4j.Logger;
47
48import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080050import java.util.HashSet;
51import java.util.Map;
52import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090053import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080054import java.util.Set;
55import java.util.concurrent.ConcurrentHashMap;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58import java.util.concurrent.TimeUnit;
59
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Strings.isNullOrEmpty;
62import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080063import static org.onlab.util.Tools.groupedThreads;
Madan Jampanic27b6b22016-02-05 11:36:31 -080064import static org.slf4j.LoggerFactory.getLogger;
65
66/**
67 * Maintains flow statistics using RPC calls to collect stats from remote instances
68 * on demand.
69 */
70@Component(immediate = true)
71@Service
72public class DistributedFlowStatisticStore implements FlowStatisticStore {
73 private final Logger log = getLogger(getClass());
74
sangyun-hanad84e0c2016-02-19 18:30:03 +090075 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080076
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected MastershipService mastershipService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterCommunicationService clusterCommunicator;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 private Map<ConnectPoint, Set<FlowEntry>> previous =
87 new ConcurrentHashMap<>();
88
89 private Map<ConnectPoint, Set<FlowEntry>> current =
90 new ConcurrentHashMap<>();
91
Madan Jampani78be2492016-06-03 23:27:07 -070092 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
93 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
94
HIGUCHI Yutae7290652016-05-18 11:29:01 -070095 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -080096
97 private NodeId local;
98 private ExecutorService messageHandlingExecutor;
99
sangyun-hanad84e0c2016-02-19 18:30:03 +0900100 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
101 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
102 label = "Size of thread pool to assign message handler")
103 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
104
105
Madan Jampanic27b6b22016-02-05 11:36:31 -0800106 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
107
108 @Activate
109 public void activate() {
110 local = clusterService.getLocalNode().id();
111
112 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900113 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800114 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800115
116 clusterCommunicator.addSubscriber(
117 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
118 messageHandlingExecutor);
119
120 clusterCommunicator.addSubscriber(
121 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
122 messageHandlingExecutor);
123
124 log.info("Started");
125 }
126
127 @Deactivate
128 public void deactivate() {
129 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
130 clusterCommunicator.removeSubscriber(GET_CURRENT);
131 messageHandlingExecutor.shutdown();
132 log.info("Stopped");
133 }
134
sangyun-hanad84e0c2016-02-19 18:30:03 +0900135 @Modified
136 public void modified(ComponentContext context) {
137 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
138
139 int newMessageHandlerThreadPoolSize;
140
141 try {
142 String s = get(properties, "messageHandlerThreadPoolSize");
143
144 newMessageHandlerThreadPoolSize =
145 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
146
147 } catch (NumberFormatException e) {
148 log.warn(e.getMessage());
149 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
150 }
151
152 // Any change in the following parameters implies thread pool restart
153 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
154 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
155 restartMessageHandlerThreadPool();
156 }
157
158 log.info(FORMAT, messageHandlerThreadPoolSize);
159 }
160
Madan Jampanic27b6b22016-02-05 11:36:31 -0800161 @Override
162 public synchronized void removeFlowStatistic(FlowRule rule) {
163 ConnectPoint cp = buildConnectPoint(rule);
164 if (cp == null) {
165 return;
166 }
167
168 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900169 current.computeIfPresent(cp, (c, e) -> {
170 e.remove(rule);
171 return e;
172 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800173
174 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900175 previous.computeIfPresent(cp, (c, e) -> {
176 e.remove(rule);
177 return e;
178 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800179 }
180
181 @Override
182 public synchronized void addFlowStatistic(FlowEntry rule) {
183 ConnectPoint cp = buildConnectPoint(rule);
184 if (cp == null) {
185 return;
186 }
187
188 // create one if absent and add this rule
189 current.putIfAbsent(cp, new HashSet<>());
190 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
191
192 // remove previous one if present
193 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
194 }
195
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800196 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800197 public synchronized void updateFlowStatistic(FlowEntry rule) {
198 ConnectPoint cp = buildConnectPoint(rule);
199 if (cp == null) {
200 return;
201 }
202
203 Set<FlowEntry> curr = current.get(cp);
204 if (curr == null) {
205 addFlowStatistic(rule);
206 } else {
207 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
208 findAny();
209 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
210 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
211 " Invalid Flow Update! Will be removed!!" +
212 " curr flowId=" + Long.toHexString(rule.id().value()) +
213 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
214 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
215 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
216 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
217 // something is wrong! invalid flow entry, so delete it
218 removeFlowStatistic(rule);
219 return;
220 }
221 Set<FlowEntry> prev = previous.get(cp);
222 if (prev == null) {
223 prev = new HashSet<>();
224 previous.put(cp, prev);
225 }
226
227 // previous one is exist
228 if (f.isPresent()) {
229 // remove old one and add new one
230 prev.remove(rule);
231 if (!prev.add(f.get())) {
232 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
233 " flowId={}, add failed into previous.",
234 Long.toHexString(rule.id().value()));
235 }
236 }
237
238 // remove old one and add new one
239 curr.remove(rule);
240 if (!curr.add(rule)) {
241 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
242 " flowId={}, add failed into current.",
243 Long.toHexString(rule.id().value()));
244 }
245 }
246 }
247
248 @Override
249 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
250 final DeviceId deviceId = connectPoint.deviceId();
251
252 NodeId master = mastershipService.getMasterFor(deviceId);
253 if (master == null) {
254 log.warn("No master for {}", deviceId);
255 return Collections.emptySet();
256 }
257
258 if (Objects.equal(local, master)) {
259 return getCurrentStatisticInternal(connectPoint);
260 } else {
261 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
262 connectPoint,
263 GET_CURRENT,
264 SERIALIZER::encode,
265 SERIALIZER::decode,
266 master),
267 STATISTIC_STORE_TIMEOUT_MILLIS,
268 TimeUnit.MILLISECONDS,
269 Collections.emptySet());
270 }
271 }
272
273 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
274 return current.get(connectPoint);
275 }
276
277 @Override
278 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
279 final DeviceId deviceId = connectPoint.deviceId();
280
281 NodeId master = mastershipService.getMasterFor(deviceId);
282 if (master == null) {
283 log.warn("No master for {}", deviceId);
284 return Collections.emptySet();
285 }
286
287 if (Objects.equal(local, master)) {
288 return getPreviousStatisticInternal(connectPoint);
289 } else {
290 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
291 connectPoint,
292 GET_PREVIOUS,
293 SERIALIZER::encode,
294 SERIALIZER::decode,
295 master),
296 STATISTIC_STORE_TIMEOUT_MILLIS,
297 TimeUnit.MILLISECONDS,
298 Collections.emptySet());
299 }
300 }
301
302 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
303 return previous.get(connectPoint);
304 }
305
306 private ConnectPoint buildConnectPoint(FlowRule rule) {
307 PortNumber port = getOutput(rule);
308
309 if (port == null) {
310 return null;
311 }
312 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
313 return cp;
314 }
315
316 private PortNumber getOutput(FlowRule rule) {
317 for (Instruction i : rule.treatment().allInstructions()) {
318 if (i.type() == Instruction.Type.OUTPUT) {
319 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
320 return out.port();
321 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800322 }
323 return null;
324 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900325
326 /**
327 * Sets thread pool size of message handler.
328 *
329 * @param poolSize
330 */
331 private void setMessageHandlerThreadPoolSize(int poolSize) {
332 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
333 messageHandlerThreadPoolSize = poolSize;
334 }
335
336 /**
337 * Restarts thread pool of message handler.
338 */
339 private void restartMessageHandlerThreadPool() {
340 ExecutorService prevExecutor = messageHandlingExecutor;
341 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
342 prevExecutor.shutdown();
343 }
344
345 /**
346 * Gets current thread pool size of message handler.
347 *
348 * @return messageHandlerThreadPoolSize
349 */
350 private int getMessageHandlerThreadPoolSize() {
351 return messageHandlerThreadPoolSize;
352 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800353}