Misc bug fixes in preparation for enabling StorageManager
Change-Id: I953414891c901e5d1f92844ca8c4eaa8c042dd53
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index fafb2d0..984a610 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -24,12 +24,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+
+
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.MoreObjects;
@@ -41,6 +43,7 @@
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
+import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
@@ -66,10 +69,6 @@
private final String inboundMessageSubject;
private final ThreadContext context;
private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
- private final AtomicInteger messagesSent = new AtomicInteger(0);
- private final AtomicInteger sendFailures = new AtomicInteger(0);
- private final AtomicInteger messagesReceived = new AtomicInteger(0);
- private final AtomicInteger receiveFailures = new AtomicInteger(0);
CopycatTransportConnection(long connectionId,
CopycatTransport.Mode mode,
@@ -120,12 +119,14 @@
baos.toByteArray(),
context.executor())
.whenComplete((r, e) -> {
- if (e == null) {
- messagesSent.incrementAndGet();
- } else {
- sendFailures.incrementAndGet();
+ Throwable wrappedError = e;
+ if (e != null) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
+ wrappedError = new TransportException(e);
+ }
}
- handleResponse(r, e, result, context);
+ handleResponse(r, wrappedError, result, context);
});
} catch (SerializationException | IOException e) {
result.completeExceptionally(e);
@@ -172,11 +173,6 @@
"No handler registered for " + request.getClass()));
}
return handler.handle(request).handle((result, error) -> {
- if (error == null) {
- messagesReceived.incrementAndGet();
- } else {
- receiveFailures.incrementAndGet();
- }
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
baos.write(error != null ? FAILURE : SUCCESS);
context.serializer().writeObject(error != null ? error : result, baos);
@@ -220,7 +216,6 @@
if (!(other instanceof CopycatTransportConnection)) {
return false;
}
-
return connectionId == ((CopycatTransportConnection) other).connectionId;
}
@@ -228,10 +223,6 @@
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", connectionId)
- .add("sent", messagesSent.get())
- .add("received", messagesReceived.get())
- .add("sendFailures", sendFailures.get())
- .add("receiveFailures", receiveFailures.get())
.toString();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index b40e39d..286b2fb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -18,7 +18,6 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.resource.ResourceType;
-import io.atomix.variables.DistributedLong;
import java.io.File;
import java.util.Collection;
@@ -57,7 +56,6 @@
private StoragePartitionClient client;
public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
- new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index dbc3157..4f912da 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -52,7 +52,7 @@
@Override
public ConsistencyLevel consistency() {
- return ConsistencyLevel.SEQUENTIAL;
+ return ConsistencyLevel.LINEARIZABLE;
}
@Override
@@ -78,7 +78,7 @@
@Override
public ConsistencyLevel consistency() {
- return ConsistencyLevel.SEQUENTIAL;
+ return ConsistencyLevel.BOUNDED_LINEARIZABLE;
}
@Override