blob: 40c3e2dab2c4331113c6aff669b1eda10c9b8d81 [file] [log] [blame]
ssyoon90a98825a2015-08-26 00:48:15 +09001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onlab.util.Tools;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.mastership.MastershipService;
31import org.onosproject.net.ConnectPoint;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.flow.FlowEntry;
35import org.onosproject.net.flow.FlowRule;
36import org.onosproject.net.flow.instructions.Instruction;
37import org.onosproject.net.flow.instructions.Instructions;
38import org.onosproject.net.statistic.FlowStatisticStore;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
42import org.slf4j.Logger;
43
44import java.util.Collections;
45import java.util.HashSet;
46import java.util.Map;
47import java.util.Set;
48import java.util.concurrent.ConcurrentHashMap;
49import java.util.concurrent.ExecutorService;
50import java.util.concurrent.Executors;
51import java.util.concurrent.TimeUnit;
52
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
55import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Maintains flow statistics using RPC calls to collect stats from remote instances
60 * on demand.
61 */
62@Component(immediate = true)
63@Service
64public class DistributedFlowStatisticStore implements FlowStatisticStore {
65 private final Logger log = getLogger(getClass());
66
67 // TODO: Make configurable.
68 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected MastershipService mastershipService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected ClusterCommunicationService clusterCommunicator;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected ClusterService clusterService;
78
79 private Map<ConnectPoint, Set<FlowEntry>> previous =
80 new ConcurrentHashMap<>();
81
82 private Map<ConnectPoint, Set<FlowEntry>> current =
83 new ConcurrentHashMap<>();
84
85 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
88 serializerPool = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
90 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
91 // register this store specific classes here
92 .build();
93 }
94 };
95
96 private NodeId local;
97 private ExecutorService messageHandlingExecutor;
98
99 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
100
101 @Activate
102 public void activate() {
103 local = clusterService.getLocalNode().id();
104
105 messageHandlingExecutor = Executors.newFixedThreadPool(
106 MESSAGE_HANDLER_THREAD_POOL_SIZE,
107 groupedThreads("onos/store/statistic", "message-handlers"));
108
109 clusterCommunicator.addSubscriber(
110 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
111 messageHandlingExecutor);
112
113 clusterCommunicator.addSubscriber(
114 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
115 messageHandlingExecutor);
116
117 log.info("Started");
118 }
119
120 @Deactivate
121 public void deactivate() {
122 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
123 clusterCommunicator.removeSubscriber(GET_CURRENT);
124 messageHandlingExecutor.shutdown();
125 log.info("Stopped");
126 }
127
128 @Override
129 public synchronized void removeFlowStatistic(FlowRule rule) {
130 ConnectPoint cp = buildConnectPoint(rule);
131 if (cp == null) {
132 return;
133 }
134
135 // remove this rule if present from current map
136 current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
137
138 // remove this on if present from previous map
139 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
140 }
141
142 @Override
143 public synchronized void addFlowStatistic(FlowEntry rule) {
144 ConnectPoint cp = buildConnectPoint(rule);
145 if (cp == null) {
146 return;
147 }
148
149 // create one if absent and add this rule
150 current.putIfAbsent(cp, new HashSet<>());
151 current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
152
153 // remove previous one if present
154 previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
155 }
156
157 public synchronized void updateFlowStatistic(FlowEntry rule) {
158 ConnectPoint cp = buildConnectPoint(rule);
159 if (cp == null) {
160 return;
161 }
162
163 Set<FlowEntry> curr = current.get(cp);
164 if (curr == null) {
165 addFlowStatistic(rule);
166 } else {
167 FlowEntry f = curr.stream().filter(c -> rule.equals(c)).
168 findAny().orElse(null);
169 if (rule.bytes() < f.bytes()) {
170 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
171 " Invalid Flow Update! Will be removed!!" +
172 " curr flowId=" + Long.toHexString(rule.id().value()) +
173 ", prev flowId=" + Long.toHexString(f.id().value()) +
174 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.bytes() +
175 ", curr life=" + rule.life() + ", prev life=" + f.life() +
176 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.lastSeen());
177 // something is wrong! invalid flow entry, so delete it
178 removeFlowStatistic(rule);
179 return;
180 }
181 Set<FlowEntry> prev = previous.get(cp);
182 if (prev == null) {
183 prev = new HashSet<>();
184 previous.put(cp, prev);
185 }
186
187 // previous one is exist
188 if (f != null) {
189 // remove old one and add new one
190 prev.remove(rule);
191 if (!prev.add(f)) {
192 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
193 " flowId={}, add failed into previous.",
194 Long.toHexString(rule.id().value()));
195 }
196 }
197
198 // remove old one and add new one
199 curr.remove(rule);
200 if (!curr.add(rule)) {
201 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
202 " flowId={}, add failed into current.",
203 Long.toHexString(rule.id().value()));
204 }
205 }
206 }
207
208 @Override
209 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
210 final DeviceId deviceId = connectPoint.deviceId();
211
212 NodeId master = mastershipService.getMasterFor(deviceId);
213 if (master == null) {
214 log.warn("No master for {}", deviceId);
215 return Collections.emptySet();
216 }
217
218 if (Objects.equal(local, master)) {
219 return getCurrentStatisticInternal(connectPoint);
220 } else {
221 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
222 connectPoint,
223 GET_CURRENT,
224 SERIALIZER::encode,
225 SERIALIZER::decode,
226 master),
227 STATISTIC_STORE_TIMEOUT_MILLIS,
228 TimeUnit.MILLISECONDS,
229 Collections.emptySet());
230 }
231 }
232
233 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
234 return current.get(connectPoint);
235 }
236
237 @Override
238 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
239 final DeviceId deviceId = connectPoint.deviceId();
240
241 NodeId master = mastershipService.getMasterFor(deviceId);
242 if (master == null) {
243 log.warn("No master for {}", deviceId);
244 return Collections.emptySet();
245 }
246
247 if (Objects.equal(local, master)) {
248 return getPreviousStatisticInternal(connectPoint);
249 } else {
250 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
251 connectPoint,
252 GET_PREVIOUS,
253 SERIALIZER::encode,
254 SERIALIZER::decode,
255 master),
256 STATISTIC_STORE_TIMEOUT_MILLIS,
257 TimeUnit.MILLISECONDS,
258 Collections.emptySet());
259 }
260 }
261
262 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
263 return previous.get(connectPoint);
264 }
265
266 private ConnectPoint buildConnectPoint(FlowRule rule) {
267 PortNumber port = getOutput(rule);
268
269 if (port == null) {
270 return null;
271 }
272 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
273 return cp;
274 }
275
276 private PortNumber getOutput(FlowRule rule) {
277 for (Instruction i : rule.treatment().allInstructions()) {
278 if (i.type() == Instruction.Type.OUTPUT) {
279 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
280 return out.port();
281 }
282 if (i.type() == Instruction.Type.DROP) {
283 return PortNumber.P0;
284 }
285 }
286 return null;
287 }
288}