blob: 380a0f23da87ddee03e164ea800e6c4a18f08a8b [file] [log] [blame]
Nick Karanatsios8abe7172014-02-19 20:31:48 -08001/*
2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
4 */
5package net.onrc.onos.intent.persist;
6
7import com.esotericsoftware.kryo.Kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08008import com.esotericsoftware.kryo.io.Output;
9import edu.stanford.ramcloud.JRamCloud;
10import java.io.ByteArrayOutputStream;
Nick Karanatsiosf9336002014-02-24 11:26:58 -080011import java.io.IOException;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080012import java.nio.ByteBuffer;
13import java.util.Arrays;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080014import java.util.concurrent.atomic.AtomicLong;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080015import net.onrc.onos.datagrid.web.IntentResource;
16import net.onrc.onos.datastore.RCTable;
17import net.onrc.onos.intent.IntentOperationList;
18import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
19import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
20import net.onrc.onos.registry.controller.IControllerRegistryService;
21import net.onrc.onos.registry.controller.IdBlock;
Nick Karanatsiosed645df2014-02-20 23:22:29 -080022import org.slf4j.Logger;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080023import org.slf4j.LoggerFactory;
24
25/**
26 *
27 * @author nickkaranatsios
28 */
29public class PersistIntent {
Nick Karanatsiosed645df2014-02-20 23:22:29 -080030 private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
Nick Karanatsiosf9336002014-02-24 11:26:58 -080031 private long range = 10000L;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080032 private final IControllerRegistryService controllerRegistry;
33 NetworkGraph graph = null;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080034 private final static String intentJournal = "G:IntentJournal";
35 private final static int valueStoreLimit = 1024 * 1024;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080036 private RCTable table;
37 private Kryo kryo = new Kryo();
38 private ByteArrayOutputStream stream;
39 private Output output = null;
40 private AtomicLong nextId = null;
41 private long rangeEnd;
42 private IdBlock idBlock = null;
43
44
45 public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
46 this.controllerRegistry = controllerRegistry;
47 this.graph = ng.getNetworkGraph();
48 table = RCTable.getTable(intentJournal);
Nick Karanatsiosed645df2014-02-20 23:22:29 -080049 stream = new ByteArrayOutputStream(1024);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080050 output = new Output(stream);
51 }
52
53 public long getKey() {
54 long key;
55 if (idBlock == null) {
56 key = getNextBlock();
57 } else {
58 key = nextId.incrementAndGet();
59 if (key >= rangeEnd) {
60 key = getNextBlock();
61 }
62 }
63 return key;
64 }
65
66 private long getNextBlock() {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080067 // XXX This method is not thread safe, may lose allocated IdBlock
Nick Karanatsios8abe7172014-02-19 20:31:48 -080068 idBlock = controllerRegistry.allocateUniqueIdBlock(range);
69 nextId = new AtomicLong(idBlock.getStart());
70 rangeEnd = idBlock.getEnd();
71 return nextId.get();
72 }
73
74 public boolean persistIfLeader(long key, IntentOperationList operations) {
75 boolean leader = true;
76 boolean ret = false;
77 // TODO call controllerRegistry.isClusterLeader()
78 if (leader) {
79 try {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080080 // reserve key 10 entries for multi-write if size over 1MB
81 key *= 10;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080082 kryo.writeObject(output, operations);
83 output.close();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080084 ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080085 byte[] buffer = stream.toByteArray();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080086 int total = buffer.length;
87 if ((total >= valueStoreLimit )) {
88 int writeCount = total / valueStoreLimit;
89 int remainder = total % valueStoreLimit;
90 int upperIndex = 0;
91 for (int i = 0; i < writeCount; i++, key++) {
92 keyBytes.clear();
93 keyBytes.putLong(key);
94 keyBytes.flip();
95 upperIndex = (i * valueStoreLimit + valueStoreLimit) - 1;
96 log.debug("writing using indexes {}:{}", (i*valueStoreLimit) ,upperIndex);
97 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * valueStoreLimit, upperIndex));
98 }
99 if (remainder > 0) {
100 keyBytes.clear();
101 keyBytes.putLong(key);
102 keyBytes.flip();
103 log.debug("writing using indexes {}:{}" ,upperIndex ,total);
104 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
105 }
106 } else {
107 keyBytes.flip();
108 table.create(keyBytes.array(), buffer);
109 }
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800110 log.debug("key is {} value length is {}", key, buffer.length);
111 stream.reset();
112 stream.close();
Nick Karanatsiosa1bad352014-02-22 14:16:34 -0800113 log.debug("persist operations to ramcloud size of operations: {}", operations.size());
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800114 ret = true;
115 } catch (JRamCloud.ObjectExistsException ex) {
116 log.warn("Failed to store intent journal with key " + key);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800117 } catch (IOException ex) {
118 log.error("Failed to close the stream");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800119 }
120 }
121 return ret;
122 }
123}