add persistance of intents to ramcloud

Change-Id: I9a8b5886cf57da0621dc92e341d46c53d252528a
diff --git a/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java b/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
index 90c7a18..99f79fc 100755
--- a/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
+++ b/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
@@ -17,7 +17,7 @@
     public Restlet getRestlet(Context context) {
         Router router = new Router(context);
         router.attach("/get/map/{map-name}/json", GetMapResource.class);
-        router.attach("/add/intent/json", IntentResource.class);
+        router.attach("/add/intents/json", IntentResource.class);
         router.attach("/get/intents/json", IntentResource.class);
         return router;
     }
diff --git a/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java b/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
index ead5a1f..daf786f 100755
--- a/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
+++ b/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
@@ -5,17 +5,16 @@
 package net.onrc.onos.datagrid.web;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.intent.ConstrainedShortestPathIntent;
 import net.onrc.onos.intent.ShortestPathIntent;
 import net.onrc.onos.intent.IntentOperation;
 import net.onrc.onos.intent.IntentMap;
-//import net.onrc.onos.intent.Intent.IntentState;
-import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
-import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
-import net.onrc.onos.registry.controller.IControllerRegistryService;
-import net.onrc.onos.registry.controller.IdBlock;
+import net.onrc.onos.intent.Intent;
+import net.onrc.onos.intent.runtime.IPathCalcRuntimeService;
+import net.onrc.onos.intent.IntentOperationList;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -23,10 +22,7 @@
 import org.restlet.resource.ServerResource;
 import org.codehaus.jackson.map.ObjectMapper;
 import net.floodlightcontroller.util.MACAddress;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.ObjectNode;
@@ -40,9 +36,6 @@
  */
 public class IntentResource extends ServerResource {
     private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
-    private final String sep = ":";
-    private IdBlock idBlock = null;
-    private long nextIdBlock = 0;
 
     private class IntentStatus {
         String intentId;
@@ -80,6 +73,13 @@
 	    log.debug("FlowIntentResource ONOS Datagrid Service not found");
 	    return "";
 	}
+        IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService)getContext()
+                .getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+        if (pathRuntime == null) {
+            log.debug("Failed to get path calc runtime");
+            System.out.println("Failed to get path calc runtime");
+            return "";
+        }
         String reply = "";
 	ObjectMapper mapper = new ObjectMapper();
 	JsonNode jNode = null;
@@ -94,20 +94,35 @@
 	}
 
 	if (jNode != null) {
-	    Kryo kryo = new Kryo();
-	    reply = parseJsonNode(kryo, jNode.getElements(), datagridService);
+	    reply = parseJsonNode(jNode.getElements(), pathRuntime);
 	}
         return reply;
     }
 
     @Get("json")
-    public String retrieve() {
-        return "123";
+    public String retrieve() throws IOException {
+        IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService)getContext().
+                getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+        ObjectMapper mapper = new ObjectMapper();
+        String restStr = "";
+        ArrayNode arrayNode = mapper.createArrayNode();
+        IntentMap intentMap = pathRuntime.getHighLevelIntents();
+        Collection<Intent> intents = intentMap.getAllIntents();
+        if (!intents.isEmpty()) {
+            for (Intent intent : intents) {
+                ObjectNode node = mapper.createObjectNode();
+                node.put("intent_id", intent.getId());
+                node.put("status", intent.getState().toString());
+                arrayNode.add(node);
+                restStr = mapper.writeValueAsString(arrayNode);
+            }
+        }
+        return restStr;
     }
     
-    private String parseJsonNode(Kryo kryo, Iterator<JsonNode> nodes,
-	    IDatagridService datagridService) throws IOException {
-        LinkedList<IntentOperation> operations = new LinkedList<>();
+    private String parseJsonNode(Iterator<JsonNode> nodes,
+	    IPathCalcRuntimeService pathRuntime) throws IOException {
+        IntentOperationList operations = new IntentOperationList();
         ObjectMapper mapper = new ObjectMapper();
         ArrayNode arrayNode = mapper.createArrayNode();
 	while (nodes.hasNext()) {
@@ -123,18 +138,14 @@
 		}
                 String status = processIntent(fields, operations);
                 appendIntentStatus(status, (String)fields.get("intent_id"), mapper, arrayNode);
-		// datagridService.registerIntent(Long.toString(uuid),
-		// sb.toString().getBytes());
 	    }
 	}
