blob: 2c2b3a4b79ca566330b0693e637e232dbc41acdd [file] [log] [blame]
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07001package net.onrc.onos.datastore.hazelcast;
2
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
7import net.onrc.onos.datastore.IKVClient;
8import net.onrc.onos.datastore.IKVTable;
9import net.onrc.onos.datastore.IKVTable.IKVEntry;
10import net.onrc.onos.datastore.IKVTableID;
11import net.onrc.onos.datastore.IMultiEntryOperation;
12import net.onrc.onos.datastore.IMultiEntryOperation.OPERATION;
13import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
14import net.onrc.onos.datastore.ObjectDoesntExistException;
15import net.onrc.onos.datastore.ObjectExistsException;
16import net.onrc.onos.datastore.WrongVersionException;
17import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
18import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
19
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import com.hazelcast.client.HazelcastClient;
24import com.hazelcast.client.config.ClientConfig;
25import com.hazelcast.config.Config;
26import com.hazelcast.config.FileSystemXmlConfig;
27import com.hazelcast.config.MapConfig;
28import com.hazelcast.config.SerializationConfig;
29import com.hazelcast.core.Hazelcast;
30import com.hazelcast.core.HazelcastInstance;
31import com.hazelcast.core.IMap;
32
33public class HZClient implements IKVClient {
34 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
35
36 static final long VERSION_NONEXISTENT = 0L;
37
38 private static final String MAP_PREFIX = "datastore://";
39
40 // make this path configurable
41 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
42 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.datastore.hazelcast.clientMode", "true"));
43
44 // Note: xml configuration will overwrite this value if present
45 private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.datastore.hazelcast.backupCount", "3"));
46
47 private final HazelcastInstance hazelcastInstance;
48
49 private static final HZClient THE_INSTANCE = new HZClient();
50
51 public static HZClient getClient() {
52 return THE_INSTANCE;
53 }
54
55 private HZClient() {
56 hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
57 }
58
59 private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
60 Config baseHzConfig = null;
61 try {
62 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
63 } catch (FileNotFoundException e) {
64 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
65 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
66 }
67
68 // use xml config if present, if not use System.property
69 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
70 if (mapConfig != null) {
71 backupCount = mapConfig.getBackupCount();
72 }
73
74 HazelcastInstance instance = null;
75 if (useClientMode) {
76 log.info("Configuring Hazelcast datastore as Client mode");
77 ClientConfig clientConfig = new ClientConfig();
78 final int port = baseHzConfig.getNetworkConfig().getPort();
79
80 String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
81 clientConfig.addAddress(server + ":" + port);
82
83 // copy group config from base Hazelcast configuration
84 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
85 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
86
87 // TODO We probably need to figure out what else need to be
88 // derived from baseConfig
89
90 registerSerializer(clientConfig.getSerializationConfig());
91
92 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
93
94 try {
95 instance = HazelcastClient.newHazelcastClient(clientConfig);
96 if (!instance.getCluster().getMembers().isEmpty()) {
97 log.debug("Members in cluster: " + instance.getCluster().getMembers());
98 return instance;
99 }
100 log.info("Failed to find cluster member, falling back to Instance mode");
101 } catch (IllegalStateException e) {
102 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
103 }
104 useClientMode = false;
105 instance = null;
106 }
107 log.info("Configuring Hazelcast datastore as Instance mode");
108
109 // To run 2 Hazelcast instance in 1 JVM,
110 // we probably need to something like below
111 //int port = hazelcastConfig.getNetworkConfig().getPort();
112 //hazelcastConfig.getNetworkConfig().setPort(port+1);
113
114 registerSerializer(baseHzConfig.getSerializationConfig());
115
116 return Hazelcast.newHazelcastInstance(baseHzConfig);
117 }
118
119 /**
120 * Register serializer for VersionedValue class used to imitate value version.
121 * @param config
122 */
123 private static void registerSerializer(final SerializationConfig config) {
124 config.addDataSerializableFactoryClass(
125 VersionedValueSerializableFactory.FACTORY_ID,
126 VersionedValueSerializableFactory.class);
127 }
128
129 @Override
130 public IKVTable getTable(final String tableName) {
131 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
132
133 if (!useClientMode) {
134 // config only available in Instance Mode
135 // Client Mode must rely on hazelcast.xml to be properly configured.
136 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
137 // config for this map to be strong consistent
138 if (config.isReadBackupData()) {
139 config.setReadBackupData(false);
140 }
141 if (config.isNearCacheEnabled()) {
142 config.getNearCacheConfig().setMaxSize(0);
143 }
144
145 if (config.getBackupCount() != backupCount) {
146 config.setAsyncBackupCount(0);
147 config.setBackupCount(backupCount);
148 }
149 }
150
151 return new HZTable(tableName, map);
152 }
153
154 @Override
155 public void dropTable(final IKVTable table) {
156 ((HZTable) table).getBackendMap().clear();
157 }
158
159 @Override
160 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
161 throws ObjectExistsException {
162 IKVTable table = (IKVTable) tableId;
163 return table.create(key, value);
164 }
165
166 @Override
167 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
168 IKVTable table = (IKVTable) tableId;
169 return table.forceCreate(key, value);
170 }
171
172 @Override
173 public IKVEntry read(final IKVTableID tableId, final byte[] key)
174 throws ObjectDoesntExistException {
175 IKVTable table = (IKVTable) tableId;
176 return table.read(key);
177 }
178
179 @Override
180 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
181 final long version) throws ObjectDoesntExistException,
182 WrongVersionException {
183 IKVTable table = (IKVTable) tableId;
184 return table.update(key, value, version);
185 }
186
187 @Override
188 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
189 throws ObjectDoesntExistException {
190 IKVTable table = (IKVTable) tableId;
191 return table.update(key, value);
192 }
193
194 @Override
195 public long delete(final IKVTableID tableId, final byte[] key, final long version)
196 throws ObjectDoesntExistException, WrongVersionException {
197 IKVTable table = (IKVTable) tableId;
198 return table.delete(key, version);
199 }
200
201 @Override
202 public long forceDelete(final IKVTableID tableId, final byte[] key) {
203 IKVTable table = (IKVTable) tableId;
204 return table.forceDelete(key);
205 }
206
207 @Override
208 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
209 IKVTable table = (IKVTable) tableId;
210 return table.getAllEntries();
211 }
212
213 @Override
214 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
215 final byte[] value) {
216 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
217 }
218
219 @Override
220 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
221 final byte[] value) {
222 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
223 }
224
225 @Override
226 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
227 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
228 }
229
230 @Override
231 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
232 final byte[] value, final long version) {
233 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
234 }
235
236 @Override
237 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
238 final byte[] value, final long version) {
239 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
240 }
241
242 @Override
243 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
244 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
245 }
246
247 @Override
248 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
249 boolean failExists = false;
250 for (IMultiEntryOperation op : ops) {
251 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
252 switch (mop.getOperation()) {
253 case DELETE:
254 try {
255 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
256 mop.setVersion(version);
257 mop.setStatus(STATUS.SUCCESS);
258 } catch (ObjectDoesntExistException | WrongVersionException e) {
259 log.error(mop + " failed.", e);
260 mop.setStatus(STATUS.FAILED);
261 failExists = true;
262 }
263 break;
264 case FORCE_DELETE:
265 final long version = forceDelete(mop.getTableId(), mop.getKey());
266 mop.setVersion(version);
267 mop.setStatus(STATUS.SUCCESS);
268 break;
269 default:
270 throw new UnsupportedOperationException(mop.toString());
271 }
272 }
273 return failExists;
274 }
275
276 @Override
277 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
278 // there may be room to batch to improve performance
279 boolean failExists = false;
280 for (IMultiEntryOperation op : ops) {
281 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
282 switch (mop.getOperation()) {
283 case CREATE:
284 try {
285 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
286 mop.setVersion(version);
287 mop.setStatus(STATUS.SUCCESS);
288 } catch (ObjectExistsException e) {
289 log.error(mop + " failed.", e);
290 mop.setStatus(STATUS.FAILED);
291 failExists = true;
292 }
293 break;
294 case FORCE_CREATE:
295 {
296 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
297 mop.setVersion(version);
298 mop.setStatus(STATUS.SUCCESS);
299 break;
300 }
301 case UPDATE:
302 try {
303 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
304 mop.setVersion(version);
305 mop.setStatus(STATUS.SUCCESS);
306 } catch (ObjectDoesntExistException | WrongVersionException e) {
307 log.error(mop + " failed.", e);
308 mop.setStatus(STATUS.FAILED);
309 failExists = true;
310 }
311 break;
312 default:
313 throw new UnsupportedOperationException(mop.toString());
314 }
315 }
316 return failExists;
317 }
318
319 @Override
320 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
321 boolean failExists = false;
322 for (IMultiEntryOperation op : ops) {
323 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
324 HZTable table = (HZTable) op.getTableId();
325 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
326 }
327 for (IMultiEntryOperation op : ops) {
328 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
329 if (mop.hasSucceeded()) {
330 // status update is already done, nothing to do.
331 } else {
332 failExists = true;
333 }
334 }
335
336 return failExists;
337 }
338
339 @Override
340 public long VERSION_NONEXISTENT() {
341 return VERSION_NONEXISTENT;
342 }
343
344
345}