blob: 23cb8b838eb7c48f610d1eaa5cca44671b1fa062 [file] [log] [blame]
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +09001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.dpi.impl;
18
19import com.fasterxml.jackson.databind.ObjectMapper;
20
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.incubator.net.dpi.DpiStatInfo;
31import org.onosproject.incubator.net.dpi.DpiStatistics;
32import org.onosproject.incubator.net.dpi.DpiStatisticsManagerService;
33import org.onosproject.incubator.net.dpi.FlowStatInfo;
34import org.onosproject.incubator.net.dpi.ProtocolStatInfo;
35import org.onosproject.incubator.net.dpi.TrafficStatInfo;
36import org.osgi.service.component.ComponentContext;
37import org.slf4j.Logger;
38
39import java.io.BufferedReader;
40import java.io.IOException;
41import java.io.InputStreamReader;
42import java.io.PrintWriter;
43import java.net.ServerSocket;
44import java.net.Socket;
45import java.text.ParseException;
46import java.text.SimpleDateFormat;
47import java.util.ArrayList;
48import java.util.Collections;
49import java.util.Comparator;
50import java.util.Date;
51import java.util.List;
52import java.util.Locale;
53import java.util.SortedMap;
54import java.util.TimeZone;
55import java.util.TreeMap;
56import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58
59import static java.lang.Thread.sleep;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * DPI Statistics Manager.
65 */
66@Component(immediate = true)
67@Service
68public class DpiStatisticsManager implements DpiStatisticsManagerService {
69
70 private static ServerSocket serverSocket;
71 private static int port = 11990; // socket server listening port
72
73 private final Logger log = getLogger(getClass());
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected CoreService coreService;
77
78 private ApplicationId appId;
79
80 private final ExecutorService dpiListenerThread =
81 Executors.newSingleThreadExecutor(groupedThreads("onos/apps/dpi", "dpi-listener"));
82
83 DpiStatisticsListener dpiStatisticsListener = null;
84
85 // 31*2(month)*24(hour)*3600(second)/5(second)
86 private static final int MAX_DPI_STATISTICS_ENTRY = 1071360;
87
88 private SortedMap<String, DpiStatistics> dpiStatisticsMap =
89 new TreeMap<>(new MapComparator());
90
91 private long convertTimeToLong(String timeString) {
92 long timeLong = 0;
93
94 try {
95 // Time format: yyyy-MM-dd HH:mm:ss, Time Zone: GMT
96 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
97 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
98
99 timeLong = df.parse(timeString).getTime();
100 } catch (ParseException e) {
101 log.error("Time parse error! Exception={}", e.toString());
102 }
103
104 return timeLong;
105 }
106
107 private static final String DATE_FMT = "yyyy-MM-dd HH:mm:ss";
108 private static final String TIME_ZONE = "GMT";
109
110 public static final int MAX_DPI_STATISTICS_REQUEST = 100;
111 public static final int MAX_DPI_STATISTICS_TOPN = 100;
112
113 @Activate
114 public void activate(ComponentContext context) {
115 appId = coreService.registerApplication("org.onosproject.dpi");
116
117 dpiStatisticsListener = new DpiStatisticsListener();
118 dpiListenerThread.execute(dpiStatisticsListener);
119
120 log.info("Started", appId.id());
121 }
122
123 @Deactivate
124 public void deactivate() {
125 log.info("Deactivated...");
126 dpiStatisticsListener.stop();
127 dpiListenerThread.shutdown();
128 log.info("Stopped");
129 }
130
131 @Override
132 public DpiStatistics getDpiStatisticsLatest() {
133 if (dpiStatisticsMap.size() > 0) {
134 return dpiStatisticsMap.get(dpiStatisticsMap.firstKey());
135 } else {
136 return null;
137 }
138 }
139
140 @Override
141 public DpiStatistics getDpiStatisticsLatest(int topnProtocols, int topnFlows) {
142 DpiStatistics ds, topnDs;
143
144 ds = getDpiStatisticsLatest();
145 topnDs = processTopn(ds, topnProtocols, topnFlows);
146
147 return topnDs;
148 }
149
150 @Override
151 public List<DpiStatistics> getDpiStatistics(int lastN) {
152 List<DpiStatistics> dsList = new ArrayList<>();
153 DpiStatistics ds;
154
155 if (lastN > MAX_DPI_STATISTICS_REQUEST) {
156 lastN = MAX_DPI_STATISTICS_REQUEST;
157 }
158
159 SortedMap tempMap = new TreeMap(new MapComparator());
160 tempMap.putAll(dpiStatisticsMap);
161
162 for (int i = 0; i < lastN && i < tempMap.size(); i++) {
163 ds = (DpiStatistics) tempMap.get(tempMap.firstKey());
164 dsList.add(i, new DpiStatistics(ds.receivedTime(), ds.dpiStatInfo()));
165
166 tempMap.remove(tempMap.firstKey());
167 }
168
169 return dsList;
170 }
171
172 @Override
173 public List<DpiStatistics> getDpiStatistics(int lastN, int topnProtocols, int topnFlows) {
174 List<DpiStatistics> dsList;
175 List<DpiStatistics> topnDsList = new ArrayList<>();
176 DpiStatistics ds, topnDs;
177
178 dsList = getDpiStatistics(lastN);
179 for (int i = 0; i < dsList.size(); i++) {
180 ds = dsList.get(i);
181 topnDs = processTopn(ds, topnProtocols, topnFlows);
182 topnDsList.add(i, topnDs);
183 }
184
185 return topnDsList;
186 }
187
188 @Override
189 public DpiStatistics getDpiStatistics(String receivedTime) {
190 DpiStatistics ds;
191
192 if (receivedTime == null) {
193 return null;
194 }
195
196 if (!dpiStatisticsMap.containsKey(receivedTime)) {
197 return null;
198 }
199
200 ds = dpiStatisticsMap.get(receivedTime);
201
202 return ds;
203 }
204
205 @Override
206 public DpiStatistics getDpiStatistics(String receivedTime, int topnProtocols, int topnFlows) {
207 DpiStatistics ds, topnDs;
208
209 ds = getDpiStatistics(receivedTime);
210
211 topnDs = processTopn(ds, topnProtocols, topnFlows);
212
213 return topnDs;
214 }
215
216 @Override
217 public DpiStatistics addDpiStatistics(DpiStatistics ds) {
218 if (ds == null) {
219 return ds;
220 }
221
222 // check the time. The firstKey is lastTime because of descending sorted order
223 if (dpiStatisticsMap.size() > 0) {
224 String lastTime = dpiStatisticsMap.get(dpiStatisticsMap.firstKey()).receivedTime();
225 String inputTime = ds.receivedTime();
226
227 long lastTimeLong = convertTimeToLong(lastTime);
228 long inputTimeLong = convertTimeToLong(inputTime);
229
230 if (lastTimeLong >= inputTimeLong) {
231 return null;
232 }
233 }
234
235 if (dpiStatisticsMap.size() >= MAX_DPI_STATISTICS_ENTRY) {
236 // remove the last (oldest) entry
237 dpiStatisticsMap.remove(dpiStatisticsMap.lastKey());
238 }
239
240 if (dpiStatisticsMap.containsKey(ds.receivedTime())) {
241 log.warn("addDpiStatistics(), {} dpiStatistics is already existing!",
242 ds.receivedTime());
243 return null;
244 }
245
246 dpiStatisticsMap.put(ds.receivedTime(), ds);
247 log.debug("addDpiStatistics: dpiResultJson data[time={}] is added " +
248 "into DpiStatisticsMap size={}.",
249 ds.receivedTime(), dpiStatisticsMap.size());
250
251 return ds;
252 }
253
254 private class MapComparator implements Comparator<String> {
255 @Override
256 public int compare(String rt1, String rt2) {
257 long rt1Long = convertTimeToLong(rt1);
258 long rt2Long = convertTimeToLong(rt2);
259
260 // Descending order
261 if (rt1Long > rt2Long) {
262 return -1;
263 } else if (rt1Long < rt2Long) {
264 return 1;
265 } else {
266 return 0;
267 }
268 }
269 }
270
271 private class ProtocolComparator implements Comparator<ProtocolStatInfo> {
272 @Override
273 public int compare(ProtocolStatInfo p1, ProtocolStatInfo p2) {
274 //Descending order
275 if (p1.bytes() > p2.bytes()) {
276 return -1;
277 } else if (p1.bytes() < p2.bytes()) {
278 return 1;
279 } else {
280 return 0;
281 }
282 }
283 }
284
285 private class FlowComparator implements Comparator<FlowStatInfo> {
286 @Override
287 public int compare(FlowStatInfo f1, FlowStatInfo f2) {
288 // Descending order
289 if (f1.bytes() > f2.bytes()) {
290 return -1;
291 } else if (f1.bytes() < f2.bytes()) {
292 return 1;
293 } else {
294 return 0;
295 }
296 }
297 }
298 private DpiStatistics processTopn(DpiStatistics ds, int topnProtocols, int topnFlows) {
299 if (ds == null) {
300 return null;
301 }
302
303 if (topnProtocols <= 0) {
304 // displays all entries
305 topnProtocols = 0;
306 } else if (topnProtocols > MAX_DPI_STATISTICS_TOPN) {
307 topnProtocols = MAX_DPI_STATISTICS_TOPN;
308 }
309
310 if (topnFlows <= 0) {
311 // displays all entries
312 topnFlows = 0;
313 } else if (topnFlows > MAX_DPI_STATISTICS_TOPN) {
314 topnFlows = MAX_DPI_STATISTICS_TOPN;
315 }
316
317 if (topnProtocols == 0 && topnFlows == 0) {
318 return ds;
319 }
320
321 TrafficStatInfo tsi = ds.dpiStatInfo().trafficStatistics();
322 List<ProtocolStatInfo> psiList;
323 List<FlowStatInfo> kfList;
324 List<FlowStatInfo> ufList;
325
326 List<ProtocolStatInfo> pList = ds.dpiStatInfo().detectedProtos();
327 Collections.sort(pList, new ProtocolComparator());
328 if (topnProtocols > 0 && topnProtocols < pList.size()) {
329 psiList = pList.subList(0, topnProtocols);
330 } else {
331 psiList = pList;
332 }
333
334
335 List<FlowStatInfo> fList = ds.dpiStatInfo().knownFlows();
336 Collections.sort(fList, new FlowComparator());
337 if (topnFlows > 0 && topnFlows < fList.size()) {
338 kfList = fList.subList(0, topnFlows);
339 } else {
340 kfList = fList;
341 }
342
343 fList = ds.dpiStatInfo().unknownFlows();
344 Collections.sort(fList, new FlowComparator());
345 if (topnFlows > 0 && topnFlows < fList.size()) {
346 ufList = fList.subList(0, topnFlows);
347 } else {
348 ufList = fList;
349 }
350
351 DpiStatInfo dsi = new DpiStatInfo();
352 dsi.setTrafficStatistics(tsi);
353 dsi.setDetectedProtos(psiList);
354 dsi.setKnownFlows(kfList);
355 dsi.setUnknownFlows(ufList);
356
357 DpiStatistics retDs = new DpiStatistics(ds.receivedTime(), dsi);
358 return retDs;
359 }
360
361 /**
362 * Receiving DPI Statistics result thread.
363 */
364 private class DpiStatisticsListener implements Runnable {
365 Socket clientSocket = null;
366 BufferedReader in = null;
367 PrintWriter out = null;
368
369 String resultJsonString = null;
370
371 static final int MAX_SLEEP_COUNT = 10;
372 int sleepCount = 0;
373
374 @Override
375 public void run() {
376 log.info("DpiStatisticsListener: Receiving thread started...");
377 receiveDpiResult();
378 }
379
380 public void stop() {
381 try {
382 if (serverSocket != null) {
383 if (clientSocket != null) {
384 if (in != null) {
385 in.close();
386 }
387 if (out != null) {
388 out.close();
389 }
390 clientSocket.close();
391 //log.debug("DpiResultListener: stop(): Socket close() is done...");
392 }
393 serverSocket.close();
394 //log.debug("DpiResultListener: stop(): Server close() is done...");
395 }
396 } catch (Exception e) {
397 log.error("DpiStatisticsListener: stop(): Server Socket closing error, exception={}",
398 e.toString());
399 }
400 log.debug("DpiStatisticsListener: stop(): stopped...");
401 }
402
403 private void receiveDpiResult() {
404 try {
405 serverSocket = new ServerSocket(port);
406 } catch (Exception e) {
407 log.error("DpiStatisticsListener: ServerSocket listening error from port={} in localhost, exception={}",
408 port, e.toString());
409 return;
410 }
411
412 try {
413 while (true) {
414 if (clientSocket == null) {
415 log.info("DpiStatisticsListener: Waiting for accepting from dpi client...");
416 clientSocket = serverSocket.accept();
417 log.info("DpiStatisticsListener: Accepted from dpi client={}",
418 clientSocket.getRemoteSocketAddress().toString());
419
420 in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
421 out = new PrintWriter(clientSocket.getOutputStream(), true); // For disconnecting check!
422
423 resultJsonString = null;
424 }
425
426 sleepCount = 0;
427 while (!in.ready()) {
428 sleep(1000); // sleep one second.
429 if (out.checkError() || ++sleepCount >= MAX_SLEEP_COUNT) {
430 log.debug("DpiStatisticsListener: server and socket connect is lost...");
431 in.close();
432 in = null;
433 out.close();
434 out = null;
435 clientSocket.close();
436 clientSocket = null;
437
438 break;
439 }
440 }
441
442 if (in != null) {
443 resultJsonString = in.readLine();
444
445 // process the result
446 log.trace("DpiStatisticsListener: resultJsonString={}", resultJsonString);
447 processResultJson(resultJsonString);
448 }
449 }
450 } catch (Exception e) {
451 log.error("DpiStatisticsListener: Exception = {}", e.toString());
452 return;
453 }
454 }
455
456 private void processResultJson(String resultJsonString) {
457 Date tr = new Date(System.currentTimeMillis());
458 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
459 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
460
461 String curReceivedTime = new String(df.format(tr));
462 String curResultJson = new String(resultJsonString);
463
464 DpiStatInfo dpiStatInfo;
465 ObjectMapper mapper = new ObjectMapper();
466 try {
467 dpiStatInfo = mapper.readValue(curResultJson, DpiStatInfo.class);
468 } catch (IOException e) {
469 log.error("DpiStatisticsListener: ObjectMapper Exception = {}", e.toString());
470 return;
471 }
472
473 DpiStatistics dpiStatistics = new DpiStatistics(curReceivedTime, dpiStatInfo);
474
475 addDpiStatistics(dpiStatistics);
476 }
477 }
478}