Method name refactor in DistributedPrimitive + Builder for AsyncLeaderElector
Change-Id: I59be6e66665c0b12d02106bd5c722e9fa38dd7a1
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 9a1959a..6acb7db 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,6 +33,7 @@
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
+import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
@@ -71,18 +73,23 @@
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
- InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
- int senderPort = sender.port();
- Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
AtomicBoolean newConnection = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnection.set(true);
- return new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- senderAddress,
- messagingService,
- getOrCreateContext(context));
+ try {
+ InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
+ int senderPort = sender.port();
+ Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
+ return new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ senderAddress,
+ messagingService,
+ getOrCreateContext(context));
+ } catch (UnknownHostException e) {
+ Throwables.propagate(e);
+ return null;
+ }
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
index b6d5cd2..4065eab 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
@@ -76,6 +76,7 @@
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -360,6 +361,11 @@
}
@Override
+ public LeaderElectorBuilder leaderElectorBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public List<MapInfo> getMapInfo() {
List<MapInfo> maps = Lists.newArrayList();
maps.addAll(getMapInfo(inMemoryDatabase));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 6c709ce..78fcedd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,29 +54,29 @@
}
@Override
- public T read(Class<T> clazz, BufferInput<?> input,
- io.atomix.catalyst.serializer.Serializer serializer) {
- int size = input.readInt();
- byte[] payload = new byte[size];
- input.read(payload);
+ public void write(T object, BufferOutput<?> buffer,
+ io.atomix.catalyst.serializer.Serializer serializer) {
try {
- return this.serializer.decode(payload);
+ byte[] payload = this.serializer.encode(object);
+ buffer.writeInt(payload.length);
+ buffer.write(payload);
} catch (Exception e) {
- log.warn("Failed to deserialize as type {}", clazz, e);
- Throwables.propagate(e);
- return null;
+ log.warn("Failed to serialize {}", object, e);
}
}
@Override
- public void write(T object, BufferOutput<?> output,
- io.atomix.catalyst.serializer.Serializer serializer) {
+ public T read(Class<T> type, BufferInput<?> buffer,
+ io.atomix.catalyst.serializer.Serializer serializer) {
+ int size = buffer.readInt();
try {
- byte[] payload = this.serializer.encode(object);
- output.writeInt(payload.length);
- output.write(payload);
+ byte[] payload = new byte[size];
+ buffer.read(payload);
+ return this.serializer.decode(payload);
} catch (Exception e) {
- log.warn("Failed to serialize {}", object, e);
+ log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
+ Throwables.propagate(e);
+ return null;
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
index e4cf65f..3480117 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
@@ -117,7 +117,7 @@
}
@Override
- public DistributedPrimitive.Type type() {
+ public DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.QUEUE;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
new file mode 100644
index 0000000..556ad17
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.LeaderElectorBuilder;
+
+/**
+ * Default implementation of {@code LeaderElectorBuilder}.
+ */
+public class DefaultLeaderElectorBuilder extends LeaderElectorBuilder {
+
+ private final DistributedPrimitiveCreator base;
+ private final DistributedPrimitiveCreator federated;
+
+ public DefaultLeaderElectorBuilder(DistributedPrimitiveCreator base, DistributedPrimitiveCreator federated) {
+ this.base = base;
+ this.federated = federated;
+ }
+
+ @Override
+ public AsyncLeaderElector build() {
+ DistributedPrimitiveCreator creator = partitionsDisabled() ? base : federated;
+ return creator.newAsyncLeaderElector(name());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index e7312a9..acf88fa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -33,6 +33,7 @@
import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.collect.Collections2;
@@ -51,6 +52,7 @@
private final File logFolder;
private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
+ new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
private NodeId localNodeId;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 2e40664..293deef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -29,6 +29,7 @@
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -133,7 +134,7 @@
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
- throw new UnsupportedOperationException();
+ return client.get(name, AtomixLeaderElector.class).join();
}
@Override