blob: aec9f9d0b27de489a82e95be6cad2b6d3ffa9f09 [file] [log] [blame]
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07001package net.onrc.onos.datastore.hazelcast;
2
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
7import net.onrc.onos.datastore.IKVClient;
8import net.onrc.onos.datastore.IKVTable;
9import net.onrc.onos.datastore.IKVTable.IKVEntry;
10import net.onrc.onos.datastore.IKVTableID;
11import net.onrc.onos.datastore.IMultiEntryOperation;
12import net.onrc.onos.datastore.IMultiEntryOperation.OPERATION;
13import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
14import net.onrc.onos.datastore.ObjectDoesntExistException;
15import net.onrc.onos.datastore.ObjectExistsException;
16import net.onrc.onos.datastore.WrongVersionException;
17import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
18import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
19
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import com.hazelcast.client.HazelcastClient;
24import com.hazelcast.client.config.ClientConfig;
25import com.hazelcast.config.Config;
26import com.hazelcast.config.FileSystemXmlConfig;
27import com.hazelcast.config.MapConfig;
28import com.hazelcast.config.SerializationConfig;
29import com.hazelcast.core.Hazelcast;
30import com.hazelcast.core.HazelcastInstance;
31import com.hazelcast.core.IMap;
32
33public class HZClient implements IKVClient {
34 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
35
36 static final long VERSION_NONEXISTENT = 0L;
37
38 private static final String MAP_PREFIX = "datastore://";
39
40 // make this path configurable
41 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
42 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.datastore.hazelcast.clientMode", "true"));
43
44 // Note: xml configuration will overwrite this value if present
45 private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.datastore.hazelcast.backupCount", "3"));
46
47 private final HazelcastInstance hazelcastInstance;
48
49 private static final HZClient THE_INSTANCE = new HZClient();
50
51 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070052 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053 }
54
55 private HZClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070056 hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057 }
58
59 private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070060 Config baseHzConfig = null;
61 try {
62 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
63 } catch (FileNotFoundException e) {
64 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
65 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
66 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 // use xml config if present, if not use System.property
69 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
70 if (mapConfig != null) {
71 backupCount = mapConfig.getBackupCount();
72 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070073
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070074 HazelcastInstance instance = null;
75 if (useClientMode) {
76 log.info("Configuring Hazelcast datastore as Client mode");
77 ClientConfig clientConfig = new ClientConfig();
78 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070079
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070080 String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
81 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070082
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070083 // copy group config from base Hazelcast configuration
84 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
85 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070086
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070087 // TODO We probably need to figure out what else need to be
88 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070089
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070090 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070091
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070092 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070093
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070094 try {
95 instance = HazelcastClient.newHazelcastClient(clientConfig);
96 if (!instance.getCluster().getMembers().isEmpty()) {
97 log.debug("Members in cluster: " + instance.getCluster().getMembers());
98 return instance;
99 }
100 log.info("Failed to find cluster member, falling back to Instance mode");
101 } catch (IllegalStateException e) {
102 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
103 }
104 useClientMode = false;
105 instance = null;
106 }
107 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700108
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700109 // To run 2 Hazelcast instance in 1 JVM,
110 // we probably need to something like below
111 //int port = hazelcastConfig.getNetworkConfig().getPort();
112 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700113
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700114 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700117 }
118
119 /**
120 * Register serializer for VersionedValue class used to imitate value version.
121 * @param config
122 */
123 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 config.addDataSerializableFactoryClass(
125 VersionedValueSerializableFactory.FACTORY_ID,
126 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700127 }
128
129 @Override
130 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700131 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700132
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700133 if (!useClientMode) {
134 // config only available in Instance Mode
135 // Client Mode must rely on hazelcast.xml to be properly configured.
136 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
137 // config for this map to be strong consistent
138 if (config.isReadBackupData()) {
139 config.setReadBackupData(false);
140 }
141 if (config.isNearCacheEnabled()) {
142 config.getNearCacheConfig().setMaxSize(0);
143 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700144
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700145 if (config.getBackupCount() != backupCount) {
146 config.setAsyncBackupCount(0);
147 config.setBackupCount(backupCount);
148 }
149 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700150
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700151 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700152 }
153
154 @Override
155 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700156 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700157 }
158
159 @Override
160 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
161 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700162 IKVTable table = (IKVTable) tableId;
163 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700164 }
165
166 @Override
167 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700168 IKVTable table = (IKVTable) tableId;
169 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700170 }
171
172 @Override
173 public IKVEntry read(final IKVTableID tableId, final byte[] key)
174 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700175 IKVTable table = (IKVTable) tableId;
176 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700177 }
178
179 @Override
180 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
181 final long version) throws ObjectDoesntExistException,
182 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700183 IKVTable table = (IKVTable) tableId;
184 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700185 }
186
187 @Override
188 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
189 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700190 IKVTable table = (IKVTable) tableId;
191 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700192 }
193
194 @Override
195 public long delete(final IKVTableID tableId, final byte[] key, final long version)
196 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700197 IKVTable table = (IKVTable) tableId;
198 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700199 }
200
201 @Override
202 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700203 IKVTable table = (IKVTable) tableId;
204 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700205 }
206
207 @Override
208 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700209 IKVTable table = (IKVTable) tableId;
210 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700211 }
212
213 @Override
214 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
215 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700216 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700217 }
218
219 @Override
220 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
221 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700222 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700223 }
224
225 @Override
226 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700227 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700228 }
229
230 @Override
231 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
232 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700233 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700234 }
235
236 @Override
237 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
238 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700239 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700240 }
241
242 @Override
243 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700244 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700245 }
246
247 @Override
248 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 boolean failExists = false;
250 for (IMultiEntryOperation op : ops) {
251 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
252 switch (mop.getOperation()) {
253 case DELETE:
254 try {
255 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
256 mop.setVersion(version);
257 mop.setStatus(STATUS.SUCCESS);
258 } catch (ObjectDoesntExistException | WrongVersionException e) {
259 log.error(mop + " failed.", e);
260 mop.setStatus(STATUS.FAILED);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700261 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700262 }
263 break;
264 case FORCE_DELETE:
265 final long version = forceDelete(mop.getTableId(), mop.getKey());
266 mop.setVersion(version);
267 mop.setStatus(STATUS.SUCCESS);
268 break;
269 default:
270 throw new UnsupportedOperationException(mop.toString());
271 }
272 }
273 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700274 }
275
276 @Override
277 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700278 // there may be room to batch to improve performance
279 boolean failExists = false;
280 for (IMultiEntryOperation op : ops) {
281 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
282 switch (mop.getOperation()) {
283 case CREATE:
284 try {
285 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
286 mop.setVersion(version);
287 mop.setStatus(STATUS.SUCCESS);
288 } catch (ObjectExistsException e) {
289 log.error(mop + " failed.", e);
290 mop.setStatus(STATUS.FAILED);
291 failExists = true;
292 }
293 break;
294 case FORCE_CREATE:
295 {
296 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
297 mop.setVersion(version);
298 mop.setStatus(STATUS.SUCCESS);
299 break;
300 }
301 case UPDATE:
302 try {
303 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
304 mop.setVersion(version);
305 mop.setStatus(STATUS.SUCCESS);
306 } catch (ObjectDoesntExistException | WrongVersionException e) {
307 log.error(mop + " failed.", e);
308 mop.setStatus(STATUS.FAILED);
309 failExists = true;
310 }
311 break;
312 default:
313 throw new UnsupportedOperationException(mop.toString());
314 }
315 }
316 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700317 }
318
319 @Override
320 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700321 boolean failExists = false;
322 for (IMultiEntryOperation op : ops) {
323 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
324 HZTable table = (HZTable) op.getTableId();
325 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
326 }
327 for (IMultiEntryOperation op : ops) {
328 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
329 if (mop.hasSucceeded()) {
330 // status update is already done, nothing to do.
331 } else {
332 failExists = true;
333 }
334 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700335
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700336 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700337 }
338
339 @Override
340 public long VERSION_NONEXISTENT() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700342 }
343
344
345}