IntentStore: add batch write API

Change-Id: I9d397e9dc3dc6e9ccd21ac6ddacaece79214c470
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
index 3af59c1..8dc70d8 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
@@ -15,8 +15,17 @@
  */
 package org.onlab.onos.net.intent;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
 import org.onlab.onos.store.Store;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -110,4 +119,165 @@
      */
     void removeInstalledIntents(IntentId intentId);
 
+
+    /**
+     * Returns a new empty batch write operation buider.
+     *
+     * @return BatchWrite
+     */
+    default BatchWrite newBatchWrite() {
+        return new BatchWrite();
+    }
+
+    // default implementation simply executes them sequentially.
+    // Store implementation should override and implement actual batch write.
+    /**
+     * Execute writes in a batch.
+     *
+     * @param batch BatchWrite to execute
+     * @return failed operations
+     */
+    default List<Operation> batchWrite(BatchWrite batch) {
+        List<Operation> failed = new ArrayList<>();
+        for (Operation op : batch.operations) {
+            switch (op.type) {
+            case CREATE_INTENT:
+                checkArgument(op.args.size() == 1,
+                              "CREATE_INTENT takes 1 argument. %s", op);
+                Intent intent = (Intent) op.args.get(0);
+                if (createIntent(intent) == null) {
+                    failed.add(op);
+                }
+                break;
+
+            case REMOVE_INTENT:
+                checkArgument(op.args.size() == 1,
+                              "REMOVE_INTENT takes 1 argument. %s", op);
+                IntentId intentId = (IntentId) op.args.get(0);
+                removeIntent(intentId);
+                break;
+
+            case REMOVE_INSTALLED:
+                checkArgument(op.args.size() == 1,
+                              "REMOVE_INSTALLED takes 1 argument. %s", op);
+                intentId = (IntentId) op.args.get(0);
+                removeInstalledIntents(intentId);
+                break;
+
+            case SET_INSTALLABLE:
+                checkArgument(op.args.size() == 2,
+                              "SET_INSTALLABLE takes 2 arguments. %s", op);
+                intentId = (IntentId) op.args.get(0);
+                @SuppressWarnings("unchecked")
+                List<Intent> installableIntents = (List<Intent>) op.args.get(1);
+                setInstallableIntents(intentId, installableIntents);
+                break;
+
+            case SET_STATE:
+                checkArgument(op.args.size() == 2,
+                              "SET_STATE takes 2 arguments. %s", op);
+                intent = (Intent) op.args.get(0);
+                IntentState newState = (IntentState) op.args.get(1);
+                setState(intent, newState);
+                break;
+
+            default:
+                break;
+            }
+        }
+        return failed;
+    }
+
+    public static class BatchWrite {
+
+        public enum OpType {
+            CREATE_INTENT,
+            REMOVE_INTENT,
+            SET_STATE,
+            SET_INSTALLABLE,
+            REMOVE_INSTALLED
+        }
+
+        List<Operation> operations = new ArrayList<>();
+
+        public List<Operation> operations() {
+            return Collections.unmodifiableList(operations);
+        }
+
+        public boolean isEmpty() {
+            return operations.isEmpty();
+        }
+
+        public BatchWrite createIntent(Intent intent) {
+            operations.add(Operation.of(OpType.CREATE_INTENT,
+                                        ImmutableList.of(intent)));
+            return this;
+        }
+
+        public BatchWrite removeIntent(IntentId intentId) {
+            operations.add(Operation.of(OpType.REMOVE_INTENT,
+                                        ImmutableList.of(intentId)));
+            return this;
+        }
+
+        public BatchWrite setState(Intent intent, IntentState newState) {
+            operations.add(Operation.of(OpType.SET_STATE,
+                                        ImmutableList.of(intent, newState)));
+            return this;
+        }
+
+        public BatchWrite setInstallableIntents(IntentId intentId, List<Intent> installableIntents) {
+            operations.add(Operation.of(OpType.SET_INSTALLABLE,
+                                        ImmutableList.of(intentId, installableIntents)));
+            return this;
+        }
+
+        public BatchWrite removeInstalledIntents(IntentId intentId) {
+            operations.add(Operation.of(OpType.REMOVE_INSTALLED,
+                                        ImmutableList.of(intentId)));
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("operations", operations)
+                    .toString();
+        }
+
+        public static class Operation {
+            final OpType type;
+            final ImmutableList<Object> args;
+
+            public static Operation of(OpType type, List<Object> args) {
+                return new Operation(type, args);
+            }
+
+            public Operation(OpType type, List<Object> args) {
+                this.type = checkNotNull(type);
+                this.args = ImmutableList.copyOf(args);
+            }
+
+            public OpType type() {
+                return type;
+            }
+
+            public ImmutableList<Object> args() {
+                return args;
+            }
+
+            @SuppressWarnings("unchecked")
+            public <T> T arg(int i) {
+                return (T) args.get(i);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(getClass())
+                        .add("type", type)
+                        .add("args", args)
+                        .toString();
+            }
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index 8da72cf..3a39193 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -18,6 +18,9 @@
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
 import com.google.common.base.Verify;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -34,22 +37,28 @@
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.IntentStore;
 import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.serializers.KryoNamespaces;
 import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteRequest.Builder;
+import org.onlab.onos.store.service.BatchWriteResult;
 import org.onlab.onos.store.service.DatabaseAdminService;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.impl.CMap;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.onlab.onos.net.intent.IntentState.*;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -70,15 +79,21 @@
     private final Logger log = getLogger(getClass());
 
     // Assumption: IntentId will not have synonyms
+    private static final String INTENTS_TABLE = "intents";
     private CMap<IntentId, Intent> intents;
+
+    private static final String STATES_TABLE = "intent-states";
     private CMap<IntentId, IntentState> states;
 
     // TODO left behind transient state issue: ONOS-103
     // Map to store instance local intermediate state transition
     private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
 
+    private static final String INSTALLABLE_TABLE = "installable-intents";
     private CMap<IntentId, List<Intent>> installable;
 
+    private LoadingCache<IntentId, String> keyCache;
+
     private StoreSerializer serializer;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -137,13 +152,23 @@
             }
         };
 
