[ONOS-7551] Support primitive revisions for upgrades
Change-Id: Ib56e10f06ab9abedd176cdd84add6cbf4e3d4c50
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index aaa9bdf..bbdf9a3 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -69,7 +69,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.18</version>
+ <version>2.0.20</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 0e53f77..b6f27f0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -19,7 +19,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.base.Charsets;
@@ -44,10 +43,20 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.AtomicCounterMapOptions;
+import org.onosproject.store.service.AtomicCounterOptions;
+import org.onosproject.store.service.AtomicIdGeneratorOptions;
+import org.onosproject.store.service.AtomicValueOptions;
+import org.onosproject.store.service.ConsistentMapOptions;
+import org.onosproject.store.service.ConsistentMultimapOptions;
+import org.onosproject.store.service.ConsistentTreeMapOptions;
+import org.onosproject.store.service.DistributedLockOptions;
+import org.onosproject.store.service.DistributedSetOptions;
import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.Ordering;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DocumentTreeOptions;
+import org.onosproject.store.service.LeaderElectorOptions;
import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueOptions;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -72,13 +81,11 @@
}
@Override
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
- checkNotNull(name);
- checkNotNull(serializer);
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(ConsistentMapOptions options) {
Map<PartitionId, AsyncConsistentMap<byte[], byte[]>> maps =
Maps.transformValues(members,
partition -> DistributedPrimitives.newTranscodingMap(
- partition.<String, byte[]>newAsyncConsistentMap(name, null),
+ partition.<String, byte[]>newAsyncConsistentMap(options.name(), null),
HexString::toHexString,
HexString::fromHexString,
Function.identity(),
@@ -87,78 +94,76 @@
int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
};
- AsyncConsistentMap<byte[], byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+ AsyncConsistentMap<byte[], byte[]> partitionedMap =
+ new PartitionedAsyncConsistentMap<>(options.name(), maps, hasher);
return DistributedPrimitives.newTranscodingMap(partitionedMap,
- key -> serializer.encode(key),
- bytes -> serializer.decode(bytes),
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ key -> options.serializer().encode(key),
+ bytes -> options.serializer().decode(bytes),
+ value -> value == null ? null : options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
@Override
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
- return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(ConsistentTreeMapOptions options) {
+ return getCreator(options.name()).newAsyncConsistentTreeMap(options.name(), options.serializer());
}
@Override
- public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
- return getCreator(name).newAsyncConsistentSetMultimap(name, serializer);
+ public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
+ return getCreator(options.name()).newAsyncConsistentSetMultimap(options);
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
- return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(DistributedSetOptions options) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(options.name(), options.serializer()));
}
@Override
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
- return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(AtomicCounterMapOptions options) {
+ return getCreator(options.name()).newAsyncAtomicCounterMap(options);
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name) {
- return getCreator(name).newAsyncCounter(name);
+ public AsyncAtomicCounter newAsyncCounter(AtomicCounterOptions options) {
+ return getCreator(options.name()).newAsyncCounter(options);
}
@Override
- public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
- return getCreator(name).newAsyncIdGenerator(name);
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(AtomicIdGeneratorOptions options) {
+ return getCreator(options.name()).newAsyncIdGenerator(options);
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
- return getCreator(name).newAsyncAtomicValue(name, serializer);
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(AtomicValueOptions options) {
+ return getCreator(options.name()).newAsyncAtomicValue(options);
}
@Override
- public AsyncDistributedLock newAsyncDistributedLock(String name) {
- return getCreator(name).newAsyncDistributedLock(name);
+ public AsyncDistributedLock newAsyncDistributedLock(DistributedLockOptions options) {
+ return getCreator(options.name()).newAsyncDistributedLock(options);
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name, long leaderTimeout, TimeUnit timeUnit) {
- checkNotNull(name);
+ public AsyncLeaderElector newAsyncLeaderElector(LeaderElectorOptions options) {
Map<PartitionId, AsyncLeaderElector> leaderElectors =
Maps.transformValues(members,
- partition -> partition.newAsyncLeaderElector(name, leaderTimeout, timeUnit));
+ partition -> partition.newAsyncLeaderElector(options));
Hasher<String> hasher = topic -> {
int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
};
- return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
+ return new PartitionedAsyncLeaderElector(options.name(), leaderElectors, hasher);
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
- return getCreator(name).newWorkQueue(name, serializer);
+ public <E> WorkQueue<E> newWorkQueue(WorkQueueOptions options) {
+ return getCreator(options.name()).newWorkQueue(options);
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer, Ordering ordering) {
- checkNotNull(name);
- checkNotNull(serializer);
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(DocumentTreeOptions options) {
Map<PartitionId, AsyncDocumentTree<V>> trees =
- Maps.transformValues(members, part -> part.<V>newAsyncDocumentTree(name, serializer, ordering));
+ Maps.transformValues(members, part -> part.<V>newAsyncDocumentTree(options));
Hasher<DocumentPath> hasher = key -> {
int bucket = (key == null) ? 0 :
Math.abs(Hashing.murmur3_32()
@@ -166,7 +171,7 @@
.asInt()) % buckets;
return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
};
- return new PartitionedAsyncDocumentTree<>(name, trees, hasher);
+ return new PartitionedAsyncDocumentTree<>(options.name(), trees, hasher);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 6639f72..4eaac99 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -64,6 +64,7 @@
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.service.PropagationStrategy;
import io.atomix.protocols.raft.session.RaftSessionMetadata;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
@@ -149,6 +150,7 @@
.register(RaftMember.Type.class)
.register(Instant.class)
.register(Configuration.class)
+ .register(PropagationStrategy.class)
.build("RaftProtocol");
/**
@@ -176,6 +178,7 @@
.register(RaftMember.Type.class)
.register(Instant.class)
.register(Configuration.class)
+ .register(PropagationStrategy.class)
.build("RaftStorage");
private StorageNamespaces() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 2318d08..0438fe5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -18,7 +18,6 @@
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.base.Suppliers;
@@ -27,6 +26,7 @@
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.proxy.CommunicationStrategy;
+import io.atomix.protocols.raft.service.PropagationStrategy;
import io.atomix.protocols.raft.session.RaftSessionMetadata;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -52,11 +52,22 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.AtomicCounterMapOptions;
+import org.onosproject.store.service.AtomicCounterOptions;
+import org.onosproject.store.service.AtomicIdGeneratorOptions;
+import org.onosproject.store.service.AtomicValueOptions;
+import org.onosproject.store.service.ConsistentMapOptions;
+import org.onosproject.store.service.ConsistentMultimapOptions;
+import org.onosproject.store.service.ConsistentTreeMapOptions;
+import org.onosproject.store.service.DistributedLockOptions;
import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.Ordering;
+import org.onosproject.store.service.DistributedSetOptions;
+import org.onosproject.store.service.DocumentTreeOptions;
+import org.onosproject.store.service.LeaderElectorOptions;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueOptions;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@@ -137,193 +148,212 @@
@Override
@SuppressWarnings("unchecked")
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(ConsistentMapOptions options) {
AtomixConsistentMap rawMap =
new AtomixConsistentMap(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- if (serializer != null) {
+ if (options.serializer() != null) {
return DistributedPrimitives.newTranscodingMap(rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ key -> HexString.toHexString(options.serializer().encode(key)),
+ string -> options.serializer().decode(HexString.fromHexString(string)),
+ value -> value == null ? null : options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
return (AsyncConsistentMap<K, V>) rawMap;
}
@Override
@SuppressWarnings("unchecked")
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(ConsistentTreeMapOptions options) {
AtomixConsistentTreeMap rawMap =
new AtomixConsistentTreeMap(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- if (serializer != null) {
+ if (options.serializer() != null) {
return DistributedPrimitives.newTranscodingTreeMap(
rawMap,
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ value -> value == null ? null : options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
return (AsyncConsistentTreeMap<V>) rawMap;
}
@Override
@SuppressWarnings("unchecked")
- public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
+ public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
AtomixConsistentSetMultimap rawMap =
new AtomixConsistentSetMultimap(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- if (serializer != null) {
+ if (options.serializer() != null) {
return DistributedPrimitives.newTranscodingMultimap(
rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ key -> HexString.toHexString(options.serializer().encode(key)),
+ string -> options.serializer().decode(HexString.fromHexString(string)),
+ value -> options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
return (AsyncConsistentMultimap<K, V>) rawMap;
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
- return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(DistributedSetOptions options) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(options.name(), options.serializer()));
}
@Override
@SuppressWarnings("unchecked")
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(AtomicCounterMapOptions options) {
AtomixAtomicCounterMap rawMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- if (serializer != null) {
+ if (options.serializer() != null) {
return DistributedPrimitives.newTranscodingAtomicCounterMap(
rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)));
+ key -> HexString.toHexString(options.serializer().encode(key)),
+ string -> options.serializer().decode(HexString.fromHexString(string)));
}
return (AsyncAtomicCounterMap<K>) rawMap;
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name) {
+ public AsyncAtomicCounter newAsyncCounter(AtomicCounterOptions options) {
return new AtomixCounter(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.COUNTER.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
}
@Override
- public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
- return new AtomixIdGenerator(newAsyncCounter(name));
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(AtomicIdGeneratorOptions options) {
+ return new AtomixIdGenerator(newAsyncCounter(options.name()));
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
- return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(AtomicValueOptions options) {
+ return new DefaultAsyncAtomicValue<>(options.name(), options.serializer(), onosAtomicValuesMap.get());
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ public <E> WorkQueue<E> newWorkQueue(WorkQueueOptions options) {
AtomixWorkQueue atomixWorkQueue = new AtomixWorkQueue(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- return new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
+ return new DefaultDistributedWorkQueue<>(atomixWorkQueue, options.serializer());
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer, Ordering ordering) {
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(DocumentTreeOptions options) {
+ String serviceType = String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), options.ordering());
AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
- .withName(name)
- .withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
+ .withName(options.name())
+ .withServiceType(serviceType)
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.ANY)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
- return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
+ return new DefaultDistributedDocumentTree<>(options.name(), atomixDocumentTree, options.serializer());
}
@Override
- public AsyncDistributedLock newAsyncDistributedLock(String name) {
+ public AsyncDistributedLock newAsyncDistributedLock(DistributedLockOptions options) {
return new AtomixDistributedLock(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.LOCK.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name, long leaderTimeout, TimeUnit timeUnit) {
+ public AsyncLeaderElector newAsyncLeaderElector(LeaderElectorOptions options) {
return new AtomixLeaderElector(client.newProxyBuilder()
- .withName(name)
+ .withName(options.name())
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
- .withMinTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
+ .withMinTimeout(Duration.ofMillis(options.electionTimeoutMillis()))
.withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
+ .withRevision(options.revision())
+ .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
.build()
.open()
.join());
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java
index fb92eba..cf3e051 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDistributedLockServiceTest.java
@@ -65,6 +65,7 @@
expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
expect(context.currentIndex()).andReturn(index.get()).anyTimes();
expect(context.currentOperation()).andReturn(OperationType.COMMAND).anyTimes();
+ expect(context.locked()).andReturn(false).anyTimes();
RaftContext server = mock(RaftContext.class);
expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class)).anyTimes();