blob: 134f437ce9a880257dbab5e546f54462a38aca0e [file] [log] [blame]
Nick Karanatsios8abe7172014-02-19 20:31:48 -08001/*
2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
4 */
Jonathan Hartaa380972014-04-03 10:24:46 -07005package net.onrc.onos.core.intent.runtime;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08006
7import com.esotericsoftware.kryo.Kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -08008import com.esotericsoftware.kryo.io.Output;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07009
Nick Karanatsios8abe7172014-02-19 20:31:48 -080010import java.io.ByteArrayOutputStream;
Nick Karanatsiosf9336002014-02-24 11:26:58 -080011import java.io.IOException;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080012import java.nio.ByteBuffer;
13import java.util.Arrays;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080014import java.util.concurrent.atomic.AtomicLong;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070015
Jonathan Hart6df90172014-04-03 10:13:11 -070016import net.onrc.onos.core.datagrid.web.IntentResource;
17import net.onrc.onos.core.datastore.DataStoreClient;
18import net.onrc.onos.core.datastore.IKVTable;
19import net.onrc.onos.core.datastore.ObjectExistsException;
Jonathan Hartaa380972014-04-03 10:24:46 -070020import net.onrc.onos.core.intent.IntentOperationList;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080021import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
22import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080023import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080024import net.onrc.onos.registry.controller.IControllerRegistryService;
25import net.onrc.onos.registry.controller.IdBlock;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070026
Nick Karanatsiosed645df2014-02-20 23:22:29 -080027import org.slf4j.Logger;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080028import org.slf4j.LoggerFactory;
29
30/**
31 *
32 * @author nickkaranatsios
33 */
34public class PersistIntent {
Nick Karanatsiosed645df2014-02-20 23:22:29 -080035 private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
Nick Karanatsiosf9336002014-02-24 11:26:58 -080036 private long range = 10000L;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080037 private final IControllerRegistryService controllerRegistry;
38 NetworkGraph graph = null;
Nick Karanatsiosa8800572014-02-25 01:12:06 -080039 private final static String intentJournal = "G:IntentJournal";
40 private final static int valueStoreLimit = 1024 * 1024;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070041 private IKVTable table;
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080042 private Kryo kryo;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080043 private ByteArrayOutputStream stream;
44 private Output output = null;
45 private AtomicLong nextId = null;
46 private long rangeEnd;
47 private IdBlock idBlock = null;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070048
49
Nick Karanatsios8abe7172014-02-19 20:31:48 -080050 public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
51 this.controllerRegistry = controllerRegistry;
52 this.graph = ng.getNetworkGraph();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070053 table = DataStoreClient.getClient().getTable(intentJournal);
Nick Karanatsiosed645df2014-02-20 23:22:29 -080054 stream = new ByteArrayOutputStream(1024);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080055 output = new Output(stream);
Nick Karanatsiosd1dfb922014-02-26 11:48:49 -080056 kryo = (new KryoFactory()).newKryo();
Nick Karanatsios8abe7172014-02-19 20:31:48 -080057 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070058
Nick Karanatsios8abe7172014-02-19 20:31:48 -080059 public long getKey() {
60 long key;
61 if (idBlock == null) {
62 key = getNextBlock();
63 } else {
64 key = nextId.incrementAndGet();
65 if (key >= rangeEnd) {
66 key = getNextBlock();
67 }
68 }
69 return key;
70 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070071
Nick Karanatsios8abe7172014-02-19 20:31:48 -080072 private long getNextBlock() {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070073 // XXX This method is not thread safe, may lose allocated IdBlock
Nick Karanatsios8abe7172014-02-19 20:31:48 -080074 idBlock = controllerRegistry.allocateUniqueIdBlock(range);
75 nextId = new AtomicLong(idBlock.getStart());
76 rangeEnd = idBlock.getEnd();
77 return nextId.get();
78 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070079
Nick Karanatsios8abe7172014-02-19 20:31:48 -080080 public boolean persistIfLeader(long key, IntentOperationList operations) {
81 boolean leader = true;
82 boolean ret = false;
83 // TODO call controllerRegistry.isClusterLeader()
84 if (leader) {
85 try {
Nick Karanatsiosa8800572014-02-25 01:12:06 -080086 // reserve key 10 entries for multi-write if size over 1MB
87 key *= 10;
Nick Karanatsios8abe7172014-02-19 20:31:48 -080088 kryo.writeObject(output, operations);
89 output.close();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080090 ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
Nick Karanatsios8abe7172014-02-19 20:31:48 -080091 byte[] buffer = stream.toByteArray();
Nick Karanatsiosa8800572014-02-25 01:12:06 -080092 int total = buffer.length;
93 if ((total >= valueStoreLimit )) {
94 int writeCount = total / valueStoreLimit;
95 int remainder = total % valueStoreLimit;
96 int upperIndex = 0;
97 for (int i = 0; i < writeCount; i++, key++) {
98 keyBytes.clear();
99 keyBytes.putLong(key);
100 keyBytes.flip();
101 upperIndex = (i * valueStoreLimit + valueStoreLimit) - 1;
102 log.debug("writing using indexes {}:{}", (i*valueStoreLimit) ,upperIndex);
103 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * valueStoreLimit, upperIndex));
104 }
105 if (remainder > 0) {
106 keyBytes.clear();
107 keyBytes.putLong(key);
108 keyBytes.flip();
109 log.debug("writing using indexes {}:{}" ,upperIndex ,total);
110 table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
111 }
112 } else {
113 keyBytes.flip();
114 table.create(keyBytes.array(), buffer);
115 }
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800116 log.debug("key is {} value length is {}", key, buffer.length);
117 stream.reset();
118 stream.close();
Nick Karanatsiosa1bad352014-02-22 14:16:34 -0800119 log.debug("persist operations to ramcloud size of operations: {}", operations.size());
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800120 ret = true;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700121 } catch (ObjectExistsException ex) {
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800122 log.warn("Failed to store intent journal with key " + key);
Nick Karanatsiosf9336002014-02-24 11:26:58 -0800123 } catch (IOException ex) {
124 log.error("Failed to close the stream");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800125 }
126 }
127 return ret;
128 }
129}