IntentStore: add batch write API
Change-Id: I9d397e9dc3dc6e9ccd21ac6ddacaece79214c470
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index 8da72cf..3a39193 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -18,6 +18,9 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
@@ -34,22 +37,28 @@
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteRequest.Builder;
+import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.impl.CMap;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -70,15 +79,21 @@
private final Logger log = getLogger(getClass());
// Assumption: IntentId will not have synonyms
+ private static final String INTENTS_TABLE = "intents";
private CMap<IntentId, Intent> intents;
+
+ private static final String STATES_TABLE = "intent-states";
private CMap<IntentId, IntentState> states;
// TODO left behind transient state issue: ONOS-103
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
+ private static final String INSTALLABLE_TABLE = "installable-intents";
private CMap<IntentId, List<Intent>> installable;
+ private LoadingCache<IntentId, String> keyCache;
+
private StoreSerializer serializer;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -137,13 +152,23 @@
}
};
- intents = new CMap<>(dbAdminService, dbService, "intents", serializer);
+ keyCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<IntentId, String>() {
- states = new CMap<>(dbAdminService, dbService, "intent-states", serializer);
+ @Override
+ public String load(IntentId key) {
+ return key.toString();
+ }
+ });
+
+ intents = new IntentIdMap<>(dbAdminService, dbService, INTENTS_TABLE, serializer);
+
+ states = new IntentIdMap<>(dbAdminService, dbService, STATES_TABLE, serializer);
transientStates.clear();
- installable = new CMap<>(dbAdminService, dbService, "installable-intents", serializer);
+ installable = new IntentIdMap<>(dbAdminService, dbService, INSTALLABLE_TABLE, serializer);
log.info("Started");
}
@@ -351,4 +376,101 @@
stopTimer(timer);
}
}
+
+ protected String strIntentId(IntentId key) {
+ return keyCache.getUnchecked(key);
+ }
+
+ /**
+ * Distributed Map from IntentId to some value.
+ *
+ * @param <V> Map value type
+ */
+ final class IntentIdMap<V> extends CMap<IntentId, V> {
+
+ /**
+ * Creates a IntentIdMap instance.
+ *
+ * @param dbAdminService DatabaseAdminService to use for this instance
+ * @param dbService DatabaseService to use for this instance
+ * @param tableName table which this Map corresponds to
+ * @param serializer Value serializer
+ */
+ public IntentIdMap(DatabaseAdminService dbAdminService,
+ DatabaseService dbService,
+ String tableName,
+ StoreSerializer serializer) {
+ super(dbAdminService, dbService, tableName, serializer);
+ }
+
+ @Override
+ protected String sK(IntentId key) {
+ return strIntentId(key);
+ }
+ }
+
+ @Override
+ public List<Operation> batchWrite(BatchWrite batch) {
+
+ List<Operation> failed = new ArrayList<>();
+ final Builder builder = BatchWriteRequest.newBuilder();
+
+ for (Operation op : batch.operations()) {
+ switch (op.type()) {
+ case CREATE_INTENT:
+ checkArgument(op.args().size() == 1,
+ "CREATE_INTENT takes 1 argument. %s", op);
+ Intent intent = op.arg(0);
+ builder.putIfAbsent(INTENTS_TABLE, strIntentId(intent.id()), serializer.encode(intent));
+ builder.putIfAbsent(STATES_TABLE, strIntentId(intent.id()), serializer.encode(SUBMITTED));
+ break;
+
+ case REMOVE_INTENT:
+ checkArgument(op.args().size() == 1,
+ "REMOVE_INTENT takes 1 argument. %s", op);
+ IntentId intentId = (IntentId) op.arg(0);
+ builder.remove(INTENTS_TABLE, strIntentId(intentId));
+ builder.remove(STATES_TABLE, strIntentId(intentId));
+ builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
+ break;
+
+ case SET_STATE:
+ checkArgument(op.args().size() == 2,
+ "SET_STATE takes 2 arguments. %s", op);
+ intent = op.arg(0);
+ IntentState newState = op.arg(1);
+ builder.put(STATES_TABLE, strIntentId(intent.id()), serializer.encode(newState));
+ break;
+
+ case SET_INSTALLABLE:
+ checkArgument(op.args().size() == 2,
+ "SET_INSTALLABLE takes 2 arguments. %s", op);
+ intentId = op.arg(0);
+ List<Intent> installableIntents = op.arg(1);
+ builder.put(INSTALLABLE_TABLE, strIntentId(intentId), serializer.encode(installableIntents));
+ break;
+
+ case REMOVE_INSTALLED:
+ checkArgument(op.args().size() == 1,
+ "REMOVE_INSTALLED takes 1 argument. %s", op);
+ intentId = op.arg(0);
+ builder.remove(INSTALLABLE_TABLE, strIntentId(intentId));
+ break;
+
+ default:
+ log.warn("Unknown Operation encountered: {}", op);
+ failed.add(op);
+ break;
+ }
+ }
+
+ BatchWriteResult batchWriteResult = dbService.batchWrite(builder.build());
+ if (batchWriteResult.isSuccessful()) {
+ // no-failure (except for invalid input)
+ return failed;
+ } else {
+ // everything failed
+ return batch.operations();
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
index 29bf15f..c458446 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -18,6 +18,7 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
@@ -25,6 +26,7 @@
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +40,7 @@
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
@@ -46,12 +49,16 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -341,6 +348,207 @@
}
}
+ @Override
+ public List<Operation> batchWrite(BatchWrite batch) {
+ // Hazelcast version will never fail for conditional failure now.
+ List<Operation> failed = new ArrayList<>();
+
+ List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
+
+ for (Operation op : batch.operations()) {
+ switch (op.type()) {
+ case CREATE_INTENT:
+ checkArgument(op.args().size() == 1,
+ "CREATE_INTENT takes 1 argument. %s", op);
+ Intent intent = op.arg(0);
+ futures.add(Pair.of(op,
+ ImmutableList.of(intents.putAsync(intent.id(), intent),
+ states.putAsync(intent.id(), SUBMITTED))));
+ break;
+
+ case REMOVE_INTENT:
+ checkArgument(op.args().size() == 1,
+ "REMOVE_INTENT takes 1 argument. %s", op);
+ IntentId intentId = (IntentId) op.arg(0);
+ futures.add(Pair.of(op,
+ ImmutableList.of(intents.removeAsync(intentId),
+ states.removeAsync(intentId),
+ installable.removeAsync(intentId))));
+ break;
+
+ case SET_STATE:
+ checkArgument(op.args().size() == 2,
+ "SET_STATE takes 2 arguments. %s", op);
+ intent = op.arg(0);
+ IntentState newState = op.arg(1);
+ futures.add(Pair.of(op,
+ ImmutableList.of(states.putAsync(intent.id(), newState))));
+ break;
+
+ case SET_INSTALLABLE:
+ checkArgument(op.args().size() == 2,
+ "SET_INSTALLABLE takes 2 arguments. %s", op);
+ intentId = op.arg(0);
+ List<Intent> installableIntents = op.arg(1);
+ futures.add(Pair.of(op,
+ ImmutableList.of(installable.putAsync(intentId, installableIntents))));
+ break;
+
+ case REMOVE_INSTALLED:
+ checkArgument(op.args().size() == 1,
+ "REMOVE_INSTALLED takes 1 argument. %s", op);
+ intentId = op.arg(0);
+ futures.add(Pair.of(op,
+ ImmutableList.of(installable.removeAsync(intentId))));
+ break;
+
+ default:
+ log.warn("Unknown Operation encountered: {}", op);
+ failed.add(op);
+ break;
+ }
+ }
+
+ // verify result
+ for (Pair<Operation, List<Future<?>>> future : futures) {
+ final Operation op = future.getLeft();
+ final List<Future<?>> subops = future.getRight();
+
+ switch (op.type()) {
+
+ case CREATE_INTENT:
+ {
+ Intent intent = op.arg(0);
+ IntentState newIntentState = SUBMITTED;
+
+ try {
+ Intent prevIntent = (Intent) subops.get(0).get();
+ IntentState prevIntentState = (IntentState) subops.get(1).get();
+
+ if (prevIntent != null || prevIntentState != null) {
+ log.warn("Overwriting existing Intent: {}@{} with {}@{}",
+ prevIntent, prevIntentState,
+ intent, newIntentState);
+ }
+ } catch (InterruptedException e) {
+ log.error("Batch write was interrupted while processing {}", op, e);
+ failed.add(op);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Batch write failed processing {}", op, e);
+ failed.add(op);
+ }
+ break;
+ }
+
+ case REMOVE_INTENT:
+ {
+ IntentId intentId = op.arg(0);
+
+ try {
+ Intent prevIntent = (Intent) subops.get(0).get();
+ IntentState prevIntentState = (IntentState) subops.get(1).get();
+ @SuppressWarnings("unchecked")
+ List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
+
+ if (prevIntent == null) {
+ log.warn("Intent {} was already removed.", intentId);
+ }
+ if (prevIntentState == null) {
+ log.warn("Intent {} state was already removed", intentId);
+ }
+ if (prevInstallable == null) {
+ log.info("Intent {} installable was already removed", intentId);
+ }
+ } catch (InterruptedException e) {
+ log.error("Batch write was interrupted while processing {}", op, e);
+ failed.add(op);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Batch write failed processing {}", op, e);
+ failed.add(op);
+ }
+ break;
+ }
+
+ case SET_STATE:
+ {
+ Intent intent = op.arg(0);
+ IntentId intentId = intent.id();
+ IntentState newState = op.arg(1);
+
+ try {
+ IntentState prevIntentState = (IntentState) subops.get(0).get();
+ log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
+ // TODO sanity check and log?
+ } catch (InterruptedException e) {
+ log.error("Batch write was interrupted while processing {}", op, e);
+ failed.add(op);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Batch write failed processing {}", op, e);
+ failed.add(op);
+ }
+ break;
+ }
+
+ case SET_INSTALLABLE:
+ {
+ IntentId intentId = op.arg(0);
+ List<Intent> installableIntents = op.arg(1);
+
+ try {
+ @SuppressWarnings("unchecked")
+ List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
+
+ if (prevInstallable != null) {
+ log.warn("Overwriting Intent {} installable {} -> {}",
+ intentId, prevInstallable, installableIntents);
+ }
+ } catch (InterruptedException e) {
+ log.error("Batch write was interrupted while processing {}", op, e);
+ failed.add(op);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Batch write failed processing {}", op, e);
+ failed.add(op);
+ }
+ break;
+ }
+
+ case REMOVE_INSTALLED:
+ {
+ IntentId intentId = op.arg(0);
+
+ try {
+ @SuppressWarnings("unchecked")
+ List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
+
+ if (prevInstallable == null) {
+ log.warn("Intent {} installable was already removed", intentId);
+ }
+ } catch (InterruptedException e) {
+ log.error("Batch write was interrupted while processing {}", op, e);
+ failed.add(op);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Batch write failed processing {}", op, e);
+ failed.add(op);
+ }
+ break;
+ }
+
+ default:
+ log.warn("Unknown Operation encountered: {}", op);
+ if (!failed.contains(op)) {
+ failed.add(op);
+ }
+ break;
+ }
+ }
+ return failed;
+ }
+
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
@Override