Adding Leadership Listener to IntentBatchService
ObjectiveTracker uses Leadership Listener to track intents that
it has become the leader of.
Change-Id: I039accb30d27ad718d79a9fec3f546dbdc78e62e
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
index 5f1ea29..fd70a0b 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
@@ -266,9 +266,8 @@
}
count++;
if (count > ITERATIONMAX) {
- log.warn("A batch is stuck processing. current : {}" +
- ", pending : {}",
- intentBatchService.getCurrentOperations(),
+ log.warn("A batch is stuck processing. " +
+ "pending : {}",
intentBatchService.getPendingOperations());
shutdownAndAwaitTermination(randomWorker);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java
new file mode 100644
index 0000000..cd5297f
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.net.intent;
+
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.event.AbstractEvent;
+
+/**
+ * A class to represent an intent related event.
+ */
+public class IntentBatchLeaderEvent extends AbstractEvent<IntentBatchLeaderEvent.Type, ApplicationId> {
+
+ public enum Type {
+ /**
+ * Signifies that this instance has become the leader for the given application id.
+ */
+ ELECTED,
+
+ /**
+ * Signifies that instance is no longer the leader for a given application id.
+ */
+ BOOTED
+ }
+
+ /**
+ * Creates an event of a given type and for the specified appId and the
+ * current time.
+ *
+ * @param type event type
+ * @param appId subject appId
+ * @param time time the event created in milliseconds since start of epoch
+ */
+ public IntentBatchLeaderEvent(Type type, ApplicationId appId, long time) {
+ super(type, appId, time);
+ }
+
+ /**
+ * Creates an event of a given type and for the specified appId and the
+ * current time.
+ *
+ * @param type event type
+ * @param appId subject appId
+ */
+ public IntentBatchLeaderEvent(Type type, ApplicationId appId) {
+ super(type, appId);
+ }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java
new file mode 100644
index 0000000..fb5c600
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.net.intent;
+
+import org.onlab.onos.event.EventListener;
+
+/**
+ * Listener for {@link org.onlab.onos.net.intent.IntentEvent intent events}.
+ */
+public interface IntentBatchListener extends EventListener<IntentBatchLeaderEvent> {
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
index 8a58aa2..246ece8 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
@@ -45,14 +45,6 @@
Set<IntentOperations> getPendingOperations();
/**
- * Returns the set of intent batches currently being processed.
- * @return set of batches
- */
- //TODO we may want to get rid of this method
- @Deprecated
- Set<IntentOperations> getCurrentOperations();
-
- /**
* Return true if this instance is the local leader for batch
* processing a given application id.
*
@@ -75,4 +67,17 @@
*/
void unsetDelegate(IntentBatchDelegate delegate);
+ /**
+ * Adds the specified listener for intent batch leadership events.
+ *
+ * @param listener listener to be added
+ */
+ void addListener(IntentBatchListener listener);
+
+ /**
+ * Removes the specified listener for intent batch leadership events.
+ *
+ * @param listener listener to be removed
+ */
+ void removeListener(IntentBatchListener listener);
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
index 22673b6..83a3ccb 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
@@ -15,22 +15,25 @@
*/
package org.onlab.onos.net.intent.impl;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.event.Event;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.NetworkResource;
+import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
+import org.onlab.onos.net.intent.IntentBatchListener;
+import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.resource.LinkResourceEvent;
import org.onlab.onos.net.resource.LinkResourceListener;
@@ -40,8 +43,10 @@
import org.onlab.onos.net.topology.TopologyService;
import org.slf4j.Logger;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -72,18 +77,26 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkResourceService resourceManager;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentService intentService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentBatchService batchService;
+
private ExecutorService executorService =
newSingleThreadExecutor(namedThreads("onos-flowtracker"));
private TopologyListener listener = new InternalTopologyListener();
private LinkResourceListener linkResourceListener =
new InternalLinkResourceListener();
+ private final LeadershipListener leaderListener = new LeadershipListener();
private TopologyChangeDelegate delegate;
@Activate
public void activate() {
topologyService.addListener(listener);
resourceManager.addListener(linkResourceListener);
+ batchService.addListener(leaderListener);
log.info("Started");
}
@@ -91,6 +104,7 @@
public void deactivate() {
topologyService.removeListener(listener);
resourceManager.removeListener(linkResourceListener);
+ batchService.removeListener(leaderListener);
log.info("Stopped");
}
@@ -220,6 +234,38 @@
//TODO consider adding flow rule event tracking
- //FIXME the only intents that will be tracked are events that were
- //executed on this instance. Need to have some backup trackers...
+ private void updateTrackedResources(ApplicationId appId, boolean track) {
+ intentService.getIntents().forEach(intent -> {
+ if (intent.appId().equals(appId)) {
+ IntentId id = intent.id();
+ Collection<NetworkResource> resources = Lists.newArrayList();
+ intentService.getInstallableIntents(id).stream()
+ .map(installable -> installable.resources())
+ .forEach(resources::addAll);
+ if (track) {
+ addTrackedResources(id, resources);
+ } else {
+ removeTrackedResources(id, resources);
+ }
+ }
+ });
+ }
+
+ private class LeadershipListener implements IntentBatchListener {
+ @Override
+ public void event(IntentBatchLeaderEvent event) {
+ log.debug("leadership event: {}", event);
+ ApplicationId appId = event.subject();
+ switch (event.type()) {
+ case ELECTED:
+ updateTrackedResources(appId, true);
+ break;
+ case BOOTED:
+ updateTrackedResources(appId, false);
+ break;
+ default:
+ break;
+ }
+ }
+ }
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
index f27ddc1..ea7bc44 100644
--- a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
@@ -111,16 +111,13 @@
//the batch has not yet been removed when we receive the last event
// FIXME: this doesn't guarantee to avoid the race
for (int tries = 0; tries < 10; tries++) {
- if (manager.batchService.getPendingOperations().isEmpty() &&
- manager.batchService.getCurrentOperations().isEmpty()) {
+ if (manager.batchService.getPendingOperations().isEmpty()) {
break;
}
delay(10);
}
assertTrue("There are still pending batch operations.",
manager.batchService.getPendingOperations().isEmpty());
- assertTrue("There are still outstanding batch operations.",
- manager.batchService.getCurrentOperations().isEmpty());
extensionService.unregisterCompiler(MockIntent.class);
extensionService.unregisterInstaller(MockInstallableIntent.class);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index 52d166c..9d9b833 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -35,7 +35,11 @@
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.intent.IntentBatchDelegate;
+import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
+import org.onlab.onos.net.intent.IntentBatchListener;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.store.hz.SQueue;
@@ -46,7 +50,6 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -74,6 +77,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+
private HazelcastInstance theInstance;
private ControllerNode localControllerNode;
protected StoreSerializer serializer;
@@ -85,6 +92,9 @@
private final Map<ApplicationId, IntentOperations> outstandingOps
= Maps.newHashMap();
+ private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
@@ -103,11 +113,13 @@
}
};
+ eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
leadershipService.removeListener(leaderListener);
for (ApplicationId appId: batchQueues.keySet()) {
leadershipService.withdraw(getTopic(appId));
@@ -277,12 +289,6 @@
}
@Override
- public Set<IntentOperations> getCurrentOperations() {
- //FIXME this is not really implemented
- return Collections.emptySet();
- }
-
- @Override
public boolean isLocalLeader(ApplicationId applicationId) {
return myTopics.contains(applicationId);
}
@@ -298,4 +304,14 @@
this.delegate = null;
}
}
+
+ @Override
+ public void addListener(IntentBatchListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(IntentBatchListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
index 57b854f..b6223c9 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
@@ -22,6 +22,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.intent.IntentBatchDelegate;
+import org.onlab.onos.net.intent.IntentBatchListener;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
@@ -83,14 +84,9 @@
@Override
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
- return Sets.newHashSet(pendingBatches);
- }
- }
-
- @Override
- public Set<IntentOperations> getCurrentOperations() {
- synchronized (this) {
- return Sets.newHashSet(currentBatches);
+ Set<IntentOperations> set = Sets.newHashSet(pendingBatches);
+ set.addAll(currentBatches); // TODO refactor this current vs. pending
+ return set;
}
}
@@ -110,4 +106,16 @@
this.delegate = null;
}
}
+
+ @Override
+ public void addListener(IntentBatchListener listener) {
+ // no-op
+ //TODO: we are always the master
+ }
+
+ @Override
+ public void removeListener(IntentBatchListener listener) {
+ // no-op
+ //TODO: we are always the master
+ }
}