blob: b70407130985bcab19c2aad1fb37264b7a562875 [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jon Hall7a8bfc62016-05-26 17:59:04 -070018import static org.onlab.util.Tools.groupedThreads;
Madan Jampani620f70d2016-01-30 22:22:47 -080019import static org.slf4j.LoggerFactory.getLogger;
20
Madan Jampani620f70d2016-01-30 22:22:47 -080021import java.util.Map;
Madan Jampani78be2492016-06-03 23:27:07 -070022import java.util.Objects;
Jon Hall7a8bfc62016-05-26 17:59:04 -070023import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
Madan Jampani78be2492016-06-03 23:27:07 -070025import java.util.function.Consumer;
Madan Jampani620f70d2016-01-30 22:22:47 -080026
Jon Hall7a8bfc62016-05-26 17:59:04 -070027import com.google.common.collect.Maps;
Madan Jampani620f70d2016-01-30 22:22:47 -080028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080035import org.onosproject.cluster.Leadership;
36import org.onosproject.cluster.LeadershipEvent;
37import org.onosproject.cluster.LeadershipStore;
38import org.onosproject.cluster.LeadershipStoreDelegate;
39import org.onosproject.cluster.NodeId;
Madan Jampani78be2492016-06-03 23:27:07 -070040import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080041import org.onosproject.store.AbstractStore;
Jon Hall7a8bfc62016-05-26 17:59:04 -070042import org.onosproject.store.service.DistributedPrimitive.Status;
Madan Jampani78be2492016-06-03 23:27:07 -070043import org.onosproject.store.service.LeaderElector;
Madan Jampani620f70d2016-01-30 22:22:47 -080044import org.onosproject.store.service.StorageService;
Madan Jampani620f70d2016-01-30 22:22:47 -080045import org.slf4j.Logger;
46
Madan Jampani620f70d2016-01-30 22:22:47 -080047/**
Madan Jampani78be2492016-06-03 23:27:07 -070048 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
49 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080050 */
51@Service
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070052@Component(immediate = true)
Madan Jampani620f70d2016-01-30 22:22:47 -080053public class DistributedLeadershipStore
54 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
55 implements LeadershipStore {
56
Madan Jampani78be2492016-06-03 23:27:07 -070057 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080058
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 protected ClusterService clusterService;
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected StorageService storageService;
64
Jon Hall7a8bfc62016-05-26 17:59:04 -070065 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070066 private NodeId localNodeId;
67 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070068 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Madan Jampani72282af2016-02-23 14:23:52 -080069
Madan Jampani78be2492016-06-03 23:27:07 -070070 private final Consumer<Change<Leadership>> leadershipChangeListener =
71 change -> {
72 Leadership oldValue = change.oldValue();
73 Leadership newValue = change.newValue();
74 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
75 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Madan Jampani620f70d2016-01-30 22:22:47 -080076 LeadershipEvent.Type eventType = null;
77 if (leaderChanged && candidatesChanged) {
78 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
79 }
80 if (leaderChanged && !candidatesChanged) {
81 eventType = LeadershipEvent.Type.LEADER_CHANGED;
82 }
83 if (!leaderChanged && candidatesChanged) {
84 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
85 }
Madan Jampani78be2492016-06-03 23:27:07 -070086 notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
Jon Hall7a8bfc62016-05-26 17:59:04 -070087 // Update local cache of currently held leaderships
88 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
89 localLeaderCache.put(newValue.topic(), newValue);
90 } else {
91 localLeaderCache.remove(newValue.topic());
92 }
Madan Jampani620f70d2016-01-30 22:22:47 -080093 };
94
Jon Hall7a8bfc62016-05-26 17:59:04 -070095 private final Consumer<Status> clientStatusListener = status ->
96 statusChangeHandler.execute(() -> handleStatusChange(status));
97
98 private void handleStatusChange(Status status) {
99 // Notify mastership Service of disconnect and reconnect
100 if (status == Status.ACTIVE) {
101 // Service Restored
102 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
103 leaderElector.getLeaderships().forEach((topic, leadership) ->
104 notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
105 } else if (status == Status.SUSPENDED) {
106 // Service Suspended
107 localLeaderCache.forEach((topic, leadership) ->
108 notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
109 } else {
110 // Should be only inactive state
111 return;
112 }
113 }
114
115
Madan Jampani620f70d2016-01-30 22:22:47 -0800116 @Activate
117 public void activate() {
Jon Hall7a8bfc62016-05-26 17:59:04 -0700118 statusChangeHandler = Executors.newSingleThreadExecutor(
119 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800120 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700121 leaderElector = storageService.leaderElectorBuilder()
122 .withName("onos-leadership-elections")
123 .build()
124 .asLeaderElector();
125 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700126 leaderElector.addStatusChangeListener(clientStatusListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800127 log.info("Started");
128 }
129
130 @Deactivate
131 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700132 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700133 leaderElector.removeStatusChangeListener(clientStatusListener);
134 statusChangeHandler.shutdown();
Madan Jampani620f70d2016-01-30 22:22:47 -0800135 log.info("Stopped");
136 }
137
138 @Override
139 public Leadership addRegistration(String topic) {
Madan Jampani78be2492016-06-03 23:27:07 -0700140 return leaderElector.run(topic, localNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800141 }
142
143 @Override
144 public void removeRegistration(String topic) {
Madan Jampani78be2492016-06-03 23:27:07 -0700145 leaderElector.withdraw(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800146 }
147
148 @Override
149 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700150 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800151 }
152
153 @Override
154 public boolean moveLeadership(String topic, NodeId toNodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700155 return leaderElector.anoint(topic, toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800156 }
157
158 @Override
159 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700160 return leaderElector.promote(topic, nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800161 }
162
163 @Override
164 public Leadership getLeadership(String topic) {
Madan Jampani78be2492016-06-03 23:27:07 -0700165 return leaderElector.getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800166 }
167
168 @Override
169 public Map<String, Leadership> getLeaderships() {
Madan Jampani78be2492016-06-03 23:27:07 -0700170 return leaderElector.getLeaderships();
Madan Jampani620f70d2016-01-30 22:22:47 -0800171 }
172}