blob: def56e8cec45d70159b78e3b7d294a1f674b73f0 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -08008import java.util.concurrent.CountDownLatch;
9import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080010
11import net.kuujo.copycat.Copycat;
12import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080013import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080014import net.kuujo.copycat.cluster.TcpCluster;
15import net.kuujo.copycat.cluster.TcpClusterConfig;
16import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080017import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080018import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080019
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080021import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080022import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080025import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080026import org.onlab.onos.cluster.ClusterEvent;
27import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.store.service.DatabaseAdminService;
31import org.onlab.onos.store.service.DatabaseException;
32import org.onlab.onos.store.service.DatabaseService;
33import org.onlab.onos.store.service.NoSuchTableException;
34import org.onlab.onos.store.service.OptimisticLockException;
35import org.onlab.onos.store.service.OptionalResult;
Madan Jampani08822c42014-11-04 17:17:46 -080036import org.onlab.onos.store.service.ReadRequest;
37import org.onlab.onos.store.service.ReadResult;
38import org.onlab.onos.store.service.WriteAborted;
39import org.onlab.onos.store.service.WriteRequest;
40import org.onlab.onos.store.service.WriteResult;
41import org.slf4j.Logger;
42
Madan Jampani08822c42014-11-04 17:17:46 -080043/**
44 * Strongly consistent and durable state management service based on
45 * Copycat implementation of Raft consensus protocol.
46 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080047@Component(immediate = true)
48@Service
Madan Jampani08822c42014-11-04 17:17:46 -080049public class DatabaseManager implements DatabaseService, DatabaseAdminService {
50
51 private final Logger log = getLogger(getClass());
52
53 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080054 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080055
Madan Jampani9b19a822014-11-04 21:37:13 -080056 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080057 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080058
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080059 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
Madan Jampani08822c42014-11-04 17:17:46 -080060
61 private Copycat copycat;
62 private DatabaseClient client;
63
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080064 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080065 private ClusterConfig<TcpMember> clusterConfig;
66
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080067 private CountDownLatch clusterEventLatch;
68
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080069 private ClusterEventListener clusterEventListener;
70
Madan Jampani08822c42014-11-04 17:17:46 -080071 @Activate
72 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080073
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080074 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080075
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080076 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080077 TcpMember localMember =
78 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080079 localNode.ip().toString(),
80 localNode.tcpPort());
81
82 clusterConfig = new TcpClusterConfig();
83 clusterConfig.setLocalMember(localMember);
84
85 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
86
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080087 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080088 clusterEventListener = new InternalClusterEventListener();
89 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080090
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080091 // note: from this point beyond, clusterConfig requires synchronization
92
Madan Jampani08822c42014-11-04 17:17:46 -080093 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080094 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080095 if (!member.equals(localMember)) {
96 remoteMembers.add(member);
97 }
98 }
99
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800100 if (remoteMembers.isEmpty()) {
101 log.info("This node is the only node in the cluster. "
102 + "Waiting for others to show up.");
103 // FIXME: hack trying to relax cases forming multiple consensus rings.
104 // add seed node configuration to avoid this
Madan Jampani08822c42014-11-04 17:17:46 -0800105
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800106 // If the node is alone on it's own, wait some time
107 // hoping other will come up soon
108 try {
109 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
110 log.info("Starting as single node cluster");
111 }
112 } catch (InterruptedException e) {
113 log.info("Interrupted waiting for others", e);
114 }
115 }
Madan Jampani08822c42014-11-04 17:17:46 -0800116
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800117 final TcpCluster cluster;
118 synchronized (clusterConfig) {
119 clusterConfig.addRemoteMembers(remoteMembers);
120
121 // Create the cluster.
122 cluster = new TcpCluster(clusterConfig);
123 }
124 log.info("Starting cluster: {}", cluster);
125
Madan Jampani08822c42014-11-04 17:17:46 -0800126
127 StateMachine stateMachine = new DatabaseStateMachine();
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800128 // Chronicle + OSGi issue
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -0800129 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800130 //Log consensusLog = new KryoRegisteredInMemoryLog();
Madan Jampani2ee20002014-11-06 20:06:12 -0800131 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800132 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800133
Madan Jampani9b19a822014-11-04 21:37:13 -0800134 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800135 copycat.start();
136
Madan Jampani9b19a822014-11-04 21:37:13 -0800137 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800138
139 log.info("Started.");
140 }
141
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800142 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800143 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800144 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800145 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800146 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800147 }
148
149 @Override
150 public boolean createTable(String name) {
151 return client.createTable(name);
152 }
153
154 @Override
155 public void dropTable(String name) {
156 client.dropTable(name);
157 }
158
159 @Override
160 public void dropAllTables() {
161 client.dropAllTables();
162 }
163
164 @Override
165 public List<String> listTables() {
166 return client.listTables();
167 }
168
169 @Override
170 public ReadResult read(ReadRequest request) {
171 return batchRead(Arrays.asList(request)).get(0).get();
172 }
173
174 @Override
175 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
176 List<ReadRequest> batch) {
177 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
178 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
179 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
180 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
181 new NoSuchTableException()));
182 } else {
183 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
184 internalReadResult.result()));
185 }
186 }
187 return readResults;
188 }
189
190 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800191 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
192 return batchWrite(Arrays.asList(request)).get(0);
193 }
194
195 @Override
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800196 public WriteResult write(WriteRequest request) {
197// throws OptimisticLockException, PreconditionFailedException {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800198 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800199 }
200
201 @Override
202 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
203 List<WriteRequest> batch) {
204 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
205 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
206 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
207 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
208 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800209 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800210 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
211 new OptimisticLockException()));
212 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
213 // TODO: throw a different exception?
214 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800215 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800216 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
217 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
218 new WriteAborted()));
219 } else {
220 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
221 internalWriteResult.result()));
222 }
223 }
224 return writeResults;
225
226 }
227
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800228 private final class InternalClusterEventListener
229 implements ClusterEventListener {
230
231 @Override
232 public void event(ClusterEvent event) {
233 // TODO: Not every node should be part of the consensus ring.
234
235 final ControllerNode node = event.subject();
236 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
237 node.tcpPort());
238
239 log.trace("{}", event);
240 switch (event.type()) {
241 case INSTANCE_ACTIVATED:
242 case INSTANCE_ADDED:
243 log.info("{} was added to the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800244 synchronized (clusterConfig) {
245 clusterConfig.addRemoteMember(tcpMember);
246 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800247 break;
248 case INSTANCE_DEACTIVATED:
249 case INSTANCE_REMOVED:
250 log.info("{} was removed from the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800251 synchronized (clusterConfig) {
252 clusterConfig.removeRemoteMember(tcpMember);
253 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800254 break;
255 default:
256 break;
257 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800258 if (copycat != null) {
259 log.debug("Current cluster: {}", copycat.cluster());
260 }
261 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800262 }
263
264 }
265
266 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
267 public KryoRegisteredInMemoryLog() {
268 super();
269 // required to deserialize object across bundles
270 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
271 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
272 }
273 }
274
Madan Jampani08822c42014-11-04 17:17:46 -0800275 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
276
277 private final R result;
278 private final DatabaseException exception;
279
280 public DatabaseOperationResult(R result) {
281 this.result = result;
282 this.exception = null;
283 }
284
285 public DatabaseOperationResult(DatabaseException exception) {
286 this.result = null;
287 this.exception = exception;
288 }
289
290 @Override
291 public R get() {
292 if (result != null) {
293 return result;
294 }
295 throw exception;
296 }
297
298 @Override
299 public boolean hasValidResult() {
300 return result != null;
301 }
302
303 @Override
304 public String toString() {
305 if (result != null) {
306 return result.toString();
307 } else {
308 return exception.toString();
309 }
310 }
311 }
312}