-        IntentMap intents = new IntentMap();
-//        intents.executeOperations(operations); // TODO use PathCalcRuntimeModule
+        pathRuntime.executeIntentOperations(operations);
         return mapper.writeValueAsString(arrayNode);
     }
 
     private void appendIntentStatus(String status, final String applnIntentId, 
             ObjectMapper mapper, ArrayNode arrayNode) throws IOException {
-        System.out.println("status " + status);
         String intentId = applnIntentId.split(":")[1];
         ObjectNode node = mapper.createObjectNode();
         node.put("intent_id", intentId);
@@ -142,7 +153,7 @@
         arrayNode.add(node);
     }
     
-    private String processIntent(Map<String, Object> fields, LinkedList<IntentOperation> operations) {
+    private String processIntent(Map<String, Object> fields, IntentOperationList operations) {
         String intentType = (String)fields.get("intent_type");
         String intentOp = (String)fields.get("intent_op");
         String status = null;
@@ -160,7 +171,6 @@
                     (long) fields.get("dstPort"),
                     MACAddress.valueOf((String) fields.get("dstMac")).toLong());
             operations.add(new IntentOperation(operation, spi));
-            System.out.println("intent operation " + operation.toString());
             status = (spi.getState()).toString();
         } else {
             ConstrainedShortestPathIntent cspi = new ConstrainedShortestPathIntent((String) fields.get("intent_id"),
@@ -188,49 +198,4 @@
             fields.put(fieldName, node.getLongValue());
         }
     }
-    
-    @Deprecated
-    private long setPathIntentId() {
-	long uuid = 0;
-	if (idBlock == null || nextIdBlock + 1 == idBlock.getSize()) {
-	    IControllerRegistryService controllerRegistry = getControllerRegistry();
-	    if (controllerRegistry != null) {
-		idBlock = controllerRegistry.allocateUniqueIdBlock();
-		nextIdBlock = idBlock.getStart();
-		System.out.println("start block " + nextIdBlock + " end block "
-			+ idBlock.getEnd() + " size " + idBlock.getSize());
-	    }
-	}
-	if (idBlock != null) {
-	    uuid = nextIdBlock;
-	    nextIdBlock++;
-	}
-	return uuid;
-    }
-
-    @Deprecated
-    private void setIntentType(final String intentType, StringBuilder sb) {
-	String canonicalName = null;
-	if (intentType.equals("shortest-path")) {
-	    canonicalName = ShortestPathIntent.class.getCanonicalName();
-	    sb.append(ShortestPathIntent.class.getCanonicalName());
-	} else if (intentType.equals("constrained-shortest-path")) {
-	    canonicalName = ShortestPathIntent.class.getCanonicalName();
-	}
-	sb.append(canonicalName);
-	sb.append(sep);
-    }
-
-    private IControllerRegistryService getControllerRegistry() {
-	return (IControllerRegistryService) getContext().getAttributes().get(
-		IControllerRegistryService.class.getCanonicalName());
-    }
-
-    @Deprecated
-    private byte[] toBytes(Kryo kryo, Object value) {
-	Output output = new Output(1024);
-        kryo.writeObject(output, value);
-        output.close();
-        return output.toBytes();
-    }
 }
