fix intent issues yuta observed
Change-Id: I7dc4a19d49a1b3fc18ecce02a4018cbc9a3043fc
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index b75e6a4..52d166c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -19,6 +19,9 @@
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -47,7 +50,6 @@
import java.util.Map;
import java.util.Set;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
@@ -107,6 +109,9 @@
@Deactivate
public void deactivate() {
leadershipService.removeListener(leaderListener);
+ for (ApplicationId appId: batchQueues.keySet()) {
+ leadershipService.withdraw(getTopic(appId));
+ }
log.info("Stopped");
}
@@ -125,12 +130,11 @@
SQueue<IntentOperations> queue = batchQueues.get(appId);
if (queue == null) {
synchronized (this) {
- // FIXME how will other instances find out about new queues
String topic = getTopic(appId);
IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
queue = new SQueue<>(rawQueue, serializer);
+ queue.addItemListener(new InternalItemListener(appId), false);
batchQueues.putIfAbsent(appId, queue);
- // TODO others should run for leadership when they hear about this topic
leadershipService.runForLeadership(topic);
}
}
@@ -209,6 +213,25 @@
}
}
+ private class InternalItemListener implements ItemListener<IntentOperations> {
+
+ private final ApplicationId appId;
+
+ public InternalItemListener(ApplicationId appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public void itemAdded(ItemEvent<IntentOperations> item) {
+ dispatchNextOperation(appId);
+ }
+
+ @Override
+ public void itemRemoved(ItemEvent<IntentOperations> item) {
+ // no-op
+ }
+ }
+
private class InternalLeaderListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
@@ -220,6 +243,8 @@
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(localControllerNode.id())) {
+ // run for leadership
+ getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
}