blob: cbe10be4f88d899dcffd733a44ca95b371af6659 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Jonathan Hart6df90172014-04-03 10:13:11 -07007import net.onrc.onos.core.datastore.IKVClient;
8import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -07009import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070010import net.onrc.onos.core.datastore.IKVTableID;
11import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070012import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
13import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070014import net.onrc.onos.core.datastore.ObjectDoesntExistException;
15import net.onrc.onos.core.datastore.ObjectExistsException;
16import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070017import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
18import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070019
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import com.hazelcast.client.HazelcastClient;
24import com.hazelcast.client.config.ClientConfig;
25import com.hazelcast.config.Config;
26import com.hazelcast.config.FileSystemXmlConfig;
27import com.hazelcast.config.MapConfig;
28import com.hazelcast.config.SerializationConfig;
29import com.hazelcast.core.Hazelcast;
30import com.hazelcast.core.HazelcastInstance;
31import com.hazelcast.core.IMap;
32
33public class HZClient implements IKVClient {
34 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
35
36 static final long VERSION_NONEXISTENT = 0L;
37
38 private static final String MAP_PREFIX = "datastore://";
39
40 // make this path configurable
Jonathan Hart6df90172014-04-03 10:13:11 -070041 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
42 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070043
44 // Note: xml configuration will overwrite this value if present
Jonathan Hart6df90172014-04-03 10:13:11 -070045 private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070046
47 private final HazelcastInstance hazelcastInstance;
48
49 private static final HZClient THE_INSTANCE = new HZClient();
50
51 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070052 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053 }
54
55 private HZClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070056 hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057 }
58
59 private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070060 Config baseHzConfig = null;
61 try {
62 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
63 } catch (FileNotFoundException e) {
64 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Ray Milkey269ffb92014-04-03 14:43:30 -070065 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070066 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 // use xml config if present, if not use System.property
69 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
70 if (mapConfig != null) {
71 backupCount = mapConfig.getBackupCount();
72 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070073
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070074 HazelcastInstance instance = null;
75 if (useClientMode) {
76 log.info("Configuring Hazelcast datastore as Client mode");
77 ClientConfig clientConfig = new ClientConfig();
78 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070079
Jonathan Hart6df90172014-04-03 10:13:11 -070080 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070081 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070082
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070083 // copy group config from base Hazelcast configuration
84 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
85 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070086
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070087 // TODO We probably need to figure out what else need to be
88 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070089
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070090 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070091
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070092 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070093
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070094 try {
95 instance = HazelcastClient.newHazelcastClient(clientConfig);
96 if (!instance.getCluster().getMembers().isEmpty()) {
97 log.debug("Members in cluster: " + instance.getCluster().getMembers());
98 return instance;
99 }
100 log.info("Failed to find cluster member, falling back to Instance mode");
101 } catch (IllegalStateException e) {
102 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
103 }
104 useClientMode = false;
105 instance = null;
106 }
107 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700108
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700109 // To run 2 Hazelcast instance in 1 JVM,
110 // we probably need to something like below
111 //int port = hazelcastConfig.getNetworkConfig().getPort();
112 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700113
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700114 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700117 }
118
119 /**
120 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700121 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700122 * @param config
123 */
124 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 config.addDataSerializableFactoryClass(
126 VersionedValueSerializableFactory.FACTORY_ID,
127 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700128 }
129
130 @Override
131 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700132 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700133
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700134 if (!useClientMode) {
135 // config only available in Instance Mode
136 // Client Mode must rely on hazelcast.xml to be properly configured.
137 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
138 // config for this map to be strong consistent
139 if (config.isReadBackupData()) {
140 config.setReadBackupData(false);
141 }
142 if (config.isNearCacheEnabled()) {
143 config.getNearCacheConfig().setMaxSize(0);
144 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700146 if (config.getBackupCount() != backupCount) {
147 config.setAsyncBackupCount(0);
148 config.setBackupCount(backupCount);
149 }
150 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700151
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700152 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700153 }
154
155 @Override
156 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700157 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700158 }
159
160 @Override
161 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
162 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700163 IKVTable table = (IKVTable) tableId;
164 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700165 }
166
167 @Override
168 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700169 IKVTable table = (IKVTable) tableId;
170 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700171 }
172
173 @Override
174 public IKVEntry read(final IKVTableID tableId, final byte[] key)
175 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 IKVTable table = (IKVTable) tableId;
177 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700178 }
179
180 @Override
181 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700182 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700183 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 IKVTable table = (IKVTable) tableId;
185 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700186 }
187
188 @Override
189 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
190 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700191 IKVTable table = (IKVTable) tableId;
192 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700193 }
194
195 @Override
196 public long delete(final IKVTableID tableId, final byte[] key, final long version)
197 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700198 IKVTable table = (IKVTable) tableId;
199 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700200 }
201
202 @Override
203 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700204 IKVTable table = (IKVTable) tableId;
205 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700206 }
207
208 @Override
209 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700210 IKVTable table = (IKVTable) tableId;
211 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700212 }
213
214 @Override
215 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700216 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700217 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700218 }
219
220 @Override
221 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700222 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700223 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700224 }
225
226 @Override
227 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700229 }
230
231 @Override
232 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700233 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700234 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700235 }
236
237 @Override
238 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700239 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700240 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700241 }
242
243 @Override
244 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700245 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700246 }
247
248 @Override
249 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700250 boolean failExists = false;
251 for (IMultiEntryOperation op : ops) {
252 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
253 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700254 case DELETE:
255 try {
256 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
257 mop.setVersion(version);
258 mop.setStatus(STATUS.SUCCESS);
259 } catch (ObjectDoesntExistException | WrongVersionException e) {
260 log.error(mop + " failed.", e);
261 mop.setStatus(STATUS.FAILED);
262 failExists = true;
263 }
264 break;
265 case FORCE_DELETE:
266 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700267 mop.setVersion(version);
268 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700269 break;
270 default:
271 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700272 }
273 }
274 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700275 }
276
277 @Override
278 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700279 // there may be room to batch to improve performance
280 boolean failExists = false;
281 for (IMultiEntryOperation op : ops) {
282 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
283 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700284 case CREATE:
285 try {
286 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
287 mop.setVersion(version);
288 mop.setStatus(STATUS.SUCCESS);
289 } catch (ObjectExistsException e) {
290 log.error(mop + " failed.", e);
291 mop.setStatus(STATUS.FAILED);
292 failExists = true;
293 }
294 break;
295 case FORCE_CREATE: {
296 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700297 mop.setVersion(version);
298 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700299 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700300 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700301 case UPDATE:
302 try {
303 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
304 mop.setVersion(version);
305 mop.setStatus(STATUS.SUCCESS);
306 } catch (ObjectDoesntExistException | WrongVersionException e) {
307 log.error(mop + " failed.", e);
308 mop.setStatus(STATUS.FAILED);
309 failExists = true;
310 }
311 break;
312 default:
313 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700314 }
315 }
316 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700317 }
318
319 @Override
320 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700321 boolean failExists = false;
322 for (IMultiEntryOperation op : ops) {
323 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
324 HZTable table = (HZTable) op.getTableId();
325 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
326 }
327 for (IMultiEntryOperation op : ops) {
328 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
329 if (mop.hasSucceeded()) {
330 // status update is already done, nothing to do.
331 } else {
332 failExists = true;
333 }
334 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700335
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700336 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700337 }
338
339 @Override
340 public long VERSION_NONEXISTENT() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700342 }
343
344
345}