diff --git a/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
new file mode 100755
index 0000000..860b6dd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
@@ -0,0 +1,91 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package net.onrc.onos.intent.persist;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import edu.stanford.ramcloud.JRamCloud;
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.onrc.onos.datagrid.web.IntentResource;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.intent.IntentOperationList;
+import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
+import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+import net.onrc.onos.registry.controller.IdBlock;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author nickkaranatsios
+ */
+public class PersistIntent {
+    private final static org.slf4j.Logger log = LoggerFactory.getLogger(IntentResource.class);
+    private final static long range = 10000;
+    private final IControllerRegistryService controllerRegistry;
+    NetworkGraph graph = null;
+    private final String intentJournal = "G:IntentJournal";
+    private RCTable table;
+    private Kryo kryo = new Kryo();
+    private ByteArrayOutputStream stream;
+    private Output output = null;
+    private AtomicLong nextId = null;
+    private long rangeEnd;
+    private IdBlock idBlock = null;
+    
+    
+    public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
+        this.controllerRegistry = controllerRegistry;
+        this.graph = ng.getNetworkGraph();
+        table = RCTable.getTable(intentJournal);
+        stream = new ByteArrayOutputStream();
+        output = new Output(stream);
+    }
+    
+    public long getKey() {
+        long key;
+        if (idBlock == null) {
+            key = getNextBlock();
+        } else {
+            key = nextId.incrementAndGet();
+            if (key >= rangeEnd) {
+                key = getNextBlock();
+            }
+        }
+        return key;
+    }
+    
+    private long getNextBlock() {
+        idBlock = controllerRegistry.allocateUniqueIdBlock(range);
+        nextId = new AtomicLong(idBlock.getStart());
+        rangeEnd = idBlock.getEnd();
+        return nextId.get();
+    }
+    
+    public boolean persistIfLeader(long key, IntentOperationList operations) {
+        boolean leader = true;
+        boolean ret = false;
+        // TODO call controllerRegistry.isClusterLeader()
+        if (leader) {
+            try {
+                System.out.println("persist operations to ramcloud");
+                kryo.writeObject(output, operations);
+                output.close();
+                byte[] buffer = stream.toByteArray();
+                table.create(String.valueOf(key).getBytes(), buffer);
+                System.out.println("key is " + key);
+                ret = true;
+            } catch (JRamCloud.ObjectExistsException ex) {
+                log.warn("Failed to store intent journal with key " + key);
+            }
+        }
+        return ret;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/intent/runtime/PathCalcRuntimeModule.java b/src/main/java/net/onrc/onos/intent/runtime/PathCalcRuntimeModule.java
old mode 100644
new mode 100755
index 4c90a23..3a9c607
--- a/src/main/java/net/onrc/onos/intent/runtime/PathCalcRuntimeModule.java
+++ b/src/main/java/net/onrc/onos/intent/runtime/PathCalcRuntimeModule.java
@@ -23,6 +23,8 @@
 import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
 import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
 import net.onrc.onos.ofcontroller.networkgraph.SwitchEvent;
+import net.onrc.onos.intent.persist.PersistIntent;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
 
 /**
  * @author Toshio Koide (t-koide@onlab.us)
@@ -33,8 +35,10 @@
 	private INetworkGraphService networkGraphService;
 	private IntentMap highLevelIntents;
 	private PathIntentMap pathIntents;
+        private IControllerRegistryService controllerRegistry;
+        private PersistIntent persistIntent;
 
-	private IEventChannel<String, IntentOperationList> eventChannel;
+	private IEventChannel<Long, IntentOperationList> eventChannel;
 	private static final String EVENT_CHANNEL_NAME = "onos.pathintent";
 
 	private void reroutePaths(LinkEvent linkEvent) {
@@ -75,6 +79,7 @@
 	public void init(FloodlightModuleContext context) throws FloodlightModuleException {
 		datagridService = context.getServiceImpl(IDatagridService.class);
 		networkGraphService = context.getServiceImpl(INetworkGraphService.class);
+                controllerRegistry = context.getServiceImpl(IControllerRegistryService.class);
 	}
 
 	@Override
@@ -84,19 +89,22 @@
 		pathIntents = new PathIntentMap();
 		eventChannel = datagridService.createChannel(
 				EVENT_CHANNEL_NAME,
-				String.class,
+				Long.class,
 				IntentOperationList.class);
 		networkGraphService.registerNetworkGraphListener(this);
+                persistIntent = new PersistIntent(controllerRegistry, networkGraphService);
+                
 	}
 
 	@Override
 	public IntentOperationList executeIntentOperations(IntentOperationList list) {
 		highLevelIntents.executeOperations(list);
 		IntentOperationList pathIntentOperations = runtime.calcPathIntents(list, pathIntents);
-		String key = "..."; // TODO generate key
+		long key = persistIntent.getKey();
 		System.out.println(pathIntentOperations);
 		pathIntents.executeOperations(pathIntentOperations);
 		eventChannel.addEntry(key, pathIntentOperations);
+                persistIntent.persistIfLeader(key, pathIntentOperations);
 		return pathIntentOperations;
 	}
 
diff --git a/src/main/java/net/onrc/onos/registry/controller/IControllerRegistryService.java b/src/main/java/net/onrc/onos/registry/controller/IControllerRegistryService.java
old mode 100644
new mode 100755
index 33ba272..576eed4
--- a/src/main/java/net/onrc/onos/registry/controller/IControllerRegistryService.java
+++ b/src/main/java/net/onrc/onos/registry/controller/IControllerRegistryService.java
@@ -116,6 +116,15 @@
 	 */
 	public Collection<Long> getSwitchesControlledByController(String controllerId);
 	
+        /**
+         * Get 
+         * @return 
+         */
 	public IdBlock allocateUniqueIdBlock();
+        
+        /**
+         * Get next unique id and retrieve a new range of ids if needed.
+         */
+        public IdBlock allocateUniqueIdBlock(long range);
 	
 }
