blob: 29505b784ac6b5276fd26435cfa940995b736697 [file] [log] [blame]
Thomas Vachuska329af532015-03-10 02:08:33 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ui.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import com.google.common.collect.ImmutableSet;
22import org.onlab.osgi.ServiceDirectory;
23import org.onlab.util.AbstractAccumulator;
24import org.onlab.util.Accumulator;
25import org.onosproject.cluster.ClusterEvent;
26import org.onosproject.cluster.ClusterEventListener;
27import org.onosproject.cluster.ControllerNode;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.event.Event;
31import org.onosproject.mastership.MastershipAdminService;
32import org.onosproject.mastership.MastershipEvent;
33import org.onosproject.mastership.MastershipListener;
34import org.onosproject.net.ConnectPoint;
35import org.onosproject.net.Device;
36import org.onosproject.net.Host;
37import org.onosproject.net.HostId;
38import org.onosproject.net.HostLocation;
39import org.onosproject.net.Link;
40import org.onosproject.net.device.DeviceEvent;
41import org.onosproject.net.device.DeviceListener;
42import org.onosproject.net.flow.DefaultTrafficSelector;
43import org.onosproject.net.flow.DefaultTrafficTreatment;
44import org.onosproject.net.flow.FlowRuleEvent;
45import org.onosproject.net.flow.FlowRuleListener;
46import org.onosproject.net.flow.TrafficSelector;
47import org.onosproject.net.flow.TrafficTreatment;
48import org.onosproject.net.host.HostEvent;
49import org.onosproject.net.host.HostListener;
50import org.onosproject.net.intent.HostToHostIntent;
51import org.onosproject.net.intent.Intent;
52import org.onosproject.net.intent.IntentEvent;
53import org.onosproject.net.intent.IntentListener;
54import org.onosproject.net.intent.MultiPointToSinglePointIntent;
55import org.onosproject.net.link.LinkEvent;
56import org.onosproject.net.link.LinkListener;
57import org.onosproject.ui.UiConnection;
58
59import java.util.ArrayList;
60import java.util.Collections;
61import java.util.Comparator;
62import java.util.HashSet;
63import java.util.List;
64import java.util.Set;
65import java.util.Timer;
66import java.util.TimerTask;
67
68import static com.google.common.base.Strings.isNullOrEmpty;
69import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ADDED;
70import static org.onosproject.net.DeviceId.deviceId;
71import static org.onosproject.net.HostId.hostId;
72import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
73import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
74import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
75import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
76
77/**
78 * Web socket capable of interacting with the GUI topology view.
79 */
80public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
81
82 private static final String APP_ID = "org.onosproject.gui";
83
84 private static final long TRAFFIC_FREQUENCY = 5000;
85 private static final long SUMMARY_FREQUENCY = 30000;
86
87 private static final Comparator<? super ControllerNode> NODE_COMPARATOR =
88 new Comparator<ControllerNode>() {
89 @Override
90 public int compare(ControllerNode o1, ControllerNode o2) {
91 return o1.id().toString().compareTo(o2.id().toString());
92 }
93 };
94
95
96 private final Timer timer = new Timer("topology-view");
97
98 private static final int MAX_EVENTS = 1000;
99 private static final int MAX_BATCH_MS = 5000;
100 private static final int MAX_IDLE_MS = 1000;
101
102 private ApplicationId appId;
103
104 private final ClusterEventListener clusterListener = new InternalClusterListener();
105 private final MastershipListener mastershipListener = new InternalMastershipListener();
106 private final DeviceListener deviceListener = new InternalDeviceListener();
107 private final LinkListener linkListener = new InternalLinkListener();
108 private final HostListener hostListener = new InternalHostListener();
109 private final IntentListener intentListener = new InternalIntentListener();
110 private final FlowRuleListener flowListener = new InternalFlowListener();
111
112 private final Accumulator<Event> eventAccummulator = new InternalEventAccummulator();
113
114 private TimerTask trafficTask;
115 private ObjectNode trafficEvent;
116
117 private TimerTask summaryTask;
118 private ObjectNode summaryEvent;
119
120 private boolean listenersRemoved = false;
121
122 private TopologyViewIntentFilter intentFilter;
123
124 // Current selection context
125 private Set<Host> selectedHosts;
126 private Set<Device> selectedDevices;
127 private List<Intent> selectedIntents;
128 private int currentIntentIndex = -1;
129
130 /**
131 * Creates a new web-socket for serving data to GUI topology view.
132 */
133 public TopologyViewMessageHandler() {
134 super(ImmutableSet.of("topoStart", "topoStop",
135 "requestDetails",
136 "updateMeta",
137 "addHostIntent",
138 "addMultiSourceIntent",
139 "requestRelatedIntents",
140 "requestNextRelatedIntent",
141 "requestPrevRelatedIntent",
142 "requestSelectedIntentTraffic",
143 "requestAllTraffic",
144 "requestDeviceLinkFlows",
145 "cancelTraffic",
146 "requestSummary",
147 "cancelSummary",
148 "equalizeMasters"
149 ));
150 }
151
152 @Override
153 public void init(UiConnection connection, ServiceDirectory directory) {
154 super.init(connection, directory);
155 intentFilter = new TopologyViewIntentFilter(intentService, deviceService,
156 hostService, linkService);
157 appId = directory.get(CoreService.class).registerApplication(APP_ID);
158 }
159
160 @Override
161 public void destroy() {
162 cancelAllRequests();
163 super.destroy();
164 }
165
166 // Processes the specified event.
167 @Override
168 public void process(ObjectNode event) {
169 String type = string(event, "event", "unknown");
170 if (type.equals("requestDetails")) {
171 requestDetails(event);
172 } else if (type.equals("updateMeta")) {
173 updateMetaUi(event);
174
175 } else if (type.equals("addHostIntent")) {
176 createHostIntent(event);
177 } else if (type.equals("addMultiSourceIntent")) {
178 createMultiSourceIntent(event);
179
180 } else if (type.equals("requestRelatedIntents")) {
181 stopTrafficMonitoring();
182 requestRelatedIntents(event);
183
184 } else if (type.equals("requestNextRelatedIntent")) {
185 stopTrafficMonitoring();
186 requestAnotherRelatedIntent(event, +1);
187 } else if (type.equals("requestPrevRelatedIntent")) {
188 stopTrafficMonitoring();
189 requestAnotherRelatedIntent(event, -1);
190 } else if (type.equals("requestSelectedIntentTraffic")) {
191 requestSelectedIntentTraffic(event);
192 startTrafficMonitoring(event);
193
194 } else if (type.equals("requestAllTraffic")) {
195 requestAllTraffic(event);
196 startTrafficMonitoring(event);
197
198 } else if (type.equals("requestDeviceLinkFlows")) {
199 requestDeviceLinkFlows(event);
200 startTrafficMonitoring(event);
201
202 } else if (type.equals("cancelTraffic")) {
203 cancelTraffic(event);
204
205 } else if (type.equals("requestSummary")) {
206 requestSummary(event);
207 startSummaryMonitoring(event);
208 } else if (type.equals("cancelSummary")) {
209 stopSummaryMonitoring();
210
211 } else if (type.equals("equalizeMasters")) {
212 equalizeMasters(event);
213
214 } else if (type.equals("topoStart")) {
215 sendAllInitialData();
216 } else if (type.equals("topoStop")) {
217 cancelAllRequests();
218 }
219 }
220
221 // Sends the specified data to the client.
222 protected synchronized void sendMessage(ObjectNode data) {
223 UiConnection connection = connection();
224 if (connection != null) {
225 connection.sendMessage(data);
226 }
227 }
228
229 private void sendAllInitialData() {
230 addListeners();
231 sendAllInstances(null);
232 sendAllDevices();
233 sendAllLinks();
234 sendAllHosts();
235
236 }
237
238 private void cancelAllRequests() {
239 stopSummaryMonitoring();
240 stopTrafficMonitoring();
241 removeListeners();
242 }
243
244 // Sends all controller nodes to the client as node-added messages.
245 private void sendAllInstances(String messageType) {
246 List<ControllerNode> nodes = new ArrayList<>(clusterService.getNodes());
247 Collections.sort(nodes, NODE_COMPARATOR);
248 for (ControllerNode node : nodes) {
249 sendMessage(instanceMessage(new ClusterEvent(INSTANCE_ADDED, node),
250 messageType));
251 }
252 }
253
254 // Sends all devices to the client as device-added messages.
255 private void sendAllDevices() {
256 // Send optical first, others later for layered rendering
257 for (Device device : deviceService.getDevices()) {
258 if (device.type() == Device.Type.ROADM) {
259 sendMessage(deviceMessage(new DeviceEvent(DEVICE_ADDED, device)));
260 }
261 }
262 for (Device device : deviceService.getDevices()) {
263 if (device.type() != Device.Type.ROADM) {
264 sendMessage(deviceMessage(new DeviceEvent(DEVICE_ADDED, device)));
265 }
266 }
267 }
268
269 // Sends all links to the client as link-added messages.
270 private void sendAllLinks() {
271 // Send optical first, others later for layered rendering
272 for (Link link : linkService.getLinks()) {
273 if (link.type() == Link.Type.OPTICAL) {
274 sendMessage(linkMessage(new LinkEvent(LINK_ADDED, link)));
275 }
276 }
277 for (Link link : linkService.getLinks()) {
278 if (link.type() != Link.Type.OPTICAL) {
279 sendMessage(linkMessage(new LinkEvent(LINK_ADDED, link)));
280 }
281 }
282 }
283
284 // Sends all hosts to the client as host-added messages.
285 private void sendAllHosts() {
286 for (Host host : hostService.getHosts()) {
287 sendMessage(hostMessage(new HostEvent(HOST_ADDED, host)));
288 }
289 }
290
291 // Sends back device or host details.
292 private void requestDetails(ObjectNode event) {
293 ObjectNode payload = payload(event);
294 String type = string(payload, "class", "unknown");
295 long sid = number(event, "sid");
296
297 if (type.equals("device")) {
298 sendMessage(deviceDetails(deviceId(string(payload, "id")), sid));
299 } else if (type.equals("host")) {
300 sendMessage(hostDetails(hostId(string(payload, "id")), sid));
301 }
302 }
303
304
305 // Creates host-to-host intent.
306 private void createHostIntent(ObjectNode event) {
307 ObjectNode payload = payload(event);
308 long id = number(event, "sid");
309 // TODO: add protection against device ids and non-existent hosts.
310 HostId one = hostId(string(payload, "one"));
311 HostId two = hostId(string(payload, "two"));
312
313 HostToHostIntent intent =
314 new HostToHostIntent(appId, one, two,
315 DefaultTrafficSelector.builder().build(),
316 DefaultTrafficTreatment.builder().build());
317
318 intentService.submit(intent);
319 startMonitoringIntent(event, intent);
320 }
321
322 // Creates multi-source-to-single-dest intent.
323 private void createMultiSourceIntent(ObjectNode event) {
324 ObjectNode payload = payload(event);
325 long id = number(event, "sid");
326 // TODO: add protection against device ids and non-existent hosts.
327 Set<HostId> src = getHostIds((ArrayNode) payload.path("src"));
328 HostId dst = hostId(string(payload, "dst"));
329 Host dstHost = hostService.getHost(dst);
330
331 Set<ConnectPoint> ingressPoints = getHostLocations(src);
332
333 // FIXME: clearly, this is not enough
334 TrafficSelector selector = DefaultTrafficSelector.builder()
335 .matchEthDst(dstHost.mac()).build();
336 TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
337
338 MultiPointToSinglePointIntent intent =
339 new MultiPointToSinglePointIntent(appId, selector, treatment,
340 ingressPoints, dstHost.location());
341
342 intentService.submit(intent);
343 startMonitoringIntent(event, intent);
344 }
345
346
347 private synchronized void startMonitoringIntent(ObjectNode event, Intent intent) {
348 selectedHosts = new HashSet<>();
349 selectedDevices = new HashSet<>();
350 selectedIntents = new ArrayList<>();
351 selectedIntents.add(intent);
352 currentIntentIndex = -1;
353 requestAnotherRelatedIntent(event, +1);
354 requestSelectedIntentTraffic(event);
355 }
356
357
358 private Set<ConnectPoint> getHostLocations(Set<HostId> hostIds) {
359 Set<ConnectPoint> points = new HashSet<>();
360 for (HostId hostId : hostIds) {
361 points.add(getHostLocation(hostId));
362 }
363 return points;
364 }
365
366 private HostLocation getHostLocation(HostId hostId) {
367 return hostService.getHost(hostId).location();
368 }
369
370 // Produces a list of host ids from the specified JSON array.
371 private Set<HostId> getHostIds(ArrayNode ids) {
372 Set<HostId> hostIds = new HashSet<>();
373 for (JsonNode id : ids) {
374 hostIds.add(hostId(id.asText()));
375 }
376 return hostIds;
377 }
378
379
380 private synchronized long startTrafficMonitoring(ObjectNode event) {
381 stopTrafficMonitoring();
382 trafficEvent = event;
383 trafficTask = new TrafficMonitor();
384 timer.schedule(trafficTask, TRAFFIC_FREQUENCY, TRAFFIC_FREQUENCY);
385 return number(event, "sid");
386 }
387
388 private synchronized void stopTrafficMonitoring() {
389 if (trafficTask != null) {
390 trafficTask.cancel();
391 trafficTask = null;
392 trafficEvent = null;
393 }
394 }
395
396 // Subscribes for host traffic messages.
397 private synchronized void requestAllTraffic(ObjectNode event) {
398 long sid = startTrafficMonitoring(event);
399 sendMessage(trafficSummaryMessage(sid));
400 }
401
402 private void requestDeviceLinkFlows(ObjectNode event) {
403 ObjectNode payload = payload(event);
404 long sid = startTrafficMonitoring(event);
405
406 // Get the set of selected hosts and their intents.
407 ArrayNode ids = (ArrayNode) payload.path("ids");
408 Set<Host> hosts = new HashSet<>();
409 Set<Device> devices = getDevices(ids);
410
411 // If there is a hover node, include it in the hosts and find intents.
412 String hover = string(payload, "hover");
413 if (!isNullOrEmpty(hover)) {
414 addHover(hosts, devices, hover);
415 }
416 sendMessage(flowSummaryMessage(sid, devices));
417 }
418
419
420 // Requests related intents message.
421 private synchronized void requestRelatedIntents(ObjectNode event) {
422 ObjectNode payload = payload(event);
423 if (!payload.has("ids")) {
424 return;
425 }
426
427 long sid = number(event, "sid");
428
429 // Cancel any other traffic monitoring mode.
430 stopTrafficMonitoring();
431
432 // Get the set of selected hosts and their intents.
433 ArrayNode ids = (ArrayNode) payload.path("ids");
434 selectedHosts = getHosts(ids);
435 selectedDevices = getDevices(ids);
436 selectedIntents = intentFilter.findPathIntents(selectedHosts, selectedDevices,
437 intentService.getIntents());
438 currentIntentIndex = -1;
439
440 if (haveSelectedIntents()) {
441 // Send a message to highlight all links of all monitored intents.
442 sendMessage(trafficMessage(sid, new TrafficClass("primary", selectedIntents)));
443 }
444
445 // FIXME: Re-introduce one the client click vs hover gesture stuff is sorted out.
446// String hover = string(payload, "hover");
447// if (!isNullOrEmpty(hover)) {
448// // If there is a hover node, include it in the selection and find intents.
449// processHoverExtendedSelection(sid, hover);
450// }
451 }
452
453 private boolean haveSelectedIntents() {
454 return selectedIntents != null && !selectedIntents.isEmpty();
455 }
456
457 // Processes the selection extended with hovered item to segregate items
458 // into primary (those including the hover) vs secondary highlights.
459 private void processHoverExtendedSelection(long sid, String hover) {
460 Set<Host> hoverSelHosts = new HashSet<>(selectedHosts);
461 Set<Device> hoverSelDevices = new HashSet<>(selectedDevices);
462 addHover(hoverSelHosts, hoverSelDevices, hover);
463
464 List<Intent> primary = selectedIntents == null ? new ArrayList<>() :
465 intentFilter.findPathIntents(hoverSelHosts, hoverSelDevices,
466 selectedIntents);
467 Set<Intent> secondary = new HashSet<>(selectedIntents);
468 secondary.removeAll(primary);
469
470 // Send a message to highlight all links of all monitored intents.
471 sendMessage(trafficMessage(sid, new TrafficClass("primary", primary),
472 new TrafficClass("secondary", secondary)));
473 }
474
475 // Requests next or previous related intent.
476 private void requestAnotherRelatedIntent(ObjectNode event, int offset) {
477 if (haveSelectedIntents()) {
478 currentIntentIndex = currentIntentIndex + offset;
479 if (currentIntentIndex < 0) {
480 currentIntentIndex = selectedIntents.size() - 1;
481 } else if (currentIntentIndex >= selectedIntents.size()) {
482 currentIntentIndex = 0;
483 }
484 sendSelectedIntent(event);
485 }
486 }
487
488 // Sends traffic information on the related intents with the currently
489 // selected intent highlighted.
490 private void sendSelectedIntent(ObjectNode event) {
491 Intent selectedIntent = selectedIntents.get(currentIntentIndex);
492 log.info("Requested next intent {}", selectedIntent.id());
493
494 Set<Intent> primary = new HashSet<>();
495 primary.add(selectedIntent);
496
497 Set<Intent> secondary = new HashSet<>(selectedIntents);
498 secondary.remove(selectedIntent);
499
500 // Send a message to highlight all links of the selected intent.
501 sendMessage(trafficMessage(number(event, "sid"),
502 new TrafficClass("primary", primary),
503 new TrafficClass("secondary", secondary)));
504 }
505
506 // Requests monitoring of traffic for the selected intent.
507 private void requestSelectedIntentTraffic(ObjectNode event) {
508 if (haveSelectedIntents()) {
509 if (currentIntentIndex < 0) {
510 currentIntentIndex = 0;
511 }
512 Intent selectedIntent = selectedIntents.get(currentIntentIndex);
513 log.info("Requested traffic for selected {}", selectedIntent.id());
514
515 Set<Intent> primary = new HashSet<>();
516 primary.add(selectedIntent);
517
518 // Send a message to highlight all links of the selected intent.
519 sendMessage(trafficMessage(number(event, "sid"),
520 new TrafficClass("primary", primary, true)));
521 }
522 }
523
524 // Cancels sending traffic messages.
525 private void cancelTraffic(ObjectNode event) {
526 selectedIntents = null;
527 sendMessage(trafficMessage(number(event, "sid")));
528 stopTrafficMonitoring();
529 }
530
531
532 private synchronized long startSummaryMonitoring(ObjectNode event) {
533 stopSummaryMonitoring();
534 summaryEvent = event;
535 summaryTask = new SummaryMonitor();
536 timer.schedule(summaryTask, SUMMARY_FREQUENCY, SUMMARY_FREQUENCY);
537 return number(event, "sid");
538 }
539
540 private synchronized void stopSummaryMonitoring() {
541 if (summaryEvent != null) {
542 summaryTask.cancel();
543 summaryTask = null;
544 summaryEvent = null;
545 }
546 }
547
548 // Subscribes for summary messages.
549 private synchronized void requestSummary(ObjectNode event) {
550 sendMessage(summmaryMessage(number(event, "sid")));
551 }
552
553
554 // Forces mastership role rebalancing.
555 private void equalizeMasters(ObjectNode event) {
556 directory.get(MastershipAdminService.class).balanceRoles();
557 }
558
559
560 // Adds all internal listeners.
561 private void addListeners() {
562 clusterService.addListener(clusterListener);
563 mastershipService.addListener(mastershipListener);
564 deviceService.addListener(deviceListener);
565 linkService.addListener(linkListener);
566 hostService.addListener(hostListener);
567 intentService.addListener(intentListener);
568 flowService.addListener(flowListener);
569 }
570
571 // Removes all internal listeners.
572 private synchronized void removeListeners() {
573 if (!listenersRemoved) {
574 listenersRemoved = true;
575 clusterService.removeListener(clusterListener);
576 mastershipService.removeListener(mastershipListener);
577 deviceService.removeListener(deviceListener);
578 linkService.removeListener(linkListener);
579 hostService.removeListener(hostListener);
580 intentService.removeListener(intentListener);
581 flowService.removeListener(flowListener);
582 }
583 }
584
585 // Cluster event listener.
586 private class InternalClusterListener implements ClusterEventListener {
587 @Override
588 public void event(ClusterEvent event) {
589 sendMessage(instanceMessage(event, null));
590 }
591 }
592
593 // Mastership change listener
594 private class InternalMastershipListener implements MastershipListener {
595 @Override
596 public void event(MastershipEvent event) {
597 sendAllInstances("updateInstance");
598 Device device = deviceService.getDevice(event.subject());
599 sendMessage(deviceMessage(new DeviceEvent(DEVICE_UPDATED, device)));
600 }
601 }
602
603 // Device event listener.
604 private class InternalDeviceListener implements DeviceListener {
605 @Override
606 public void event(DeviceEvent event) {
607 sendMessage(deviceMessage(event));
608 eventAccummulator.add(event);
609 }
610 }
611
612 // Link event listener.
613 private class InternalLinkListener implements LinkListener {
614 @Override
615 public void event(LinkEvent event) {
616 sendMessage(linkMessage(event));
617 eventAccummulator.add(event);
618 }
619 }
620
621 // Host event listener.
622 private class InternalHostListener implements HostListener {
623 @Override
624 public void event(HostEvent event) {
625 sendMessage(hostMessage(event));
626 eventAccummulator.add(event);
627 }
628 }
629
630 // Intent event listener.
631 private class InternalIntentListener implements IntentListener {
632 @Override
633 public void event(IntentEvent event) {
634 if (trafficEvent != null) {
635 requestSelectedIntentTraffic(trafficEvent);
636 }
637 eventAccummulator.add(event);
638 }
639 }
640
641 // Intent event listener.
642 private class InternalFlowListener implements FlowRuleListener {
643 @Override
644 public void event(FlowRuleEvent event) {
645 eventAccummulator.add(event);
646 }
647 }
648
649 // Periodic update of the traffic information
650 private class TrafficMonitor extends TimerTask {
651 @Override
652 public void run() {
653 try {
654 if (trafficEvent != null) {
655 String type = string(trafficEvent, "event", "unknown");
656 if (type.equals("requestAllTraffic")) {
657 requestAllTraffic(trafficEvent);
658 } else if (type.equals("requestDeviceLinkFlows")) {
659 requestDeviceLinkFlows(trafficEvent);
660 } else if (type.equals("requestSelectedIntentTraffic")) {
661 requestSelectedIntentTraffic(trafficEvent);
662 }
663 }
664 } catch (Exception e) {
665 log.warn("Unable to handle traffic request due to {}", e.getMessage());
666 log.warn("Boom!", e);
667 }
668 }
669 }
670
671 // Periodic update of the summary information
672 private class SummaryMonitor extends TimerTask {
673 @Override
674 public void run() {
675 try {
676 if (summaryEvent != null) {
677 requestSummary(summaryEvent);
678 }
679 } catch (Exception e) {
680 log.warn("Unable to handle summary request due to {}", e.getMessage());
681 log.warn("Boom!", e);
682 }
683 }
684 }
685
686 // Accumulates events to drive methodic update of the summary pane.
687 private class InternalEventAccummulator extends AbstractAccumulator<Event> {
688 protected InternalEventAccummulator() {
689 super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
690 }
691
692 @Override
693 public void processItems(List<Event> items) {
694 try {
695 if (summaryEvent != null) {
696 sendMessage(summmaryMessage(0));
697 }
698 } catch (Exception e) {
699 log.warn("Unable to handle summary request due to {}", e.getMessage());
700 log.debug("Boom!", e);
701 }
702 }
703 }
704}
705