Renamed the intent package

net.onrc.onos.intent.* => net.onrc.onos.core.intent.*

Change-Id: Id61f79ed52acf3b91af4ebad2515ac5b7d6dc5e1
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
new file mode 100755
index 0000000..134f437
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
@@ -0,0 +1,129 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package net.onrc.onos.core.intent.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.onrc.onos.core.datagrid.web.IntentResource;
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.intent.IntentOperationList;
+import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
+import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+import net.onrc.onos.registry.controller.IdBlock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author nickkaranatsios
+ */
+public class PersistIntent {
+    private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
+    private long range = 10000L;
+    private final IControllerRegistryService controllerRegistry;
+    NetworkGraph graph = null;
+    private final static String intentJournal = "G:IntentJournal";
+    private final static int valueStoreLimit = 1024 * 1024;
+    private IKVTable table;
+    private Kryo kryo;
+    private ByteArrayOutputStream stream;
+    private Output output = null;
+    private AtomicLong nextId = null;
+    private long rangeEnd;
+    private IdBlock idBlock = null;
+
+
+    public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
+        this.controllerRegistry = controllerRegistry;
+        this.graph = ng.getNetworkGraph();
+        table = DataStoreClient.getClient().getTable(intentJournal);
+        stream = new ByteArrayOutputStream(1024);
+        output = new Output(stream);
+        kryo = (new KryoFactory()).newKryo();
+    }
+
+    public long getKey() {
+        long key;
+        if (idBlock == null) {
+            key = getNextBlock();
+        } else {
+            key = nextId.incrementAndGet();
+            if (key >= rangeEnd) {
+                key = getNextBlock();
+            }
+        }
+        return key;
+    }
+
+    private long getNextBlock() {
+        // XXX This method is not thread safe, may lose allocated IdBlock
+        idBlock = controllerRegistry.allocateUniqueIdBlock(range);
+        nextId = new AtomicLong(idBlock.getStart());
+        rangeEnd = idBlock.getEnd();
+        return nextId.get();
+    }
+
+    public boolean persistIfLeader(long key, IntentOperationList operations) {
+        boolean leader = true;
+        boolean ret = false;
+        // TODO call controllerRegistry.isClusterLeader()
+        if (leader) {
+            try {
+                // reserve key 10 entries for multi-write if size over 1MB
+                key *= 10;
+                kryo.writeObject(output, operations);
+                output.close();
+                ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
+                byte[] buffer = stream.toByteArray();
+                int total = buffer.length;
+                if ((total >= valueStoreLimit )) {
+                    int writeCount = total / valueStoreLimit;
+                    int remainder = total % valueStoreLimit;
+                    int upperIndex = 0;
+                    for (int i = 0; i < writeCount; i++, key++) {
+                        keyBytes.clear();
+                        keyBytes.putLong(key);
+                        keyBytes.flip();
+                        upperIndex = (i * valueStoreLimit + valueStoreLimit) - 1;
+                        log.debug("writing using indexes {}:{}", (i*valueStoreLimit) ,upperIndex);
+                        table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * valueStoreLimit, upperIndex));
+                    }
+                    if (remainder > 0) {
+                        keyBytes.clear();
+                        keyBytes.putLong(key);
+                        keyBytes.flip();
+                        log.debug("writing using indexes {}:{}" ,upperIndex ,total);
+                        table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
+                    }
+                } else {
+                    keyBytes.flip();
+                    table.create(keyBytes.array(), buffer);
+                }
+                log.debug("key is {} value length is {}", key, buffer.length);
+                stream.reset();
+                stream.close();
+                log.debug("persist operations to ramcloud size of operations: {}", operations.size());
+                ret = true;
+            } catch (ObjectExistsException ex) {
+                log.warn("Failed to store intent journal with key " + key);
+            } catch (IOException ex) {
+                log.error("Failed to close the stream");
+            }
+        }
+        return ret;
+    }
+}