blob: 3c92800a5609e9ccc8d174a7e8cb7bb50642d633 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import java.util.Arrays;
4import java.util.List;
5import java.util.UUID;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.ExecutionException;
8
9import net.kuujo.copycat.protocol.Response.Status;
10import net.kuujo.copycat.protocol.SubmitRequest;
11import net.kuujo.copycat.protocol.SubmitResponse;
12import net.kuujo.copycat.spi.protocol.ProtocolClient;
13
14import org.apache.commons.lang3.RandomUtils;
15import org.onlab.netty.Endpoint;
16import org.onlab.netty.NettyMessagingService;
17import org.onlab.onos.store.service.DatabaseException;
18import org.onlab.onos.store.service.ReadRequest;
19import org.onlab.onos.store.service.WriteRequest;
20
21public class DatabaseClient {
22
23 private final Endpoint copycatEp;
24 ProtocolClient client;
25 NettyMessagingService messagingService;
26
27 public DatabaseClient(Endpoint copycatEp) {
28 this.copycatEp = copycatEp;
29 }
30
31 private static String nextId() {
32 return UUID.randomUUID().toString();
33 }
34
35 public void activate() throws Exception {
36 messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
37 messagingService.activate();
38 client = new NettyProtocolClient(copycatEp, messagingService);
39 }
40
41 public void deactivate() throws Exception {
42 messagingService.deactivate();
43 }
44
45 public boolean createTable(String tableName) {
46
47 SubmitRequest request =
48 new SubmitRequest(
49 nextId(),
50 "createTable",
51 Arrays.asList(tableName));
52 CompletableFuture<SubmitResponse> future = client.submit(request);
53 try {
54 return (boolean) future.get().result();
55 } catch (InterruptedException | ExecutionException e) {
56 throw new DatabaseException(e);
57 }
58 }
59
60 public void dropTable(String tableName) {
61
62 SubmitRequest request =
63 new SubmitRequest(
64 nextId(),
65 "dropTable",
66 Arrays.asList(tableName));
67 CompletableFuture<SubmitResponse> future = client.submit(request);
68 try {
69 if (future.get().status() == Status.OK) {
70 throw new DatabaseException(future.get().toString());
71 }
72
73 } catch (InterruptedException | ExecutionException e) {
74 throw new DatabaseException(e);
75 }
76 }
77
78 public void dropAllTables() {
79
80 SubmitRequest request =
81 new SubmitRequest(
82 nextId(),
83 "dropAllTables",
84 Arrays.asList());
85 CompletableFuture<SubmitResponse> future = client.submit(request);
86 try {
87 if (future.get().status() != Status.OK) {
88 throw new DatabaseException(future.get().toString());
89 }
90 } catch (InterruptedException | ExecutionException e) {
91 throw new DatabaseException(e);
92 }
93 }
94
95 @SuppressWarnings("unchecked")
96 public List<String> listTables() {
97
98 SubmitRequest request =
99 new SubmitRequest(
100 nextId(),
101 "listTables",
102 Arrays.asList());
103 CompletableFuture<SubmitResponse> future = client.submit(request);
104 try {
105 return (List<String>) future.get().result();
106 } catch (InterruptedException | ExecutionException e) {
107 throw new DatabaseException(e);
108 }
109 }
110
111 @SuppressWarnings("unchecked")
112 public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
113
114 SubmitRequest request = new SubmitRequest(
115 nextId(),
116 "read",
117 Arrays.asList(requests));
118
119 CompletableFuture<SubmitResponse> future = client.submit(request);
120 try {
121 List<InternalReadResult> internalReadResults = (List<InternalReadResult>) future.get().result();
122 return internalReadResults;
123 } catch (InterruptedException | ExecutionException e) {
124 throw new DatabaseException(e);
125 }
126 }
127
128 @SuppressWarnings("unchecked")
129 public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
130
131 SubmitRequest request = new SubmitRequest(
132 nextId(),
133 "write",
134 Arrays.asList(requests));
135
136 CompletableFuture<SubmitResponse> future = client.submit(request);
137 try {
138 List<InternalWriteResult> internalWriteResults = (List<InternalWriteResult>) future.get().result();
139 return internalWriteResults;
140 } catch (InterruptedException | ExecutionException e) {
141 throw new DatabaseException(e);
142 }
143 }
144}