-        intents = new CMap<>(dbAdminService, dbService, "intents", serializer);
+        keyCache = CacheBuilder.newBuilder()
+                .softValues()
+                .build(new CacheLoader<IntentId, String>() {
 
-        states = new CMap<>(dbAdminService, dbService, "intent-states", serializer);
+                    @Override
+                    public String load(IntentId key) {
+                        return key.toString();
+                    }
+                });
+
+        intents = new IntentIdMap<>(dbAdminService, dbService, INTENTS_TABLE, serializer);
+
+        states = new IntentIdMap<>(dbAdminService, dbService, STATES_TABLE, serializer);
 
         transientStates.clear();
 
-        installable = new CMap<>(dbAdminService, dbService, "installable-intents", serializer);
+        installable = new IntentIdMap<>(dbAdminService, dbService, INSTALLABLE_TABLE, serializer);
 
         log.info("Started");
     }
@@ -351,4 +376,101 @@
             stopTimer(timer);
         }
     }
+
+    protected String strIntentId(IntentId key) {
+        return keyCache.getUnchecked(key);
+    }
+
+    /**
+     * Distributed Map from IntentId to some value.
+     *
+     * @param <V> Map value type
+     */
+    final class IntentIdMap<V> extends CMap<IntentId, V> {
+
+        /**
+         * Creates a IntentIdMap instance.
+         *
+         * @param dbAdminService DatabaseAdminService to use for this instance
+         * @param dbService DatabaseService to use for this instance
+         * @param tableName table which this Map corresponds to
+         * @param serializer Value serializer
+         */
+        public IntentIdMap(DatabaseAdminService dbAdminService,
+                         DatabaseService dbService,
+                         String tableName,
+                         StoreSerializer serializer) {
+            super(dbAdminService, dbService, tableName, serializer);
+        }
+
+        @Override
+        protected String sK(IntentId key) {
+            return strIntentId(key);
+        }
+    }
+
+    @Override
+    public List<Operation> batchWrite(BatchWrite batch) {
+
+        List<Operation> failed = new ArrayList<>();
+        final Builder builder = BatchWriteRequest.newBuilder();
+
+        for (Operation op : batch.operations()) {
+            switch (op.type()) {
+            case CREATE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "CREATE_INTENT takes 1 argument. %s", op);
+                Intent intent = op.arg(0);
+                builder.putIfAbsent(INTENTS_TABLE, strIntentId(intent.id()), serializer.encode(intent));
+                builder.putIfAbsent(STATES_TABLE, strIntentId(intent.id()), serializer.encode(SUBMITTED));
+                break;
+
+            case REMOVE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INTENT takes 1 argument. %s", op);
+                IntentId intentId = (IntentId) op.arg(0);
+                builder.remove(INTENTS_TABLE, strIntentId(intentId));
+                builder.remove(STATES_TABLE, strIntentId(intentId));
+                builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
+                break;
+
+            case SET_STATE:
+                checkArgument(op.args().size() == 2,
+                              "SET_STATE takes 2 arguments. %s", op);
+                intent = op.arg(0);
+                IntentState newState = op.arg(1);
+                builder.put(STATES_TABLE, strIntentId(intent.id()), serializer.encode(newState));
+                break;
+
+            case SET_INSTALLABLE:
+                checkArgument(op.args().size() == 2,
+                              "SET_INSTALLABLE takes 2 arguments. %s", op);
+                intentId = op.arg(0);
+                List<Intent> installableIntents = op.arg(1);
+                builder.put(INSTALLABLE_TABLE, strIntentId(intentId), serializer.encode(installableIntents));
+                break;
+
+            case REMOVE_INSTALLED:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INSTALLED takes 1 argument. %s", op);
+                intentId = op.arg(0);
+                builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
+                break;
+
+            default:
+                log.warn("Unknown Operation encountered: {}", op);
+                failed.add(op);
+                break;
+            }
+        }
+
+        BatchWriteResult batchWriteResult = dbService.batchWrite(builder.build());
+        if (batchWriteResult.isSuccessful()) {
+            // no-failure (except for invalid input)
+            return failed;
+        } else {
+            // everything failed
+            return batch.operations();
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
index 29bf15f..c458446 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -18,6 +18,7 @@
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
 import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.hazelcast.core.EntryAdapter;
 import com.hazelcast.core.EntryEvent;
@@ -25,6 +26,7 @@
 import com.hazelcast.core.IMap;
 import com.hazelcast.core.Member;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +40,7 @@
 import org.onlab.onos.net.intent.IntentId;
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
 import org.onlab.onos.net.intent.IntentStoreDelegate;
 import org.onlab.onos.store.hz.AbstractHazelcastStore;
 import org.onlab.onos.store.hz.SMap;
@@ -46,12 +49,16 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.onlab.onos.net.intent.IntentState.*;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -341,6 +348,207 @@
         }
     }
 