diff --git a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
old mode 100644
new mode 100755
index 319ea48..69e7b3e
--- a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
@@ -172,4 +172,9 @@
 		restApi.addRestletRoutable(new RegistryWebRoutable());
 	}
 
+    @Override
+    public IdBlock allocateUniqueIdBlock(long range) {
+        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+
 }
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
old mode 100644
new mode 100755
index 3e4d5bf..8706e85
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -41,6 +41,8 @@
 import com.netflix.curator.x.discovery.ServiceDiscovery;
 import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
 import com.netflix.curator.x.discovery.ServiceInstance;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
 /**
  * A registry service that uses Zookeeper. All data is stored in Zookeeper,
@@ -78,6 +80,7 @@
 	//Zookeeper performance-related configuration
 	protected static final int sessionTimeout = 5000;
 	protected static final int connectionTimeout = 7000;
+        private volatile IdBlock idBlock = null;
 	
 
 	protected class SwitchLeaderListener implements LeaderLatchListener{
@@ -385,27 +388,31 @@
 		return data;
 	}
 	
+        public IdBlock allocateUniqueIdBlock(long range) {
+            try {
+                AtomicValue<Long> result = null;
+                do {
+                    result = distributedIdCounter.add(range);
+                } while (result == null || !result.succeeded());
+
+                return new IdBlock(result.preValue(), result.postValue() - 1, range);
+            } catch (Exception e) {
+                log.error("Error allocating ID block");
+            }
+            return null;
+        }
+        
 	/**
 	 * Returns a block of IDs which are unique and unused.
 	 * Range of IDs is fixed size and is assigned incrementally as this method called.
 	 * Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
 	 * requests come up simultaneously.
 	 */
+        @Override
 	public IdBlock allocateUniqueIdBlock(){
-		try {
-			AtomicValue<Long> result = null;
-			do {
-				result = distributedIdCounter.add(ID_BLOCK_SIZE);
-			} while (result == null || !result.succeeded());
-			
-			return new IdBlock(result.preValue(), result.postValue() - 1, ID_BLOCK_SIZE);
-		} catch (Exception e) {
-			log.error("Error allocating ID block");
-		} 
-		
-		return null;
+            return allocateUniqueIdBlock(ID_BLOCK_SIZE);
 	}
