blob: f8e7c71e9cd5d9c205ae6d0b7244d41b657d370b [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Madan Jampanic27b6b22016-02-05 11:36:31 -080028import org.onlab.util.Tools;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.PortNumber;
35import org.onosproject.net.flow.FlowEntry;
36import org.onosproject.net.flow.FlowRule;
37import org.onosproject.net.flow.instructions.Instruction;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.statistic.FlowStatisticStore;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070042import org.onosproject.store.serializers.StoreSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090043import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080044import org.slf4j.Logger;
45
46import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090047import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080048import java.util.HashSet;
49import java.util.Map;
50import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090051import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080052import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.TimeUnit;
57
sangyun-hanad84e0c2016-02-19 18:30:03 +090058import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Strings.isNullOrEmpty;
60import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080061import static org.onlab.util.Tools.groupedThreads;
62import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
63import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
64import static org.slf4j.LoggerFactory.getLogger;
65
66/**
67 * Maintains flow statistics using RPC calls to collect stats from remote instances
68 * on demand.
69 */
70@Component(immediate = true)
71@Service
72public class DistributedFlowStatisticStore implements FlowStatisticStore {
73 private final Logger log = getLogger(getClass());
74
sangyun-hanad84e0c2016-02-19 18:30:03 +090075 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080076
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected MastershipService mastershipService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterCommunicationService clusterCommunicator;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 private Map<ConnectPoint, Set<FlowEntry>> previous =
87 new ConcurrentHashMap<>();
88
89 private Map<ConnectPoint, Set<FlowEntry>> current =
90 new ConcurrentHashMap<>();
91
HIGUCHI Yutae7290652016-05-18 11:29:01 -070092 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -080093
94 private NodeId local;
95 private ExecutorService messageHandlingExecutor;
96
sangyun-hanad84e0c2016-02-19 18:30:03 +090097 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
98 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
99 label = "Size of thread pool to assign message handler")
100 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
101
102
Madan Jampanic27b6b22016-02-05 11:36:31 -0800103 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
104
105 @Activate
106 public void activate() {
107 local = clusterService.getLocalNode().id();
108
109 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900110 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800111 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800112
113 clusterCommunicator.addSubscriber(
114 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
115 messageHandlingExecutor);
116
117 clusterCommunicator.addSubscriber(
118 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
119 messageHandlingExecutor);
120
121 log.info("Started");
122 }
123
124 @Deactivate
125 public void deactivate() {
126 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
127 clusterCommunicator.removeSubscriber(GET_CURRENT);
128 messageHandlingExecutor.shutdown();
129 log.info("Stopped");
130 }
131
sangyun-hanad84e0c2016-02-19 18:30:03 +0900132 @Modified
133 public void modified(ComponentContext context) {
134 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
135
136 int newMessageHandlerThreadPoolSize;
137
138 try {
139 String s = get(properties, "messageHandlerThreadPoolSize");
140
141 newMessageHandlerThreadPoolSize =
142 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
143
144 } catch (NumberFormatException e) {
145 log.warn(e.getMessage());
146 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
147 }
148
149 // Any change in the following parameters implies thread pool restart
150 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
151 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
152 restartMessageHandlerThreadPool();
153 }
154
155 log.info(FORMAT, messageHandlerThreadPoolSize);
156 }
157
Madan Jampanic27b6b22016-02-05 11:36:31 -0800158 @Override
159 public synchronized void removeFlowStatistic(FlowRule rule) {
160 ConnectPoint cp = buildConnectPoint(rule);
161 if (cp == null) {
162 return;
163 }
164
165 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900166 current.computeIfPresent(cp, (c, e) -> {
167 e.remove(rule);
168 return e;
169 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800170
171 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900172 previous.computeIfPresent(cp, (c, e) -> {
173 e.remove(rule);
174 return e;
175 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800176 }
177
178 @Override
179 public synchronized void addFlowStatistic(FlowEntry rule) {
180 ConnectPoint cp = buildConnectPoint(rule);
181 if (cp == null) {
182 return;
183 }
184
185 // create one if absent and add this rule
186 current.putIfAbsent(cp, new HashSet<>());
187 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
188
189 // remove previous one if present
190 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
191 }
192
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800193 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800194 public synchronized void updateFlowStatistic(FlowEntry rule) {
195 ConnectPoint cp = buildConnectPoint(rule);
196 if (cp == null) {
197 return;
198 }
199
200 Set<FlowEntry> curr = current.get(cp);
201 if (curr == null) {
202 addFlowStatistic(rule);
203 } else {
204 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
205 findAny();
206 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
207 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
208 " Invalid Flow Update! Will be removed!!" +
209 " curr flowId=" + Long.toHexString(rule.id().value()) +
210 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
211 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
212 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
213 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
214 // something is wrong! invalid flow entry, so delete it
215 removeFlowStatistic(rule);
216 return;
217 }
218 Set<FlowEntry> prev = previous.get(cp);
219 if (prev == null) {
220 prev = new HashSet<>();
221 previous.put(cp, prev);
222 }
223
224 // previous one is exist
225 if (f.isPresent()) {
226 // remove old one and add new one
227 prev.remove(rule);
228 if (!prev.add(f.get())) {
229 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
230 " flowId={}, add failed into previous.",
231 Long.toHexString(rule.id().value()));
232 }
233 }
234
235 // remove old one and add new one
236 curr.remove(rule);
237 if (!curr.add(rule)) {
238 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
239 " flowId={}, add failed into current.",
240 Long.toHexString(rule.id().value()));
241 }
242 }
243 }
244
245 @Override
246 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
247 final DeviceId deviceId = connectPoint.deviceId();
248
249 NodeId master = mastershipService.getMasterFor(deviceId);
250 if (master == null) {
251 log.warn("No master for {}", deviceId);
252 return Collections.emptySet();
253 }
254
255 if (Objects.equal(local, master)) {
256 return getCurrentStatisticInternal(connectPoint);
257 } else {
258 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
259 connectPoint,
260 GET_CURRENT,
261 SERIALIZER::encode,
262 SERIALIZER::decode,
263 master),
264 STATISTIC_STORE_TIMEOUT_MILLIS,
265 TimeUnit.MILLISECONDS,
266 Collections.emptySet());
267 }
268 }
269
270 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
271 return current.get(connectPoint);
272 }
273
274 @Override
275 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
276 final DeviceId deviceId = connectPoint.deviceId();
277
278 NodeId master = mastershipService.getMasterFor(deviceId);
279 if (master == null) {
280 log.warn("No master for {}", deviceId);
281 return Collections.emptySet();
282 }
283
284 if (Objects.equal(local, master)) {
285 return getPreviousStatisticInternal(connectPoint);
286 } else {
287 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
288 connectPoint,
289 GET_PREVIOUS,
290 SERIALIZER::encode,
291 SERIALIZER::decode,
292 master),
293 STATISTIC_STORE_TIMEOUT_MILLIS,
294 TimeUnit.MILLISECONDS,
295 Collections.emptySet());
296 }
297 }
298
299 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
300 return previous.get(connectPoint);
301 }
302
303 private ConnectPoint buildConnectPoint(FlowRule rule) {
304 PortNumber port = getOutput(rule);
305
306 if (port == null) {
307 return null;
308 }
309 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
310 return cp;
311 }
312
313 private PortNumber getOutput(FlowRule rule) {
314 for (Instruction i : rule.treatment().allInstructions()) {
315 if (i.type() == Instruction.Type.OUTPUT) {
316 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
317 return out.port();
318 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800319 }
320 return null;
321 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900322
323 /**
324 * Sets thread pool size of message handler.
325 *
326 * @param poolSize
327 */
328 private void setMessageHandlerThreadPoolSize(int poolSize) {
329 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
330 messageHandlerThreadPoolSize = poolSize;
331 }
332
333 /**
334 * Restarts thread pool of message handler.
335 */
336 private void restartMessageHandlerThreadPool() {
337 ExecutorService prevExecutor = messageHandlingExecutor;
338 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
339 prevExecutor.shutdown();
340 }
341
342 /**
343 * Gets current thread pool size of message handler.
344 *
345 * @return messageHandlerThreadPoolSize
346 */
347 private int getMessageHandlerThreadPoolSize() {
348 return messageHandlerThreadPoolSize;
349 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800350}