blob: dd28810063ff359b12c246821b9c6a0fc3bd1b79 [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onlab.util.Tools;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.statistic.FlowStatisticStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090044import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080045import org.slf4j.Logger;
46
47import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090048import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080049import java.util.HashSet;
50import java.util.Map;
51import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080053import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.TimeUnit;
58
sangyun-hanad84e0c2016-02-19 18:30:03 +090059import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Strings.isNullOrEmpty;
61import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080062import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
64import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
65import static org.slf4j.LoggerFactory.getLogger;
66
67/**
68 * Maintains flow statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
71@Component(immediate = true)
72@Service
73public class DistributedFlowStatisticStore implements FlowStatisticStore {
74 private final Logger log = getLogger(getClass());
75
sangyun-hanad84e0c2016-02-19 18:30:03 +090076 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080077
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected MastershipService mastershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterCommunicationService clusterCommunicator;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 private Map<ConnectPoint, Set<FlowEntry>> previous =
88 new ConcurrentHashMap<>();
89
90 private Map<ConnectPoint, Set<FlowEntry>> current =
91 new ConcurrentHashMap<>();
92
93 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
94 @Override
95 protected void setupKryoPool() {
96 serializerPool = KryoNamespace.newBuilder()
97 .register(KryoNamespaces.API)
98 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
99 // register this store specific classes here
100 .build();
101 }
102 };
103
104 private NodeId local;
105 private ExecutorService messageHandlingExecutor;
106
sangyun-hanad84e0c2016-02-19 18:30:03 +0900107 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
108 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
109 label = "Size of thread pool to assign message handler")
110 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
111
112
Madan Jampanic27b6b22016-02-05 11:36:31 -0800113 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
114
115 @Activate
116 public void activate() {
117 local = clusterService.getLocalNode().id();
118
119 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900120 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800121 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800122
123 clusterCommunicator.addSubscriber(
124 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
125 messageHandlingExecutor);
126
127 clusterCommunicator.addSubscriber(
128 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
129 messageHandlingExecutor);
130
131 log.info("Started");
132 }
133
134 @Deactivate
135 public void deactivate() {
136 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
137 clusterCommunicator.removeSubscriber(GET_CURRENT);
138 messageHandlingExecutor.shutdown();
139 log.info("Stopped");
140 }
141
sangyun-hanad84e0c2016-02-19 18:30:03 +0900142 @Modified
143 public void modified(ComponentContext context) {
144 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
145
146 int newMessageHandlerThreadPoolSize;
147
148 try {
149 String s = get(properties, "messageHandlerThreadPoolSize");
150
151 newMessageHandlerThreadPoolSize =
152 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
153
154 } catch (NumberFormatException e) {
155 log.warn(e.getMessage());
156 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
157 }
158
159 // Any change in the following parameters implies thread pool restart
160 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
161 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
162 restartMessageHandlerThreadPool();
163 }
164
165 log.info(FORMAT, messageHandlerThreadPoolSize);
166 }
167
Madan Jampanic27b6b22016-02-05 11:36:31 -0800168 @Override
169 public synchronized void removeFlowStatistic(FlowRule rule) {
170 ConnectPoint cp = buildConnectPoint(rule);
171 if (cp == null) {
172 return;
173 }
174
175 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900176 current.computeIfPresent(cp, (c, e) -> {
177 e.remove(rule);
178 return e;
179 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800180
181 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900182 previous.computeIfPresent(cp, (c, e) -> {
183 e.remove(rule);
184 return e;
185 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800186 }
187
188 @Override
189 public synchronized void addFlowStatistic(FlowEntry rule) {
190 ConnectPoint cp = buildConnectPoint(rule);
191 if (cp == null) {
192 return;
193 }
194
195 // create one if absent and add this rule
196 current.putIfAbsent(cp, new HashSet<>());
197 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
198
199 // remove previous one if present
200 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
201 }
202
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800203 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800204 public synchronized void updateFlowStatistic(FlowEntry rule) {
205 ConnectPoint cp = buildConnectPoint(rule);
206 if (cp == null) {
207 return;
208 }
209
210 Set<FlowEntry> curr = current.get(cp);
211 if (curr == null) {
212 addFlowStatistic(rule);
213 } else {
214 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
215 findAny();
216 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
217 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
218 " Invalid Flow Update! Will be removed!!" +
219 " curr flowId=" + Long.toHexString(rule.id().value()) +
220 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
221 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
222 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
223 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
224 // something is wrong! invalid flow entry, so delete it
225 removeFlowStatistic(rule);
226 return;
227 }
228 Set<FlowEntry> prev = previous.get(cp);
229 if (prev == null) {
230 prev = new HashSet<>();
231 previous.put(cp, prev);
232 }
233
234 // previous one is exist
235 if (f.isPresent()) {
236 // remove old one and add new one
237 prev.remove(rule);
238 if (!prev.add(f.get())) {
239 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
240 " flowId={}, add failed into previous.",
241 Long.toHexString(rule.id().value()));
242 }
243 }
244
245 // remove old one and add new one
246 curr.remove(rule);
247 if (!curr.add(rule)) {
248 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
249 " flowId={}, add failed into current.",
250 Long.toHexString(rule.id().value()));
251 }
252 }
253 }
254
255 @Override
256 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
257 final DeviceId deviceId = connectPoint.deviceId();
258
259 NodeId master = mastershipService.getMasterFor(deviceId);
260 if (master == null) {
261 log.warn("No master for {}", deviceId);
262 return Collections.emptySet();
263 }
264
265 if (Objects.equal(local, master)) {
266 return getCurrentStatisticInternal(connectPoint);
267 } else {
268 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
269 connectPoint,
270 GET_CURRENT,
271 SERIALIZER::encode,
272 SERIALIZER::decode,
273 master),
274 STATISTIC_STORE_TIMEOUT_MILLIS,
275 TimeUnit.MILLISECONDS,
276 Collections.emptySet());
277 }
278 }
279
280 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
281 return current.get(connectPoint);
282 }
283
284 @Override
285 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
286 final DeviceId deviceId = connectPoint.deviceId();
287
288 NodeId master = mastershipService.getMasterFor(deviceId);
289 if (master == null) {
290 log.warn("No master for {}", deviceId);
291 return Collections.emptySet();
292 }
293
294 if (Objects.equal(local, master)) {
295 return getPreviousStatisticInternal(connectPoint);
296 } else {
297 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
298 connectPoint,
299 GET_PREVIOUS,
300 SERIALIZER::encode,
301 SERIALIZER::decode,
302 master),
303 STATISTIC_STORE_TIMEOUT_MILLIS,
304 TimeUnit.MILLISECONDS,
305 Collections.emptySet());
306 }
307 }
308
309 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
310 return previous.get(connectPoint);
311 }
312
313 private ConnectPoint buildConnectPoint(FlowRule rule) {
314 PortNumber port = getOutput(rule);
315
316 if (port == null) {
317 return null;
318 }
319 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
320 return cp;
321 }
322
323 private PortNumber getOutput(FlowRule rule) {
324 for (Instruction i : rule.treatment().allInstructions()) {
325 if (i.type() == Instruction.Type.OUTPUT) {
326 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
327 return out.port();
328 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800329 }
330 return null;
331 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900332
333 /**
334 * Sets thread pool size of message handler.
335 *
336 * @param poolSize
337 */
338 private void setMessageHandlerThreadPoolSize(int poolSize) {
339 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
340 messageHandlerThreadPoolSize = poolSize;
341 }
342
343 /**
344 * Restarts thread pool of message handler.
345 */
346 private void restartMessageHandlerThreadPool() {
347 ExecutorService prevExecutor = messageHandlingExecutor;
348 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
349 prevExecutor.shutdown();
350 }
351
352 /**
353 * Gets current thread pool size of message handler.
354 *
355 * @return messageHandlerThreadPoolSize
356 */
357 private int getMessageHandlerThreadPoolSize() {
358 return messageHandlerThreadPoolSize;
359 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800360}