Added notion of a general Store abstraction and wired it up in ClusterStore.
diff --git a/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 77a28f5..004f807 100644
--- a/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -12,7 +12,9 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
+import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
@@ -26,6 +28,8 @@
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
import static org.onlab.onos.cluster.ControllerNode.State;
/**
@@ -33,14 +37,15 @@
*/
@Component(immediate = true)
@Service
-public class DistributedClusterStore extends AbstractDistributedStore
+public class DistributedClusterStore
+ extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private IMap<byte[], byte[]> rawNodes;
private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
private String listenerId;
- private final MembershipListener listener = new InnerMembershipListener();
+ private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Activate
@@ -106,11 +111,12 @@
}
// Adds a new node based on the specified member
- private synchronized void addMember(Member member) {
+ private synchronized ControllerNode addMember(Member member) {
DefaultControllerNode node = node(member);
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
+ return node;
}
// Creates a controller node descriptor from the Hazelcast member.
@@ -125,18 +131,20 @@
}
// Interceptor for membership events.
- private class InnerMembershipListener implements MembershipListener {
+ private class InternalMembershipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
- addMember(membershipEvent.getMember());
+ ControllerNode node = addMember(membershipEvent.getMember());
+ notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
log.info("Member {} removed", membershipEvent.getMember());
- states.put(new NodeId(memberAddress(membershipEvent.getMember()).toString()),
- State.INACTIVE);
+ NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
+ states.put(nodeId, State.INACTIVE);
+ notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
}
@Override