split onos-core-hz into
onos-core-hz-common
onos-core-hz-cluster
onos-core-hz-net
Change-Id: Ie0ceb0de8e9e8af119433fef6f802444591eb4a4
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
new file mode 100644
index 0000000..57d2358
--- /dev/null
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterStore;
+import org.onlab.onos.cluster.ClusterStoreDelegate;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.common.AbstractHazelcastStore;
+import org.onlab.onos.store.common.OptionalCacheLoader;
+import org.onlab.packet.IpPrefix;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
+import static org.onlab.onos.cluster.ControllerNode.State;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedClusterStore
+ extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+ implements ClusterStore {
+
+ private IMap<byte[], byte[]> rawNodes;
+ private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+
+ private String listenerId;
+ private final MembershipListener listener = new InternalMembershipListener();
+ private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+
+ @Activate
+ public void activate() {
+ super.activate();
+ listenerId = theInstance.getCluster().addMembershipListener(listener);
+
+ rawNodes = theInstance.getMap("nodes");
+ OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
+ = new OptionalCacheLoader<>(storeService, rawNodes);
+ nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
+
+ loadClusterNodes();
+
+ log.info("Started");
+ }
+
+ // Loads the initial set of cluster nodes
+ private void loadClusterNodes() {
+ for (Member member : theInstance.getCluster().getMembers()) {
+ addMember(member);
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ theInstance.getCluster().removeMembershipListener(listenerId);
+ log.info("Stopped");
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return node(theInstance.getCluster().getLocalMember());
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
+ for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
+ builder.add(optional.get());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return nodes.getUnchecked(nodeId).orNull();
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ State state = states.get(nodeId);
+ return state == null ? State.INACTIVE : state;
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ synchronized (this) {
+ rawNodes.remove(serialize(nodeId));
+ nodes.invalidate(nodeId);
+ }
+ }
+
+ // Adds a new node based on the specified member
+ private synchronized ControllerNode addMember(Member member) {
+ DefaultControllerNode node = node(member);
+ rawNodes.put(serialize(node.id()), serialize(node));
+ nodes.put(node.id(), Optional.of(node));
+ states.put(node.id(), State.ACTIVE);
+ return node;
+ }
+
+ // Creates a controller node descriptor from the Hazelcast member.
+ private DefaultControllerNode node(Member member) {
+ IpPrefix ip = memberAddress(member);
+ return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ }
+
+ private IpPrefix memberAddress(Member member) {
+ byte[] address = member.getSocketAddress().getAddress().getAddress();
+ return IpPrefix.valueOf(address);
+ }
+
+ // Interceptor for membership events.
+ private class InternalMembershipListener implements MembershipListener {
+ @Override
+ public void memberAdded(MembershipEvent membershipEvent) {
+ log.info("Member {} added", membershipEvent.getMember());
+ ControllerNode node = addMember(membershipEvent.getMember());
+ notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
+ }
+
+ @Override
+ public void memberRemoved(MembershipEvent membershipEvent) {
+ log.info("Member {} removed", membershipEvent.getMember());
+ NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
+ states.put(nodeId, State.INACTIVE);
+ notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ log.info("Member {} attribute {} changed to {}",
+ memberAttributeEvent.getMember(),
+ memberAttributeEvent.getKey(),
+ memberAttributeEvent.getValue());
+ }
+ }
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..a2f2dd9
--- /dev/null
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -0,0 +1,147 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipStore;
+import org.onlab.onos.cluster.MastershipStoreDelegate;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.common.AbstractHazelcastStore;
+import org.onlab.onos.store.common.OptionalCacheLoader;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+ private IMap<byte[], byte[]> rawMasters;
+ private LoadingCache<DeviceId, Optional<NodeId>> masters;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Override
+ @Activate
+ public void activate() {
+ super.activate();
+
+ rawMasters = theInstance.getMap("masters");
+ OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+ = new OptionalCacheLoader<>(storeService, rawMasters);
+ masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+
+ loadMasters();
+
+ log.info("Started");
+ }
+
+ private void loadMasters() {
+ for (byte[] keyBytes : rawMasters.keySet()) {
+ final DeviceId id = deserialize(keyBytes);
+ masters.refresh(id);
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ synchronized (this) {
+ NodeId currentMaster = getMaster(deviceId);
+ if (Objects.equals(currentMaster, nodeId)) {
+ return null;
+ }
+
+ // FIXME: for now implementing semantics of setMaster
+ rawMasters.put(serialize(deviceId), serialize(nodeId));
+ masters.put(deviceId, Optional.of(nodeId));
+ return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ }
+ }
+
+ @Override
+ public NodeId getMaster(DeviceId deviceId) {
+ return masters.getUnchecked(deviceId).orNull();
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NodeId nodeId) {
+ ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+ for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
+ if (nodeId.equals(entry.getValue().get())) {
+ builder.add(entry.getKey());
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public MastershipRole requestRole(DeviceId deviceId) {
+ // FIXME: for now we are 'selecting' as master whoever asks
+ setMaster(clusterService.getLocalNode().id(), deviceId);
+ return MastershipRole.MASTER;
+ }
+
+ @Override
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ NodeId master = masters.getUnchecked(deviceId).orNull();
+ return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ }
+
+ @Override
+ public MastershipTerm getTermFor(DeviceId deviceId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
+ public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
+ super(cache);
+ }
+
+ @Override
+ protected void onAdd(DeviceId deviceId, NodeId nodeId) {
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ }
+
+ @Override
+ protected void onRemove(DeviceId deviceId, NodeId nodeId) {
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ }
+
+ @Override
+ protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+ }
+ }
+
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
new file mode 100644
index 0000000..9f36b88
--- /dev/null
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of a distributed cluster node store using Hazelcast.
+ */
+package org.onlab.onos.store.cluster.impl;