blob: 737aeaec4a15c2ddd3de04db3025698a35d2bead [file] [log] [blame]
Nick Karanatsios8abe7172014-02-19 20:31:48 -08001/*
2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
4 */
Jonathan Hartaa380972014-04-03 10:24:46 -07005package net.onrc.onos.core.intent.runtime;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08006
Nick Karanatsios8abe7172014-02-19 20:31:48 -08007import java.io.ByteArrayOutputStream;
Nick Karanatsiosf9336002014-02-24 11:26:58 -08008import java.io.IOException;
Nick Karanatsiosa8800572014-02-25 01:12:06 -08009import java.nio.ByteBuffer;
10import java.util.Arrays;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080011import java.util.concurrent.atomic.AtomicLong;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070012
Jonathan Hart6df90172014-04-03 10:13:11 -070013import net.onrc.onos.core.datagrid.web.IntentResource;
14import net.onrc.onos.core.datastore.DataStoreClient;
15import net.onrc.onos.core.datastore.IKVTable;
16import net.onrc.onos.core.datastore.ObjectExistsException;
Jonathan Hartaa380972014-04-03 10:24:46 -070017import net.onrc.onos.core.intent.IntentOperationList;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070018import net.onrc.onos.core.registry.IControllerRegistryService;
19import net.onrc.onos.core.registry.IdBlock;
Jonathan Hart472062d2014-04-03 10:56:48 -070020import net.onrc.onos.core.topology.INetworkGraphService;
21import net.onrc.onos.core.topology.NetworkGraph;
Jonathan Hart23701d12014-04-03 10:45:48 -070022import net.onrc.onos.core.util.serializers.KryoFactory;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070023
Nick Karanatsiosed645df2014-02-20 23:22:29 -080024import org.slf4j.Logger;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080025import org.slf4j.LoggerFactory;
26
Jonathan Harta99ec672014-04-03 11:30:34 -070027import com.esotericsoftware.kryo.Kryo;
28import com.esotericsoftware.kryo.io.Output;
29
Nick Karanatsios8abe7172014-02-19 20:31:48 -080030/**
Nick Karanatsios8abe7172014-02-19 20:31:48 -080031 * @author nickkaranatsios
32 */
33public class PersistIntent {
Ray Milkeyec838942014-04-09 11:28:43 -070034 private static final Logger log = LoggerFactory.getLogger(IntentResource.class);
Nick Karanatsiosf9336002014-02-24 11:26:58 -080035 private long range = 10000L;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080036 private final IControllerRegistryService controllerRegistry;
37 NetworkGraph graph = null;
Ray Milkeyec838942014-04-09 11:28:43 -070038 private static final String INTENT_JOURNAL = "G:IntentJournal";
39 private static final int VALUE_STORE_LIMIT = 1024 * 1024;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070040 private IKVTable table;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080041 private Kryo kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080042 private ByteArrayOutputStream stream;
43 private Output output = null;
44 private AtomicLong nextId = null;
45 private long rangeEnd;
46 private IdBlock idBlock = null;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070047
48
Nick Karanatsios8abe7172014-02-19 20:31:48 -080049 public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
50 this.controllerRegistry = controllerRegistry;
51 this.graph = ng.getNetworkGraph();
Ray Milkey5c9f2db2014-04-09 10:31:21 -070052 table = DataStoreClient.getClient().getTable(INTENT_JOURNAL);
Nick Karanatsiosed645df2014-02-20 23:22:29 -080053 stream = new ByteArrayOutputStream(1024);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080054 output = new Output(stream);
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080055 kryo = (new KryoFactory()).newKryo();
Nick Karanatsios8abe7172014-02-19 20:31:48 -080056 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070057
Nick Karanatsios8abe7172014-02-19 20:31:48 -080058 public long getKey() {
59 long key;
60 if (idBlock == null) {
61 key = getNextBlock();
62 } else {
63 key = nextId.incrementAndGet();
64 if (key >= rangeEnd) {
65 key = getNextBlock();
66 }
67 }
68 return key;
69 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070070
Nick Karanatsios8abe7172014-02-19 20:31:48 -080071 private long getNextBlock() {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070072 // XXX This method is not thread safe, may lose allocated IdBlock
Nick Karanatsios8abe7172014-02-19 20:31:48 -080073 idBlock = controllerRegistry.allocateUniqueIdBlock(range);
74 nextId = new AtomicLong(idBlock.getStart());
75 rangeEnd = idBlock.getEnd();
76 return nextId.get();
77 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070078
Nick Karanatsios8abe7172014-02-19 20:31:48 -080079 public boolean persistIfLeader(long key, IntentOperationList operations) {
80 boolean leader = true;
81 boolean ret = false;
82 // TODO call controllerRegistry.isClusterLeader()
83 if (leader) {
84 try {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080085 // reserve key 10 entries for multi-write if size over 1MB
86 key *= 10;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080087 kryo.writeObject(output, operations);
88 output.close();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080089 ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080090 byte[] buffer = stream.toByteArray();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080091 int total = buffer.length;
Ray Milkey5c9f2db2014-04-09 10:31:21 -070092 if ((total >= VALUE_STORE_LIMIT)) {
93 int writeCount = total / VALUE_STORE_LIMIT;
94 int remainder = total % VALUE_STORE_LIMIT;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080095 int upperIndex = 0;
96 for (int i = 0; i < writeCount; i++, key++) {
97 keyBytes.clear();
98 keyBytes.putLong(key);
99 keyBytes.flip();
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700100 upperIndex = (i * VALUE_STORE_LIMIT + VALUE_STORE_LIMIT) - 1;
101 log.debug("writing using indexes {}:{}", (i * VALUE_STORE_LIMIT), upperIndex);
102 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * VALUE_STORE_LIMIT, upperIndex));
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800103 }
104 if (remainder > 0) {
105 keyBytes.clear();
106 keyBytes.putLong(key);
107 keyBytes.flip();
Ray Milkey269ffb92014-04-03 14:43:30 -0700108 log.debug("writing using indexes {}:{}", upperIndex, total);
Nick Karanatsiosa8800572014-02-25 01:12:06 -0800109 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
110 }
111 } else {
112 keyBytes.flip();
113 table.create(keyBytes.array(), buffer);
114 }
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800115 log.debug("key is {} value length is {}", key, buffer.length);
116 stream.reset();
117 stream.close();
Nick Karanatsiosa1bad352014-02-22 14:16:34 -0800118 log.debug("persist operations to ramcloud size of operations: {}", operations.size());
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800119 ret = true;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700120 } catch (ObjectExistsException ex) {
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800121 log.warn("Failed to store intent journal with key " + key);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800122 } catch (IOException ex) {
123 log.error("Failed to close the stream");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800124 }
125 }
126 return ret;
127 }
128}