Enabled leadership service implementation based on consistent map + added precondition checks to partitioned database.
Change-Id: Ia76f8479d9113e7ad67e583e4ca157e62a1cabc7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index f75cd48..b48ce21 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -73,7 +73,7 @@
* the current leader (e.g., for informational purpose).
* </p>
*/
-@Component(immediate = true)
+@Component(immediate = true, enabled = false)
@Service
public class HazelcastLeadershipService implements LeadershipService {
private static final Logger log =
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 7ccdd80..9eed61a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -145,7 +145,7 @@
});
try {
if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
- log.warn("Timeed out watiing for database to initialize.");
+ log.warn("Timed out waiting for database to initialize.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 3b65ec0..0d38dd3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -52,7 +52,7 @@
* detection capabilities to detect and purge stale locks.
* TODO: Ensure lock safety and liveness.
*/
-@Component(immediate = true, enabled = false)
+@Component(immediate = true, enabled = true)
@Service
public class DistributedLeadershipManager implements LeadershipService {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 04e5e5c..24ec92b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -36,6 +36,8 @@
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import static com.google.common.base.Preconditions.checkState;
+
/**
* A database that partitions the keys across one or more database partitions.
*/
@@ -44,11 +46,21 @@
private Partitioner<String> partitioner;
private final ClusterCoordinator coordinator;
private final Map<String, Database> partitions = Maps.newConcurrentMap();
+ private final AtomicBoolean isOpen = new AtomicBoolean(false);
+ private static final String DB_NOT_OPEN = "Database is not open";
protected PartitionedDatabase(ClusterCoordinator coordinator) {
this.coordinator = coordinator;
}
+ /**
+ * Returns true if the database is open.
+ * @return true if open, false otherwise
+ */
+ public boolean isOpen() {
+ return isOpen.get();
+ }
+
@Override
public void registerPartition(String name, Database partition) {
partitions.put(name, partition);
@@ -61,6 +73,7 @@
@Override
public CompletableFuture<Integer> size(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
.values()
@@ -72,16 +85,19 @@
@Override
public CompletableFuture<Boolean> isEmpty(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return size(tableName).thenApply(size -> size == 0);
}
@Override
public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).containsKey(tableName, key);
}
@Override
public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
.values()
@@ -93,21 +109,25 @@
@Override
public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).get(tableName, key);
}
@Override
public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).put(tableName, key, value);
}
@Override
public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key);
}
@Override
public CompletableFuture<Void> clear(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
.values()
.stream()
@@ -117,6 +137,7 @@
@Override
public CompletableFuture<Set<String>> keySet(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
Set<String> keySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
.values()
@@ -128,6 +149,7 @@
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
return CompletableFuture.allOf(partitions
.values()
@@ -139,6 +161,7 @@
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
.values()
@@ -150,31 +173,37 @@
@Override
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, version);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
}
@Override
public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
for (UpdateOperation<String, byte[]> update : updates) {
Database partition = partitioner.getPartition(update.tableName(), update.key());
@@ -207,12 +236,15 @@
.stream()
.map(Database::open)
.toArray(CompletableFuture[]::new))
- .thenApply(v -> this));
+ .thenApply(v -> {
+ isOpen.set(true);
+ return this; }));
}
@Override
public CompletableFuture<Void> close() {
+ checkState(isOpen.get(), DB_NOT_OPEN);
CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
.values()
.stream()
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index d913a38..e2e6322 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -33,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -169,7 +171,7 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (leadership.leader().equals(clusterService.getLocalNode().id()) &&
+ if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go