Kryo related fixes
- KryoNamespace to allow control over registration id
Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index c820704..3aa30dc 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -18,6 +18,9 @@
import java.io.IOException;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.util.ByteArraySizeHashPrinter;
+
+import com.google.common.base.MoreObjects;
// TODO: Should payload type be ByteBuffer?
/**
@@ -78,4 +81,13 @@
public void respond(byte[] data) throws IOException {
throw new IllegalStateException("One can only repond to message recived from others.");
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("sender", sender)
+ .add("subject", subject)
+ .add("payload", ByteArraySizeHashPrinter.of(payload))
+ .toString();
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 70aeeed..8ff3392 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -34,7 +34,6 @@
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -67,12 +66,10 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .register(ClusterMessage.class, new ClusterMessageSerializer())
- .register(ClusterMembershipEvent.class)
- .register(byte[].class)
- .register(MessageSubject.class, new MessageSubjectSerializer())
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(new ClusterMessageSerializer(), ClusterMessage.class)
+ .register(new MessageSubjectSerializer(), MessageSubject.class)
+ .build();
}
};
@@ -194,11 +191,17 @@
@Override
public void handle(Message message) {
+ final ClusterMessage clusterMessage;
try {
- ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ clusterMessage = SERIALIZER.decode(message.payload());
+ } catch (Exception e) {
+ log.error("Failed decoding ClusterMessage", e);
+ throw e;
+ }
+ try {
handler.handle(new InternalClusterMessage(clusterMessage, message));
} catch (Exception e) {
- log.error("Exception caught during ClusterMessageHandler", e);
+ log.error("Exception caught handling {}", clusterMessage, e);
throw e;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
index 24f4f74..884bc12 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
@@ -65,8 +65,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8d32585..0de64ca 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -141,18 +141,17 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
-
- .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
- .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
+ .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
.register(InternalDeviceRemovedEvent.class)
- .register(InternalPortEvent.class, new InternalPortEventSerializer())
- .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+ .register(new InternalPortEventSerializer(), InternalPortEvent.class)
+ .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f7718c1..d6d06d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -156,9 +156,9 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
- .build()
- .populate(1);
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e1be932..6620477 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -123,13 +123,13 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
.register(HostFragmentId.class)
.register(HostAntiEntropyAdvertisement.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index 27c1284..3ad4036 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -93,8 +93,8 @@
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
index fecaf6a..80c27d1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -86,8 +86,8 @@
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 97d1ce7..3ec7524 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -129,13 +129,13 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalLinkEvent.class)
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index dd5041a..78a92a8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -84,10 +84,9 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
-
- .register(RoleValue.class, new RoleValueSerializer())
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(new RoleValueSerializer(), RoleValue.class)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
index be38785..793e759 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
@@ -72,8 +72,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
index 2de47bd..5ef8aaf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
@@ -23,13 +23,17 @@
public final class DistributedStoreSerializers {
+
+ public static final int STORE_CUSTOM_BEGIN = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 10;
+
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
- public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+ public static final KryoNamespace STORE_COMMON = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(Timestamped.class)
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.register(WallClockTimestamp.class)
.build();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
index b771b80..2c390c5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
@@ -55,8 +55,8 @@
* Creates a CMap instance.
* It will create the table if necessary.
*
- * @param dbAdminService
- * @param dbService
+ * @param dbAdminService DatabaseAdminService to use for this instance
+ * @param dbService DatabaseService to use for this instance
* @param tableName table which this Map corresponds to
* @param serializer Value serializer
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index b2f5c2b..5a6a82a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -2,13 +2,6 @@
import static org.slf4j.LoggerFactory.getLogger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.Vector;
import net.kuujo.copycat.cluster.TcpClusterConfig;
@@ -40,29 +33,14 @@
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.ImmutableListSerializer;
-import org.onlab.onos.store.serializers.ImmutableMapSerializer;
-import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.service.BatchReadRequest;
-import org.onlab.onos.store.service.BatchWriteRequest;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.ReadStatus;
-import org.onlab.onos.store.service.VersionedValue;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.WriteStatus;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* ONOS Cluster messaging based Copycat protocol.
*/
@@ -88,7 +66,11 @@
public static final MessageSubject COPYCAT_SUBMIT =
new MessageSubject("copycat-raft-consensus-submit");
- private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
+
+ static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(PingRequest.class)
.register(PingResponse.class)
.register(PollRequest.class)
@@ -105,53 +87,23 @@
.register(TcpClusterConfig.class)
.register(TcpMember.class)
.register(LeaderElectEvent.class)
- .build();
-
- private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
- .register(ReadRequest.class)
- .register(WriteRequest.class)
- .register(WriteRequest.Type.class)
- .register(WriteResult.class)
- .register(ReadResult.class)
- .register(BatchReadRequest.class)
- .register(BatchWriteRequest.class)
- .register(ReadStatus.class)
- .register(WriteStatus.class)
- .register(VersionedValue.class)
- .build();
-
- public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
- .register(Arrays.asList().getClass(), new CollectionSerializer() {
- @Override
- @SuppressWarnings("rawtypes")
- protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
- return new ArrayList();
- }
- })
- .register(ImmutableMap.class, new ImmutableMapSerializer())
- .register(ImmutableList.class, new ImmutableListSerializer())
- .register(ImmutableSet.class, new ImmutableSetSerializer())
- .register(
- Vector.class,
- ArrayList.class,
- Arrays.asList().getClass(),
- HashMap.class,
- HashSet.class,
- LinkedList.class,
- Collections.singletonList("").getClass(),
- byte[].class)
+ .register(Vector.class)
.build();
// serializer used for CopyCat Protocol
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
- .register(COMMON)
- .register(DATABASE)
- .build()
- .populate(1);
+ .nextId(AFTER_COPYCAT)
+ // for snapshot
+ .register(State.class)
+ .register(TableMetadata.class)
+ // TODO: Move this out ?
+ .register(TableModificationEvent.class)
+ .register(TableModificationEvent.Type.class)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 9bba4b5..475db82 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Verify.verifyNotNull;
-import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -139,7 +139,7 @@
new ClusterMessage(
localNode.id(),
messageType(request),
- verifyNotNull(SERIALIZER.encode(request)));
+ verifyNotNull(DB_SERIALIZER.encode(request)));
this.future = future;
}
@@ -158,7 +158,8 @@
if (!connectionOK.getAndSet(true)) {
log.info("Connectivity to {} restored", remoteNode);
}
- future.complete(verifyNotNull(SERIALIZER.decode(response)));
+ future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
+
} catch (IOException | TimeoutException e) {
if (connectionOK.getAndSet(false)) {
log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 0ecc0a5..373bc97 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -66,7 +66,7 @@
@Override
public void handle(ClusterMessage message) {
- T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
if (handler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
@@ -117,7 +117,7 @@
} else {
try {
log.trace("responding to {}", message.subject());
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to " + response.getClass().getName(), e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c3f9118..17f356b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -54,7 +54,7 @@
@Override
public void handle(ClusterMessage message) {
LeaderElectEvent event =
- ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
TcpMember newLeader = event.leader();
long newLeaderTerm = event.term();
if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 2fba52e..df9c73f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -101,7 +101,7 @@
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
- DatabaseStateMachine.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database row deleted event.", e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 2c0ff14..c3494d9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -173,7 +173,7 @@
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
- ClusterMessagingProtocol.SERIALIZER);
+ ClusterMessagingProtocol.DB_SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
@@ -432,7 +432,7 @@
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
}
} catch (Exception e) {
log.debug("LeaderAdvertiser failed with exception", e);
@@ -454,7 +454,7 @@
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} else {
if (myLeaderEvent != null) {
log.debug("This node is no longer the Leader");
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 2fbef0b..cfb8512b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -2,6 +2,7 @@
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -20,7 +21,6 @@
import net.kuujo.copycat.StateMachine;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
@@ -30,7 +30,6 @@
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.WriteStatus;
-import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
@@ -39,7 +38,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
/**
* StateMachine whose transitions are coordinated/replicated
@@ -59,33 +57,13 @@
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
- // serializer used for snapshot
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(VersionedValue.class)
- .register(State.class)
- .register(TableMetadata.class)
- .register(BatchReadRequest.class)
- .register(BatchWriteRequest.class)
- .register(ReadStatus.class)
- .register(WriteStatus.class)
- // TODO: Move this out ?
- .register(TableModificationEvent.class)
- .register(TableModificationEvent.Type.class)
- .register(ClusterMessagingProtocol.COMMON)
- .build()
- .populate(1);
- }
- };
-
private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
- private boolean compressSnapshot = false;
+ // TODO make this configurable
+ private boolean compressSnapshot = true;
@Command
public boolean createTable(String tableName) {
@@ -402,14 +380,14 @@
public byte[] takeSnapshot() {
try {
if (compressSnapshot) {
- byte[] input = SERIALIZER.encode(state);
+ byte[] input = DB_SERIALIZER.encode(state);
ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
compressor.write(input, 0, input.length);
compressor.close();
return comp.toByteArray();
} else {
- return SERIALIZER.encode(state);
+ return DB_SERIALIZER.encode(state);
}
} catch (Exception e) {
log.error("Failed to take snapshot", e);
@@ -423,10 +401,9 @@
if (compressSnapshot) {
ByteArrayInputStream in = new ByteArrayInputStream(data);
InflaterInputStream decompressor = new InflaterInputStream(in);
- ByteStreams.toByteArray(decompressor);
- this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
+ this.state = DB_SERIALIZER.decode(decompressor);
} else {
- this.state = SERIALIZER.decode(data);
+ this.state = DB_SERIALIZER.decode(data);
}
updatesExecutor.submit(new Runnable() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index 74f8373..45cc7ed 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -149,7 +149,7 @@
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
- TableModificationEvent event = DatabaseStateMachine.SERIALIZER
+ TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
.decode(message.payload());
if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index cc432a0..42fcf98 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -90,9 +90,9 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
- .build()
- .populate(1);
+ .build();
}
};;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
index e1da794..9832483 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
@@ -95,7 +95,7 @@
public final void testKryoSerializableWithHandcraftedSerializer() {
final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
final KryoNamespace kryos = KryoNamespace.newBuilder()
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.build();
kryos.serialize(TS_1_2, buffer);
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
index f14ef92..5aae159 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
@@ -22,7 +22,7 @@
*/
public class MapDBLogTest {
- private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
+ private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.DB_SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ArraysAsListSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ArraysAsListSerializer.java
new file mode 100644
index 0000000..fe54804
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ArraysAsListSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.serializers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link java.util.Arrays#asList(Object...)}.
+ */
+public final class ArraysAsListSerializer extends Serializer<List<?>> {
+
+ @Override
+ public void write(Kryo kryo, Output output, List<?> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public List<?> read(Kryo kryo, Input input, Class<List<?>> type) {
+ final int size = input.readInt(true);
+ List<Object> list = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ list.add(kryo.readClassAndObject(input));
+ }
+ return list;
+ }
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
index a787012..dd02834 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
@@ -45,7 +45,7 @@
kryo.writeClassAndObject(output, object.dst());
kryo.writeClassAndObject(output, object.type());
kryo.writeClassAndObject(output, object.state());
- kryo.writeClassAndObject(output, object.isDurable());
+ output.writeBoolean(object.isDurable());
}
@Override
@@ -55,7 +55,7 @@
ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
Type linkType = (Type) kryo.readClassAndObject(input);
State state = (State) kryo.readClassAndObject(input);
- boolean isDurable = (boolean) kryo.readClassAndObject(input);
+ boolean isDurable = input.readBoolean();
return new DefaultLink(providerId, src, dst, linkType, state, isDurable);
}
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
index d51f37b..8c6ee38 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
@@ -15,9 +15,8 @@
*/
package org.onlab.onos.store.serializers;
-import org.onlab.util.KryoNamespace.FamilySerializer;
-
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
@@ -26,7 +25,7 @@
/**
* Creates {@link ImmutableList} serializer instance.
*/
-public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
+public class ImmutableListSerializer extends Serializer<ImmutableList<?>> {
/**
* Creates {@link ImmutableList} serializer instance.
@@ -53,12 +52,4 @@
}
return builder.build();
}
-
- @Override
- public void registerFamilies(Kryo kryo) {
- kryo.register(ImmutableList.of(1).getClass(), this);
- kryo.register(ImmutableList.of(1, 2).getClass(), this);
- // TODO register required ImmutableList variants
- }
-
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
index 1a26ea4..85b98a9 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
@@ -19,9 +19,8 @@
import java.util.HashMap;
import java.util.Map;
-import org.onlab.util.KryoNamespace.FamilySerializer;
-
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
@@ -30,7 +29,7 @@
/**
* Kryo Serializer for {@link ImmutableMap}.
*/
-public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
+public class ImmutableMapSerializer extends Serializer<ImmutableMap<?, ?>> {
private final MapSerializer mapSerializer = new MapSerializer();
@@ -56,12 +55,4 @@
Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
return ImmutableMap.copyOf(map);
}
-
- @Override
- public void registerFamilies(Kryo kryo) {
- kryo.register(ImmutableMap.of().getClass(), this);
- kryo.register(ImmutableMap.of(1, 2).getClass(), this);
- kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
- // TODO register required ImmutableMap variants
- }
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
index f63d17d..bcba7df 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
@@ -18,9 +18,8 @@
import java.util.ArrayList;
import java.util.List;
-import org.onlab.util.KryoNamespace.FamilySerializer;
-
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
@@ -29,7 +28,7 @@
/**
* Kryo Serializer for {@link ImmutableSet}.
*/
-public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
+public class ImmutableSetSerializer extends Serializer<ImmutableSet<?>> {
private final CollectionSerializer serializer = new CollectionSerializer();
@@ -39,6 +38,7 @@
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
+ serializer.setElementsCanBeNull(false);
}
@Override
@@ -52,12 +52,4 @@
List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
return ImmutableSet.copyOf(elms);
}
-
- @Override
- public void registerFamilies(Kryo kryo) {
- kryo.register(ImmutableSet.of().getClass(), this);
- kryo.register(ImmutableSet.of(1).getClass(), this);
- kryo.register(ImmutableSet.of(1, 2).getClass(), this);
- // TODO register required ImmutableSet variants
- }
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index fea74bf..264dd4a 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -99,6 +100,15 @@
import org.onlab.onos.net.resource.LambdaResourceRequest;
import org.onlab.onos.net.resource.LinkResourceRequest;
import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress;
import org.onlab.packet.Ip4Address;
@@ -117,41 +127,62 @@
public final class KryoNamespaces {
public static final KryoNamespace BASIC = KryoNamespace.newBuilder()
- .register(ImmutableMap.class, new ImmutableMapSerializer())
- .register(ImmutableList.class, new ImmutableListSerializer())
- .register(ImmutableSet.class, new ImmutableSetSerializer())
- .register(
- ArrayList.class,
- Arrays.asList().getClass(),
- HashMap.class,
- HashSet.class,
- LinkedList.class,
- byte[].class,
- Duration.class
- )
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(byte[].class)
+ .register(new ImmutableListSerializer(),
+ ImmutableList.class,
+ ImmutableList.of(1).getClass(),
+ ImmutableList.of(1, 2).getClass())
+ .register(new ImmutableSetSerializer(),
+ ImmutableSet.class,
+ ImmutableSet.of().getClass(),
+ ImmutableSet.of(1).getClass(),
+ ImmutableSet.of(1, 2).getClass())
+ .register(new ImmutableMapSerializer(),
+ ImmutableMap.class,
+ ImmutableMap.of().getClass(),
+ ImmutableMap.of("a", 1).getClass(),
+ ImmutableMap.of("R", 2, "D", 2).getClass())
+ .register(HashMap.class)
+ .register(ArrayList.class,
+ LinkedList.class,
+ HashSet.class
+ )
+ .register(new ArraysAsListSerializer(), Arrays.asList().getClass())
+ .register(Collections.singletonList(1).getClass())
+ .register(Duration.class)
.build();
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
public static final KryoNamespace MISC = KryoNamespace.newBuilder()
- .register(IpPrefix.class, new IpPrefixSerializer())
- .register(Ip4Prefix.class, new Ip4PrefixSerializer())
- .register(Ip6Prefix.class, new Ip6PrefixSerializer())
- .register(IpAddress.class, new IpAddressSerializer())
- .register(Ip4Address.class, new Ip4AddressSerializer())
- .register(Ip6Address.class, new Ip6AddressSerializer())
- .register(MacAddress.class, new MacAddressSerializer())
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(new IpPrefixSerializer(), IpPrefix.class)
+ .register(new Ip4PrefixSerializer(), Ip4Prefix.class)
+ .register(new Ip6PrefixSerializer(), Ip6Prefix.class)
+ .register(new IpAddressSerializer(), IpAddress.class)
+ .register(new Ip4AddressSerializer(), Ip4Address.class)
+ .register(new Ip6AddressSerializer(), Ip6Address.class)
+ .register(new MacAddressSerializer(), MacAddress.class)
.register(VlanId.class)
.build();
+ /**
+ * Kryo registration Id for user custom registration.
+ */
+ public static final int BEGIN_USER_CUSTOM_ID = 300;
+
// TODO: Populate other classes
/**
* KryoNamespace which can serialize API bundle classes.
*/
public static final KryoNamespace API = KryoNamespace.newBuilder()
- .register(MISC)
+ .nextId(KryoNamespace.INITIAL_ID)
.register(BASIC)
+ .nextId(KryoNamespace.INITIAL_ID + 30)
+ .register(MISC)
+ .nextId(KryoNamespace.INITIAL_ID + 30 + 10)
.register(
ControllerNode.State.class,
Device.Type.class,
@@ -242,19 +273,29 @@
AnnotationConstraint.class,
BooleanConstraint.class
)
- .register(DefaultApplicationId.class, new DefaultApplicationIdSerializer())
- .register(URI.class, new URISerializer())
- .register(NodeId.class, new NodeIdSerializer())
- .register(ProviderId.class, new ProviderIdSerializer())
- .register(DeviceId.class, new DeviceIdSerializer())
- .register(PortNumber.class, new PortNumberSerializer())
- .register(DefaultPort.class, new DefaultPortSerializer())
- .register(LinkKey.class, new LinkKeySerializer())
- .register(ConnectPoint.class, new ConnectPointSerializer())
- .register(DefaultLink.class, new DefaultLinkSerializer())
- .register(MastershipTerm.class, new MastershipTermSerializer())
- .register(HostLocation.class, new HostLocationSerializer())
- .register(DefaultOutboundPacket.class, new DefaultOutboundPacketSerializer())
+ .register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
+ .register(new URISerializer(), URI.class)
+ .register(new NodeIdSerializer(), NodeId.class)
+ .register(new ProviderIdSerializer(), ProviderId.class)
+ .register(new DeviceIdSerializer(), DeviceId.class)
+ .register(new PortNumberSerializer(), PortNumber.class)
+ .register(new DefaultPortSerializer(), DefaultPort.class)
+ .register(new LinkKeySerializer(), LinkKey.class)
+ .register(new ConnectPointSerializer(), ConnectPoint.class)
+ .register(new DefaultLinkSerializer(), DefaultLink.class)
+ .register(new MastershipTermSerializer(), MastershipTerm.class)
+ .register(new HostLocationSerializer(), HostLocation.class)
+ .register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
+ .register(ReadRequest.class)
+ .register(WriteRequest.class)
+ .register(WriteRequest.Type.class)
+ .register(WriteResult.class)
+ .register(ReadResult.class)
+ .register(BatchReadRequest.class)
+ .register(BatchWriteRequest.class)
+ .register(ReadStatus.class)
+ .register(WriteStatus.class)
+ .register(VersionedValue.class)
.build();
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
index da37848..4554f8c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -15,10 +15,14 @@
*/
package org.onlab.onos.store.serializers;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
+import com.google.common.base.MoreObjects;
+
/**
* StoreSerializer implementation using Kryo.
*/
@@ -36,8 +40,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
@Override
@@ -63,4 +67,20 @@
return serializerPool.deserialize(buffer);
}
+ @Override
+ public void encode(Object obj, OutputStream stream) {
+ serializerPool.serialize(obj, stream);
+ }
+
+ @Override
+ public <T> T decode(InputStream stream) {
+ return serializerPool.deserialize(stream);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("serializerPool", serializerPool)
+ .toString();
+ }
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
index 8ebb0e6..a5a95ac 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
@@ -15,6 +15,8 @@
*/
package org.onlab.onos.store.serializers;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
@@ -40,6 +42,14 @@
public void encode(final Object obj, ByteBuffer buffer);
/**
+ * Serializes the specified object into bytes.
+ *
+ * @param obj object to be serialized
+ * @param stream to write serialized bytes
+ */
+ public void encode(final Object obj, final OutputStream stream);
+
+ /**
* Deserializes the specified bytes into an object.
*
* @param bytes bytes to be deserialized
@@ -56,4 +66,13 @@
* @param <T> decoded type
*/
public <T> T decode(final ByteBuffer buffer);
+
+ /**
+ * Deserializes the specified bytes into an object.
+ *
+ * @param stream stream containing the bytes to be deserialized
+ * @return deserialized object
+ * @param <T> decoded type
+ */
+ public <T> T decode(final InputStream stream);
}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
index ff5b250..535b996 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
@@ -70,6 +70,7 @@
import org.onlab.util.KryoNamespace;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -121,8 +122,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
}
@@ -326,6 +327,11 @@
}
@Test
+ public void testArraysAsList() {
+ testSerializedEquals(Arrays.asList(1, 2, 3));
+ }
+
+ @Test
public void testAnnotationConstraint() {
testSerializable(new AnnotationConstraint("distance", 100.0));
}
diff --git a/providers/host/bin/pom.xml b/providers/host/bin/pom.xml
deleted file mode 100644
index e3ff4d0..0000000
--- a/providers/host/bin/pom.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2014 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onlab.onos</groupId>
- <artifactId>onos-of-providers</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>onos-of-provider-host</artifactId>
- <packaging>bundle</packaging>
-
- <description>ONOS OpenFlow protocol host provider</description>
-
-</project>
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
index b89544e..63d5363 100644
--- a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
+++ b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
@@ -15,10 +15,11 @@
*/
package org.onlab.util;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.Pair;
@@ -27,15 +28,17 @@
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.pool.KryoCallback;
import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
-// TODO Add tests for this class.
/**
* Pool of Kryo instances, with classes pre-registered.
*/
//@ThreadSafe
-public final class KryoNamespace implements KryoFactory {
+public final class KryoNamespace implements KryoFactory, KryoPool {
/**
* Default buffer size used for serialization.
@@ -45,17 +48,35 @@
public static final int DEFAULT_BUFFER_SIZE = 4096;
public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
- private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
- private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
+ /**
+ * ID to use if this KryoNamespace does not define registration id.
+ */
+ public static final int FLOATING_ID = -1;
+
+ /**
+ * Smallest ID free to use for user defined registrations.
+ */
+ public static final int INITIAL_ID = 11;
+
+
+ private final KryoPool pool = new KryoPool.Builder(this)
+ .softReferences()
+ .build();
+
+ private final ImmutableList<RegistrationBlock> registeredBlocks;
+
private final boolean registrationRequired;
+
/**
* KryoNamespace builder.
*/
//@NotThreadSafe
public static final class Builder {
- private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+ private int blockHeadId = INITIAL_ID;
+ private List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+ private List<RegistrationBlock> blocks = new ArrayList<>();
private boolean registrationRequired = true;
/**
@@ -64,7 +85,27 @@
* @return KryoNamespace
*/
public KryoNamespace build() {
- return new KryoNamespace(types, registrationRequired);
+ if (!types.isEmpty()) {
+ blocks.add(new RegistrationBlock(this.blockHeadId, types));
+ }
+ return new KryoNamespace(blocks, registrationRequired).populate(1);
+ }
+
+ /**
+ * Sets the next Kryo registration Id for following register entries.
+ *
+ * @param id Kryo registration Id
+ * @return this
+ *
+ * @see Kryo#register(Class, Serializer, int)
+ */
+ public Builder nextId(final int id) {
+ if (!types.isEmpty()) {
+ blocks.add(new RegistrationBlock(this.blockHeadId, types));
+ types = new ArrayList<>();
+ }
+ this.blockHeadId = id;
+ return this;
}
/**
@@ -75,7 +116,7 @@
*/
public Builder register(final Class<?>... expectedTypes) {
for (Class<?> clazz : expectedTypes) {
- types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
+ types.add(Pair.of(clazz, null));
}
return this;
}
@@ -83,26 +124,54 @@
/**
* Registers a class and it's serializer.
*
- * @param clazz the class to register
+ * @param classes list of classes to register
* @param serializer serializer to use for the class
* @return this
*/
- public Builder register(final Class<?> clazz, Serializer<?> serializer) {
- types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
+ public Builder register(Serializer<?> serializer, final Class<?>... classes) {
+ for (Class<?> clazz : classes) {
+ types.add(Pair.of(clazz, serializer));
+ }
+ return this;
+ }
+
+ private Builder register(RegistrationBlock block) {
+ if (block.begin() != FLOATING_ID) {
+ // flush pending types
+ nextId(block.begin());
+ blocks.add(block);
+ nextId(block.begin() + block.types().size());
+ } else {
+ // flush pending types
+ final int addedBlockBegin = blockHeadId + types.size();
+ nextId(addedBlockBegin);
+ blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
+ nextId(addedBlockBegin + block.types().size());
+ }
return this;
}
/**
* Registers all the class registered to given KryoNamespace.
*
- * @param pool KryoNamespace
+ * @param ns KryoNamespace
* @return this
*/
- public Builder register(final KryoNamespace pool) {
- types.addAll(pool.registeredTypes);
+ public Builder register(final KryoNamespace ns) {
+ for (RegistrationBlock block : ns.registeredBlocks) {
+ this.register(block);
+ }
return this;
}
+ /**
+ * Sets the registrationRequired flag.
+ *
+ * @param registrationRequired Kryo's registrationRequired flag
+ * @return this
+ *
+ * @see Kryo#setRegistrationRequired(boolean)
+ */
public Builder setRegistrationRequired(boolean registrationRequired) {
this.registrationRequired = registrationRequired;
return this;
@@ -124,8 +193,8 @@
* @param registeredTypes types to register
* @param registrationRequired
*/
- private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registeredTypes, boolean registrationRequired) {
- this.registeredTypes = ImmutableList.copyOf(registeredTypes);
+ private KryoNamespace(final List<RegistrationBlock> registeredTypes, boolean registrationRequired) {
+ this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
this.registrationRequired = registrationRequired;
}
@@ -136,39 +205,14 @@
* @return this
*/
public KryoNamespace populate(int instances) {
- List<Kryo> kryos = new ArrayList<>(instances);
+
for (int i = 0; i < instances; ++i) {
- kryos.add(create());
+ release(create());
}
- pool.addAll(kryos);
return this;
}
/**
- * Gets a Kryo instance from the pool.
- *
- * @return Kryo instance
- */
- public Kryo getKryo() {
- Kryo kryo = pool.poll();
- if (kryo == null) {
- return create();
- }
- return kryo;
- }
-
- /**
- * Returns a Kryo instance to the pool.
- *
- * @param kryo instance obtained from this pool.
- */
- public void putKryo(Kryo kryo) {
- if (kryo != null) {
- pool.add(kryo);
- }
- }
-
- /**
* Serializes given object to byte array using Kryo instance in pool.
* <p>
* Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
@@ -189,13 +233,13 @@
*/
public byte[] serialize(final Object obj, final int bufferSize) {
ByteBufferOutput out = new ByteBufferOutput(bufferSize, MAX_BUFFER_SIZE);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
return out.toBytes();
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
@@ -207,12 +251,40 @@
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
} finally {
- putKryo(kryo);
+ release(kryo);
+ }
+ }
+
+ /**
+ * Serializes given object to OutputStream using Kryo instance in pool.
+ *
+ * @param obj Object to serialize
+ * @param stream to write to
+ */
+ public void serialize(final Object obj, final OutputStream stream) {
+ serialize(obj, stream, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Serializes given object to OutputStream using Kryo instance in pool.
+ *
+ * @param obj Object to serialize
+ * @param stream to write to
+ * @param bufferSize size of the buffer in front of the stream
+ */
+ public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
+ ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
+ Kryo kryo = borrow();
+ try {
+ kryo.writeClassAndObject(out, obj);
+ out.flush();
+ } finally {
+ release(kryo);
}
}
@@ -225,13 +297,13 @@
*/
public <T> T deserialize(final byte[] bytes) {
Input in = new Input(bytes);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
@@ -244,18 +316,49 @@
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
/**
- * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
+ * Deserializes given InputStream to an Object using Kryo instance in pool.
+ *
+ * @param stream input stream
+ * @param <T> deserialized Object type
+ * @return deserialized Object
+ */
+ public <T> T deserialize(final InputStream stream) {
+ return deserialize(stream, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Deserializes given InputStream to an Object using Kryo instance in pool.
+ *
+ * @param stream input stream
+ * @param <T> deserialized Object type
+ * @return deserialized Object
+ * @param bufferSize size of the buffer in front of the stream
+ */
+ public <T> T deserialize(final InputStream stream, final int bufferSize) {
+ ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
+ Kryo kryo = borrow();
+ try {
+ @SuppressWarnings("unchecked")
+ T obj = (T) kryo.readClassAndObject(in);
+ return obj;
+ } finally {
+ release(kryo);
+ }
+ }
+
+ /**
+ * Creates a Kryo instance.
*
* @return Kryo instance
*/
@@ -263,42 +366,68 @@
public Kryo create() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
- for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
- final Serializer<?> serializer = registry.getRight();
- if (serializer == null) {
- kryo.register(registry.getLeft());
- } else {
- kryo.register(registry.getLeft(), serializer);
- if (serializer instanceof FamilySerializer) {
- FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
- fser.registerFamilies(kryo);
+ for (RegistrationBlock block : registeredBlocks) {
+ int id = block.begin();
+ if (id == FLOATING_ID) {
+ id = kryo.getNextRegistrationId();
+ }
+ for (Pair<Class<?>, Serializer<?>> entry : block.types()) {
+ final Serializer<?> serializer = entry.getRight();
+ if (serializer == null) {
+ kryo.register(entry.getLeft(), id++);
+ } else {
+ kryo.register(entry.getLeft(), serializer, id++);
}
}
}
return kryo;
}
- /**
- * Serializer implementation, which required registration of family of Classes.
- * @param <T> base type of this serializer.
- */
- public abstract static class FamilySerializer<T> extends Serializer<T> {
+ @Override
+ public Kryo borrow() {
+ return pool.borrow();
+ }
+ @Override
+ public void release(Kryo kryo) {
+ pool.release(kryo);
+ }
- public FamilySerializer(boolean acceptsNull) {
- super(acceptsNull);
+ @Override
+ public <T> T run(KryoCallback<T> callback) {
+ return pool.run(callback);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("registeredBlocks", registeredBlocks)
+ .toString();
+ }
+
+ static final class RegistrationBlock {
+ private final int begin;
+ private final ImmutableList<Pair<Class<?>, Serializer<?>>> types;
+
+ public RegistrationBlock(int begin, List<Pair<Class<?>, Serializer<?>>> types) {
+ this.begin = begin;
+ this.types = ImmutableList.copyOf(types);
}
- public FamilySerializer(boolean acceptsNull, boolean immutable) {
- super(acceptsNull, immutable);
+ public int begin() {
+ return begin;
}
- /**
- * Registers other classes this Serializer supports.
- *
- * @param kryo instance to register classes to
- */
- public void registerFamilies(Kryo kryo) {
+ public ImmutableList<Pair<Class<?>, Serializer<?>>> types() {
+ return types;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("begin", begin)
+ .add("types", types)
+ .toString();
}
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index ca24e4f..50bc58d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -22,20 +22,11 @@
/**
* Representation of a TCP/UDP communication end point.
*/
-public class Endpoint {
+public final class Endpoint {
private final int port;
private final String host;
- /**
- * Used for serialization.
- */
- @SuppressWarnings("unused")
- private Endpoint() {
- port = 0;
- host = null;
- }
-
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index a981c34..40b529e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -34,6 +34,13 @@
// Must be created using the Builder.
private InternalMessage() {}
+ InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+ this.id = id;
+ this.sender = sender;
+ this.type = type;
+ this.payload = payload;
+ }
+
public long id() {
return id;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 435225c..6118c49 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -17,11 +17,13 @@
import org.onlab.util.KryoNamespace;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
-//FIXME: Should be move out to test or app
+import java.nio.ByteBuffer;
+
/**
* Kryo Serializer.
*/
@@ -37,17 +39,11 @@
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
- // FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoNamespace.newBuilder()
- .register(ArrayList.class,
- HashMap.class,
- ArrayList.class,
- InternalMessage.class,
- Endpoint.class,
- byte[].class
- )
- .build()
- .populate(1);
+ .register(byte[].class)
+ .register(new InternalMessageSerializer(), InternalMessage.class)
+ .register(new EndPointSerializer(), Endpoint.class)
+ .build();
}
@@ -66,4 +62,45 @@
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
+
+ public static final class InternalMessageSerializer
+ extends Serializer<InternalMessage> {
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalMessage object) {
+ output.writeLong(object.id());
+ kryo.writeClassAndObject(output, object.sender());
+ output.writeString(object.type());
+ output.writeInt(object.payload().length, true);
+ output.writeBytes(object.payload());
+ }
+
+ @Override
+ public InternalMessage read(Kryo kryo, Input input,
+ Class<InternalMessage> type) {
+ long id = input.readLong();
+ Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
+ String msgtype = input.readString();
+ int length = input.readInt(true);
+ byte[] payload = input.readBytes(length);
+ return new InternalMessage(id, sender, msgtype, payload);
+ }
+
+ }
+
+ public static final class EndPointSerializer extends Serializer<Endpoint> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Endpoint object) {
+ output.writeString(object.host());
+ output.writeInt(object.port());
+ }
+
+ @Override
+ public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
+ String host = input.readString();
+ int port = input.readInt();
+ return new Endpoint(host, port);
+ }
+ }
}