/* Copyright (c) 2013 Stanford University
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

package com.tinkerpop.blueprints.impls.ramcloud;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.jersey.core.util.Base64;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Features;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Index;
import com.tinkerpop.blueprints.IndexableGraph;
import com.tinkerpop.blueprints.KeyIndexableGraph;
import com.tinkerpop.blueprints.Parameter;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.DefaultGraphQuery;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;

import edu.stanford.ramcloud.JRamCloud;
import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;

public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable {
    private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);

    private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>();

    protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... )
    protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
    protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
    protected long idxVertTableId;
    protected long idxEdgeTableId;
    protected long kidxVertTableId;
    protected long kidxEdgeTableId;
    protected long instanceTableId;
    private String VERT_TABLE_NAME = "verts";
    private String EDGE_PROP_TABLE_NAME = "edge_props";
    private String VERT_PROP_TABLE_NAME = "vert_props";
    private String IDX_VERT_TABLE_NAME = "idx_vert";
    private String IDX_EDGE_TABLE_NAME = "idx_edge";
    private String KIDX_VERT_TABLE_NAME = "kidx_vert";
    private String KIDX_EDGE_TABLE_NAME = "kidx_edge";
    private final String INSTANCE_TABLE_NAME = "instance";
    private long instanceId;
    private AtomicLong nextVertexId;
    private final long INSTANCE_ID_RANGE = 100000;
    private String coordinatorLocation;
    private static final Features FEATURES = new Features();
    // FIXME better loop variable name
    public final int CONDITIONALWRITE_RETRY_MAX = 100;
    public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0"));
    public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0"));
    public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0"));


    public final Set<String> indexedKeys = new HashSet<String>();

    static {
	FEATURES.supportsSerializableObjectProperty = true;
	FEATURES.supportsBooleanProperty = true;
	FEATURES.supportsDoubleProperty = true;
	FEATURES.supportsFloatProperty = true;
	FEATURES.supportsIntegerProperty = true;
	FEATURES.supportsPrimitiveArrayProperty = true;
	FEATURES.supportsUniformListProperty = true;
	FEATURES.supportsMixedListProperty = true;
	FEATURES.supportsLongProperty = true;
	FEATURES.supportsMapProperty = true;
	FEATURES.supportsStringProperty = true;

	FEATURES.supportsDuplicateEdges = false;
	FEATURES.supportsSelfLoops = false;
	FEATURES.isPersistent = false;
	FEATURES.isWrapper = false;
	FEATURES.supportsVertexIteration = true;
	FEATURES.supportsEdgeIteration = true;
	FEATURES.supportsVertexIndex = true;
	FEATURES.supportsEdgeIndex = false;
	FEATURES.ignoresSuppliedIds = true;
	FEATURES.supportsTransactions = false;
	FEATURES.supportsIndices = false;
	FEATURES.supportsKeyIndices = true;
	FEATURES.supportsVertexKeyIndex = true;
	FEATURES.supportsEdgeKeyIndex = false;
	FEATURES.supportsEdgeRetrieval = true;
	FEATURES.supportsVertexProperties = true;
	FEATURES.supportsEdgeProperties = true;
	FEATURES.supportsThreadedTransactions = false;
    }

    static {
	System.loadLibrary("edu_stanford_ramcloud_JRamCloud");
    }

    private RamCloudGraph() {
    }


    public RamCloudGraph(String coordinatorLocation) {
	this.coordinatorLocation = coordinatorLocation;

	JRamCloud rcClient = getRcClient();

	vertTableId = rcClient.createTable(VERT_TABLE_NAME);
	vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME);
	edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME);
	idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME);
	idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME);
	kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME);
	kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME);
	instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME);

	log.info( "Connected to coordinator at {}", coordinatorLocation);
	log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId);
	nextVertexId = new AtomicLong(-1);
        initInstance();
    }

    public JRamCloud getRcClient() {
	JRamCloud rcClient = RamCloudThreadLocal.get();
	if (rcClient == null) {
	    rcClient = new JRamCloud(coordinatorLocation);
	    RamCloudThreadLocal.set(rcClient);
	}
	return rcClient;
    }

    @Override
    public Features getFeatures() {
	return FEATURES;
    }

    private Long parseVertexId(Object id) {
	Long longId;
	if (id == null) {
	    longId = nextVertexId.incrementAndGet();
	} else if (id instanceof Integer) {
	    longId = ((Integer) id).longValue();
	} else if (id instanceof Long) {
	    longId = (Long) id;
	} else if (id instanceof String) {
	    try {
		longId = Long.parseLong((String) id, 10);
	    } catch (NumberFormatException e) {
		log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
		return null;
	    }
	} else if (id instanceof byte[]) {
	    try {
		longId = ByteBuffer.wrap((byte[]) id).getLong();
	    } catch (BufferUnderflowException e) {
		log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
		return null;
	    }
	} else {
	    log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
	    return null;
	}
	return longId;
    }

    @Override
    public Vertex addVertex(Object id) {
	long startTime = 0;
	long Tstamp1 = 0;
	long Tstamp2 = 0;

	if (measureBPTimeProp == 1) {
	    startTime = System.nanoTime();
	}
	Long longId = parseVertexId(id);
	if (longId == null)
	    return null;
	if (measureBPTimeProp == 1) {
	    Tstamp1 = System.nanoTime();
	}
	RamCloudVertex newVertex = new RamCloudVertex(longId, this);
	if (measureBPTimeProp == 1) {
	    Tstamp2 = System.nanoTime();
	    log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2);
	}

	try {
	    newVertex.create();
	    if (measureBPTimeProp == 1) {
		long endTime = System.nanoTime();
		log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime);
	    }
	    log.info("Added vertex: [id={}]", longId);
	    return newVertex;
	} catch (IllegalArgumentException e) {
	    log.error("Tried to create vertex failed {" + newVertex + "}", e);
	    return null;
	}
    }

    public List<RamCloudVertex> addVertices(Iterable<Object> ids) {
	log.info("addVertices start");
	List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>();

	for (Object id: ids) {
	    Long longId = parseVertexId(id);
	    if (longId == null)
		return null;
	    RamCloudVertex v = new RamCloudVertex(longId, this);
	    if (v.exists()) {
		log.error("ramcloud vertex id: {} already exists", v.getId());
		throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId());
	    }
	    vertices.add(v);
	}
	MultiWriteObject multiWriteObjects[] = new MultiWriteObject[vertices.size() * 2];
	for (int i=0; i < vertices.size(); i++) {
	    RamCloudVertex v = vertices.get(i);
	    multiWriteObjects[i*2] = new MultiWriteObject(vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
	    multiWriteObjects[i*2+1] = new MultiWriteObject(vertPropTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
	}
	try {
		PerfMon pm = PerfMon.getInstance();
		pm.multiwrite_start("RamCloudVertex create()");
	    getRcClient().multiWrite(multiWriteObjects);
		pm.multiwrite_end("RamCloudVertex create()");
	    log.info("ramcloud vertices are created");
	} catch (Exception e) {
	    log.error("Tried to create vertices failed {}", e);
	    return null;
	}
	log.info("addVertices end (success)");
	return vertices;
    }

    private final void initInstance() {
        //long incrementValue = 1;
        JRamCloud.Object instanceEntry = null;
        JRamCloud rcClient = getRcClient();
        try {
            instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
        } catch (Exception e) {
            if (e instanceof JRamCloud.ObjectDoesntExistException) {
                instanceId = 0;
                rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array());
            }
        }
        if (instanceEntry != null) {
	    long curInstanceId = 1;
	    for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) {
		Map<String, Long> propMap = null;
		if (instanceEntry.value == null) {
		    log.warn("Got a null byteArray argument");
		    return;
		} else if (instanceEntry.value.length != 0) {
		    try {
			ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value);
			ObjectInputStream ois = new ObjectInputStream(bais);
			propMap = (Map<String, Long>) ois.readObject();
		    } catch (IOException e) {
			log.error("Got an exception while deserializing element's property map: ", e);
			return;
		    } catch (ClassNotFoundException e) {
			log.error("Got an exception while deserializing element's property map: ", e);
			return;
		    }
		} else {
		    propMap = new HashMap<String, Long>();
		}

		if (propMap.containsKey(INSTANCE_TABLE_NAME)) {
		    curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1;
		}

		propMap.put(INSTANCE_TABLE_NAME, curInstanceId);

		byte[] rcValue = null;
		try {
		    ByteArrayOutputStream baos = new ByteArrayOutputStream();
		    ObjectOutputStream oot = new ObjectOutputStream(baos);
		    oot.writeObject(propMap);
		    rcValue = baos.toByteArray();
		} catch (IOException e) {
		    log.error("Got an exception while serializing element's property map", e);
		    return;
		}
		JRamCloud.RejectRules rules = rcClient.new RejectRules();
		rules.setNeVersion(instanceEntry.version);
		try {
		    rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules);
		    instanceId = curInstanceId;
		    break;
		} catch (Exception ex) {
		    log.debug("Cond. Write increment Vertex property: ", ex);
		    instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
		    continue;
		}
	    }
	}

	nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE);
    }

    @Override
    public Vertex getVertex(Object id) throws IllegalArgumentException {
	Long longId;

	if (id == null) {
	    throw ExceptionFactory.vertexIdCanNotBeNull();
	} else if (id instanceof Integer) {
	    longId = ((Integer) id).longValue();
	} else if (id instanceof Long) {
	    longId = (Long) id;
	} else if (id instanceof String) {
	    try {
		longId = Long.parseLong((String) id, 10);
	    } catch (NumberFormatException e) {
		log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
		return null;
	    }
	} else if (id instanceof byte[]) {
	    try {
		longId = ByteBuffer.wrap((byte[]) id).getLong();
	    } catch (BufferUnderflowException e) {
		log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
		return null;
	    }
	} else {
	    log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
	    return null;
	}

	RamCloudVertex vertex = new RamCloudVertex(longId, this);

	if (vertex.exists()) {
	    return vertex;
	} else {
	    return null;
	}
    }

    @Override
    public void removeVertex(Vertex vertex) {
	((RamCloudVertex) vertex).remove();
    }

    @Override
    public Iterable<Vertex> getVertices() {
	JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
	List<Vertex> vertices = new LinkedList<Vertex>();

	while (tableEnum.hasNext()) {
		vertices.add(new RamCloudVertex(tableEnum.next().key, this));
	}

	return vertices;
    }

    @Override
    public Iterable<Vertex> getVertices(String key, Object value) {
	long startTime = 0;
	long Tstamp1 = 0;
	long Tstamp2 = 0;
	long Tstamp3 = 0;
	if (measureBPTimeProp == 1) {
	    startTime = System.nanoTime();
	    log.error("Performance getVertices(key {}) start at {}", key, startTime);
	}

	List<Vertex> vertices = new ArrayList<Vertex>();
	List<Object> vertexList = null;

	JRamCloud vertTable = getRcClient();
	if (measureBPTimeProp == 1) {
	    Tstamp1 = System.nanoTime();
	    log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1);
	}


	if (indexedKeys.contains(key)) {
	    PerfMon pm = PerfMon.getInstance();
	    if (measureBPTimeProp == 1) {
	      Tstamp2 = System.nanoTime();
	      log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2);
	    }
	    RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class);
	    if (measureBPTimeProp == 1) {
	      Tstamp3 = System.nanoTime();
	      log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3);
	    }
	    vertexList = KeyIndex.getElmIdListForPropValue(value.toString());
	    if (vertexList == null) {
		if (measureBPTimeProp == 1) {
		    long endTime = System.nanoTime();
		    log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3));
		}
		return vertices;
	    }

	    final int mreadMax = 400;
	    final int size = Math.min(mreadMax, vertexList.size());
	    JRamCloud.multiReadObject vertPropTableMread[] = new JRamCloud.multiReadObject[size];

	    int vertexNum = 0;
	    for (Object vert : vertexList) {
		byte[] rckey =
			ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array();
		vertPropTableMread[vertexNum] = new JRamCloud.multiReadObject(vertPropTableId, rckey);
		if (vertexNum >= (mreadMax - 1)) {
		    pm.multiread_start("RamCloudGraph getVertices()");
		    JRamCloud.Object outvertPropTable[] =
			    vertTable.multiRead(vertPropTableMread);
		    pm.multiread_end("RamCloudGraph getVertices()");
		    for (int i = 0; i < outvertPropTable.length; i++) {
			if (outvertPropTable[i] != null) {
			    vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
			}
		    }
		    vertexNum = 0;
		    continue;
		}
		vertexNum++;
	    }

	    if (vertexNum != 0) {
		JRamCloud.multiReadObject mread_leftover[] = Arrays.copyOf(vertPropTableMread, vertexNum);

		long startTime2 = 0;
		if (measureRcTimeProp == 1) {
		    startTime2 = System.nanoTime();
		}
		pm.multiread_start("RamCloudGraph getVertices()");
		JRamCloud.Object outvertPropTable[] = vertTable.multiRead(mread_leftover);
		pm.multiread_end("RamCloudGraph getVertices()");
		if (measureRcTimeProp == 1) {
		    long endTime2 = System.nanoTime();
		    log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2);
		}
		for (int i = 0; i < outvertPropTable.length; i++) {
		    if (outvertPropTable[i] != null) {
			vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
		    }
		}
	    }
	} else {

	    JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
	    JRamCloud.Object tableEntry;

	    while (tableEnum.hasNext()) {
		tableEntry = tableEnum.next();
		if (tableEntry != null) {
		    //XXX remove temp
		    // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this);
		    Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
		    if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
			vertices.add(new RamCloudVertex(tableEntry.key, this));
		    }
		}
	    }
	}

	if (measureBPTimeProp == 1) {
		long endTime = System.nanoTime();
		log.error("Performance getVertices exists total time {}.", endTime - startTime);
	}

	return vertices;
    }

    @Override
    public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException {
	log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label);

	if (label == null) {
	    throw ExceptionFactory.edgeLabelCanNotBeNull();
	}

	RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this);

	for (int i = 0; i < 5 ;i++) {
	    try {
		newEdge.create();
		return newEdge;
	    } catch (Exception e) {
		log.warn("Tried to create edge failed: {" + newEdge + "}: ", e);

		if (e instanceof NoSuchElementException) {
		    log.error("addEdge RETRYING {}", i);
		    continue;
		}
	    }
	}
	return null;
    }

    public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException {
	//TODO WIP: need multi-write
	log.info("addEdges start");
	ArrayList<Edge> edges = new ArrayList<Edge>();
	for (Edge edge: edgeEntities) {
	    edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel()));
	}
	log.info("addVertices end");
	return edges;
    }

    public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) {
	// TODO WIP: need multi-write
	log.info("setProperties start");
	for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) {
	    e.getKey().setProperties(e.getValue());
	}
	log.info("setProperties end");
    }

    @Override
    public Edge getEdge(Object id) throws IllegalArgumentException {
	byte[] bytearrayId;

	if (id == null) {
	    throw ExceptionFactory.edgeIdCanNotBeNull();
	} else if (id instanceof byte[]) {
	    bytearrayId = (byte[]) id;
	} else if (id instanceof String) {
	    bytearrayId = Base64.decode(((String) id));
	} else {
	    log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
	    return null;
	}

	if (!RamCloudEdge.isValidEdgeId(bytearrayId)) {
	    log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass());
	    return null;
	}

	RamCloudEdge edge = new RamCloudEdge(bytearrayId, this);

	if (edge.exists()) {
	    return edge;
	} else {
	    return null;
	}
    }

    @Override
    public void removeEdge(Edge edge) {
	edge.remove();
    }

    @Override
    public Iterable<Edge> getEdges() {
	JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
	List<Edge> edges = new ArrayList<Edge>();

	while (tableEnum.hasNext()) {
	    edges.add(new RamCloudEdge(tableEnum.next().key, this));
	}

	return edges;
    }

    @Override
    public Iterable<Edge> getEdges(String key, Object value) {
	JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
	List<Edge> edges = new ArrayList<Edge>();
	JRamCloud.Object tableEntry;

	while (tableEnum.hasNext()) {
	    tableEntry = tableEnum.next();
		// FIXME temp
		//RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this);
	    Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
	    if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
		edges.add(new RamCloudEdge(tableEntry.key, this));
	    }
	}

	return edges;
    }

    @Override
    public GraphQuery query() {
	return new DefaultGraphQuery(this);
    }

    @Override
    public void shutdown() {
	JRamCloud rcClient = getRcClient();
	rcClient.dropTable(VERT_TABLE_NAME);
	rcClient.dropTable(VERT_PROP_TABLE_NAME);
	rcClient.dropTable(EDGE_PROP_TABLE_NAME);
	rcClient.dropTable(IDX_VERT_TABLE_NAME);
	rcClient.dropTable(IDX_EDGE_TABLE_NAME);
	rcClient.dropTable(KIDX_VERT_TABLE_NAME);
	rcClient.dropTable(KIDX_EDGE_TABLE_NAME);
	rcClient.disconnect();
    }

    @Override
    public void stopTransaction(Conclusion conclusion) {
	// TODO Auto-generated method stub
    }

    @Override
    public void commit() {
	// TODO Auto-generated method stub
    }

    @Override
    public void rollback() {
	// TODO Auto-generated method stub
    }

    @Override
    public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) {
	throw new UnsupportedOperationException("Not supported yet.");
	//FIXME how to dropKeyIndex
	//new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass);
	//getIndexedKeys(key, elementClass).removeIndex();
    }

    @Override
    public <T extends Element> void createKeyIndex(String key,
	    Class<T> elementClass, Parameter... indexParameters) {
	if (key == null) {
	    return;
	}
	if (this.indexedKeys.contains(key)) {
	    return;
	}
	this.indexedKeys.add(key);
    }

    @Override
    public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) {
	if (null != this.indexedKeys) {
	    return new HashSet<String>(this.indexedKeys);
	} else {
	    return Collections.emptySet();
	}
    }

    @Override
    public <T extends Element> Index<T> createIndex(String indexName,
	    Class<T> indexClass, Parameter... indexParameters) {
	throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) {
	throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public Iterable<Index<? extends Element>> getIndices() {
	throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public void dropIndex(String indexName) {
	throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public String toString() {
	return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]";
    }

    public static void main(String[] args) {
	RamCloudGraph graph = new RamCloudGraph();

	Vertex a = graph.addVertex(null);
	Vertex b = graph.addVertex(null);
	Vertex c = graph.addVertex(null);
	Vertex d = graph.addVertex(null);
	Vertex e = graph.addVertex(null);
	Vertex f = graph.addVertex(null);
	Vertex g = graph.addVertex(null);

	graph.addEdge(null, a, a, "friend");
	graph.addEdge(null, a, b, "friend1");
	graph.addEdge(null, a, b, "friend2");
	graph.addEdge(null, a, b, "friend3");
	graph.addEdge(null, a, c, "friend");
	graph.addEdge(null, a, d, "friend");
	graph.addEdge(null, a, e, "friend");
	graph.addEdge(null, a, f, "friend");
	graph.addEdge(null, a, g, "friend");

	graph.shutdown();
    }
}
