Further simplified the store & connection manager relationship.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
deleted file mode 100644
index bafb2c3..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
-import org.onlab.onos.store.cluster.impl.MessageSender;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
-
-import java.util.Set;
-
-/**
- * Implements the cluster communication services to use by other stores.
- */
-@Component(immediate = true)
-@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService, CommunicationsDelegate {
-
- // TODO: use something different that won't require synchronization
- private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
- private MessageSender messageSender;
-
- @Override
- public boolean send(ClusterMessage message, NodeId toNodeId) {
- return messageSender.send(toNodeId, message);
- }
-
- @Override
- public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
- subscribers.put(subject, subscriber);
- }
-
- @Override
- public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
- subscribers.remove(subject, subscriber);
- }
-
- @Override
- public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
- return ImmutableSet.copyOf(subscribers.get(subject));
- }
-
- @Override
- public void dispatch(ClusterMessage message) {
- Set<MessageSubscriber> set = getSubscribers(message.subject());
- if (set != null) {
- for (MessageSubscriber subscriber : set) {
- subscriber.receive(message);
- }
- }
- }
-
- @Override
- public void setSender(MessageSender messageSender) {
- this.messageSender = messageSender;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
index 93c8310..7064801 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -1,8 +1,15 @@
package org.onlab.onos.store.cluster.messaging.impl;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
@@ -11,8 +18,12 @@
/**
* Factory for parsing messages sent between cluster members.
*/
+@Component(immediate = true)
+@Service
public class MessageSerializer implements SerializationService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
@@ -46,11 +57,12 @@
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
- return null; // actually deserialize
+ String[] fields = new String(data).split(":");
+ return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
- e.printStackTrace();
+ log.warn("Unable to decode message due to: " + e);
}
return null;
}
@@ -58,11 +70,18 @@
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
- int i = 0;
- // Type based lookup for proper encoder
+ HelloMessage helloMessage = (HelloMessage) message;
+ buffer.putLong(MARKER);
+ buffer.putInt(message.subject().ordinal());
+
+ String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
+ byte[] data = str.getBytes();
+ buffer.putInt(data.length + METADATA_LENGTH);
+ buffer.put(data);
+
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
- e.printStackTrace();
+ log.warn("Unable to encode message due to: " + e);
}
}