tom | 1d416c5 | 2014-09-29 20:55:24 -0700 | [diff] [blame] | 1 | package org.onlab.onos.store.cluster.messaging.impl; |
| 2 | |
| 3 | import com.google.common.collect.HashMultimap; |
| 4 | import com.google.common.collect.ImmutableSet; |
| 5 | import com.google.common.collect.Multimap; |
| 6 | import org.apache.felix.scr.annotations.Component; |
| 7 | import org.apache.felix.scr.annotations.Service; |
| 8 | import org.onlab.onos.cluster.NodeId; |
| 9 | import org.onlab.onos.store.cluster.impl.CommunicationsDelegate; |
| 10 | import org.onlab.onos.store.cluster.impl.MessageSender; |
| 11 | import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; |
| 12 | import org.onlab.onos.store.cluster.messaging.ClusterMessage; |
| 13 | import org.onlab.onos.store.cluster.messaging.MessageSubject; |
| 14 | import org.onlab.onos.store.cluster.messaging.MessageSubscriber; |
| 15 | |
| 16 | import java.util.Set; |
| 17 | |
| 18 | /** |
| 19 | * Implements the cluster communication services to use by other stores. |
| 20 | */ |
| 21 | @Component(immediate = true) |
| 22 | @Service |
| 23 | public class ClusterCommunicationManager |
| 24 | implements ClusterCommunicationService, CommunicationsDelegate { |
| 25 | |
| 26 | // TODO: use something different that won't require synchronization |
| 27 | private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create(); |
| 28 | private MessageSender messageSender; |
| 29 | |
| 30 | @Override |
| 31 | public boolean send(ClusterMessage message, NodeId toNodeId) { |
| 32 | return messageSender.send(toNodeId, message); |
| 33 | } |
| 34 | |
| 35 | @Override |
| 36 | public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) { |
| 37 | subscribers.put(subject, subscriber); |
| 38 | } |
| 39 | |
| 40 | @Override |
| 41 | public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) { |
| 42 | subscribers.remove(subject, subscriber); |
| 43 | } |
| 44 | |
| 45 | @Override |
| 46 | public Set<MessageSubscriber> getSubscribers(MessageSubject subject) { |
| 47 | return ImmutableSet.copyOf(subscribers.get(subject)); |
| 48 | } |
| 49 | |
| 50 | @Override |
| 51 | public void dispatch(ClusterMessage message) { |
| 52 | Set<MessageSubscriber> set = getSubscribers(message.subject()); |
| 53 | if (set != null) { |
| 54 | for (MessageSubscriber subscriber : set) { |
| 55 | subscriber.receive(message); |
| 56 | } |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | @Override |
| 61 | public void setSender(MessageSender messageSender) { |
| 62 | this.messageSender = messageSender; |
| 63 | } |
| 64 | } |