ONOS-2003 Fixing intent reroute after cluster change
Objective trackers update when partitions are shuffled to
track "local" intents.
Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
index 20f2c24..53d84b1 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
@@ -20,5 +20,5 @@
/**
* Entity capable of receiving device leadership-related events.
*/
-public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
+public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java
new file mode 100644
index 0000000..48623dd
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Partition event.
+ */
+//TODO change String into a proper object type
+public class PartitionEvent extends AbstractEvent<PartitionEvent.Type, String> {
+
+ public enum Type {
+ LEADER_CHANGED
+ }
+
+ public PartitionEvent(Type type, String partition) {
+ super(type, partition);
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java
new file mode 100644
index 0000000..73b9de9
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Entity capable of receiving device partition-related events.
+ */
+public interface PartitionEventListener extends EventListener<PartitionEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
index e963abc..c636cd2 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
@@ -40,4 +40,18 @@
NodeId getLeader(Key intentKey);
// TODO add API for rebalancing partitions
+
+ /**
+ * Registers a event listener to be notified of partition events.
+ *
+ * @param listener listener that will asynchronously notified of partition events.
+ */
+ void addListener(PartitionEventListener listener);
+
+ /**
+ * Unregisters a event listener for partition events.
+ *
+ * @param listener listener to be removed.
+ */
+ void removeListener(PartitionEventListener listener);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index 939b9a1..33f5a6e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -23,6 +23,7 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.event.Event;
@@ -38,8 +39,12 @@
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
+import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
+import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.link.LinkResourceEvent;
import org.onosproject.net.resource.link.LinkResourceListener;
@@ -54,6 +59,10 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -94,25 +103,35 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ policy = ReferencePolicy.DYNAMIC)
protected IntentService intentService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PartitionService partitionService;
+
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
+ private ScheduledExecutorService executor = Executors
+ .newScheduledThreadPool(1);
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
+ private PartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
+ protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
+
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
+ partitionService.addListener(partitionListener);
log.info("Started");
}
@@ -122,6 +141,7 @@
resourceManager.removeListener(linkResourceListener);
deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
+ partitionService.removeListener(partitionListener);
log.info("Stopped");
}
@@ -268,7 +288,7 @@
private void updateTrackedResources(ApplicationId appId, boolean track) {
if (intentService == null) {
- log.debug("Intent service is not bound yet");
+ log.warn("Intent service is not bound yet");
return;
}
intentService.getIntents().forEach(intent -> {
@@ -342,4 +362,52 @@
executorService.execute(new DeviceAvailabilityHandler(id, available));
}
}
+
+ protected void doIntentUpdate() {
+ updateScheduled.set(false);
+ if (intentService == null) {
+ log.warn("Intent service is not bound yet");
+ return;
+ }
+ try {
+ //FIXME very inefficient
+ for (Intent intent : intentService.getIntents()) {
+ try {
+ if (intentService.isLocal(intent.key())) {
+ log.warn("intent {}, old: {}, new: {}",
+ intent.key(), intentsByDevice.values().contains(intent.key()), true);
+ addTrackedResources(intent.key(), intent.resources());
+ intentService.getInstallableIntents(intent.key()).stream()
+ .forEach(installable ->
+ addTrackedResources(intent.key(), installable.resources()));
+ } else {
+ log.warn("intent {}, old: {}, new: {}",
+ intent.key(), intentsByDevice.values().contains(intent.key()), false);
+ removeTrackedResources(intent.key(), intent.resources());
+ intentService.getInstallableIntents(intent.key()).stream()
+ .forEach(installable ->
+ removeTrackedResources(intent.key(), installable.resources()));
+ }
+ } catch (NullPointerException npe) {
+ log.warn("intent error {}", intent.key(), npe);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception caught during update task", e);
+ }
+ }
+
+ private void scheduleIntentUpdate(int afterDelaySec) {
+ if (updateScheduled.compareAndSet(false, true)) {
+ executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
+ }
+ }
+
+ private final class InternalPartitionListener implements PartitionEventListener {
+ @Override
+ public void event(PartitionEvent event) {
+ log.warn("got message {}", event.subject());
+ scheduleIntentUpdate(1);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
private static final String ELECTION_PREFIX = "intent-partition-";
+ private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@@ -78,6 +86,9 @@
leadershipService.addListener(leaderListener);
clusterService.addListener(clusterListener);
+ listenerRegistry = new ListenerRegistry<>();
+ eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
}
@@ -90,6 +101,7 @@
public void deactivate() {
executor.shutdownNow();
+ eventDispatcher.removeSink(PartitionEvent.class);
leadershipService.removeListener(leaderListener);
clusterService.removeListener(clusterListener);
}
@@ -133,6 +145,16 @@
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
+ @Override
+ public void addListener(PartitionEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(PartitionEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
protected void doRebalance() {
rebalanceScheduled.set(false);
try {
@@ -203,6 +225,9 @@
// See if we need to let some partitions go
scheduleRebalance(0);
+
+ eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+ leadership.topic()));
}
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 3b091c0..25e23d3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -28,6 +28,7 @@
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.intent.Key;
import java.util.HashMap;
@@ -86,6 +87,7 @@
partitionManager.clusterService = new TestClusterService();
partitionManager.leadershipService = leadershipService;
+ partitionManager.eventDispatcher = new TestEventDispatcher();
}
/**
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index 8fde858..0ef44d8 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -19,6 +19,8 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
@@ -28,7 +30,11 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
@@ -55,14 +61,24 @@
private final DateTime creationTime = DateTime.now();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
+
@Activate
public void activate() {
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
+
+ listenerRegistry = new ListenerRegistry<>();
+ eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+ eventDispatcher.removeSink(PartitionEvent.class);
log.info("Stopped");
}
@@ -110,4 +126,14 @@
public NodeId getLeader(Key intentKey) {
return instance.id();
}
+
+ @Override
+ public void addListener(PartitionEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(PartitionEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
}