blob: cc14eba70b92eeda594c10aee3e384ebc1c52808 [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onlab.util.Tools;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.mastership.MastershipService;
31import org.onosproject.net.ConnectPoint;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.flow.FlowEntry;
35import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.instructions.Instruction;
37import org.onosproject.net.flow.instructions.Instructions;
38import org.onosproject.net.statistic.FlowStatisticStore;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
42import org.slf4j.Logger;
43
44import java.util.Collections;
45import java.util.HashSet;
46import java.util.Map;
47import java.util.Optional;
48import java.util.Set;
49import java.util.concurrent.ConcurrentHashMap;
50import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.TimeUnit;
53
54import static org.onlab.util.Tools.groupedThreads;
55import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
56import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Maintains flow statistics using RPC calls to collect stats from remote instances
61 * on demand.
62 */
63@Component(immediate = true)
64@Service
65public class DistributedFlowStatisticStore implements FlowStatisticStore {
66 private final Logger log = getLogger(getClass());
67
68 // TODO: Make configurable.
69 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected MastershipService mastershipService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterCommunicationService clusterCommunicator;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterService clusterService;
79
80 private Map<ConnectPoint, Set<FlowEntry>> previous =
81 new ConcurrentHashMap<>();
82
83 private Map<ConnectPoint, Set<FlowEntry>> current =
84 new ConcurrentHashMap<>();
85
86 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
87 @Override
88 protected void setupKryoPool() {
89 serializerPool = KryoNamespace.newBuilder()
90 .register(KryoNamespaces.API)
91 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
92 // register this store specific classes here
93 .build();
94 }
95 };
96
97 private NodeId local;
98 private ExecutorService messageHandlingExecutor;
99
100 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
101
102 @Activate
103 public void activate() {
104 local = clusterService.getLocalNode().id();
105
106 messageHandlingExecutor = Executors.newFixedThreadPool(
107 MESSAGE_HANDLER_THREAD_POOL_SIZE,
108 groupedThreads("onos/store/statistic", "message-handlers"));
109
110 clusterCommunicator.addSubscriber(
111 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
112 messageHandlingExecutor);
113
114 clusterCommunicator.addSubscriber(
115 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
116 messageHandlingExecutor);
117
118 log.info("Started");
119 }
120
121 @Deactivate
122 public void deactivate() {
123 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
124 clusterCommunicator.removeSubscriber(GET_CURRENT);
125 messageHandlingExecutor.shutdown();
126 log.info("Stopped");
127 }
128
129 @Override
130 public synchronized void removeFlowStatistic(FlowRule rule) {
131 ConnectPoint cp = buildConnectPoint(rule);
132 if (cp == null) {
133 return;
134 }
135
136 // remove this rule if present from current map
137 current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
138
139 // remove this on if present from previous map
140 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
141 }
142
143 @Override
144 public synchronized void addFlowStatistic(FlowEntry rule) {
145 ConnectPoint cp = buildConnectPoint(rule);
146 if (cp == null) {
147 return;
148 }
149
150 // create one if absent and add this rule
151 current.putIfAbsent(cp, new HashSet<>());
152 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
153
154 // remove previous one if present
155 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
156 }
157
158 public synchronized void updateFlowStatistic(FlowEntry rule) {
159 ConnectPoint cp = buildConnectPoint(rule);
160 if (cp == null) {
161 return;
162 }
163
164 Set<FlowEntry> curr = current.get(cp);
165 if (curr == null) {
166 addFlowStatistic(rule);
167 } else {
168 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
169 findAny();
170 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
171 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
172 " Invalid Flow Update! Will be removed!!" +
173 " curr flowId=" + Long.toHexString(rule.id().value()) +
174 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
175 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
176 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
177 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
178 // something is wrong! invalid flow entry, so delete it
179 removeFlowStatistic(rule);
180 return;
181 }
182 Set<FlowEntry> prev = previous.get(cp);
183 if (prev == null) {
184 prev = new HashSet<>();
185 previous.put(cp, prev);
186 }
187
188 // previous one is exist
189 if (f.isPresent()) {
190 // remove old one and add new one
191 prev.remove(rule);
192 if (!prev.add(f.get())) {
193 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
194 " flowId={}, add failed into previous.",
195 Long.toHexString(rule.id().value()));
196 }
197 }
198
199 // remove old one and add new one
200 curr.remove(rule);
201 if (!curr.add(rule)) {
202 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
203 " flowId={}, add failed into current.",
204 Long.toHexString(rule.id().value()));
205 }
206 }
207 }
208
209 @Override
210 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
211 final DeviceId deviceId = connectPoint.deviceId();
212
213 NodeId master = mastershipService.getMasterFor(deviceId);
214 if (master == null) {
215 log.warn("No master for {}", deviceId);
216 return Collections.emptySet();
217 }
218
219 if (Objects.equal(local, master)) {
220 return getCurrentStatisticInternal(connectPoint);
221 } else {
222 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
223 connectPoint,
224 GET_CURRENT,
225 SERIALIZER::encode,
226 SERIALIZER::decode,
227 master),
228 STATISTIC_STORE_TIMEOUT_MILLIS,
229 TimeUnit.MILLISECONDS,
230 Collections.emptySet());
231 }
232 }
233
234 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
235 return current.get(connectPoint);
236 }
237
238 @Override
239 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
240 final DeviceId deviceId = connectPoint.deviceId();
241
242 NodeId master = mastershipService.getMasterFor(deviceId);
243 if (master == null) {
244 log.warn("No master for {}", deviceId);
245 return Collections.emptySet();
246 }
247
248 if (Objects.equal(local, master)) {
249 return getPreviousStatisticInternal(connectPoint);
250 } else {
251 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
252 connectPoint,
253 GET_PREVIOUS,
254 SERIALIZER::encode,
255 SERIALIZER::decode,
256 master),
257 STATISTIC_STORE_TIMEOUT_MILLIS,
258 TimeUnit.MILLISECONDS,
259 Collections.emptySet());
260 }
261 }
262
263 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
264 return previous.get(connectPoint);
265 }
266
267 private ConnectPoint buildConnectPoint(FlowRule rule) {
268 PortNumber port = getOutput(rule);
269
270 if (port == null) {
271 return null;
272 }
273 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
274 return cp;
275 }
276
277 private PortNumber getOutput(FlowRule rule) {
278 for (Instruction i : rule.treatment().allInstructions()) {
279 if (i.type() == Instruction.Type.OUTPUT) {
280 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
281 return out.port();
282 }
283 if (i.type() == Instruction.Type.DROP) {
284 return PortNumber.P0;
285 }
286 }
287 return null;
288 }
ssyoon90a98825a2015-08-26 00:48:15 +0900289}