Bugfixes for DistributedLockManager functionality
Added a method called broadcastIncludeSelf to ClusterCommunicationService.
Cosmetic improvements: added toString methods
Change-Id: I1d58720c29e6f8642f950670c3a6d95a7019a491
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index b6f1eb9..1aa72fb 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -21,7 +21,9 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+//import java.util.concurrent.TimeUnit;
+
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -43,6 +45,8 @@
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockService;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
@@ -72,6 +76,9 @@
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected DatabaseService dbService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+ protected LockService lockService;
+
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
@@ -92,9 +99,10 @@
log.info("Couldn't find DB service");
} else {
log.info("Found DB service");
- longIncrementor();
- executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
- executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+ //longIncrementor();
+ //lockUnlock();
+ //executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+ //executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
}
log.info("Started");
}
@@ -152,6 +160,31 @@
}
}
+ private void lockUnlock() {
+ try {
+ final String locksTable = "onos-locks";
+
+ if (!dbAdminService.listTables().contains(locksTable)) {
+ dbAdminService.createTable(locksTable, 10000);
+ }
+ Lock lock = lockService.create("foo-bar");
+ log.info("Requesting lock");
+ lock.lock(10000);
+ //try {
+ //Thread.sleep(5000);
+ //} catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ //e.printStackTrace();
+ //}
+ log.info("Acquired Lock");
+ log.info("Do I have the lock: {} ", lock.isLocked());
+ //lock.unlock();
+ log.info("Do I have the lock: {} ", lock.isLocked());
+ } finally {
+ log.info("Done");
+ }
+ }
+
private void longIncrementor() {
try {
final String someTable = "admin";
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index cf17fbe..2763eff 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -38,6 +38,15 @@
boolean broadcast(ClusterMessage message) throws IOException;
/**
+ * Broadcast a message to all controller nodes including self.
+ *
+ * @param message message to send
+ * @return true if the message was sent successfully to all nodes; false otherwise.
+ * @throws IOException when I/O exception of some sort has occurred
+ */
+ boolean broadcastIncludeSelf(ClusterMessage message) throws IOException;
+
+ /**
* Sends a message to the specified controller node.
*
* @param message message to send
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
index 86d19b2..7cab847 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
@@ -2,6 +2,7 @@
import java.util.List;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -41,6 +42,13 @@
return readRequests;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("readRequests", readRequests)
+ .toString();
+ }
+
/**
* Builder for BatchReadRequest.
*/
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
index 683ca1c..b12aa16 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
@@ -2,6 +2,7 @@
import java.util.List;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
@@ -30,4 +31,11 @@
public int batchSize() {
return readResults.size();
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("readResults", readResults)
+ .toString();
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
index 31deb4d..59f36ed 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
@@ -2,6 +2,7 @@
import java.util.List;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -41,6 +42,13 @@
return writeRequests.size();
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("writeRequests", writeRequests)
+ .toString();
+ }
+
/**
* Builder for BatchWriteRequest.
*/
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
index b185072..fa2e7c4 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
@@ -2,6 +2,7 @@
import java.util.List;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
@@ -43,4 +44,11 @@
public int batchSize() {
return writeResults.size();
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("writeResults", writeResults)
+ .toString();
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
index fa73546..0324dfb 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.service;
import java.util.Collection;
-import java.util.List;
+import java.util.Set;
import org.onlab.onos.cluster.ControllerNode;
@@ -31,9 +31,9 @@
/**
* Lists all the tables in the database.
- * @return list of table names.
+ * @return set of table names.
*/
- public List<String> listTables();
+ public Set<String> listTables();
/**
* Deletes a table from the database.
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 7aeddda..868dda5 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -64,6 +64,7 @@
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
+ .add("status", status)
.add("tableName", tableName)
.add("key", key)
.add("value", value)
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
index 6607cfe..5274c19 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -114,7 +114,7 @@
*/
public static WriteRequest removeIfVersionMatches(String tableName, String key,
long previousVersion) {
- return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
+ return new WriteRequest(REMOVE_IF_VERSION, tableName, key,
null, previousVersion, null);
}
@@ -129,7 +129,7 @@
*/
public static WriteRequest removeIfValueMatches(String tableName, String key,
byte[] oldValue) {
- return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
+ return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
null, ANY_VERSION, checkNotNull(oldValue));
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 6de94b4..0be310e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -116,6 +116,15 @@
}
@Override
+ public boolean broadcastIncludeSelf(ClusterMessage message) throws IOException {
+ boolean ok = true;
+ for (ControllerNode node : clusterService.getNodes()) {
+ ok = unicast(message, node.id()) && ok;
+ }
+ return ok;
+ }
+
+ @Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
@@ -209,4 +218,4 @@
rawMessage.respond(response);
}
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 0021eb7..a6320ed 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -3,6 +3,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -37,7 +38,7 @@
public boolean createTable(String tableName, int ttlMillis) {
- CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
+ CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
@@ -65,9 +66,9 @@
}
}
- public List<String> listTables() {
+ public Set<String> listTables() {
- CompletableFuture<List<String>> future = copycat.submit("listTables");
+ CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 157d3a6..62cf584 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -33,7 +33,6 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
@@ -51,9 +50,6 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
- "database-update-event");
-
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
@@ -61,9 +57,9 @@
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
- private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
+ private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
- private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
+ private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(
Member localMember,
@@ -78,31 +74,38 @@
@Override
public void tableModified(TableModificationEvent event) {
+ log.debug("Received a table modification event {}", event);
+
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
+ Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
- Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
- .get(event.tableName());
+ Long eventVersion = event.value().version();
switch (event.type()) {
case ROW_DELETED:
+ map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
+ // FIXME: The broadcast message should be sent to self.
clusterCommunicator.broadcast(new ClusterMessage(
- localNode.id(), DATABASE_UPDATES,
+ localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
- log.error(
- "Failed to broadcast a database table modification event.",
- e);
+ log.error("Failed to broadcast a database row deleted event.", e);
}
}
break;
case ROW_ADDED:
case ROW_UPDATED:
- map.put(row, null);
+ // To account for potential reordering of notifications,
+ // check to make sure we are replacing an old version with a new version
+ Long currentVersion = map.get(row);
+ if (currentVersion == null || currentVersion < eventVersion) {
+ map.put(row, eventVersion);
+ }
break;
default:
break;
@@ -111,60 +114,56 @@
@Override
public void tableCreated(TableMetadata metadata) {
+ log.debug("Received a table created event {}", metadata);
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
- .expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
+ .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
- // FIXME: make the expiration policy configurable.
+ // TODO: make the expiration policy configurable.
+ // Do we need to support expiration based on last access time?
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
@Override
public void tableDeleted(String tableName) {
+ log.debug("Received a table deleted event for table ({})", tableName);
tableEntryExpirationMap.remove(tableName);
}
private class ExpirationObserver implements
- ExpirationListener<DatabaseRow, VersionedValue> {
+ ExpirationListener<DatabaseRow, Long> {
@Override
- public void expired(DatabaseRow key, VersionedValue value) {
+ public void expired(DatabaseRow row, Long version) {
+ Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
- if (!databaseService.removeIfVersionMatches(key.tableName,
- key.key, value.version())) {
- log.info("Entry in the database changed before right its TTL expiration.");
+ if (!databaseService.removeIfVersionMatches(row.tableName,
+ row.key, version)) {
+ log.info("Entry in database was updated right before its expiration.");
+ } else {
+ log.info("Successfully expired old entry with key ({}) from table ({})",
+ row.key, row.tableName);
}
} else {
- // If this node is not the current leader, we should never
- // let the expiring entries drop off
- // Under stable conditions (i.e no leadership switch) the
- // current leader will initiate
- // a database remove and this instance will get notified
- // of a tableModification event causing it to remove from
- // the map.
- Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
- .get(key.tableName);
+ // Only the current leader will expire keys from database.
+ // Everyone else function as standby just in case they need to take over
if (map != null) {
- map.put(key, value);
+ map.putIfAbsent(row, version);
}
}
} catch (Exception e) {
- log.warn(
- "Failed to delete entry from the database after ttl expiration. Will retry eviction",
- e);
- tableEntryExpirationMap.get(key.tableName).put(
- new DatabaseRow(key.tableName, key.key), value);
+ log.warn("Failed to delete entry from the database after ttl "
+ + "expiration. Operation will be retried.", e);
+ map.putIfAbsent(row, version);
}
}
}
@Override
public void handle(LeaderElectEvent event) {
- if (localMember.equals(event.leader())) {
- isLocalMemberLeader.set(true);
- }
+ isLocalMemberLeader.set(localMember.equals(event.leader()));
}
/**
@@ -212,12 +211,12 @@
continue;
}
- Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
+ Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
- tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
+ tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index e233e5a..ad1ef853 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -7,7 +7,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -19,6 +18,7 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
@@ -160,18 +160,22 @@
}
log.info("Starting cluster: {}", cluster);
+ DatabaseEntryExpirationTracker expirationTracker =
+ new DatabaseEntryExpirationTracker(
+ clusterConfig.getLocalMember(),
+ clusterService.getLocalNode(),
+ clusterCommunicator,
+ this);
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
- stateMachine.addEventListener(
- new DatabaseEntryExpirationTracker(
- clusterConfig.getLocalMember(),
- clusterService.getLocalNode(),
- clusterCommunicator,
- this));
+ stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+
+ copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+
copycat.start();
client = new DatabaseClient(copycat);
@@ -207,7 +211,7 @@
}
@Override
- public List<String> listTables() {
+ public Set<String> listTables() {
return client.listTables();
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 3b0d874..cdf66af 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -30,6 +30,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -65,6 +66,7 @@
.register(WriteStatus.class)
// TODO: Move this out ?
.register(TableModificationEvent.class)
+ .register(TableModificationEvent.Type.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
@@ -85,7 +87,8 @@
}
@Command
- public boolean createTable(String tableName, int ttlMillis) {
+ public boolean createTableWithExpiration(String tableName) {
+ int ttlMillis = 10000;
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
@@ -266,6 +269,7 @@
// notify listeners of table mod events.
for (DatabaseUpdateEventListener listener : listeners) {
for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ log.info("Publishing table modification event: {}", tableModificationEvent);
listener.tableModified(tableModificationEvent);
}
}
@@ -345,6 +349,15 @@
public int ttlMillis() {
return ttlMillis;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("tableName", tableName)
+ .add("expireOldEntries", expireOldEntries)
+ .add("ttlMillis", ttlMillis)
+ .toString();
+ }
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index c466941..cef53a5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -80,6 +80,7 @@
return false;
}
}
+ isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
@@ -95,9 +96,11 @@
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
return false;
+ } else {
+ return true;
}
}
- return true;
+ return false;
}
@Override
@@ -105,6 +108,7 @@
if (!isLocked()) {
return;
} else {
+ isLocked.set(false);
databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index 6d99ba7..c8b16e4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -94,6 +94,8 @@
return;
}
+ log.info("Received a lock available event for path: {}", event.key());
+
String path = event.key();
if (!locksToAcquire.containsKey(path)) {
return;
@@ -159,4 +161,4 @@
return future;
}
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
index b0962dc..31dcaab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -2,6 +2,8 @@
import org.onlab.onos.store.service.VersionedValue;
+import com.google.common.base.MoreObjects;
+
/**
* A table modification event.
*/
@@ -9,7 +11,6 @@
/**
* Type of table modification event.
- *
*/
public enum Type {
ROW_ADDED,
@@ -94,4 +95,14 @@
public Type type() {
return type;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("version", value.version())
+ .toString();
+ }
}