blob: 602762b4a4bd8009fdc745aee53a9948dc2201c9 [file] [log] [blame]
Nick Karanatsios8abe7172014-02-19 20:31:48 -08001/*
2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
4 */
5package net.onrc.onos.intent.persist;
6
7import com.esotericsoftware.kryo.Kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08008import com.esotericsoftware.kryo.io.Output;
9import edu.stanford.ramcloud.JRamCloud;
10import java.io.ByteArrayOutputStream;
Nick Karanatsiosf9336002014-02-24 11:26:58 -080011import java.io.IOException;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080012import java.nio.ByteBuffer;
13import java.util.Arrays;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080014import java.util.concurrent.atomic.AtomicLong;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080015import net.onrc.onos.datagrid.web.IntentResource;
16import net.onrc.onos.datastore.RCTable;
17import net.onrc.onos.intent.IntentOperationList;
18import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
19import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080020import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080021import net.onrc.onos.registry.controller.IControllerRegistryService;
22import net.onrc.onos.registry.controller.IdBlock;
Nick Karanatsiosed645df2014-02-20 23:22:29 -080023import org.slf4j.Logger;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080024import org.slf4j.LoggerFactory;
25
26/**
27 *
28 * @author nickkaranatsios
29 */
30public class PersistIntent {
Nick Karanatsiosed645df2014-02-20 23:22:29 -080031 private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
Nick Karanatsiosf9336002014-02-24 11:26:58 -080032 private long range = 10000L;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080033 private final IControllerRegistryService controllerRegistry;
34 NetworkGraph graph = null;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080035 private final static String intentJournal = "G:IntentJournal";
36 private final static int valueStoreLimit = 1024 * 1024;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080037 private RCTable table;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080038 private Kryo kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080039 private ByteArrayOutputStream stream;
40 private Output output = null;
41 private AtomicLong nextId = null;
42 private long rangeEnd;
43 private IdBlock idBlock = null;
44
45
46 public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
47 this.controllerRegistry = controllerRegistry;
48 this.graph = ng.getNetworkGraph();
49 table = RCTable.getTable(intentJournal);
Nick Karanatsiosed645df2014-02-20 23:22:29 -080050 stream = new ByteArrayOutputStream(1024);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080051 output = new Output(stream);
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080052 kryo = (new KryoFactory()).newKryo();
Nick Karanatsios8abe7172014-02-19 20:31:48 -080053 }
54
55 public long getKey() {
56 long key;
57 if (idBlock == null) {
58 key = getNextBlock();
59 } else {
60 key = nextId.incrementAndGet();
61 if (key >= rangeEnd) {
62 key = getNextBlock();
63 }
64 }
65 return key;
66 }
67
68 private long getNextBlock() {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080069 // XXX This method is not thread safe, may lose allocated IdBlock
Nick Karanatsios8abe7172014-02-19 20:31:48 -080070 idBlock = controllerRegistry.allocateUniqueIdBlock(range);
71 nextId = new AtomicLong(idBlock.getStart());
72 rangeEnd = idBlock.getEnd();
73 return nextId.get();
74 }
75
76 public boolean persistIfLeader(long key, IntentOperationList operations) {
77 boolean leader = true;
78 boolean ret = false;
79 // TODO call controllerRegistry.isClusterLeader()
80 if (leader) {
81 try {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080082 // reserve key 10 entries for multi-write if size over 1MB
83 key *= 10;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080084 kryo.writeObject(output, operations);
85 output.close();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080086 ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080087 byte[] buffer = stream.toByteArray();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080088 int total = buffer.length;
89 if ((total >= valueStoreLimit )) {
90 int writeCount = total / valueStoreLimit;
91 int remainder = total % valueStoreLimit;
92 int upperIndex = 0;
93 for (int i = 0; i < writeCount; i++, key++) {
94 keyBytes.clear();
95 keyBytes.putLong(key);
96 keyBytes.flip();
97 upperIndex = (i * valueStoreLimit + valueStoreLimit) - 1;
98 log.debug("writing using indexes {}:{}", (i*valueStoreLimit) ,upperIndex);
99 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * valueStoreLimit, upperIndex));
100 }
101 if (remainder > 0) {
102 keyBytes.clear();
103 keyBytes.putLong(key);
104 keyBytes.flip();
105 log.debug("writing using indexes {}:{}" ,upperIndex ,total);
106 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
107 }
108 } else {
109 keyBytes.flip();
110 table.create(keyBytes.array(), buffer);
111 }
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800112 log.debug("key is {} value length is {}", key, buffer.length);
113 stream.reset();
114 stream.close();
Nick Karanatsiosa1bad352014-02-22 14:16:34 -0800115 log.debug("persist operations to ramcloud size of operations: {}", operations.size());
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800116 ret = true;
117 } catch (JRamCloud.ObjectExistsException ex) {
118 log.warn("Failed to store intent journal with key " + key);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800119 } catch (IOException ex) {
120 log.error("Failed to close the stream");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800121 }
122 }
123 return ret;
124 }
125}