blob: 8aa02ecf52f11c10be2d993127ffa3315d5957bb [file] [log] [blame]
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +09001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +09003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.dpi.impl;
18
19import com.fasterxml.jackson.databind.ObjectMapper;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090020import org.onosproject.core.ApplicationId;
21import org.onosproject.core.CoreService;
22import org.onosproject.incubator.net.dpi.DpiStatInfo;
23import org.onosproject.incubator.net.dpi.DpiStatistics;
24import org.onosproject.incubator.net.dpi.DpiStatisticsManagerService;
25import org.onosproject.incubator.net.dpi.FlowStatInfo;
26import org.onosproject.incubator.net.dpi.ProtocolStatInfo;
27import org.onosproject.incubator.net.dpi.TrafficStatInfo;
28import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Reference;
33import org.osgi.service.component.annotations.ReferenceCardinality;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090034import org.slf4j.Logger;
35
36import java.io.BufferedReader;
37import java.io.IOException;
38import java.io.InputStreamReader;
39import java.io.PrintWriter;
40import java.net.ServerSocket;
41import java.net.Socket;
42import java.text.ParseException;
43import java.text.SimpleDateFormat;
44import java.util.ArrayList;
45import java.util.Collections;
46import java.util.Comparator;
47import java.util.Date;
48import java.util.List;
49import java.util.Locale;
50import java.util.SortedMap;
51import java.util.TimeZone;
52import java.util.TreeMap;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55
56import static java.lang.Thread.sleep;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * DPI Statistics Manager.
62 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063@Component(immediate = true, service = DpiStatisticsManagerService.class)
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090064public class DpiStatisticsManager implements DpiStatisticsManagerService {
65
Ray Milkey06297ed2018-01-22 17:13:41 -080066 private ServerSocket serverSocket;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090067 private static int port = 11990; // socket server listening port
68
69 private final Logger log = getLogger(getClass());
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090072 protected CoreService coreService;
73
74 private ApplicationId appId;
75
76 private final ExecutorService dpiListenerThread =
77 Executors.newSingleThreadExecutor(groupedThreads("onos/apps/dpi", "dpi-listener"));
78
79 DpiStatisticsListener dpiStatisticsListener = null;
80
81 // 31*2(month)*24(hour)*3600(second)/5(second)
82 private static final int MAX_DPI_STATISTICS_ENTRY = 1071360;
83
84 private SortedMap<String, DpiStatistics> dpiStatisticsMap =
85 new TreeMap<>(new MapComparator());
86
87 private long convertTimeToLong(String timeString) {
88 long timeLong = 0;
89
90 try {
91 // Time format: yyyy-MM-dd HH:mm:ss, Time Zone: GMT
92 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
93 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
94
95 timeLong = df.parse(timeString).getTime();
96 } catch (ParseException e) {
97 log.error("Time parse error! Exception={}", e.toString());
98 }
99
100 return timeLong;
101 }
102
103 private static final String DATE_FMT = "yyyy-MM-dd HH:mm:ss";
104 private static final String TIME_ZONE = "GMT";
105
106 public static final int MAX_DPI_STATISTICS_REQUEST = 100;
107 public static final int MAX_DPI_STATISTICS_TOPN = 100;
108
109 @Activate
110 public void activate(ComponentContext context) {
111 appId = coreService.registerApplication("org.onosproject.dpi");
112
113 dpiStatisticsListener = new DpiStatisticsListener();
114 dpiListenerThread.execute(dpiStatisticsListener);
115
116 log.info("Started", appId.id());
117 }
118
119 @Deactivate
120 public void deactivate() {
121 log.info("Deactivated...");
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530122 dpiListenerThread.shutdownNow();
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900123 log.info("Stopped");
124 }
125
126 @Override
127 public DpiStatistics getDpiStatisticsLatest() {
128 if (dpiStatisticsMap.size() > 0) {
129 return dpiStatisticsMap.get(dpiStatisticsMap.firstKey());
130 } else {
131 return null;
132 }
133 }
134
135 @Override
136 public DpiStatistics getDpiStatisticsLatest(int topnProtocols, int topnFlows) {
137 DpiStatistics ds, topnDs;
138
139 ds = getDpiStatisticsLatest();
140 topnDs = processTopn(ds, topnProtocols, topnFlows);
141
142 return topnDs;
143 }
144
145 @Override
146 public List<DpiStatistics> getDpiStatistics(int lastN) {
147 List<DpiStatistics> dsList = new ArrayList<>();
148 DpiStatistics ds;
149
150 if (lastN > MAX_DPI_STATISTICS_REQUEST) {
151 lastN = MAX_DPI_STATISTICS_REQUEST;
152 }
153
154 SortedMap tempMap = new TreeMap(new MapComparator());
155 tempMap.putAll(dpiStatisticsMap);
156
157 for (int i = 0; i < lastN && i < tempMap.size(); i++) {
158 ds = (DpiStatistics) tempMap.get(tempMap.firstKey());
159 dsList.add(i, new DpiStatistics(ds.receivedTime(), ds.dpiStatInfo()));
160
161 tempMap.remove(tempMap.firstKey());
162 }
163
164 return dsList;
165 }
166
167 @Override
168 public List<DpiStatistics> getDpiStatistics(int lastN, int topnProtocols, int topnFlows) {
169 List<DpiStatistics> dsList;
170 List<DpiStatistics> topnDsList = new ArrayList<>();
171 DpiStatistics ds, topnDs;
172
173 dsList = getDpiStatistics(lastN);
174 for (int i = 0; i < dsList.size(); i++) {
175 ds = dsList.get(i);
176 topnDs = processTopn(ds, topnProtocols, topnFlows);
177 topnDsList.add(i, topnDs);
178 }
179
180 return topnDsList;
181 }
182
183 @Override
184 public DpiStatistics getDpiStatistics(String receivedTime) {
185 DpiStatistics ds;
186
187 if (receivedTime == null) {
188 return null;
189 }
190
191 if (!dpiStatisticsMap.containsKey(receivedTime)) {
192 return null;
193 }
194
195 ds = dpiStatisticsMap.get(receivedTime);
196
197 return ds;
198 }
199
200 @Override
201 public DpiStatistics getDpiStatistics(String receivedTime, int topnProtocols, int topnFlows) {
202 DpiStatistics ds, topnDs;
203
204 ds = getDpiStatistics(receivedTime);
205
206 topnDs = processTopn(ds, topnProtocols, topnFlows);
207
208 return topnDs;
209 }
210
211 @Override
212 public DpiStatistics addDpiStatistics(DpiStatistics ds) {
213 if (ds == null) {
214 return ds;
215 }
216
217 // check the time. The firstKey is lastTime because of descending sorted order
218 if (dpiStatisticsMap.size() > 0) {
219 String lastTime = dpiStatisticsMap.get(dpiStatisticsMap.firstKey()).receivedTime();
220 String inputTime = ds.receivedTime();
221
222 long lastTimeLong = convertTimeToLong(lastTime);
223 long inputTimeLong = convertTimeToLong(inputTime);
224
225 if (lastTimeLong >= inputTimeLong) {
226 return null;
227 }
228 }
229
230 if (dpiStatisticsMap.size() >= MAX_DPI_STATISTICS_ENTRY) {
231 // remove the last (oldest) entry
232 dpiStatisticsMap.remove(dpiStatisticsMap.lastKey());
233 }
234
235 if (dpiStatisticsMap.containsKey(ds.receivedTime())) {
236 log.warn("addDpiStatistics(), {} dpiStatistics is already existing!",
237 ds.receivedTime());
238 return null;
239 }
240
241 dpiStatisticsMap.put(ds.receivedTime(), ds);
242 log.debug("addDpiStatistics: dpiResultJson data[time={}] is added " +
243 "into DpiStatisticsMap size={}.",
244 ds.receivedTime(), dpiStatisticsMap.size());
245
246 return ds;
247 }
248
249 private class MapComparator implements Comparator<String> {
250 @Override
251 public int compare(String rt1, String rt2) {
252 long rt1Long = convertTimeToLong(rt1);
253 long rt2Long = convertTimeToLong(rt2);
254
255 // Descending order
256 if (rt1Long > rt2Long) {
257 return -1;
258 } else if (rt1Long < rt2Long) {
259 return 1;
260 } else {
261 return 0;
262 }
263 }
264 }
265
266 private class ProtocolComparator implements Comparator<ProtocolStatInfo> {
267 @Override
268 public int compare(ProtocolStatInfo p1, ProtocolStatInfo p2) {
269 //Descending order
270 if (p1.bytes() > p2.bytes()) {
271 return -1;
272 } else if (p1.bytes() < p2.bytes()) {
273 return 1;
274 } else {
275 return 0;
276 }
277 }
278 }
279
280 private class FlowComparator implements Comparator<FlowStatInfo> {
281 @Override
282 public int compare(FlowStatInfo f1, FlowStatInfo f2) {
283 // Descending order
284 if (f1.bytes() > f2.bytes()) {
285 return -1;
286 } else if (f1.bytes() < f2.bytes()) {
287 return 1;
288 } else {
289 return 0;
290 }
291 }
292 }
293 private DpiStatistics processTopn(DpiStatistics ds, int topnProtocols, int topnFlows) {
294 if (ds == null) {
295 return null;
296 }
297
298 if (topnProtocols <= 0) {
299 // displays all entries
300 topnProtocols = 0;
301 } else if (topnProtocols > MAX_DPI_STATISTICS_TOPN) {
302 topnProtocols = MAX_DPI_STATISTICS_TOPN;
303 }
304
305 if (topnFlows <= 0) {
306 // displays all entries
307 topnFlows = 0;
308 } else if (topnFlows > MAX_DPI_STATISTICS_TOPN) {
309 topnFlows = MAX_DPI_STATISTICS_TOPN;
310 }
311
312 if (topnProtocols == 0 && topnFlows == 0) {
313 return ds;
314 }
315
316 TrafficStatInfo tsi = ds.dpiStatInfo().trafficStatistics();
317 List<ProtocolStatInfo> psiList;
318 List<FlowStatInfo> kfList;
319 List<FlowStatInfo> ufList;
320
321 List<ProtocolStatInfo> pList = ds.dpiStatInfo().detectedProtos();
322 Collections.sort(pList, new ProtocolComparator());
323 if (topnProtocols > 0 && topnProtocols < pList.size()) {
324 psiList = pList.subList(0, topnProtocols);
325 } else {
326 psiList = pList;
327 }
328
329
330 List<FlowStatInfo> fList = ds.dpiStatInfo().knownFlows();
331 Collections.sort(fList, new FlowComparator());
332 if (topnFlows > 0 && topnFlows < fList.size()) {
333 kfList = fList.subList(0, topnFlows);
334 } else {
335 kfList = fList;
336 }
337
338 fList = ds.dpiStatInfo().unknownFlows();
339 Collections.sort(fList, new FlowComparator());
340 if (topnFlows > 0 && topnFlows < fList.size()) {
341 ufList = fList.subList(0, topnFlows);
342 } else {
343 ufList = fList;
344 }
345
346 DpiStatInfo dsi = new DpiStatInfo();
347 dsi.setTrafficStatistics(tsi);
348 dsi.setDetectedProtos(psiList);
349 dsi.setKnownFlows(kfList);
350 dsi.setUnknownFlows(ufList);
351
352 DpiStatistics retDs = new DpiStatistics(ds.receivedTime(), dsi);
353 return retDs;
354 }
355
356 /**
357 * Receiving DPI Statistics result thread.
358 */
359 private class DpiStatisticsListener implements Runnable {
360 Socket clientSocket = null;
361 BufferedReader in = null;
362 PrintWriter out = null;
363
364 String resultJsonString = null;
365
366 static final int MAX_SLEEP_COUNT = 10;
367 int sleepCount = 0;
368
369 @Override
370 public void run() {
371 log.info("DpiStatisticsListener: Receiving thread started...");
372 receiveDpiResult();
373 }
374
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900375 private void receiveDpiResult() {
376 try {
377 serverSocket = new ServerSocket(port);
378 } catch (Exception e) {
379 log.error("DpiStatisticsListener: ServerSocket listening error from port={} in localhost, exception={}",
380 port, e.toString());
381 return;
382 }
383
384 try {
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530385 while (!Thread.currentThread().isInterrupted()) {
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900386 if (clientSocket == null) {
387 log.info("DpiStatisticsListener: Waiting for accepting from dpi client...");
388 clientSocket = serverSocket.accept();
389 log.info("DpiStatisticsListener: Accepted from dpi client={}",
390 clientSocket.getRemoteSocketAddress().toString());
391
392 in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
393 out = new PrintWriter(clientSocket.getOutputStream(), true); // For disconnecting check!
394
395 resultJsonString = null;
396 }
397
398 sleepCount = 0;
399 while (!in.ready()) {
400 sleep(1000); // sleep one second.
401 if (out.checkError() || ++sleepCount >= MAX_SLEEP_COUNT) {
402 log.debug("DpiStatisticsListener: server and socket connect is lost...");
403 in.close();
404 in = null;
405 out.close();
406 out = null;
407 clientSocket.close();
408 clientSocket = null;
409
410 break;
411 }
412 }
413
414 if (in != null) {
415 resultJsonString = in.readLine();
416
417 // process the result
418 log.trace("DpiStatisticsListener: resultJsonString={}", resultJsonString);
419 processResultJson(resultJsonString);
420 }
421 }
422 } catch (Exception e) {
423 log.error("DpiStatisticsListener: Exception = {}", e.toString());
424 return;
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530425 } finally {
426 try {
427 if (serverSocket != null) {
428 if (clientSocket != null) {
429 if (in != null) {
430 in.close();
431 }
432 if (out != null) {
433 out.close();
434 }
435 clientSocket.close();
436 //log.debug("DpiResultListener: stop(): Socket close() is done...");
437 }
438 serverSocket.close();
439 //log.debug("DpiResultListener: stop(): Server close() is done...");
440 }
441 } catch (Exception e) {
442 log.error("DpiStatisticsListener: stop(): Server Socket closing error, exception={}",
443 e.toString());
444 }
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900445 }
446 }
447
448 private void processResultJson(String resultJsonString) {
449 Date tr = new Date(System.currentTimeMillis());
450 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
451 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
452
453 String curReceivedTime = new String(df.format(tr));
454 String curResultJson = new String(resultJsonString);
455
456 DpiStatInfo dpiStatInfo;
457 ObjectMapper mapper = new ObjectMapper();
458 try {
459 dpiStatInfo = mapper.readValue(curResultJson, DpiStatInfo.class);
460 } catch (IOException e) {
461 log.error("DpiStatisticsListener: ObjectMapper Exception = {}", e.toString());
462 return;
463 }
464
465 DpiStatistics dpiStatistics = new DpiStatistics(curReceivedTime, dpiStatInfo);
466
467 addDpiStatistics(dpiStatistics);
468 }
469 }
470}