+    @Override
+    public List<Operation> batchWrite(BatchWrite batch) {
+        // Hazelcast version will never fail for conditional failure now.
+        List<Operation> failed = new ArrayList<>();
+
+        List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
+
+        for (Operation op : batch.operations()) {
+            switch (op.type()) {
+            case CREATE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "CREATE_INTENT takes 1 argument. %s", op);
+                Intent intent = op.arg(0);
+                futures.add(Pair.of(op,
+                                    ImmutableList.of(intents.putAsync(intent.id(), intent),
+                                                     states.putAsync(intent.id(), SUBMITTED))));
+                break;
+
+            case REMOVE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INTENT takes 1 argument. %s", op);
+                IntentId intentId = (IntentId) op.arg(0);
+                futures.add(Pair.of(op,
+                                    ImmutableList.of(intents.removeAsync(intentId),
+                                                     states.removeAsync(intentId),
+                                                     installable.removeAsync(intentId))));
+                break;
+
+            case SET_STATE:
+                checkArgument(op.args().size() == 2,
+                              "SET_STATE takes 2 arguments. %s", op);
+                intent = op.arg(0);
+                IntentState newState = op.arg(1);
+                futures.add(Pair.of(op,
+                                    ImmutableList.of(states.putAsync(intent.id(), newState))));
+                break;
+
+            case SET_INSTALLABLE:
+                checkArgument(op.args().size() == 2,
+                              "SET_INSTALLABLE takes 2 arguments. %s", op);
+                intentId = op.arg(0);
+                List<Intent> installableIntents = op.arg(1);
+                futures.add(Pair.of(op,
+                                    ImmutableList.of(installable.putAsync(intentId, installableIntents))));
+                break;
+
+            case REMOVE_INSTALLED:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INSTALLED takes 1 argument. %s", op);
+                intentId = op.arg(0);
+                futures.add(Pair.of(op,
+                                    ImmutableList.of(installable.removeAsync(intentId))));
+                break;
+
+            default:
+                log.warn("Unknown Operation encountered: {}", op);
+                failed.add(op);
+                break;
+            }
+        }
+
+        // verify result
+        for (Pair<Operation, List<Future<?>>> future : futures) {
+            final Operation op = future.getLeft();
+            final List<Future<?>> subops = future.getRight();
+
+            switch (op.type()) {
+
+            case CREATE_INTENT:
+            {
+                Intent intent = op.arg(0);
+                IntentState newIntentState = SUBMITTED;
+
+                try {
+                    Intent prevIntent = (Intent) subops.get(0).get();
+                    IntentState prevIntentState = (IntentState) subops.get(1).get();
+
+                    if (prevIntent != null || prevIntentState != null) {
+                        log.warn("Overwriting existing Intent: {}@{} with {}@{}",
+                                 prevIntent, prevIntentState,
+                                 intent, newIntentState);
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Batch write was interrupted while processing {}", op,  e);
+                    failed.add(op);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("Batch write failed processing {}", op,  e);
+                    failed.add(op);
+                }
+                break;
+            }
+
+            case REMOVE_INTENT:
+            {
+                IntentId intentId = op.arg(0);
+
+                try {
+                    Intent prevIntent = (Intent) subops.get(0).get();
+                    IntentState prevIntentState = (IntentState) subops.get(1).get();
+                    @SuppressWarnings("unchecked")
+                    List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
+
+                    if (prevIntent == null) {
+                        log.warn("Intent {} was already removed.", intentId);
+                    }
+                    if (prevIntentState == null) {
+                        log.warn("Intent {} state was already removed", intentId);
+                    }
+                    if (prevInstallable == null) {
+                        log.info("Intent {} installable was already removed", intentId);
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Batch write was interrupted while processing {}", op,  e);
+                    failed.add(op);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("Batch write failed processing {}", op,  e);
+                    failed.add(op);
+                }
+                break;
+            }
+
+            case SET_STATE:
+            {
+                Intent intent = op.arg(0);
+                IntentId intentId = intent.id();
+                IntentState newState = op.arg(1);
+
+                try {
+                    IntentState prevIntentState = (IntentState) subops.get(0).get();
+                    log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
+                    // TODO sanity check and log?
+                } catch (InterruptedException e) {
+                    log.error("Batch write was interrupted while processing {}", op,  e);
+                    failed.add(op);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("Batch write failed processing {}", op,  e);
+                    failed.add(op);
+                }
+                break;
+            }
+
+            case SET_INSTALLABLE:
+            {
+                IntentId intentId = op.arg(0);
+                List<Intent> installableIntents = op.arg(1);
+
+                try {
+                    @SuppressWarnings("unchecked")
+                    List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
+
+                    if (prevInstallable != null) {
+                        log.warn("Overwriting Intent {} installable {} -> {}",
+                                 intentId, prevInstallable, installableIntents);
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Batch write was interrupted while processing {}", op,  e);
+                    failed.add(op);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("Batch write failed processing {}", op,  e);
+                    failed.add(op);
+                }
+                break;
+            }
+
+            case REMOVE_INSTALLED:
+            {
+                IntentId intentId = op.arg(0);
+
+                try {
+                    @SuppressWarnings("unchecked")
+                    List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
+
+                    if (prevInstallable == null) {
+                        log.warn("Intent {} installable was already removed", intentId);
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Batch write was interrupted while processing {}", op,  e);
+                    failed.add(op);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("Batch write failed processing {}", op,  e);
+                    failed.add(op);
+                }
+                break;
+            }
+
+            default:
+                log.warn("Unknown Operation encountered: {}", op);
+                if (!failed.contains(op)) {
+                    failed.add(op);
+                }
+                break;
+            }
+        }
+        return failed;
+    }
+
     public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
 
         @Override