blob: f1fb7217e4f4e627f7d0d73a9b1b8ff9be0a0b26 [file] [log] [blame]
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +09001/*
Thomas Vachuska52f2cd12018-11-08 21:20:04 -08002 * Copyright 2018-present Open Networking Foundation
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +09003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.dpi.impl;
18
19import com.fasterxml.jackson.databind.ObjectMapper;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090020import org.onosproject.core.ApplicationId;
21import org.onosproject.core.CoreService;
22import org.onosproject.incubator.net.dpi.DpiStatInfo;
23import org.onosproject.incubator.net.dpi.DpiStatistics;
24import org.onosproject.incubator.net.dpi.DpiStatisticsManagerService;
25import org.onosproject.incubator.net.dpi.FlowStatInfo;
26import org.onosproject.incubator.net.dpi.ProtocolStatInfo;
27import org.onosproject.incubator.net.dpi.TrafficStatInfo;
28import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Reference;
33import org.osgi.service.component.annotations.ReferenceCardinality;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090034import org.slf4j.Logger;
35
36import java.io.BufferedReader;
37import java.io.IOException;
38import java.io.InputStreamReader;
39import java.io.PrintWriter;
40import java.net.ServerSocket;
41import java.net.Socket;
42import java.text.ParseException;
43import java.text.SimpleDateFormat;
44import java.util.ArrayList;
45import java.util.Collections;
46import java.util.Comparator;
47import java.util.Date;
48import java.util.List;
49import java.util.Locale;
50import java.util.SortedMap;
51import java.util.TimeZone;
52import java.util.TreeMap;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55
56import static java.lang.Thread.sleep;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * DPI Statistics Manager.
62 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063@Component(immediate = true, service = DpiStatisticsManagerService.class)
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090064public class DpiStatisticsManager implements DpiStatisticsManagerService {
65
Ray Milkey06297ed2018-01-22 17:13:41 -080066 private ServerSocket serverSocket;
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090067 private static int port = 11990; // socket server listening port
68
69 private final Logger log = getLogger(getClass());
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +090072 protected CoreService coreService;
73
74 private ApplicationId appId;
75
76 private final ExecutorService dpiListenerThread =
77 Executors.newSingleThreadExecutor(groupedThreads("onos/apps/dpi", "dpi-listener"));
78
79 DpiStatisticsListener dpiStatisticsListener = null;
80
81 // 31*2(month)*24(hour)*3600(second)/5(second)
82 private static final int MAX_DPI_STATISTICS_ENTRY = 1071360;
83
84 private SortedMap<String, DpiStatistics> dpiStatisticsMap =
85 new TreeMap<>(new MapComparator());
86
87 private long convertTimeToLong(String timeString) {
88 long timeLong = 0;
89
90 try {
91 // Time format: yyyy-MM-dd HH:mm:ss, Time Zone: GMT
92 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
93 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
94
95 timeLong = df.parse(timeString).getTime();
96 } catch (ParseException e) {
97 log.error("Time parse error! Exception={}", e.toString());
98 }
99
100 return timeLong;
101 }
102
103 private static final String DATE_FMT = "yyyy-MM-dd HH:mm:ss";
104 private static final String TIME_ZONE = "GMT";
105
106 public static final int MAX_DPI_STATISTICS_REQUEST = 100;
107 public static final int MAX_DPI_STATISTICS_TOPN = 100;
108
109 @Activate
110 public void activate(ComponentContext context) {
111 appId = coreService.registerApplication("org.onosproject.dpi");
112
Thomas Vachuska52f2cd12018-11-08 21:20:04 -0800113// registerCodec(DpiStatistics.class, new DpiStatisticsCodec());
114// registerCodec(DpiStatInfo.class, new DpiStatInfoCodec());
115// registerCodec(TrafficStatInfo.class, new TrafficStatInfoCodec());
116// registerCodec(ProtocolStatInfo.class, new ProtocolStatInfoCodec());
117// registerCodec(FlowStatInfo.class, new FlowStatInfoCodec());
118
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900119 dpiStatisticsListener = new DpiStatisticsListener();
120 dpiListenerThread.execute(dpiStatisticsListener);
121
122 log.info("Started", appId.id());
123 }
124
125 @Deactivate
126 public void deactivate() {
127 log.info("Deactivated...");
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530128 dpiListenerThread.shutdownNow();
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900129 log.info("Stopped");
130 }
131
132 @Override
133 public DpiStatistics getDpiStatisticsLatest() {
134 if (dpiStatisticsMap.size() > 0) {
135 return dpiStatisticsMap.get(dpiStatisticsMap.firstKey());
136 } else {
137 return null;
138 }
139 }
140
141 @Override
142 public DpiStatistics getDpiStatisticsLatest(int topnProtocols, int topnFlows) {
143 DpiStatistics ds, topnDs;
144
145 ds = getDpiStatisticsLatest();
146 topnDs = processTopn(ds, topnProtocols, topnFlows);
147
148 return topnDs;
149 }
150
151 @Override
152 public List<DpiStatistics> getDpiStatistics(int lastN) {
153 List<DpiStatistics> dsList = new ArrayList<>();
154 DpiStatistics ds;
155
156 if (lastN > MAX_DPI_STATISTICS_REQUEST) {
157 lastN = MAX_DPI_STATISTICS_REQUEST;
158 }
159
160 SortedMap tempMap = new TreeMap(new MapComparator());
161 tempMap.putAll(dpiStatisticsMap);
162
163 for (int i = 0; i < lastN && i < tempMap.size(); i++) {
164 ds = (DpiStatistics) tempMap.get(tempMap.firstKey());
165 dsList.add(i, new DpiStatistics(ds.receivedTime(), ds.dpiStatInfo()));
166
167 tempMap.remove(tempMap.firstKey());
168 }
169
170 return dsList;
171 }
172
173 @Override
174 public List<DpiStatistics> getDpiStatistics(int lastN, int topnProtocols, int topnFlows) {
175 List<DpiStatistics> dsList;
176 List<DpiStatistics> topnDsList = new ArrayList<>();
177 DpiStatistics ds, topnDs;
178
179 dsList = getDpiStatistics(lastN);
180 for (int i = 0; i < dsList.size(); i++) {
181 ds = dsList.get(i);
182 topnDs = processTopn(ds, topnProtocols, topnFlows);
183 topnDsList.add(i, topnDs);
184 }
185
186 return topnDsList;
187 }
188
189 @Override
190 public DpiStatistics getDpiStatistics(String receivedTime) {
191 DpiStatistics ds;
192
193 if (receivedTime == null) {
194 return null;
195 }
196
197 if (!dpiStatisticsMap.containsKey(receivedTime)) {
198 return null;
199 }
200
201 ds = dpiStatisticsMap.get(receivedTime);
202
203 return ds;
204 }
205
206 @Override
207 public DpiStatistics getDpiStatistics(String receivedTime, int topnProtocols, int topnFlows) {
208 DpiStatistics ds, topnDs;
209
210 ds = getDpiStatistics(receivedTime);
211
212 topnDs = processTopn(ds, topnProtocols, topnFlows);
213
214 return topnDs;
215 }
216
217 @Override
218 public DpiStatistics addDpiStatistics(DpiStatistics ds) {
219 if (ds == null) {
220 return ds;
221 }
222
223 // check the time. The firstKey is lastTime because of descending sorted order
224 if (dpiStatisticsMap.size() > 0) {
225 String lastTime = dpiStatisticsMap.get(dpiStatisticsMap.firstKey()).receivedTime();
226 String inputTime = ds.receivedTime();
227
228 long lastTimeLong = convertTimeToLong(lastTime);
229 long inputTimeLong = convertTimeToLong(inputTime);
230
231 if (lastTimeLong >= inputTimeLong) {
232 return null;
233 }
234 }
235
236 if (dpiStatisticsMap.size() >= MAX_DPI_STATISTICS_ENTRY) {
237 // remove the last (oldest) entry
238 dpiStatisticsMap.remove(dpiStatisticsMap.lastKey());
239 }
240
241 if (dpiStatisticsMap.containsKey(ds.receivedTime())) {
242 log.warn("addDpiStatistics(), {} dpiStatistics is already existing!",
243 ds.receivedTime());
244 return null;
245 }
246
247 dpiStatisticsMap.put(ds.receivedTime(), ds);
248 log.debug("addDpiStatistics: dpiResultJson data[time={}] is added " +
249 "into DpiStatisticsMap size={}.",
250 ds.receivedTime(), dpiStatisticsMap.size());
251
252 return ds;
253 }
254
255 private class MapComparator implements Comparator<String> {
256 @Override
257 public int compare(String rt1, String rt2) {
258 long rt1Long = convertTimeToLong(rt1);
259 long rt2Long = convertTimeToLong(rt2);
260
261 // Descending order
262 if (rt1Long > rt2Long) {
263 return -1;
264 } else if (rt1Long < rt2Long) {
265 return 1;
266 } else {
267 return 0;
268 }
269 }
270 }
271
272 private class ProtocolComparator implements Comparator<ProtocolStatInfo> {
273 @Override
274 public int compare(ProtocolStatInfo p1, ProtocolStatInfo p2) {
275 //Descending order
276 if (p1.bytes() > p2.bytes()) {
277 return -1;
278 } else if (p1.bytes() < p2.bytes()) {
279 return 1;
280 } else {
281 return 0;
282 }
283 }
284 }
285
286 private class FlowComparator implements Comparator<FlowStatInfo> {
287 @Override
288 public int compare(FlowStatInfo f1, FlowStatInfo f2) {
289 // Descending order
290 if (f1.bytes() > f2.bytes()) {
291 return -1;
292 } else if (f1.bytes() < f2.bytes()) {
293 return 1;
294 } else {
295 return 0;
296 }
297 }
298 }
299 private DpiStatistics processTopn(DpiStatistics ds, int topnProtocols, int topnFlows) {
300 if (ds == null) {
301 return null;
302 }
303
304 if (topnProtocols <= 0) {
305 // displays all entries
306 topnProtocols = 0;
307 } else if (topnProtocols > MAX_DPI_STATISTICS_TOPN) {
308 topnProtocols = MAX_DPI_STATISTICS_TOPN;
309 }
310
311 if (topnFlows <= 0) {
312 // displays all entries
313 topnFlows = 0;
314 } else if (topnFlows > MAX_DPI_STATISTICS_TOPN) {
315 topnFlows = MAX_DPI_STATISTICS_TOPN;
316 }
317
318 if (topnProtocols == 0 && topnFlows == 0) {
319 return ds;
320 }
321
322 TrafficStatInfo tsi = ds.dpiStatInfo().trafficStatistics();
323 List<ProtocolStatInfo> psiList;
324 List<FlowStatInfo> kfList;
325 List<FlowStatInfo> ufList;
326
327 List<ProtocolStatInfo> pList = ds.dpiStatInfo().detectedProtos();
328 Collections.sort(pList, new ProtocolComparator());
329 if (topnProtocols > 0 && topnProtocols < pList.size()) {
330 psiList = pList.subList(0, topnProtocols);
331 } else {
332 psiList = pList;
333 }
334
335
336 List<FlowStatInfo> fList = ds.dpiStatInfo().knownFlows();
337 Collections.sort(fList, new FlowComparator());
338 if (topnFlows > 0 && topnFlows < fList.size()) {
339 kfList = fList.subList(0, topnFlows);
340 } else {
341 kfList = fList;
342 }
343
344 fList = ds.dpiStatInfo().unknownFlows();
345 Collections.sort(fList, new FlowComparator());
346 if (topnFlows > 0 && topnFlows < fList.size()) {
347 ufList = fList.subList(0, topnFlows);
348 } else {
349 ufList = fList;
350 }
351
352 DpiStatInfo dsi = new DpiStatInfo();
353 dsi.setTrafficStatistics(tsi);
354 dsi.setDetectedProtos(psiList);
355 dsi.setKnownFlows(kfList);
356 dsi.setUnknownFlows(ufList);
357
358 DpiStatistics retDs = new DpiStatistics(ds.receivedTime(), dsi);
359 return retDs;
360 }
361
362 /**
363 * Receiving DPI Statistics result thread.
364 */
365 private class DpiStatisticsListener implements Runnable {
366 Socket clientSocket = null;
367 BufferedReader in = null;
368 PrintWriter out = null;
369
370 String resultJsonString = null;
371
372 static final int MAX_SLEEP_COUNT = 10;
373 int sleepCount = 0;
374
375 @Override
376 public void run() {
377 log.info("DpiStatisticsListener: Receiving thread started...");
378 receiveDpiResult();
379 }
380
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900381 private void receiveDpiResult() {
382 try {
383 serverSocket = new ServerSocket(port);
384 } catch (Exception e) {
385 log.error("DpiStatisticsListener: ServerSocket listening error from port={} in localhost, exception={}",
386 port, e.toString());
387 return;
388 }
389
390 try {
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530391 while (!Thread.currentThread().isInterrupted()) {
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900392 if (clientSocket == null) {
393 log.info("DpiStatisticsListener: Waiting for accepting from dpi client...");
394 clientSocket = serverSocket.accept();
395 log.info("DpiStatisticsListener: Accepted from dpi client={}",
396 clientSocket.getRemoteSocketAddress().toString());
397
398 in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
399 out = new PrintWriter(clientSocket.getOutputStream(), true); // For disconnecting check!
400
401 resultJsonString = null;
402 }
403
404 sleepCount = 0;
405 while (!in.ready()) {
406 sleep(1000); // sleep one second.
407 if (out.checkError() || ++sleepCount >= MAX_SLEEP_COUNT) {
408 log.debug("DpiStatisticsListener: server and socket connect is lost...");
409 in.close();
410 in = null;
411 out.close();
412 out = null;
413 clientSocket.close();
414 clientSocket = null;
415
416 break;
417 }
418 }
419
420 if (in != null) {
421 resultJsonString = in.readLine();
422
423 // process the result
424 log.trace("DpiStatisticsListener: resultJsonString={}", resultJsonString);
425 processResultJson(resultJsonString);
426 }
427 }
428 } catch (Exception e) {
429 log.error("DpiStatisticsListener: Exception = {}", e.toString());
430 return;
Kavitha Alagesan8c9e4102017-02-28 17:11:45 +0530431 } finally {
432 try {
433 if (serverSocket != null) {
434 if (clientSocket != null) {
435 if (in != null) {
436 in.close();
437 }
438 if (out != null) {
439 out.close();
440 }
441 clientSocket.close();
442 //log.debug("DpiResultListener: stop(): Socket close() is done...");
443 }
444 serverSocket.close();
445 //log.debug("DpiResultListener: stop(): Server close() is done...");
446 }
447 } catch (Exception e) {
448 log.error("DpiStatisticsListener: stop(): Server Socket closing error, exception={}",
449 e.toString());
450 }
Sangsik Yoonf0b3ad82016-08-19 18:47:59 +0900451 }
452 }
453
454 private void processResultJson(String resultJsonString) {
455 Date tr = new Date(System.currentTimeMillis());
456 SimpleDateFormat df = new SimpleDateFormat(DATE_FMT, Locale.KOREA);
457 df.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
458
459 String curReceivedTime = new String(df.format(tr));
460 String curResultJson = new String(resultJsonString);
461
462 DpiStatInfo dpiStatInfo;
463 ObjectMapper mapper = new ObjectMapper();
464 try {
465 dpiStatInfo = mapper.readValue(curResultJson, DpiStatInfo.class);
466 } catch (IOException e) {
467 log.error("DpiStatisticsListener: ObjectMapper Exception = {}", e.toString());
468 return;
469 }
470
471 DpiStatistics dpiStatistics = new DpiStatistics(curReceivedTime, dpiStatInfo);
472
473 addDpiStatistics(dpiStatistics);
474 }
475 }
476}