blob: bafb2c37c95aa0253b655050566b071c2e9d7b50 [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import org.apache.felix.scr.annotations.Component;
7import org.apache.felix.scr.annotations.Service;
8import org.onlab.onos.cluster.NodeId;
9import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
10import org.onlab.onos.store.cluster.impl.MessageSender;
11import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
12import org.onlab.onos.store.cluster.messaging.ClusterMessage;
13import org.onlab.onos.store.cluster.messaging.MessageSubject;
14import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
15
16import java.util.Set;
17
18/**
19 * Implements the cluster communication services to use by other stores.
20 */
21@Component(immediate = true)
22@Service
23public class ClusterCommunicationManager
24 implements ClusterCommunicationService, CommunicationsDelegate {
25
26 // TODO: use something different that won't require synchronization
27 private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
28 private MessageSender messageSender;
29
30 @Override
31 public boolean send(ClusterMessage message, NodeId toNodeId) {
32 return messageSender.send(toNodeId, message);
33 }
34
35 @Override
36 public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
37 subscribers.put(subject, subscriber);
38 }
39
40 @Override
41 public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
42 subscribers.remove(subject, subscriber);
43 }
44
45 @Override
46 public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
47 return ImmutableSet.copyOf(subscribers.get(subject));
48 }
49
50 @Override
51 public void dispatch(ClusterMessage message) {
52 Set<MessageSubscriber> set = getSubscribers(message.subject());
53 if (set != null) {
54 for (MessageSubscriber subscriber : set) {
55 subscriber.receive(message);
56 }
57 }
58 }
59
60 @Override
61 public void setSender(MessageSender messageSender) {
62 this.messageSender = messageSender;
63 }
64}