-	
+            
 	/*
 	 * IFloodlightModule
 	 */
diff --git a/src/test/java/net/onrc/onos/intent/runtime/UseCaseTest.java b/src/test/java/net/onrc/onos/intent/runtime/UseCaseTest.java
old mode 100644
new mode 100755
index a1f3e80..36162da
--- a/src/test/java/net/onrc/onos/intent/runtime/UseCaseTest.java
+++ b/src/test/java/net/onrc/onos/intent/runtime/UseCaseTest.java
@@ -16,54 +16,77 @@
 import net.onrc.onos.intent.PathIntent;
 import net.onrc.onos.intent.PathIntentMap;
 import net.onrc.onos.intent.ShortestPathIntent;
+import net.onrc.onos.intent.persist.PersistIntent;
 import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphListener;
 import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
 import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
 import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
 
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
  * @author Toshio Koide (t-koide@onlab.us)
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PathCalcRuntimeModule.class)
 public class UseCaseTest {
 	private NetworkGraph g;
 	private FloodlightModuleContext modContext;
 	private IDatagridService datagridService;
 	private INetworkGraphService networkGraphService;
+        private IControllerRegistryService controllerRegistryService;
+        private PersistIntent persistIntent;
 	@SuppressWarnings("rawtypes")
 	private IEventChannel eventChannel;
 
 	@SuppressWarnings("unchecked")
 	@Before
-	public void setUp() {
+	public void setUp() throws Exception {
 		MockNetworkGraph graph = new MockNetworkGraph();
 		graph.createSampleTopology();
 		g = graph;
 
 		datagridService = EasyMock.createMock(IDatagridService.class);
 		networkGraphService = EasyMock.createMock(INetworkGraphService.class);
+                controllerRegistryService = EasyMock.createMock(IControllerRegistryService.class);
 		modContext = EasyMock.createMock(FloodlightModuleContext.class);
 		eventChannel = EasyMock.createMock(IEventChannel.class);
+                persistIntent = PowerMock.createMock(PersistIntent.class);
+                
+                PowerMock.expectNew(PersistIntent.class,
+                        EasyMock.anyObject(IControllerRegistryService.class),
+                        EasyMock.anyObject(INetworkGraphService.class)).andReturn(persistIntent);
 
 		EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IDatagridService.class)))
 		.andReturn(datagridService).once();
 		EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(INetworkGraphService.class)))
 		.andReturn(networkGraphService).once();
+                EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IControllerRegistryService.class)))
+                        .andReturn(controllerRegistryService).once();
+                EasyMock.expect(persistIntent.getKey()).andReturn(1L).anyTimes();
+                EasyMock.expect(persistIntent.persistIfLeader(EasyMock.eq(1L), 
+                        EasyMock.anyObject(IntentOperationList.class))).andReturn(true).anyTimes();
 
 		EasyMock.expect(networkGraphService.getNetworkGraph()).andReturn(g).anyTimes();
 		networkGraphService.registerNetworkGraphListener(EasyMock.anyObject(INetworkGraphListener.class));
 		EasyMock.expectLastCall();
 
-		EasyMock.expect(datagridService.createChannel("onos.pathintent", String.class, IntentOperationList.class))
+		EasyMock.expect(datagridService.createChannel("onos.pathintent", Long.class, IntentOperationList.class))
 		.andReturn(eventChannel).once();
 
 		EasyMock.replay(datagridService);
 		EasyMock.replay(networkGraphService);
 		EasyMock.replay(modContext);
+                EasyMock.replay(controllerRegistryService);
+                PowerMock.replay(persistIntent, PersistIntent.class);
 	}
 
 	@After
@@ -71,6 +94,8 @@
 		EasyMock.verify(datagridService);
 		EasyMock.verify(networkGraphService);
 		EasyMock.verify(modContext);
+                EasyMock.verify(controllerRegistryService);
+                PowerMock.verify(persistIntent, PersistIntent.class);
 	}
 
 	private void showResult(PathIntentMap intents) {