Adding device and host tracking for intents (ONOS-1356)
Also, this should fix ONOS-1184 (intents submitted before hosts detected).
Change-Id: I47a503c18dc728912132eb2e2fcc160d47e518eb
diff --git a/core/api/src/main/java/org/onosproject/net/ElementId.java b/core/api/src/main/java/org/onosproject/net/ElementId.java
index 49e110b..9e4b3cd 100644
--- a/core/api/src/main/java/org/onosproject/net/ElementId.java
+++ b/core/api/src/main/java/org/onosproject/net/ElementId.java
@@ -18,5 +18,5 @@
/**
* Immutable representation of a network element identity.
*/
-public abstract class ElementId {
+public abstract class ElementId implements NetworkResource {
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/HostToHostIntent.java b/core/api/src/main/java/org/onosproject/net/intent/HostToHostIntent.java
index 157397a..01ef54d 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/HostToHostIntent.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/HostToHostIntent.java
@@ -15,15 +15,14 @@
*/
package org.onosproject.net.intent;
-import java.util.Collections;
-import java.util.List;
-
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.HostId;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
-import com.google.common.base.MoreObjects;
+import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -147,7 +146,7 @@
TrafficTreatment treatment,
List<Constraint> constraints,
int priority) {
- super(appId, key, Collections.emptyList(), selector, treatment,
+ super(appId, key, ImmutableSet.of(one, two), selector, treatment,
constraints, priority);
// TODO: consider whether the case one and two are same is allowed
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 351016d..ddccb53 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -337,8 +337,8 @@
private void submitUpdates(List<FinalIntentProcessPhase> updates) {
store.batchWrite(updates.stream()
- .map(FinalIntentProcessPhase::data)
- .collect(Collectors.toList()));
+ .map(FinalIntentProcessPhase::data)
+ .collect(Collectors.toList()));
}
}
@@ -377,7 +377,9 @@
throw new IllegalStateException("installable intents must be FlowRuleIntent");
}
- installables.forEach(x -> trackerService.addTrackedResources(data.key(), x.resources()));
+ trackerService.addTrackedResources(data.key(), data.intent().resources());
+ installables.forEach(installable ->
+ trackerService.addTrackedResources(data.key(), installable.resources()));
List<Collection<FlowRule>> stages = installables.stream()
.map(x -> (FlowRuleIntent) x)
@@ -415,7 +417,10 @@
throw new IllegalStateException("installable intents must be FlowRuleIntent");
}
- installables.forEach(x -> trackerService.removeTrackedResources(data.intent().key(), x.resources()));
+ trackerService.removeTrackedResources(data.key(), data.intent().resources());
+ installables.forEach(installable ->
+ trackerService.removeTrackedResources(data.intent().key(),
+ installable.resources()));
List<Collection<FlowRule>> stages = installables.stream()
.map(x -> (FlowRuleIntent) x)
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index c32b29c..5c310b0 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -26,9 +26,18 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.event.Event;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.ElementId;
+import org.onosproject.net.HostId;
import org.onosproject.net.Link;
import org.onosproject.net.LinkKey;
import org.onosproject.net.NetworkResource;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostListener;
+import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.link.LinkEvent;
@@ -41,6 +50,7 @@
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -69,27 +79,40 @@
//TODO this could be slow as a point of synchronization
synchronizedSetMultimap(HashMultimap.<LinkKey, Key>create());
+ private final SetMultimap<ElementId, Key> intentsByDevice =
+ synchronizedSetMultimap(HashMultimap.<ElementId, Key>create());
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkResourceService resourceManager;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected IntentService intentService;
private ExecutorService executorService =
- newSingleThreadExecutor(groupedThreads("onos/intent", "flowtracker"));
+ newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
+ private DeviceListener deviceListener = new InternalDeviceListener();
+ private HostListener hostListener = new InternalHostListener();
private TopologyChangeDelegate delegate;
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
+ deviceService.addListener(deviceListener);
+ hostService.addListener(hostListener);
log.info("Started");
}
@@ -97,6 +120,8 @@
public void deactivate() {
topologyService.removeListener(listener);
resourceManager.removeListener(linkResourceListener);
+ deviceService.removeListener(deviceListener);
+ hostService.removeListener(hostListener);
log.info("Stopped");
}
@@ -132,6 +157,8 @@
for (NetworkResource resource : resources) {
if (resource instanceof Link) {
intentsByLink.put(linkKey((Link) resource), intentKey);
+ } else if (resource instanceof ElementId) {
+ intentsByDevice.put((ElementId) resource, intentKey);
}
}
}
@@ -142,6 +169,8 @@
for (NetworkResource resource : resources) {
if (resource instanceof Link) {
intentsByLink.remove(linkKey((Link) resource), intentKey);
+ } else if (resource instanceof ElementId) {
+ intentsByDevice.remove((ElementId) resource, intentKey);
}
}
}
@@ -171,7 +200,7 @@
}
if (event.reasons() == null || event.reasons().isEmpty()) {
- delegate.triggerCompile(new HashSet<Key>(), true);
+ delegate.triggerCompile(Collections.emptySet(), true);
} else {
Set<Key> toBeRecompiled = new HashSet<>();
@@ -231,7 +260,7 @@
return;
}
- delegate.triggerCompile(new HashSet<>(), true);
+ delegate.triggerCompile(Collections.emptySet(), true);
}
}
@@ -257,4 +286,60 @@
}
});
}
+
+ /*
+ * Re-dispatcher of device and host events.
+ */
+ private class DeviceAvailabilityHandler implements Runnable {
+
+ private final ElementId id;
+ private final boolean available;
+
+ DeviceAvailabilityHandler(ElementId id, boolean available) {
+ this.id = checkNotNull(id);
+ this.available = available;
+ }
+
+ @Override
+ public void run() {
+ // If there is no delegate, why bother? Just bail.
+ if (delegate == null) {
+ return;
+ }
+
+ // TODO should we recompile on available==true?
+ delegate.triggerCompile(intentsByDevice.get(id), available);
+ }
+ }
+
+
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceEvent.Type type = event.type();
+ if (type == DeviceEvent.Type.PORT_ADDED ||
+ type == DeviceEvent.Type.PORT_UPDATED ||
+ type == DeviceEvent.Type.PORT_REMOVED) {
+ // skip port events for now
+ return;
+ }
+ DeviceId id = event.subject().id();
+ // TODO we need to check whether AVAILABILITY_CHANGED means up or down
+ boolean available = (type == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+ type == DeviceEvent.Type.DEVICE_ADDED ||
+ type == DeviceEvent.Type.DEVICE_UPDATED);
+ executorService.execute(new DeviceAvailabilityHandler(id, available));
+
+ }
+ }
+
+ private class InternalHostListener implements HostListener {
+ @Override
+ public void event(HostEvent event) {
+ HostId id = event.subject().id();
+ HostEvent.Type type = event.type();
+ boolean available = (type == HostEvent.Type.HOST_ADDED);
+ executorService.execute(new DeviceAvailabilityHandler(id, available));
+ }
+ }
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
index b324ae8..4299cb9 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/ObjectiveTrackerTest.java
@@ -29,8 +29,11 @@
import org.onlab.junit.TestUtils.TestUtilsException;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.Event;
+import org.onosproject.net.Device;
import org.onosproject.net.Link;
import org.onosproject.net.NetworkResource;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.MockIdGenerator;
@@ -50,6 +53,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.device;
import static org.onosproject.net.NetTestTools.link;
/**
@@ -62,6 +66,7 @@
private TestTopologyChangeDelegate delegate;
private List<Event> reasons;
private TopologyListener listener;
+ private DeviceListener deviceListener;
private LinkResourceListener linkResourceListener;
private IdGenerator mockGenerator;
@@ -78,6 +83,7 @@
tracker.setDelegate(delegate);
reasons = new LinkedList<>();
listener = TestUtils.getField(tracker, "listener");
+ deviceListener = TestUtils.getField(tracker, "deviceListener");
linkResourceListener = TestUtils.getField(tracker, "linkResourceListener");
mockGenerator = new MockIdGenerator();
Intent.bindIdGenerator(mockGenerator);
@@ -235,4 +241,86 @@
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
+ /**
+ * Tests an event for a host becoming available that matches an intent.
+ *
+ * @throws InterruptedException if the latch wait fails.
+ */
+
+ @Test
+ public void testEventHostAvailableMatch() throws Exception {
+ final Device host = device("host1");
+
+ final DeviceEvent deviceEvent =
+ new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
+ reasons.add(deviceEvent);
+
+ final Key key = Key.of(0x333L, APP_ID);
+ Collection<NetworkResource> resources = ImmutableSet.of(host.id());
+ tracker.addTrackedResources(key, resources);
+
+ deviceListener.event(deviceEvent);
+ assertThat(
+ delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
+ is(true));
+
+ assertThat(delegate.intentIdsFromEvent, hasSize(1));
+ assertThat(delegate.compileAllFailedFromEvent, is(true));
+ assertThat(delegate.intentIdsFromEvent.get(0).toString(),
+ equalTo("0x333"));
+ }
+
+ /**
+ * Tests an event for a host becoming unavailable that matches an intent.
+ *
+ * @throws InterruptedException if the latch wait fails.
+ */
+
+ @Test
+ public void testEventHostUnavailableMatch() throws Exception {
+ final Device host = device("host1");
+
+ final DeviceEvent deviceEvent =
+ new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
+ reasons.add(deviceEvent);
+
+ final Key key = Key.of(0x333L, APP_ID);
+ Collection<NetworkResource> resources = ImmutableSet.of(host.id());
+ tracker.addTrackedResources(key, resources);
+
+ deviceListener.event(deviceEvent);
+ assertThat(
+ delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
+ is(true));
+
+ assertThat(delegate.intentIdsFromEvent, hasSize(1));
+ assertThat(delegate.compileAllFailedFromEvent, is(false));
+ assertThat(delegate.intentIdsFromEvent.get(0).toString(),
+ equalTo("0x333"));
+ }
+
+ /**
+ * Tests an event for a host becoming available that matches an intent.
+ *
+ * @throws InterruptedException if the latch wait fails.
+ */
+
+ @Test
+ public void testEventHostAvailableNoMatch() throws Exception {
+ final Device host = device("host1");
+
+ final DeviceEvent deviceEvent =
+ new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
+ reasons.add(deviceEvent);
+
+ deviceListener.event(deviceEvent);
+ assertThat(
+ delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
+ is(true));
+
+ assertThat(delegate.intentIdsFromEvent, hasSize(0));
+ assertThat(delegate.compileAllFailedFromEvent, is(true));
+ }
+
+
}