blob: 5710aced74c8e8fce79c6530555b4f962c4e66f8 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.net.intent.impl;
tom95329eb2014-10-06 08:40:06 -070017
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080018import com.google.common.collect.HashMultimap;
19import com.google.common.collect.Lists;
Thomas Vachuskac46af202015-06-03 16:43:27 -070020import com.google.common.collect.Maps;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080021import com.google.common.collect.SetMultimap;
tom95329eb2014-10-06 08:40:06 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Brian O'Connor69d6ac72015-05-29 16:24:06 -070027import org.apache.felix.scr.annotations.ReferencePolicy;
tom95329eb2014-10-06 08:40:06 -070028import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.core.ApplicationId;
30import org.onosproject.event.Event;
Brian O'Connorb5dcc512015-03-24 17:28:00 -070031import org.onosproject.net.DeviceId;
32import org.onosproject.net.ElementId;
33import org.onosproject.net.HostId;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.net.Link;
35import org.onosproject.net.LinkKey;
36import org.onosproject.net.NetworkResource;
Brian O'Connorb5dcc512015-03-24 17:28:00 -070037import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.host.HostEvent;
41import org.onosproject.net.host.HostListener;
42import org.onosproject.net.host.HostService;
Brian O'Connor69d6ac72015-05-29 16:24:06 -070043import org.onosproject.net.intent.Intent;
Thomas Vachuskac46af202015-06-03 16:43:27 -070044import org.onosproject.net.intent.IntentData;
Brian O'Connorabafb502014-12-02 22:26:20 -080045import org.onosproject.net.intent.IntentService;
Ray Milkeyf9af43c2015-02-09 16:45:48 -080046import org.onosproject.net.intent.Key;
Brian O'Connor69d6ac72015-05-29 16:24:06 -070047import org.onosproject.net.intent.PartitionEvent;
48import org.onosproject.net.intent.PartitionEventListener;
49import org.onosproject.net.intent.PartitionService;
Brian O'Connorabafb502014-12-02 22:26:20 -080050import org.onosproject.net.link.LinkEvent;
Brian O'Connor6de2e202015-05-21 14:30:41 -070051import org.onosproject.net.resource.link.LinkResourceEvent;
52import org.onosproject.net.resource.link.LinkResourceListener;
53import org.onosproject.net.resource.link.LinkResourceService;
Brian O'Connorabafb502014-12-02 22:26:20 -080054import org.onosproject.net.topology.TopologyEvent;
55import org.onosproject.net.topology.TopologyListener;
56import org.onosproject.net.topology.TopologyService;
tom95329eb2014-10-06 08:40:06 -070057import org.slf4j.Logger;
58
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080059import java.util.Collection;
Brian O'Connorb5dcc512015-03-24 17:28:00 -070060import java.util.Collections;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080061import java.util.HashSet;
Thomas Vachuskac46af202015-06-03 16:43:27 -070062import java.util.List;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080063import java.util.Set;
Thomas Vachuskac46af202015-06-03 16:43:27 -070064import java.util.concurrent.ConcurrentMap;
Brian O'Connor86f6f7f2014-12-01 17:02:45 -080065import java.util.concurrent.ExecutorService;
Brian O'Connor69d6ac72015-05-29 16:24:06 -070066import java.util.concurrent.Executors;
67import java.util.concurrent.ScheduledExecutorService;
68import java.util.concurrent.TimeUnit;
69import java.util.concurrent.atomic.AtomicBoolean;
tom95329eb2014-10-06 08:40:06 -070070
71import static com.google.common.base.Preconditions.checkArgument;
72import static com.google.common.base.Preconditions.checkNotNull;
73import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080075import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskac46af202015-06-03 16:43:27 -070076import static org.onlab.util.Tools.isNullOrEmpty;
Brian O'Connorabafb502014-12-02 22:26:20 -080077import static org.onosproject.net.LinkKey.linkKey;
Thomas Vachuskac46af202015-06-03 16:43:27 -070078import static org.onosproject.net.intent.IntentState.INSTALLED;
Brian O'Connor690fd1c2015-06-04 19:50:33 -070079import static org.onosproject.net.intent.IntentState.INSTALLING;
Brian O'Connorabafb502014-12-02 22:26:20 -080080import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
81import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
tom95329eb2014-10-06 08:40:06 -070082import static org.slf4j.LoggerFactory.getLogger;
83
84/**
85 * Entity responsible for tracking installed flows and for monitoring topology
86 * events to determine what flows are affected by topology changes.
87 */
Ray Milkeye97ede92014-11-20 10:43:12 -080088@Component(immediate = true)
tom95329eb2014-10-06 08:40:06 -070089@Service
tom85258ee2014-10-07 00:10:02 -070090public class ObjectiveTracker implements ObjectiveTrackerService {
tom95329eb2014-10-06 08:40:06 -070091
92 private final Logger log = getLogger(getClass());
93
Thomas Vachuskac46af202015-06-03 16:43:27 -070094 private final ConcurrentMap<Key, Intent> intents = Maps.newConcurrentMap();
95
Ray Milkeyf9af43c2015-02-09 16:45:48 -080096 private final SetMultimap<LinkKey, Key> intentsByLink =
Brian O'Connor64a0369d2015-02-20 22:02:59 -080097 //TODO this could be slow as a point of synchronization
Ray Milkeyf9af43c2015-02-09 16:45:48 -080098 synchronizedSetMultimap(HashMultimap.<LinkKey, Key>create());
tom95329eb2014-10-06 08:40:06 -070099
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700100 private final SetMultimap<ElementId, Key> intentsByDevice =
101 synchronizedSetMultimap(HashMultimap.<ElementId, Key>create());
102
tom95329eb2014-10-06 08:40:06 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected TopologyService topologyService;
105
Ray Milkeye97ede92014-11-20 10:43:12 -0800106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected LinkResourceService resourceManager;
108
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected DeviceService deviceService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected HostService hostService;
114
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700115 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
116 policy = ReferencePolicy.DYNAMIC)
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800117 protected IntentService intentService;
118
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected PartitionService partitionService;
121
tom95329eb2014-10-06 08:40:06 -0700122 private ExecutorService executorService =
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700123 newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700124 private ScheduledExecutorService executor = Executors
125 .newScheduledThreadPool(1);
tom95329eb2014-10-06 08:40:06 -0700126
127 private TopologyListener listener = new InternalTopologyListener();
Ray Milkeye97ede92014-11-20 10:43:12 -0800128 private LinkResourceListener linkResourceListener =
129 new InternalLinkResourceListener();
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700130 private DeviceListener deviceListener = new InternalDeviceListener();
131 private HostListener hostListener = new InternalHostListener();
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700132 private PartitionEventListener partitionListener = new InternalPartitionListener();
tom95329eb2014-10-06 08:40:06 -0700133 private TopologyChangeDelegate delegate;
134
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700135 protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
136
tom95329eb2014-10-06 08:40:06 -0700137 @Activate
138 public void activate() {
139 topologyService.addListener(listener);
Ray Milkeye97ede92014-11-20 10:43:12 -0800140 resourceManager.addListener(linkResourceListener);
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700141 deviceService.addListener(deviceListener);
142 hostService.addListener(hostListener);
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700143 partitionService.addListener(partitionListener);
tom95329eb2014-10-06 08:40:06 -0700144 log.info("Started");
145 }
146
147 @Deactivate
148 public void deactivate() {
149 topologyService.removeListener(listener);
Ray Milkeye97ede92014-11-20 10:43:12 -0800150 resourceManager.removeListener(linkResourceListener);
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700151 deviceService.removeListener(deviceListener);
152 hostService.removeListener(hostListener);
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700153 partitionService.removeListener(partitionListener);
tom95329eb2014-10-06 08:40:06 -0700154 log.info("Stopped");
155 }
156
Brian O'Connor5d55ed42014-12-01 18:27:47 -0800157 protected void bindIntentService(IntentService service) {
158 if (intentService == null) {
159 intentService = service;
160 }
161 }
162
163 protected void unbindIntentService(IntentService service) {
164 if (intentService == service) {
165 intentService = null;
166 }
167 }
168
tom95329eb2014-10-06 08:40:06 -0700169 @Override
170 public void setDelegate(TopologyChangeDelegate delegate) {
171 checkNotNull(delegate, "Delegate cannot be null");
172 checkArgument(this.delegate == null || this.delegate == delegate,
173 "Another delegate already set");
174 this.delegate = delegate;
175 }
176
177 @Override
178 public void unsetDelegate(TopologyChangeDelegate delegate) {
179 checkArgument(this.delegate == delegate, "Not the current delegate");
180 this.delegate = null;
181 }
182
183 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800184 public void addTrackedResources(Key intentKey,
Thomas Vachuskab97cf282014-10-20 23:31:12 -0700185 Collection<NetworkResource> resources) {
186 for (NetworkResource resource : resources) {
187 if (resource instanceof Link) {
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800188 intentsByLink.put(linkKey((Link) resource), intentKey);
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700189 } else if (resource instanceof ElementId) {
190 intentsByDevice.put((ElementId) resource, intentKey);
Thomas Vachuskab97cf282014-10-20 23:31:12 -0700191 }
tom95329eb2014-10-06 08:40:06 -0700192 }
193 }
194
195 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800196 public void removeTrackedResources(Key intentKey,
Thomas Vachuskab97cf282014-10-20 23:31:12 -0700197 Collection<NetworkResource> resources) {
198 for (NetworkResource resource : resources) {
199 if (resource instanceof Link) {
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800200 intentsByLink.remove(linkKey((Link) resource), intentKey);
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700201 } else if (resource instanceof ElementId) {
Sho SHIMIZUd82a4e62015-09-09 14:53:46 -0700202 intentsByDevice.remove(resource, intentKey);
Thomas Vachuskab97cf282014-10-20 23:31:12 -0700203 }
tom95329eb2014-10-06 08:40:06 -0700204 }
205 }
206
Thomas Vachuskac46af202015-06-03 16:43:27 -0700207 @Override
208 public void trackIntent(IntentData intentData) {
209
210 //NOTE: This will be called for intents that are being added to the store
211 // locally (i.e. every intent update)
212
213 Key key = intentData.key();
214 Intent intent = intentData.intent();
215 boolean isLocal = intentService.isLocal(key);
Brian O'Connor690fd1c2015-06-04 19:50:33 -0700216 boolean isInstalled = intentData.state() == INSTALLING ||
217 intentData.state() == INSTALLED;
Thomas Vachuskac46af202015-06-03 16:43:27 -0700218 List<Intent> installables = intentData.installables();
219
220 if (log.isTraceEnabled()) {
221 log.trace("intent {}, old: {}, new: {}, installableCount: {}, resourceCount: {}",
222 key,
223 intentsByDevice.values().contains(key),
Brian O'Connor690fd1c2015-06-04 19:50:33 -0700224 isLocal && isInstalled,
Thomas Vachuskac46af202015-06-03 16:43:27 -0700225 installables.size(),
226 intent.resources().size() +
227 installables.stream()
228 .mapToLong(i -> i.resources().size()).sum());
229 }
230
231 if (isNullOrEmpty(installables) && intentData.state() == INSTALLED) {
232 log.warn("Intent {} is INSTALLED with no installables", key);
233 }
234
Brian O'Connor690fd1c2015-06-04 19:50:33 -0700235 // FIXME Intents will be added 3 times (once directly using addTracked,
236 // then when installing and when installed)
237 if (isLocal && isInstalled) {
Thomas Vachuskac46af202015-06-03 16:43:27 -0700238 addTrackedResources(key, intent.resources());
239 for (Intent installable : installables) {
240 addTrackedResources(key, installable.resources());
241 }
242 // FIXME check all resources against current topo service(s); recompile if necessary
243 } else {
244 removeTrackedResources(key, intent.resources());
245 for (Intent installable : installables) {
246 removeTrackedResources(key, installable.resources());
247 }
248 }
249 }
250
tom95329eb2014-10-06 08:40:06 -0700251 // Internal re-actor to topology change events.
252 private class InternalTopologyListener implements TopologyListener {
253 @Override
254 public void event(TopologyEvent event) {
255 executorService.execute(new TopologyChangeHandler(event));
256 }
257 }
258
259 // Re-dispatcher of topology change events.
260 private class TopologyChangeHandler implements Runnable {
261
262 private final TopologyEvent event;
263
264 TopologyChangeHandler(TopologyEvent event) {
265 this.event = event;
266 }
267
268 @Override
269 public void run() {
Thomas Vachuska7b652ad2014-10-30 14:10:51 -0700270 // If there is no delegate, why bother? Just bail.
271 if (delegate == null) {
272 return;
273 }
274
Ray Milkey7c44c052014-12-05 10:34:54 -0800275 if (event.reasons() == null || event.reasons().isEmpty()) {
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700276 delegate.triggerCompile(Collections.emptySet(), true);
tom85258ee2014-10-07 00:10:02 -0700277
tom95329eb2014-10-06 08:40:06 -0700278 } else {
Jonathan Hart96c5a4a2015-07-31 14:23:33 -0700279 Set<Key> intentsToRecompile = new HashSet<>();
280 boolean dontRecompileAllFailedIntents = true;
tom85258ee2014-10-07 00:10:02 -0700281
282 // Scan through the list of reasons and keep accruing all
283 // intents that need to be recompiled.
tom95329eb2014-10-06 08:40:06 -0700284 for (Event reason : event.reasons()) {
285 if (reason instanceof LinkEvent) {
286 LinkEvent linkEvent = (LinkEvent) reason;
Jonathan Hart96c5a4a2015-07-31 14:23:33 -0700287 final LinkKey linkKey = linkKey(linkEvent.subject());
288 synchronized (intentsByLink) {
289 Set<Key> intentKeys = intentsByLink.get(linkKey);
290 log.debug("recompile triggered by LinkEvent {} ({}) for {}",
291 linkKey, linkEvent.type(), intentKeys);
292 intentsToRecompile.addAll(intentKeys);
tom85258ee2014-10-07 00:10:02 -0700293 }
Jonathan Hart96c5a4a2015-07-31 14:23:33 -0700294 dontRecompileAllFailedIntents = dontRecompileAllFailedIntents &&
Praseed Balakrishnan00dd1f92014-11-19 17:12:36 -0800295 (linkEvent.type() == LINK_REMOVED ||
296 (linkEvent.type() == LINK_UPDATED &&
297 linkEvent.subject().isDurable()));
tom95329eb2014-10-06 08:40:06 -0700298 }
299 }
Jonathan Hart96c5a4a2015-07-31 14:23:33 -0700300 delegate.triggerCompile(intentsToRecompile, !dontRecompileAllFailedIntents);
tom95329eb2014-10-06 08:40:06 -0700301 }
302 }
303 }
304
Ray Milkeye97ede92014-11-20 10:43:12 -0800305 /**
306 * Internal re-actor to resource available events.
307 */
308 private class InternalLinkResourceListener implements LinkResourceListener {
309 @Override
310 public void event(LinkResourceEvent event) {
311 executorService.execute(new ResourceAvailableHandler(event));
312 }
313 }
314
315 /*
316 * Re-dispatcher of resource available events.
317 */
318 private class ResourceAvailableHandler implements Runnable {
319
320 private final LinkResourceEvent event;
321
322 ResourceAvailableHandler(LinkResourceEvent event) {
323 this.event = event;
324 }
325
326 @Override
327 public void run() {
328 // If there is no delegate, why bother? Just bail.
329 if (delegate == null) {
330 return;
331 }
332
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700333 delegate.triggerCompile(Collections.emptySet(), true);
Ray Milkeye97ede92014-11-20 10:43:12 -0800334 }
335 }
336
Brian O'Connor72a034c2014-11-26 18:24:23 -0800337 //TODO consider adding flow rule event tracking
Ray Milkeye97ede92014-11-20 10:43:12 -0800338
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800339 private void updateTrackedResources(ApplicationId appId, boolean track) {
Brian O'Connor5d55ed42014-12-01 18:27:47 -0800340 if (intentService == null) {
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700341 log.warn("Intent service is not bound yet");
Brian O'Connor5d55ed42014-12-01 18:27:47 -0800342 return;
343 }
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800344 intentService.getIntents().forEach(intent -> {
345 if (intent.appId().equals(appId)) {
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800346 Key key = intent.key();
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800347 Collection<NetworkResource> resources = Lists.newArrayList();
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800348 intentService.getInstallableIntents(key).stream()
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800349 .map(installable -> installable.resources())
350 .forEach(resources::addAll);
351 if (track) {
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800352 addTrackedResources(key, resources);
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800353 } else {
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800354 removeTrackedResources(key, resources);
Brian O'Connor86f6f7f2014-12-01 17:02:45 -0800355 }
356 }
357 });
358 }
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700359
360 /*
361 * Re-dispatcher of device and host events.
362 */
363 private class DeviceAvailabilityHandler implements Runnable {
364
365 private final ElementId id;
366 private final boolean available;
367
368 DeviceAvailabilityHandler(ElementId id, boolean available) {
369 this.id = checkNotNull(id);
370 this.available = available;
371 }
372
373 @Override
374 public void run() {
375 // If there is no delegate, why bother? Just bail.
376 if (delegate == null) {
377 return;
378 }
379
380 // TODO should we recompile on available==true?
381 delegate.triggerCompile(intentsByDevice.get(id), available);
382 }
383 }
384
385
386 private class InternalDeviceListener implements DeviceListener {
387 @Override
388 public void event(DeviceEvent event) {
389 DeviceEvent.Type type = event.type();
Jonathan Hart94470fe2015-07-31 11:41:10 -0700390 switch (type) {
391 case DEVICE_ADDED:
392 case DEVICE_AVAILABILITY_CHANGED:
393 case DEVICE_REMOVED:
394 case DEVICE_SUSPENDED:
395 case DEVICE_UPDATED:
396 DeviceId id = event.subject().id();
397 // TODO we need to check whether AVAILABILITY_CHANGED means up or down
398 boolean available = (type == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
399 type == DeviceEvent.Type.DEVICE_ADDED ||
400 type == DeviceEvent.Type.DEVICE_UPDATED);
401 executorService.execute(new DeviceAvailabilityHandler(id, available));
402 break;
403 case PORT_ADDED:
404 case PORT_REMOVED:
405 case PORT_UPDATED:
406 case PORT_STATS_UPDATED:
407 default:
408 // Don't handle port events for now
409 break;
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700410 }
Brian O'Connorb5dcc512015-03-24 17:28:00 -0700411 }
412 }
413
414 private class InternalHostListener implements HostListener {
415 @Override
416 public void event(HostEvent event) {
417 HostId id = event.subject().id();
418 HostEvent.Type type = event.type();
419 boolean available = (type == HostEvent.Type.HOST_ADDED);
420 executorService.execute(new DeviceAvailabilityHandler(id, available));
421 }
422 }
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700423
424 protected void doIntentUpdate() {
425 updateScheduled.set(false);
426 if (intentService == null) {
427 log.warn("Intent service is not bound yet");
428 return;
429 }
430 try {
431 //FIXME very inefficient
Thomas Vachuskac46af202015-06-03 16:43:27 -0700432 for (IntentData intentData : intentService.getIntentData()) {
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700433 try {
Thomas Vachuskac46af202015-06-03 16:43:27 -0700434 trackIntent(intentData);
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700435 } catch (NullPointerException npe) {
Thomas Vachuskac46af202015-06-03 16:43:27 -0700436 log.warn("intent error {}", intentData.key(), npe);
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700437 }
438 }
439 } catch (Exception e) {
440 log.warn("Exception caught during update task", e);
441 }
442 }
443
444 private void scheduleIntentUpdate(int afterDelaySec) {
445 if (updateScheduled.compareAndSet(false, true)) {
446 executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
447 }
448 }
449
450 private final class InternalPartitionListener implements PartitionEventListener {
451 @Override
452 public void event(PartitionEvent event) {
Brian O'Connor3057f212015-05-29 18:22:18 -0700453 log.debug("got message {}", event.subject());
Brian O'Connor69d6ac72015-05-29 16:24:06 -0700454 scheduleIntentUpdate(1);
455 }
456 }
tom95329eb2014-10-06 08:40:06 -0700457}