blob: 3322ce30277f4c83eb6293828e8c82462b15ade5 [file] [log] [blame]
Jonathan Hartaa380972014-04-03 10:24:46 -07001package net.onrc.onos.core.intent.runtime;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08002
Nick Karanatsios8abe7172014-02-19 20:31:48 -08003import java.io.ByteArrayOutputStream;
Nick Karanatsiosf9336002014-02-24 11:26:58 -08004import java.io.IOException;
Nick Karanatsiosa8800572014-02-25 01:12:06 -08005import java.nio.ByteBuffer;
6import java.util.Arrays;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08007import java.util.concurrent.atomic.AtomicLong;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07008
Jonathan Hart6df90172014-04-03 10:13:11 -07009import net.onrc.onos.core.datastore.DataStoreClient;
10import net.onrc.onos.core.datastore.IKVTable;
11import net.onrc.onos.core.datastore.ObjectExistsException;
Jonathan Hartaa380972014-04-03 10:24:46 -070012import net.onrc.onos.core.intent.IntentOperationList;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070013import net.onrc.onos.core.registry.IControllerRegistryService;
Sho SHIMIZUfc932d52014-08-15 11:22:37 -070014import net.onrc.onos.core.util.IdBlock;
Jonathan Hart23701d12014-04-03 10:45:48 -070015import net.onrc.onos.core.util.serializers.KryoFactory;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070016
Nick Karanatsiosed645df2014-02-20 23:22:29 -080017import org.slf4j.Logger;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080018import org.slf4j.LoggerFactory;
19
Jonathan Harta99ec672014-04-03 11:30:34 -070020import com.esotericsoftware.kryo.Kryo;
21import com.esotericsoftware.kryo.io.Output;
22
Nick Karanatsios8abe7172014-02-19 20:31:48 -080023/**
Toshio Koidefdb75932014-06-16 17:59:24 -070024 * The module used by PathCalcRuntimeModule class.
25 * <p>
26 * It persists intent operations into persistent storage.
Nick Karanatsios8abe7172014-02-19 20:31:48 -080027 */
28public class PersistIntent {
Pavlin Radoslavov0c14b8a2014-05-06 16:23:48 -070029 private static final Logger log = LoggerFactory.getLogger(PersistIntent.class);
Nick Karanatsiosf9336002014-02-24 11:26:58 -080030 private long range = 10000L;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080031 private final IControllerRegistryService controllerRegistry;
Ray Milkeyec838942014-04-09 11:28:43 -070032 private static final String INTENT_JOURNAL = "G:IntentJournal";
33 private static final int VALUE_STORE_LIMIT = 1024 * 1024;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070034 private IKVTable table;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080035 private Kryo kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080036 private ByteArrayOutputStream stream;
37 private Output output = null;
38 private AtomicLong nextId = null;
39 private long rangeEnd;
40 private IdBlock idBlock = null;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070041
Toshio Koidefdb75932014-06-16 17:59:24 -070042 /**
43 * Constructor.
44 *
45 * @param controllerRegistry the Registry Service to use.
46 */
Pavlin Radoslavov0294e052014-04-10 13:36:45 -070047 public PersistIntent(final IControllerRegistryService controllerRegistry) {
Nick Karanatsios8abe7172014-02-19 20:31:48 -080048 this.controllerRegistry = controllerRegistry;
Ray Milkey5c9f2db2014-04-09 10:31:21 -070049 table = DataStoreClient.getClient().getTable(INTENT_JOURNAL);
Nick Karanatsiosed645df2014-02-20 23:22:29 -080050 stream = new ByteArrayOutputStream(1024);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080051 output = new Output(stream);
Yuta HIGUCHIe57e10e2014-08-20 14:25:30 -070052 kryo = KryoFactory.newKryoObject();
Nick Karanatsios8abe7172014-02-19 20:31:48 -080053 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070054
Toshio Koidefdb75932014-06-16 17:59:24 -070055 private long getNextBlock() {
56 // XXX This method is not thread safe, may lose allocated IdBlock
57 idBlock = controllerRegistry.allocateUniqueIdBlock(range);
58 nextId = new AtomicLong(idBlock.getStart());
59 rangeEnd = idBlock.getEnd();
60 return nextId.get();
61 }
62
63 /**
64 * Provides the unique key for persisting.
65 * <p>
66 * This key is necessary for persistIfLeader() method.
67 *
68 * @return a key for persisting.
69 */
Nick Karanatsios8abe7172014-02-19 20:31:48 -080070 public long getKey() {
71 long key;
72 if (idBlock == null) {
73 key = getNextBlock();
74 } else {
75 key = nextId.incrementAndGet();
76 if (key >= rangeEnd) {
77 key = getNextBlock();
78 }
79 }
80 return key;
81 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070082
Toshio Koidefdb75932014-06-16 17:59:24 -070083 /**
84 * Persist intent operations into persistent storage only if this instance was a leader.
85 *
86 * @param key a unique key
87 * @param operations intent operations
88 * @return true if succeeded, otherwise false.
89 */
Nick Karanatsios8abe7172014-02-19 20:31:48 -080090 public boolean persistIfLeader(long key, IntentOperationList operations) {
91 boolean leader = true;
92 boolean ret = false;
Ray Milkey4373cbe2014-08-12 09:58:58 -070093 long keyValue = key;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080094 // TODO call controllerRegistry.isClusterLeader()
95 if (leader) {
96 try {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080097 // reserve key 10 entries for multi-write if size over 1MB
Ray Milkey4373cbe2014-08-12 09:58:58 -070098 keyValue *= 10;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080099 kryo.writeObject(output, operations);
100 output.close();
Ray Milkey4373cbe2014-08-12 09:58:58 -0700101 ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(keyValue);
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800102 byte[] buffer = stream.toByteArray();
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800103 int total = buffer.length;
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700104 if ((total >= VALUE_STORE_LIMIT)) {
105 int writeCount = total / VALUE_STORE_LIMIT;
106 int remainder = total % VALUE_STORE_LIMIT;
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800107 int upperIndex = 0;
Ray Milkey4373cbe2014-08-12 09:58:58 -0700108 for (int i = 0; i < writeCount; i++, keyValue++) {
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800109 keyBytes.clear();
Ray Milkey4373cbe2014-08-12 09:58:58 -0700110 keyBytes.putLong(keyValue);
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800111 keyBytes.flip();
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700112 upperIndex = (i * VALUE_STORE_LIMIT + VALUE_STORE_LIMIT) - 1;
113 log.debug("writing using indexes {}:{}", (i * VALUE_STORE_LIMIT), upperIndex);
114 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * VALUE_STORE_LIMIT, upperIndex));
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800115 }
116 if (remainder > 0) {
117 keyBytes.clear();
Ray Milkey4373cbe2014-08-12 09:58:58 -0700118 keyBytes.putLong(keyValue);
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800119 keyBytes.flip();
Ray Milkey269ffb92014-04-03 14:43:30 -0700120 log.debug("writing using indexes {}:{}", upperIndex, total);
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800121 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
122 }
123 } else {
124 keyBytes.flip();
125 table.create(keyBytes.array(), buffer);
126 }
Ray Milkey4373cbe2014-08-12 09:58:58 -0700127 log.debug("key is {} value length is {}", keyValue, buffer.length);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800128 stream.reset();
129 stream.close();
Nick Karanatsiosa1bad352014-02-22 14:16:34 -0800130 log.debug("persist operations to ramcloud size of operations: {}", operations.size());
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800131 ret = true;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700132 } catch (ObjectExistsException ex) {
Ray Milkey4373cbe2014-08-12 09:58:58 -0700133 log.warn("Failed to store intent journal with key " + keyValue);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800134 } catch (IOException ex) {
135 log.error("Failed to close the stream");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800136 }
137 }
138 return ret;
139 }
140}