Kryo related fixes
- KryoNamespace to allow control over registration id
Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 70aeeed..8ff3392 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -34,7 +34,6 @@
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -67,12 +66,10 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .register(ClusterMessage.class, new ClusterMessageSerializer())
- .register(ClusterMembershipEvent.class)
- .register(byte[].class)
- .register(MessageSubject.class, new MessageSubjectSerializer())
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(new ClusterMessageSerializer(), ClusterMessage.class)
+ .register(new MessageSubjectSerializer(), MessageSubject.class)
+ .build();
}
};
@@ -194,11 +191,17 @@
@Override
public void handle(Message message) {
+ final ClusterMessage clusterMessage;
try {
- ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ clusterMessage = SERIALIZER.decode(message.payload());
+ } catch (Exception e) {
+ log.error("Failed decoding ClusterMessage", e);
+ throw e;
+ }
+ try {
handler.handle(new InternalClusterMessage(clusterMessage, message));
} catch (Exception e) {
- log.error("Exception caught during ClusterMessageHandler", e);
+ log.error("Exception caught handling {}", clusterMessage, e);
throw e;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
index 24f4f74..884bc12 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/core/impl/DistributedApplicationIdStore.java
@@ -65,8 +65,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8d32585..0de64ca 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -141,18 +141,17 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
-
- .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
- .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
+ .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
.register(InternalDeviceRemovedEvent.class)
- .register(InternalPortEvent.class, new InternalPortEventSerializer())
- .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+ .register(new InternalPortEventSerializer(), InternalPortEvent.class)
+ .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f7718c1..d6d06d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -156,9 +156,9 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
- .build()
- .populate(1);
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e1be932..6620477 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -123,13 +123,13 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
.register(HostFragmentId.class)
.register(HostAntiEntropyAdvertisement.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index 27c1284..3ad4036 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -93,8 +93,8 @@
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
index fecaf6a..80c27d1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -86,8 +86,8 @@
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 97d1ce7..3ec7524 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -129,13 +129,13 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.COMMON)
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(InternalLinkEvent.class)
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
- .build()
- .populate(1);
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index dd5041a..78a92a8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -84,10 +84,9 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
-
- .register(RoleValue.class, new RoleValueSerializer())
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(new RoleValueSerializer(), RoleValue.class)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
index be38785..793e759 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/packet/impl/DistributedPacketStore.java
@@ -72,8 +72,8 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .build()
- .populate(1);
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
index 2de47bd..5ef8aaf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
@@ -23,13 +23,17 @@
public final class DistributedStoreSerializers {
+
+ public static final int STORE_CUSTOM_BEGIN = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 10;
+
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
- public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+ public static final KryoNamespace STORE_COMMON = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(Timestamped.class)
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.register(WallClockTimestamp.class)
.build();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
index b771b80..2c390c5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/CMap.java
@@ -55,8 +55,8 @@
* Creates a CMap instance.
* It will create the table if necessary.
*
- * @param dbAdminService
- * @param dbService
+ * @param dbAdminService DatabaseAdminService to use for this instance
+ * @param dbService DatabaseService to use for this instance
* @param tableName table which this Map corresponds to
* @param serializer Value serializer
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index b2f5c2b..5a6a82a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -2,13 +2,6 @@
import static org.slf4j.LoggerFactory.getLogger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.Vector;
import net.kuujo.copycat.cluster.TcpClusterConfig;
@@ -40,29 +33,14 @@
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.ImmutableListSerializer;
-import org.onlab.onos.store.serializers.ImmutableMapSerializer;
-import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.service.BatchReadRequest;
-import org.onlab.onos.store.service.BatchWriteRequest;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.ReadStatus;
-import org.onlab.onos.store.service.VersionedValue;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.WriteStatus;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* ONOS Cluster messaging based Copycat protocol.
*/
@@ -88,7 +66,11 @@
public static final MessageSubject COPYCAT_SUBMIT =
new MessageSubject("copycat-raft-consensus-submit");
- private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
+
+ static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(PingRequest.class)
.register(PingResponse.class)
.register(PollRequest.class)
@@ -105,53 +87,23 @@
.register(TcpClusterConfig.class)
.register(TcpMember.class)
.register(LeaderElectEvent.class)
- .build();
-
- private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
- .register(ReadRequest.class)
- .register(WriteRequest.class)
- .register(WriteRequest.Type.class)
- .register(WriteResult.class)
- .register(ReadResult.class)
- .register(BatchReadRequest.class)
- .register(BatchWriteRequest.class)
- .register(ReadStatus.class)
- .register(WriteStatus.class)
- .register(VersionedValue.class)
- .build();
-
- public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
- .register(Arrays.asList().getClass(), new CollectionSerializer() {
- @Override
- @SuppressWarnings("rawtypes")
- protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
- return new ArrayList();
- }
- })
- .register(ImmutableMap.class, new ImmutableMapSerializer())
- .register(ImmutableList.class, new ImmutableListSerializer())
- .register(ImmutableSet.class, new ImmutableSetSerializer())
- .register(
- Vector.class,
- ArrayList.class,
- Arrays.asList().getClass(),
- HashMap.class,
- HashSet.class,
- LinkedList.class,
- Collections.singletonList("").getClass(),
- byte[].class)
+ .register(Vector.class)
.build();
// serializer used for CopyCat Protocol
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
- .register(COMMON)
- .register(DATABASE)
- .build()
- .populate(1);
+ .nextId(AFTER_COPYCAT)
+ // for snapshot
+ .register(State.class)
+ .register(TableMetadata.class)
+ // TODO: Move this out ?
+ .register(TableModificationEvent.class)
+ .register(TableModificationEvent.Type.class)
+ .build();
}
};
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 9bba4b5..475db82 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Verify.verifyNotNull;
-import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -139,7 +139,7 @@
new ClusterMessage(
localNode.id(),
messageType(request),
- verifyNotNull(SERIALIZER.encode(request)));
+ verifyNotNull(DB_SERIALIZER.encode(request)));
this.future = future;
}
@@ -158,7 +158,8 @@
if (!connectionOK.getAndSet(true)) {
log.info("Connectivity to {} restored", remoteNode);
}
- future.complete(verifyNotNull(SERIALIZER.decode(response)));
+ future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
+
} catch (IOException | TimeoutException e) {
if (connectionOK.getAndSet(false)) {
log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 0ecc0a5..373bc97 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -66,7 +66,7 @@
@Override
public void handle(ClusterMessage message) {
- T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
if (handler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
@@ -117,7 +117,7 @@
} else {
try {
log.trace("responding to {}", message.subject());
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to " + response.getClass().getName(), e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c3f9118..17f356b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -54,7 +54,7 @@
@Override
public void handle(ClusterMessage message) {
LeaderElectEvent event =
- ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
TcpMember newLeader = event.leader();
long newLeaderTerm = event.term();
if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 2fba52e..df9c73f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -101,7 +101,7 @@
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
- DatabaseStateMachine.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database row deleted event.", e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 2c0ff14..c3494d9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -173,7 +173,7 @@
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(expirationTracker);
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
- ClusterMessagingProtocol.SERIALIZER);
+ ClusterMessagingProtocol.DB_SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
@@ -432,7 +432,7 @@
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
}
} catch (Exception e) {
log.debug("LeaderAdvertiser failed with exception", e);
@@ -454,7 +454,7 @@
new ClusterMessage(
clusterService.getLocalNode().id(),
RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
} else {
if (myLeaderEvent != null) {
log.debug("This node is no longer the Leader");
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 2fbef0b..cfb8512b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -2,6 +2,7 @@
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -20,7 +21,6 @@
import net.kuujo.copycat.StateMachine;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
@@ -30,7 +30,6 @@
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.WriteStatus;
-import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
@@ -39,7 +38,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
/**
* StateMachine whose transitions are coordinated/replicated
@@ -59,33 +57,13 @@
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
- // serializer used for snapshot
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(VersionedValue.class)
- .register(State.class)
- .register(TableMetadata.class)
- .register(BatchReadRequest.class)
- .register(BatchWriteRequest.class)
- .register(ReadStatus.class)
- .register(WriteStatus.class)
- // TODO: Move this out ?
- .register(TableModificationEvent.class)
- .register(TableModificationEvent.Type.class)
- .register(ClusterMessagingProtocol.COMMON)
- .build()
- .populate(1);
- }
- };
-
private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
- private boolean compressSnapshot = false;
+ // TODO make this configurable
+ private boolean compressSnapshot = true;
@Command
public boolean createTable(String tableName) {
@@ -402,14 +380,14 @@
public byte[] takeSnapshot() {
try {
if (compressSnapshot) {
- byte[] input = SERIALIZER.encode(state);
+ byte[] input = DB_SERIALIZER.encode(state);
ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
compressor.write(input, 0, input.length);
compressor.close();
return comp.toByteArray();
} else {
- return SERIALIZER.encode(state);
+ return DB_SERIALIZER.encode(state);
}
} catch (Exception e) {
log.error("Failed to take snapshot", e);
@@ -423,10 +401,9 @@
if (compressSnapshot) {
ByteArrayInputStream in = new ByteArrayInputStream(data);
InflaterInputStream decompressor = new InflaterInputStream(in);
- ByteStreams.toByteArray(decompressor);
- this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
+ this.state = DB_SERIALIZER.decode(decompressor);
} else {
- this.state = SERIALIZER.decode(data);
+ this.state = DB_SERIALIZER.decode(data);
}
updatesExecutor.submit(new Runnable() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index 74f8373..45cc7ed 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -149,7 +149,7 @@
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
- TableModificationEvent event = DatabaseStateMachine.SERIALIZER
+ TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
.decode(message.payload());
if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index cc432a0..42fcf98 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -90,9 +90,9 @@
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
- .build()
- .populate(1);
+ .build();
}
};;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
index e1da794..9832483 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
@@ -95,7 +95,7 @@
public final void testKryoSerializableWithHandcraftedSerializer() {
final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
final KryoNamespace kryos = KryoNamespace.newBuilder()
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(new MastershipBasedTimestampSerializer(), MastershipBasedTimestamp.class)
.build();
kryos.serialize(TS_1_2, buffer);
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
index f14ef92..5aae159 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
@@ -22,7 +22,7 @@
*/
public class MapDBLogTest {
- private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
+ private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.DB_SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");