blob: e3de134976da816d7fc1e9170fbb7b4e84a71769 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.util.Collection;
8import java.util.Collections;
9import java.util.HashSet;
Madan Jampani08822c42014-11-04 17:17:46 -080010import java.util.List;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080011import java.util.Map;
12import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080013import java.util.concurrent.CountDownLatch;
14import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080015
16import net.kuujo.copycat.Copycat;
17import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080018import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080019import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080020import net.kuujo.copycat.cluster.TcpCluster;
21import net.kuujo.copycat.cluster.TcpClusterConfig;
22import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani08822c42014-11-04 17:17:46 -080023import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080024
Madan Jampani08822c42014-11-04 17:17:46 -080025import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080026import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080027import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080031import org.onlab.onos.cluster.ClusterEvent;
32import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080033import org.onlab.onos.cluster.ClusterService;
34import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080035import org.onlab.onos.cluster.DefaultControllerNode;
36import org.onlab.onos.cluster.NodeId;
Madan Jampani12390c12014-11-12 00:35:56 -080037import org.onlab.onos.store.service.BatchReadRequest;
38import org.onlab.onos.store.service.BatchReadResult;
39import org.onlab.onos.store.service.BatchWriteRequest;
40import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080041import org.onlab.onos.store.service.DatabaseAdminService;
42import org.onlab.onos.store.service.DatabaseException;
43import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080044import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080045import org.onlab.onos.store.service.ReadStatus;
46import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080047import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080048import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080049import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080050import org.slf4j.Logger;
51
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080052import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080053
Madan Jampani08822c42014-11-04 17:17:46 -080054/**
55 * Strongly consistent and durable state management service based on
56 * Copycat implementation of Raft consensus protocol.
57 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080058@Component(immediate = true)
59@Service
Madan Jampani08822c42014-11-04 17:17:46 -080060public class DatabaseManager implements DatabaseService, DatabaseAdminService {
61
62 private final Logger log = getLogger(getClass());
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080065 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080066
Madan Jampani9b19a822014-11-04 21:37:13 -080067 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080068 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080069
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080070 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
71
72 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080073 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080074 private static final String CONFIG_DIR = "../config";
75
76 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
77
78 private static final String DEFAULT_TABLET = "default";
79
80 // TODO: make this configurable
81 // initial member configuration file path
82 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080083
84 private Copycat copycat;
85 private DatabaseClient client;
86
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080087 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080088 private ClusterConfig<TcpMember> clusterConfig;
89
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080090 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080091 private ClusterEventListener clusterEventListener;
92
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080093 private Map<String, Set<DefaultControllerNode>> tabletMembers;
94
95 private boolean autoAddMember = false;
96
Madan Jampani08822c42014-11-04 17:17:46 -080097 @Activate
98 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080099
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800100 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -0800101
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800102 // load tablet configuration
103 File file = new File(CONFIG_DIR, initialMemberConfig);
104 log.info("Loading config: {}", file.getAbsolutePath());
105 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
106 try {
107 tabletMembers = tabletDef.read();
108 } catch (IOException e) {
109 log.error("Failed to load tablet config {}", file);
110 throw new IllegalStateException("Failed to load tablet config", e);
111 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800112
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800113 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800114 clusterConfig = new TcpClusterConfig();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800115 Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
116 if (defaultMember == null || defaultMember.isEmpty()) {
117 log.error("No member found in [{}] tablet configuration.",
118 DEFAULT_TABLET);
119 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800120
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800121 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800122
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800123 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800124 for (ControllerNode member : defaultMember) {
125 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
126 member.tcpPort());
127 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800128 clusterConfig.setLocalMember(tcpMember);
129 } else {
130 clusterConfig.addRemoteMember(tcpMember);
131 }
132 }
133
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800134 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800135 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800136 clusterEventListener = new InternalClusterEventListener();
137 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800138
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800139 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
140 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800141 try {
142 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800143 log.info("Starting with {}/{} nodes cluster",
144 clusterService.getNodes().size(),
145 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800146 }
147 } catch (InterruptedException e) {
148 log.info("Interrupted waiting for others", e);
149 }
150 }
Madan Jampani08822c42014-11-04 17:17:46 -0800151
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800152 final TcpCluster cluster;
153 synchronized (clusterConfig) {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800154 // Create the cluster.
155 cluster = new TcpCluster(clusterConfig);
156 }
157 log.info("Starting cluster: {}", cluster);
158
Madan Jampani08822c42014-11-04 17:17:46 -0800159
160 StateMachine stateMachine = new DatabaseStateMachine();
Madan Jampani2ee20002014-11-06 20:06:12 -0800161 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800162 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800163
Madan Jampani9b19a822014-11-04 21:37:13 -0800164 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800165 copycat.start();
166
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800167 client = new DatabaseClient(copycat);
Madan Jampani08822c42014-11-04 17:17:46 -0800168
169 log.info("Started.");
170 }
171
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800172 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800173 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800174 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800175 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800176 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800177 }
178
179 @Override
180 public boolean createTable(String name) {
181 return client.createTable(name);
182 }
183
184 @Override
185 public void dropTable(String name) {
186 client.dropTable(name);
187 }
188
189 @Override
190 public void dropAllTables() {
191 client.dropAllTables();
192 }
193
194 @Override
195 public List<String> listTables() {
196 return client.listTables();
197 }
198
199 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800200 public VersionedValue get(String tableName, String key) {
201 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
202 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
203 if (readResult.status().equals(ReadStatus.OK)) {
204 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800205 }
Madan Jampani12390c12014-11-12 00:35:56 -0800206 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800207 }
208
209 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800210 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
211 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800212 }
213
214 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800215 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
216 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800217 }
218
219 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800220 public VersionedValue put(String tableName, String key, byte[] value) {
221 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
222 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
223 if (writeResult.status().equals(WriteStatus.OK)) {
224 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800225 }
Madan Jampani12390c12014-11-12 00:35:56 -0800226 throw new DatabaseException("put failed due to status: " + writeResult.status());
227 }
Madan Jampani08822c42014-11-04 17:17:46 -0800228
Madan Jampani12390c12014-11-12 00:35:56 -0800229 @Override
230 public boolean putIfAbsent(String tableName, String key, byte[] value) {
231 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
232 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
233 if (writeResult.status().equals(WriteStatus.OK)) {
234 return true;
235 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
236 return false;
237 }
238 throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
239 }
240
241 @Override
242 public boolean putIfVersionMatches(String tableName, String key,
243 byte[] value, long version) {
244 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
245 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
246 if (writeResult.status().equals(WriteStatus.OK)) {
247 return true;
248 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
249 return false;
250 }
251 throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
252 }
253
254 @Override
255 public boolean putIfValueMatches(String tableName, String key,
256 byte[] oldValue, byte[] newValue) {
257 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
258 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
259 if (writeResult.status().equals(WriteStatus.OK)) {
260 return true;
261 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
262 return false;
263 }
264 throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
265 }
266
267 @Override
268 public VersionedValue remove(String tableName, String key) {
269 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
270 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
271 if (writeResult.status().equals(WriteStatus.OK)) {
272 return writeResult.previousValue();
273 }
274 throw new DatabaseException("remove failed due to status: " + writeResult.status());
275 }
276
277 @Override
278 public boolean removeIfVersionMatches(String tableName, String key,
279 long version) {
280 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
281 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
282 if (writeResult.status().equals(WriteStatus.OK)) {
283 return true;
284 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
285 return false;
286 }
287 throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
288 }
289
290 @Override
291 public boolean removeIfValueMatches(String tableName, String key,
292 byte[] value) {
293 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
294 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
295 if (writeResult.status().equals(WriteStatus.OK)) {
296 return true;
297 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
298 return false;
299 }
300 throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
301 }
302
303 @Override
304 public void addMember(final ControllerNode node) {
305 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
306 node.tcpPort());
307 log.info("{} was added to the cluster", tcpMember);
308 synchronized (clusterConfig) {
309 clusterConfig.addRemoteMember(tcpMember);
310 }
Madan Jampani08822c42014-11-04 17:17:46 -0800311 }
312
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800313 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800314 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800315
316 @Override
317 public void event(ClusterEvent event) {
318 // TODO: Not every node should be part of the consensus ring.
319
320 final ControllerNode node = event.subject();
321 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800322 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800323
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800324 switch (event.type()) {
325 case INSTANCE_ACTIVATED:
326 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800327 if (autoAddMember) {
328 synchronized (clusterConfig) {
329 if (!clusterConfig.getMembers().contains(tcpMember)) {
330 log.info("{} was automatically added to the cluster", tcpMember);
331 clusterConfig.addRemoteMember(tcpMember);
332 }
333 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800334 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800335 break;
336 case INSTANCE_DEACTIVATED:
337 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800338 if (autoAddMember) {
339 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800340 = tabletMembers.getOrDefault(DEFAULT_TABLET,
341 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800342 // remove only if not the initial members
343 if (!members.contains(node)) {
344 synchronized (clusterConfig) {
345 if (clusterConfig.getMembers().contains(tcpMember)) {
346 log.info("{} was automatically removed from the cluster", tcpMember);
347 clusterConfig.removeRemoteMember(tcpMember);
348 }
349 }
350 }
351 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800352 break;
353 default:
354 break;
355 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800356 if (copycat != null) {
357 log.debug("Current cluster: {}", copycat.cluster());
358 }
359 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800360 }
361
362 }
363
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800364 @Override
365 public void removeMember(final ControllerNode node) {
366 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
367 node.tcpPort());
368 log.info("{} was removed from the cluster", tcpMember);
369 synchronized (clusterConfig) {
370 clusterConfig.removeRemoteMember(tcpMember);
371 }
372 }
373
374 @Override
375 public Collection<ControllerNode> listMembers() {
376 if (copycat == null) {
377 return ImmutableList.of();
378 }
379 Set<ControllerNode> members = new HashSet<>();
380 for (Member member : copycat.cluster().members()) {
381 if (member instanceof TcpMember) {
382 final TcpMember tcpMember = (TcpMember) member;
383 // TODO assuming tcpMember#host to be IP address,
384 // but if not lookup DNS, etc. first
385 IpAddress ip = IpAddress.valueOf(tcpMember.host());
386 int tcpPort = tcpMember.port();
387 NodeId id = getNodeIdFromIp(ip, tcpPort);
388 if (id == null) {
389 log.info("No NodeId found for {}:{}", ip, tcpPort);
390 continue;
391 }
392 members.add(new DefaultControllerNode(id, ip, tcpPort));
393 }
394 }
395 return members;
396 }
397
398 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
399 for (ControllerNode node : clusterService.getNodes()) {
400 if (node.ip().equals(ip) &&
401 node.tcpPort() == tcpPort) {
402 return node.id();
403 }
404 }
405 return null;
406 }
Madan Jampani08822c42014-11-04 17:17:46 -0800407}