Fixed objective tracker not to do extra work so eagerly.
Added ability to see availability status as part of device events (for availability events)
Change-Id: I4a3476e203459ed72deee45f0a24e4b4373bd819
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceEvent.java b/core/api/src/main/java/org/onosproject/net/device/DeviceEvent.java
index ade1cbc..baf3704 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceEvent.java
@@ -20,7 +20,10 @@
import org.onosproject.net.Device;
import org.onosproject.net.Port;
+import java.util.Optional;
+
import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
/**
* Describes infrastructure device event.
@@ -28,6 +31,7 @@
public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
private final Port port;
+ private final boolean isAvailable;
/**
* Type of device events.
@@ -102,6 +106,20 @@
public DeviceEvent(Type type, Device device, Port port) {
super(type, device);
this.port = port;
+ this.isAvailable = false;
+ }
+
+ /**
+ * Creates an event for change of device availability for the given device
+ * and the current time.
+ *
+ * @param device event device subject
+ * @param isAvailable true if device became available; false otherwise
+ */
+ public DeviceEvent(Device device, boolean isAvailable) {
+ super(DEVICE_AVAILABILITY_CHANGED, device);
+ this.port = null;
+ this.isAvailable = isAvailable;
}
/**
@@ -115,6 +133,7 @@
public DeviceEvent(Type type, Device device, Port port, long time) {
super(type, device, time);
this.port = port;
+ this.isAvailable = false;
}
/**
@@ -126,6 +145,15 @@
return port;
}
+ /**
+ * Indicates whether device became available or unavailable.
+ *
+ * @return if present, true indicates device came online; false if device went offline
+ */
+ public Optional<Boolean> isAvailable() {
+ return type() == DEVICE_AVAILABILITY_CHANGED ? Optional.of(isAvailable) : Optional.empty();
+ }
+
@Override
public String toString() {
if (port == null) {
@@ -137,5 +165,5 @@
.add("subject", subject())
.add("port", port)
.toString();
- }
+ }
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index 45a2af4..a63cef5 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -16,8 +16,8 @@
package org.onosproject.net.intent.impl;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,8 +25,9 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onosproject.event.Event;
-import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.HostId;
import org.onosproject.net.Link;
@@ -34,18 +35,17 @@
import org.onosproject.net.NetworkResource;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
-import org.onosproject.net.intent.IntentService;
-import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.IntentService;
+import org.onosproject.net.intent.Key;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.ResourceEvent;
import org.onosproject.net.resource.ResourceListener;
@@ -57,26 +57,22 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.isNullOrEmpty;
import static org.onosproject.net.LinkKey.linkKey;
import static org.onosproject.net.intent.IntentState.INSTALLED;
import static org.onosproject.net.intent.IntentState.INSTALLING;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -86,14 +82,16 @@
@Component(immediate = true)
@Service
public class ObjectiveTracker implements ObjectiveTrackerService {
-
private final Logger log = getLogger(getClass());
+ //TODO make this configurable via component config
+ private static final long RECOMPILE_ALL_DELAY = 25; //ms
+
private final SetMultimap<LinkKey, Key> intentsByLink =
//TODO this could be slow as a point of synchronization
synchronizedSetMultimap(HashMultimap.<LinkKey, Key>create());
- private final SetMultimap<ElementId, Key> intentsByDevice =
+ private final SetMultimap<ElementId, Key> intentsByElement =
synchronizedSetMultimap(HashMultimap.<ElementId, Key>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -109,31 +107,28 @@
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
- policy = ReferencePolicy.DYNAMIC)
+ policy = ReferencePolicy.DYNAMIC)
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentPartitionService partitionService;
- private ExecutorService executorService =
- newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
- private ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(1);
+ private final Timer timer = new Timer("onos/intent-objective-tracker", true);
+ private final Accumulator<WorkMessage> workMessageAccumulator = new InternalWorkAccumulator();
private TopologyListener listener = new InternalTopologyListener();
private ResourceListener resourceListener = new InternalResourceListener();
- private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
private IntentPartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
+ protected final AtomicBoolean recompileAllScheduled = new AtomicBoolean(false);
@Activate
public void activate() {
topologyService.addListener(listener);
resourceService.addListener(resourceListener);
- deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
partitionService.addListener(partitionListener);
log.info("Started");
@@ -143,7 +138,6 @@
public void deactivate() {
topologyService.removeListener(listener);
resourceService.removeListener(resourceListener);
- deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
partitionService.removeListener(partitionListener);
log.info("Stopped");
@@ -153,7 +147,7 @@
if (intentService == null) {
intentService = service;
}
- }
+ }
protected void unbindIntentService(IntentService service) {
if (intentService == service) {
@@ -182,7 +176,7 @@
if (resource instanceof Link) {
intentsByLink.put(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
- intentsByDevice.put((ElementId) resource, intentKey);
+ intentsByElement.put((ElementId) resource, intentKey);
}
}
}
@@ -194,7 +188,7 @@
if (resource instanceof Link) {
intentsByLink.remove(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
- intentsByDevice.remove(resource, intentKey);
+ intentsByElement.remove(resource, intentKey);
}
}
}
@@ -209,18 +203,18 @@
Intent intent = intentData.intent();
boolean isLocal = intentService.isLocal(key);
boolean isInstalled = intentData.state() == INSTALLING ||
- intentData.state() == INSTALLED;
+ intentData.state() == INSTALLED;
List<Intent> installables = intentData.installables();
if (log.isTraceEnabled()) {
log.trace("intent {}, old: {}, new: {}, installableCount: {}, resourceCount: {}",
key,
- intentsByDevice.values().contains(key),
+ intentsByElement.values().contains(key),
isLocal && isInstalled,
installables.size(),
intent.resources().size() +
- installables.stream()
- .mapToLong(i -> i.resources().size()).sum());
+ installables.stream()
+ .mapToLong(i -> i.resources().size()).sum());
}
if (isNullOrEmpty(installables) && intentData.state() == INSTALLED) {
@@ -247,52 +241,39 @@
private class InternalTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
- executorService.execute(new TopologyChangeHandler(event));
- }
- }
-
- // Re-dispatcher of topology change events.
- private class TopologyChangeHandler implements Runnable {
-
- private final TopologyEvent event;
-
- TopologyChangeHandler(TopologyEvent event) {
- this.event = event;
- }
-
- @Override
- public void run() {
- // If there is no delegate, why bother? Just bail.
- if (delegate == null) {
- return;
- }
+ boolean recompileAllFailedIntents = false;
if (event.reasons() == null || event.reasons().isEmpty()) {
- delegate.triggerCompile(Collections.emptySet(), true);
+ recompileAllFailedIntents = true;
} else {
- Set<Key> intentsToRecompile = new HashSet<>();
- boolean dontRecompileAllFailedIntents = true;
-
// Scan through the list of reasons and keep accruing all
// intents that need to be recompiled.
for (Event reason : event.reasons()) {
+ WorkMessage wi = new WorkMessage();
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
- final LinkKey linkKey = linkKey(linkEvent.subject());
- synchronized (intentsByLink) {
- Set<Key> intentKeys = intentsByLink.get(linkKey);
- log.debug("recompile triggered by LinkEvent {} ({}) for {}",
- linkKey, linkEvent.type(), intentKeys);
- intentsToRecompile.addAll(intentKeys);
- }
- dontRecompileAllFailedIntents = dontRecompileAllFailedIntents &&
- (linkEvent.type() == LINK_REMOVED ||
- (linkEvent.type() == LINK_UPDATED &&
- linkEvent.subject().isDurable()));
+ wi.linkKey = linkKey(linkEvent.subject());
+ recompileAllFailedIntents |=
+ !(linkEvent.type() == LINK_REMOVED ||
+ (linkEvent.type() == LINK_UPDATED &&
+ linkEvent.subject().isDurable()));
+ } else if (reason instanceof DeviceEvent) {
+ DeviceEvent deviceEvent = (DeviceEvent) reason;
+ wi.elementId = deviceEvent.subject().id();
+ recompileAllFailedIntents |=
+ (deviceEvent.type() == DeviceEvent.Type.DEVICE_ADDED ||
+ deviceEvent.type() == DeviceEvent.Type.DEVICE_UPDATED ||
+ deviceEvent.isAvailable().orElse(false));
+ } else {
+ continue;
}
+ workMessageAccumulator.add(wi);
}
- delegate.triggerCompile(intentsToRecompile, !dontRecompileAllFailedIntents);
+ }
+
+ if (recompileAllFailedIntents) {
+ scheduleRecompileAll();
}
}
}
@@ -300,79 +281,16 @@
private class InternalResourceListener implements ResourceListener {
@Override
public void event(ResourceEvent event) {
- if (event.subject().isSubTypeOf(PortNumber.class)) {
- executorService.execute(() -> {
- if (delegate == null) {
- return;
- }
-
- delegate.triggerCompile(Collections.emptySet(), true);
- });
+ if (event.type() == RESOURCE_ADDED &&
+ event.subject().isSubTypeOf(PortNumber.class)) {
+ scheduleRecompileAll();
}
+ //TODO we should probably track resources and trigger removal
}
}
//TODO consider adding flow rule event tracking
- /*
- * Re-dispatcher of device and host events.
- */
- private class DeviceAvailabilityHandler implements Runnable {
-
- private final ElementId id;
- private final boolean available;
-
- DeviceAvailabilityHandler(ElementId id, boolean available) {
- this.id = checkNotNull(id);
- this.available = available;
- }
-
- @Override
- public void run() {
- // If there is no delegate, why bother? Just bail.
- if (delegate == null) {
- return;
- }
-
- // TODO should we recompile on available==true?
-
- final ImmutableSet<Key> snapshot;
- synchronized (intentsByDevice) {
- snapshot = ImmutableSet.copyOf(intentsByDevice.get(id));
- }
- delegate.triggerCompile(snapshot, available);
- }
- }
-
-
- private class InternalDeviceListener implements DeviceListener {
- @Override
- public void event(DeviceEvent event) {
- DeviceEvent.Type type = event.type();
- switch (type) {
- case DEVICE_ADDED:
- case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_REMOVED:
- case DEVICE_SUSPENDED:
- case DEVICE_UPDATED:
- DeviceId id = event.subject().id();
- // TODO we need to check whether AVAILABILITY_CHANGED means up or down
- boolean available = (type == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
- type == DeviceEvent.Type.DEVICE_ADDED ||
- type == DeviceEvent.Type.DEVICE_UPDATED);
- executorService.execute(new DeviceAvailabilityHandler(id, available));
- break;
- case PORT_ADDED:
- case PORT_REMOVED:
- case PORT_UPDATED:
- case PORT_STATS_UPDATED:
- default:
- // Don't handle port events for now
- break;
- }
- }
- }
-
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
@@ -381,7 +299,10 @@
case HOST_ADDED:
case HOST_MOVED:
case HOST_REMOVED:
- executorService.execute(new DeviceAvailabilityHandler(id, false));
+ WorkMessage wi = new WorkMessage();
+ wi.elementId = id;
+ workMessageAccumulator.add(wi);
+ scheduleRecompileAll();
break;
case HOST_UPDATED:
default:
@@ -413,7 +334,31 @@
private void scheduleIntentUpdate(int afterDelaySec) {
if (updateScheduled.compareAndSet(false, true)) {
- executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ ObjectiveTracker.this.doIntentUpdate();
+ }
+ }, afterDelaySec * 1_000);
+ }
+ }
+
+ private void doRecompileAll() {
+ recompileAllScheduled.set(false);
+ if (delegate != null) {
+ delegate.triggerCompile(Collections.emptySet(), true);
+ }
+ }
+
+ private void scheduleRecompileAll() {
+ if (recompileAllScheduled.compareAndSet(false, true)) {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ ObjectiveTracker.this.doRecompileAll();
+ }
+ }, RECOMPILE_ALL_DELAY);
+
}
}
@@ -424,4 +369,47 @@
scheduleIntentUpdate(1);
}
}
+
+ private static class WorkMessage {
+ ElementId elementId;
+ LinkKey linkKey;
+ }
+
+ private class InternalWorkAccumulator extends AbstractAccumulator<WorkMessage> {
+ private static final int MAX_WORK = 100_000;
+ private static final int MAX_WORK_TIME = 500;
+ private static final int MAX_WORK_IDLE_TIME = 20;
+
+ InternalWorkAccumulator() {
+ super(timer, MAX_WORK, MAX_WORK_TIME, MAX_WORK_IDLE_TIME);
+ }
+
+ @Override
+ public void processItems(List<WorkMessage> items) {
+ // If there is no delegate, why bother? Just bail.
+ if (delegate == null) {
+ return;
+ }
+
+ Set<Key> keys = Sets.newHashSet();
+
+ items.forEach(wi -> {
+ if (wi.elementId != null) {
+ synchronized (intentsByElement) {
+ keys.addAll(intentsByElement.get(wi.elementId));
+ }
+ }
+
+ if (wi.linkKey != null) {
+ synchronized (intentsByLink) {
+ keys.addAll(intentsByLink.get(wi.linkKey));
+ }
+ }
+ });
+
+ if (!keys.isEmpty()) {
+ delegate.triggerCompile(keys, false);
+ }
+ }
+ }
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
index 5ebef81..c186869 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
@@ -15,12 +15,8 @@
*/
package org.onosproject.net.intent.impl;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -30,11 +26,13 @@
import org.onosproject.event.Event;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
import org.onosproject.net.Link;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostListener;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.MockIdGenerator;
@@ -46,18 +44,17 @@
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyListener;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.createMock;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.onosproject.net.resource.ResourceEvent.Type.*;
-import static org.onosproject.net.NetTestTools.APP_ID;
-import static org.onosproject.net.NetTestTools.device;
-import static org.onosproject.net.NetTestTools.link;
+import static org.hamcrest.Matchers.*;
+import static org.onosproject.net.NetTestTools.*;
+import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
/**
* Tests for the objective tracker.
@@ -69,7 +66,7 @@
private TestTopologyChangeDelegate delegate;
private List<Event> reasons;
private TopologyListener listener;
- private DeviceListener deviceListener;
+ private HostListener hostListener;
private ResourceListener resourceListener;
private IdGenerator mockGenerator;
@@ -86,7 +83,7 @@
tracker.setDelegate(delegate);
reasons = new LinkedList<>();
listener = TestUtils.getField(tracker, "listener");
- deviceListener = TestUtils.getField(tracker, "deviceListener");
+ hostListener = TestUtils.getField(tracker, "hostListener");
resourceListener = TestUtils.getField(tracker, "resourceListener");
mockGenerator = new MockIdGenerator();
Intent.bindIdGenerator(mockGenerator);
@@ -107,16 +104,14 @@
* to be generated.
*/
static class TestTopologyChangeDelegate implements TopologyChangeDelegate {
-
CountDownLatch latch = new CountDownLatch(1);
- List<Key> intentIdsFromEvent;
- boolean compileAllFailedFromEvent;
+ List<Key> intentIdsFromEvent = Lists.newArrayList();
+ boolean compileAllFailedFromEvent = false;
@Override
- public void triggerCompile(Iterable<Key> intentKeys,
- boolean compileAllFailed) {
- intentIdsFromEvent = Lists.newArrayList(intentKeys);
- compileAllFailedFromEvent = compileAllFailed;
+ public void triggerCompile(Iterable<Key> intentKeys, boolean compileAllFailed) {
+ intentKeys.forEach(intentIdsFromEvent::add);
+ compileAllFailedFromEvent |= compileAllFailed;
latch.countDown();
}
}
@@ -128,16 +123,10 @@
*/
@Test
public void testEventNoReasons() throws InterruptedException {
- final TopologyEvent event = new TopologyEvent(
- TopologyEvent.Type.TOPOLOGY_CHANGED,
- topology,
- null);
-
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, null);
listener.event(event);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
@@ -150,20 +139,15 @@
*/
@Test
public void testEventLinkDownNoMatches() throws InterruptedException {
- final Link link = link("src", 1, "dst", 2);
- final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
+ Link link = link("src", 1, "dst", 2);
+ LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
reasons.add(linkEvent);
- final TopologyEvent event = new TopologyEvent(
- TopologyEvent.Type.TOPOLOGY_CHANGED,
- topology,
- reasons);
-
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
+ // we expect no message, latch should never fire
+ assertThat(delegate.latch.await(25, TimeUnit.MILLISECONDS), is(false));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(false));
}
@@ -175,20 +159,14 @@
*/
@Test
public void testEventLinkAdded() throws InterruptedException {
- final Link link = link("src", 1, "dst", 2);
- final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_ADDED, link);
+ Link link = link("src", 1, "dst", 2);
+ LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_ADDED, link);
reasons.add(linkEvent);
- final TopologyEvent event = new TopologyEvent(
- TopologyEvent.Type.TOPOLOGY_CHANGED,
- topology,
- reasons);
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
-
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
@@ -200,24 +178,18 @@
*/
@Test
public void testEventLinkDownMatch() throws Exception {
- final Link link = link("src", 1, "dst", 2);
- final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
+ Link link = link("src", 1, "dst", 2);
+ LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
reasons.add(linkEvent);
- final TopologyEvent event = new TopologyEvent(
- TopologyEvent.Type.TOPOLOGY_CHANGED,
- topology,
- reasons);
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
- final Key key = Key.of(0x333L, APP_ID);
+ Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(link);
tracker.addTrackedResources(key, resources);
listener.event(event);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
-
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(false));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
@@ -232,13 +204,11 @@
@Test
public void testResourceEvent() throws Exception {
ResourceEvent event = new ResourceEvent(RESOURCE_ADDED,
- Resources.discrete(DeviceId.deviceId("a"), PortNumber.portNumber(1)).resource());
+ Resources.discrete(DeviceId.deviceId("a"),
+ PortNumber.portNumber(1)).resource());
resourceListener.event(event);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
-
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
@@ -251,25 +221,27 @@
@Test
public void testEventHostAvailableMatch() throws Exception {
- final Device host = device("host1");
+ // we will expect 2 delegate calls
+ delegate.latch = new CountDownLatch(2);
- final DeviceEvent deviceEvent =
- new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
+ Device host = device("host1");
+ DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
reasons.add(deviceEvent);
- final Key key = Key.of(0x333L, APP_ID);
+ Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
- deviceListener.event(deviceEvent);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
+ reasons.add(deviceEvent);
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
+
+ listener.event(event);
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(true));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
- equalTo("0x333"));
+ equalTo("0x333"));
}
/**
@@ -280,25 +252,21 @@
@Test
public void testEventHostUnavailableMatch() throws Exception {
- final Device host = device("host1");
-
- final DeviceEvent deviceEvent =
- new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
+ Device host = device("host1");
+ DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
reasons.add(deviceEvent);
- final Key key = Key.of(0x333L, APP_ID);
+ Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
- deviceListener.event(deviceEvent);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
+ listener.event(event);
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(false));
- assertThat(delegate.intentIdsFromEvent.get(0).toString(),
- equalTo("0x333"));
+ assertThat(delegate.intentIdsFromEvent.get(0).toString(), equalTo("0x333"));
}
/**
@@ -309,20 +277,12 @@
@Test
public void testEventHostAvailableNoMatch() throws Exception {
- final Device host = device("host1");
-
- final DeviceEvent deviceEvent =
- new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
- reasons.add(deviceEvent);
-
- deviceListener.event(deviceEvent);
- assertThat(
- delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
- is(true));
-
+ Host host = host("00:11:22:33:44:55/6", "device1");
+ HostEvent hostEvent = new HostEvent(HostEvent.Type.HOST_ADDED, host);
+ hostListener.event(hostEvent);
+ assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index d2d450e..7afdf62 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -444,7 +444,7 @@
boolean wasOnline = availableDevices.contains(newDevice.id());
markOnline(newDevice.id(), newTimestamp);
if (!wasOnline) {
- notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
+ notifyDelegateIfNotNull(new DeviceEvent(newDevice, true));
}
}
return event;
@@ -487,7 +487,7 @@
}
boolean removed = availableDevices.remove(deviceId);
if (removed) {
- return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ return new DeviceEvent(device, false);
}
return null;
}