Added a function to the registry to allocate a block of numbers unique within the whole cluster using Curator's DistributedAtomicLong
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 76b7ebd..82259a9 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -24,6 +24,8 @@
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.recipes.atomic.AtomicValue;
+import com.netflix.curator.framework.recipes.atomic.DistributedAtomicLong;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
@@ -33,6 +35,7 @@
import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
import com.netflix.curator.retry.ExponentialBackoffRetry;
+import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.x.discovery.ServiceCache;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -67,6 +70,10 @@
protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
protected Map<String, PathChildrenCache> switchPathCaches;
+ private final String ID_COUNTER_PATH = "/flowidcounter";
+ private final Long ID_BLOCK_SIZE = 0x100000000L;
+ protected DistributedAtomicLong distributedIdCounter;
+
//Zookeeper performance-related configuration
protected static final int sessionTimeout = 5000;
protected static final int connectionTimeout = 7000;
@@ -372,6 +379,21 @@
return data;
}
+ public IdBlock allocateUniqueIdBlock(){
+ try {
+ AtomicValue<Long> result = null;
+ do {
+ result = distributedIdCounter.add(ID_BLOCK_SIZE);
+ } while (result == null || !result.succeeded());
+
+ return new IdBlock(result.preValue(), result.postValue() - 1, ID_BLOCK_SIZE);
+ } catch (Exception e) {
+ log.error("Error allocating ID block");
+ }
+
+ return null;
+ }
+
/*
* IFloodlightModule
*/
@@ -427,6 +449,10 @@
client.start();
client = client.usingNamespace(namespace);
+ distributedIdCounter = new DistributedAtomicLong(
+ client,
+ ID_COUNTER_PATH,
+ new RetryOneTime(100));
switchCache = new PathChildrenCache(client, switchLatchesPath, true);
switchCache.getListenable().addListener(switchPathCacheListener);