blob: 2779b352c4628561a177e195a9d2de4aaf119a67 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -08008import java.util.concurrent.CountDownLatch;
9import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080010
11import net.kuujo.copycat.Copycat;
12import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080013import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080014import net.kuujo.copycat.cluster.TcpCluster;
15import net.kuujo.copycat.cluster.TcpClusterConfig;
16import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080017import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080018import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080019
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080021import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080022import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080025import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080026import org.onlab.onos.cluster.ClusterEvent;
27import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.store.service.DatabaseAdminService;
31import org.onlab.onos.store.service.DatabaseException;
32import org.onlab.onos.store.service.DatabaseService;
33import org.onlab.onos.store.service.NoSuchTableException;
34import org.onlab.onos.store.service.OptimisticLockException;
35import org.onlab.onos.store.service.OptionalResult;
36import org.onlab.onos.store.service.PreconditionFailedException;
37import org.onlab.onos.store.service.ReadRequest;
38import org.onlab.onos.store.service.ReadResult;
39import org.onlab.onos.store.service.WriteAborted;
40import org.onlab.onos.store.service.WriteRequest;
41import org.onlab.onos.store.service.WriteResult;
42import org.slf4j.Logger;
43
Madan Jampani08822c42014-11-04 17:17:46 -080044/**
45 * Strongly consistent and durable state management service based on
46 * Copycat implementation of Raft consensus protocol.
47 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080048@Component(immediate = true)
49@Service
Madan Jampani08822c42014-11-04 17:17:46 -080050public class DatabaseManager implements DatabaseService, DatabaseAdminService {
51
52 private final Logger log = getLogger(getClass());
53
54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080055 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080056
Madan Jampani9b19a822014-11-04 21:37:13 -080057 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080058 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080059
Madan Jampani08822c42014-11-04 17:17:46 -080060 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
61
62 private Copycat copycat;
63 private DatabaseClient client;
64
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080065 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080066 private ClusterConfig<TcpMember> clusterConfig;
67
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080068 private CountDownLatch clusterEventLatch;
69
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080070 private ClusterEventListener clusterEventListener;
71
Madan Jampani08822c42014-11-04 17:17:46 -080072 @Activate
73 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080074
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080075 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080076
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080077 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080078 TcpMember localMember =
79 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080080 localNode.ip().toString(),
81 localNode.tcpPort());
82
83 clusterConfig = new TcpClusterConfig();
84 clusterConfig.setLocalMember(localMember);
85
86 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
87
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080088 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080089 clusterEventListener = new InternalClusterEventListener();
90 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080091
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080092 // note: from this point beyond, clusterConfig requires synchronization
93
Madan Jampani08822c42014-11-04 17:17:46 -080094 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080095 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080096 if (!member.equals(localMember)) {
97 remoteMembers.add(member);
98 }
99 }
100
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800101 if (remoteMembers.isEmpty()) {
102 log.info("This node is the only node in the cluster. "
103 + "Waiting for others to show up.");
104 // FIXME: hack trying to relax cases forming multiple consensus rings.
105 // add seed node configuration to avoid this
Madan Jampani08822c42014-11-04 17:17:46 -0800106
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800107 // If the node is alone on it's own, wait some time
108 // hoping other will come up soon
109 try {
110 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
111 log.info("Starting as single node cluster");
112 }
113 } catch (InterruptedException e) {
114 log.info("Interrupted waiting for others", e);
115 }
116 }
Madan Jampani08822c42014-11-04 17:17:46 -0800117
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800118 final TcpCluster cluster;
119 synchronized (clusterConfig) {
120 clusterConfig.addRemoteMembers(remoteMembers);
121
122 // Create the cluster.
123 cluster = new TcpCluster(clusterConfig);
124 }
125 log.info("Starting cluster: {}", cluster);
126
Madan Jampani08822c42014-11-04 17:17:46 -0800127
128 StateMachine stateMachine = new DatabaseStateMachine();
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -0800129 // FIXME resolve Chronicle + OSGi issue
130 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800131 Log consensusLog = new KryoRegisteredInMemoryLog();
Madan Jampani08822c42014-11-04 17:17:46 -0800132
Madan Jampani9b19a822014-11-04 21:37:13 -0800133 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800134 copycat.start();
135
Madan Jampani9b19a822014-11-04 21:37:13 -0800136 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800137
138 log.info("Started.");
139 }
140
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800141 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800142 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800143 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800144 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800145 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800146 }
147
148 @Override
149 public boolean createTable(String name) {
150 return client.createTable(name);
151 }
152
153 @Override
154 public void dropTable(String name) {
155 client.dropTable(name);
156 }
157
158 @Override
159 public void dropAllTables() {
160 client.dropAllTables();
161 }
162
163 @Override
164 public List<String> listTables() {
165 return client.listTables();
166 }
167
168 @Override
169 public ReadResult read(ReadRequest request) {
170 return batchRead(Arrays.asList(request)).get(0).get();
171 }
172
173 @Override
174 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
175 List<ReadRequest> batch) {
176 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
177 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
178 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
179 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
180 new NoSuchTableException()));
181 } else {
182 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
183 internalReadResult.result()));
184 }
185 }
186 return readResults;
187 }
188
189 @Override
190 public WriteResult write(WriteRequest request) {
191 return batchWrite(Arrays.asList(request)).get(0).get();
192 }
193
194 @Override
195 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
196 List<WriteRequest> batch) {
197 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
198 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
199 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
200 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
201 new NoSuchTableException()));
202 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
203 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
204 new OptimisticLockException()));
205 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
206 // TODO: throw a different exception?
207 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
208 new PreconditionFailedException()));
209 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
210 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
211 new WriteAborted()));
212 } else {
213 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
214 internalWriteResult.result()));
215 }
216 }
217 return writeResults;
218
219 }
220
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800221 private final class InternalClusterEventListener
222 implements ClusterEventListener {
223
224 @Override
225 public void event(ClusterEvent event) {
226 // TODO: Not every node should be part of the consensus ring.
227
228 final ControllerNode node = event.subject();
229 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
230 node.tcpPort());
231
232 log.trace("{}", event);
233 switch (event.type()) {
234 case INSTANCE_ACTIVATED:
235 case INSTANCE_ADDED:
236 log.info("{} was added to the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800237 synchronized (clusterConfig) {
238 clusterConfig.addRemoteMember(tcpMember);
239 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800240 break;
241 case INSTANCE_DEACTIVATED:
242 case INSTANCE_REMOVED:
243 log.info("{} was removed from the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800244 synchronized (clusterConfig) {
245 clusterConfig.removeRemoteMember(tcpMember);
246 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800247 break;
248 default:
249 break;
250 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800251 if (copycat != null) {
252 log.debug("Current cluster: {}", copycat.cluster());
253 }
254 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800255 }
256
257 }
258
259 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
260 public KryoRegisteredInMemoryLog() {
261 super();
262 // required to deserialize object across bundles
263 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
264 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
265 }
266 }
267
Madan Jampani08822c42014-11-04 17:17:46 -0800268 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
269
270 private final R result;
271 private final DatabaseException exception;
272
273 public DatabaseOperationResult(R result) {
274 this.result = result;
275 this.exception = null;
276 }
277
278 public DatabaseOperationResult(DatabaseException exception) {
279 this.result = null;
280 this.exception = exception;
281 }
282
283 @Override
284 public R get() {
285 if (result != null) {
286 return result;
287 }
288 throw exception;
289 }
290
291 @Override
292 public boolean hasValidResult() {
293 return result != null;
294 }
295
296 @Override
297 public String toString() {
298 if (result != null) {
299 return result.toString();
300 } else {
301 return exception.toString();
302 }
303 }
304 }
305}