Implement Atomix AsyncAtomicCounterMap, AtomicCounterMap and state machine.
Change-Id: Ifd7f60ae8dcfe7239e034a92654b4ef30ffe46ae
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounterMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounterMap.java
new file mode 100644
index 0000000..10b213a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounterMap.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives;
+
+import com.google.common.base.Throwables;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.Synchronous;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Default implementation of {@code AtomicCounterMap}.
+ *
+ * @param <K> map key type
+ */
+public class DefaultAtomicCounterMap<K> extends Synchronous<AsyncAtomicCounterMap<K>> implements AtomicCounterMap<K> {
+
+ private final AsyncAtomicCounterMap<K> asyncCounterMap;
+ private final long operationTimeoutMillis;
+
+ public DefaultAtomicCounterMap(AsyncAtomicCounterMap<K> asyncCounterMap, long operationTimeoutMillis) {
+ super(asyncCounterMap);
+ this.asyncCounterMap = asyncCounterMap;
+ this.operationTimeoutMillis = operationTimeoutMillis;
+ }
+
+ @Override
+ public long incrementAndGet(K key) {
+ return complete(asyncCounterMap.incrementAndGet(key));
+ }
+
+ @Override
+ public long decrementAndGet(K key) {
+ return complete(asyncCounterMap.decrementAndGet(key));
+ }
+
+ @Override
+ public long getAndIncrement(K key) {
+ return complete(asyncCounterMap.getAndIncrement(key));
+ }
+
+ @Override
+ public long getAndDecrement(K key) {
+ return complete(asyncCounterMap.getAndDecrement(key));
+ }
+
+ @Override
+ public long addAndGet(K key, long delta) {
+ return complete(asyncCounterMap.addAndGet(key, delta));
+ }
+
+ @Override
+ public long getAndAdd(K key, long delta) {
+ return complete(asyncCounterMap.getAndAdd(key, delta));
+ }
+
+ @Override
+ public long get(K key) {
+ return complete(asyncCounterMap.get(key));
+ }
+
+ @Override
+ public long put(K key, long newValue) {
+ return complete(asyncCounterMap.put(key, newValue));
+ }
+
+ @Override
+ public long putIfAbsent(K key, long newValue) {
+ return complete(asyncCounterMap.putIfAbsent(key, newValue));
+ }
+
+ @Override
+ public boolean replace(K key, long expectedOldValue, long newValue) {
+ return complete(asyncCounterMap.replace(key, expectedOldValue, newValue));
+ }
+
+ @Override
+ public long remove(K key) {
+ return complete(asyncCounterMap.remove(key));
+ }
+
+ @Override
+ public boolean remove(K key, long value) {
+ return complete(asyncCounterMap.remove(key, value));
+ }
+
+ @Override
+ public int size() {
+ return complete(asyncCounterMap.size());
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return complete(asyncCounterMap.isEmpty());
+ }
+
+ @Override
+ public void clear() {
+ complete(asyncCounterMap.clear());
+ }
+
+ private <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsistentMapException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new ConsistentMapException.Timeout(name());
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new ConsistentMapException(e.getCause());
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 2e986c4..d034261 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -15,9 +15,8 @@
*/
package org.onosproject.store.primitives;
-import java.util.Set;
-
import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -28,6 +27,8 @@
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
+import java.util.Set;
+
/**
* Interface for entity that can create instances of different distributed primitives.
*/
@@ -68,6 +69,17 @@
String name, Serializer serializer);
/**
+ * Creates a new {@code AsyncAtomicCounterMap}.
+ *
+ * @param name counter map name
+ * @param serializer serializer to use for serializing/deserializing keys
+ * @param <K> key type
+ * @return atomic counter map
+ */
+ <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+ String name, Serializer serializer);
+
+ /**
* Creates a new {@code AsyncAtomicCounter}.
*
* @param name counter name
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounterMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounterMap.java
index 6ac096f..e4a5408 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounterMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounterMap.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
+import org.onosproject.store.primitives.DefaultAtomicCounterMap;
+
import java.util.concurrent.CompletableFuture;
/**
@@ -139,4 +141,44 @@
* @return true if the value was removed, false otherwise
*/
CompletableFuture<Boolean> remove(K key, long value);
+
+ /**
+ * Returns the number of entries in the map.
+ *
+ * @return the number of entries in the map
+ */
+ CompletableFuture<Integer> size();
+
+ /**
+ * Returns a boolean indicating whether the map is empty.
+ *
+ * @return true if the map is empty, false otherwise
+ */
+ CompletableFuture<Boolean> isEmpty();
+
+ /**
+ * Removes all entries from the map.
+ *
+ * @return void
+ */
+ CompletableFuture<Void> clear();
+
+ /**
+ * Returns a new {@link AtomicCounterMap} that is backed by this instance.
+ *
+ * @return new {@code AtomicCounterMap} instance
+ */
+ default AtomicCounterMap<K> asAtomicCounterMap() {
+ return asAtomicCounterMap(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+ }
+
+ /**
+ * Returns a new {@link AtomicCounterMap} that is backed by this instance.
+ *
+ * @param timeoutMillis timeout duration for the returned ConsistentMap operations
+ * @return new {@code AtomicCounterMap} instance
+ */
+ default AtomicCounterMap<K> asAtomicCounterMap(long timeoutMillis) {
+ return new DefaultAtomicCounterMap<>(this, timeoutMillis);
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMap.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMap.java
index f6ab5d5..1d56946 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMap.java
@@ -93,7 +93,6 @@
*/
long put(K key, long newValue);
-
/**
* If key is not already associated with a value or if key is associated with
* zero, associate it with newValue. Returns the previous value associated with
@@ -137,4 +136,23 @@
* @return true if the value was removed, false otherwise
*/
boolean remove(K key, long value);
+
+ /**
+ * Returns the number of entries in the map.
+ *
+ * @return the number of entries in the map, including {@code 0} values
+ */
+ int size();
+
+ /**
+ * If the map is empty, returns true, otherwise false.
+ *
+ * @return true if the map is empty.
+ */
+ boolean isEmpty();
+
+ /**
+ * Clears all entries from the map.
+ */
+ void clear();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMapBuilder.java
index 56934f4..48c897b3 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterMapBuilder.java
@@ -20,9 +20,18 @@
/**
* Builder for AtomicCounterMap.
*/
-public abstract class AtomicCounterMapBuilder
- extends DistributedPrimitiveBuilder<AtomicCounterMapBuilder, AsyncAtomicCounterMap> {
+public abstract class AtomicCounterMapBuilder<K>
+ extends DistributedPrimitiveBuilder<AtomicCounterMapBuilder<K>, AtomicCounterMap<K>> {
public AtomicCounterMapBuilder() {
super(DistributedPrimitive.Type.COUNTER_MAP);
}
-}
\ No newline at end of file
+
+ /**
+ * Builds an async atomic counter map based on the configuration options
+ * supplied to this builder.
+ *
+ * @return new async atomic counter map
+ * @throws java.lang.RuntimeException if a mandatory parameter is missing
+ */
+ public abstract AsyncAtomicCounterMap<K> buildAsyncMap();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index d532be2..e034e57 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -69,6 +69,14 @@
<K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder();
/**
+ * Creates a new {@code AtomicCounterMapBuilder}.
+ *
+ * @param <K> key type
+ * @return builder for an atomic counter map
+ */
+ <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder();
+
+ /**
* Creates a new DistributedSetBuilder.
*
* @param <E> set element type
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index c44d3c7..b68958f 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -90,8 +90,14 @@
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
return null;
}
+
@Override
public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
return null;
}
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ return null;
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
new file mode 100644
index 0000000..309220a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+
+/**
+ * Default {@code AtomicCounterMapBuilder}.
+ */
+public class DefaultAtomicCounterMapBuilder<K> extends AtomicCounterMapBuilder<K> {
+
+ private final DistributedPrimitiveCreator primitiveCreator;
+
+ public DefaultAtomicCounterMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
+ this.primitiveCreator = primitiveCreator;
+ }
+
+ @Override
+ public AsyncAtomicCounterMap<K> buildAsyncMap() {
+ return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer());
+ }
+
+ @Override
+ public AtomicCounterMap<K> build() {
+ return buildAsyncMap().asAtomicCounterMap();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index 520b9a6..42fb57c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,13 +15,14 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.function.Function;
-
+import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
+import java.util.function.Function;
+
/**
* Misc utilities for working with {@code DistributedPrimitive}s.
*/
@@ -77,6 +78,22 @@
}
/**
+ * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
+ *
+ * @param map backing map
+ * @param keyEncoder transformer for key type of returned map to key type of input map
+ * @param keyDecoder transformer for key type of input map to key type of returned map
+ * @param <K1> returned map key type
+ * @param <K2> input map key type
+ * @return new counter map
+ */
+ public static <K1, K2> AsyncAtomicCounterMap<K1> newTranscodingAtomicCounterMap(AsyncAtomicCounterMap<K2> map,
+ Function<K1, K2> keyEncoder,
+ Function<K2, K1> keyDecoder) {
+ return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder);
+ }
+
+ /**
* Creates an instance of {@code AsyncConsistentMap} that transforms operations inputs and applies them
* to corresponding operation in a different typed map and returns the output after reverse transforming it.
*
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 78447a8..6dd2322 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -15,16 +15,16 @@
*/
package org.onosproject.store.primitives.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -35,12 +35,12 @@
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hashing;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@code DistributedPrimitiveCreator} that federates responsibility for creating
@@ -89,6 +89,11 @@
}
@Override
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
+ }
+
+ @Override
public AsyncAtomicCounter newAsyncCounter(String name) {
return getCreator(name).newAsyncCounter(name);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index e2820a2..48d9b7e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -47,6 +47,7 @@
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
@@ -156,6 +157,12 @@
}
@Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicCounterMapBuilder<>(federatedPrimitiveCreator);
+ }
+
+ @Override
public <E> DistributedSetBuilder<E> setBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 80ab702..4218b6e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,7 +15,8 @@
*/
package org.onosproject.store.primitives.impl;
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.client.ConnectionStrategies;
@@ -29,15 +30,9 @@
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
@@ -47,6 +42,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -60,8 +56,13 @@
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.slf4j.LoggerFactory.getLogger;
/**
* StoragePartition client.
@@ -140,10 +141,10 @@
}
};
AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
return transcodedMap;
}
@@ -166,9 +167,9 @@
};
AsyncConsistentTreeMap<V> transcodedMap =
DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
- rawMap,
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ rawMap,
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
return transcodedMap;
}
@@ -210,6 +211,19 @@
}
@Override
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ AtomixAtomicCounterMap atomixAtomicCounterMap =
+ client.getResource(name, AtomixAtomicCounterMap.class)
+ .join();
+ AsyncAtomicCounterMap<K> transcodedMap =
+ DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
+ atomixAtomicCounterMap,
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)));
+ return transcodedMap;
+ }
+
+ @Override
public AsyncAtomicCounter newAsyncCounter(String name) {
DistributedLong distributedLong = client.getLong(name).join();
return new AtomixCounter(name, distributedLong);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicCounterMap.java
new file mode 100644
index 0000000..69a3a08
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicCounterMap.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * An {@code AsyncAtomicCounterMap} that transcodes keys.
+ */
+public class TranscodingAsyncAtomicCounterMap<K1, K2> implements AsyncAtomicCounterMap<K1> {
+ private final AsyncAtomicCounterMap<K2> backingMap;
+ private final Function<K1, K2> keyEncoder;
+ private final Function<K2, K1> keyDecoder;
+
+ public TranscodingAsyncAtomicCounterMap(AsyncAtomicCounterMap<K2> backingMap,
+ Function<K1, K2> keyEncoder, Function<K2, K1> keyDecoder) {
+ this.backingMap = backingMap;
+ this.keyEncoder = k -> k == null ? null : keyEncoder.apply(k);
+ this.keyDecoder = k -> k == null ? null : keyDecoder.apply(k);
+ }
+
+ @Override
+ public String name() {
+ return backingMap.name();
+ }
+
+ @Override
+ public CompletableFuture<Long> incrementAndGet(K1 key) {
+ try {
+ return backingMap.incrementAndGet(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> decrementAndGet(K1 key) {
+ try {
+ return backingMap.decrementAndGet(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndIncrement(K1 key) {
+ try {
+ return backingMap.getAndIncrement(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndDecrement(K1 key) {
+ try {
+ return backingMap.getAndDecrement(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> addAndGet(K1 key, long delta) {
+ try {
+ return backingMap.addAndGet(keyEncoder.apply(key), delta);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndAdd(K1 key, long delta) {
+ try {
+ return backingMap.getAndAdd(keyEncoder.apply(key), delta);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> get(K1 key) {
+ try {
+ return backingMap.get(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> put(K1 key, long newValue) {
+ try {
+ return backingMap.put(keyEncoder.apply(key), newValue);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> putIfAbsent(K1 key, long newValue) {
+ try {
+ return backingMap.putIfAbsent(keyEncoder.apply(key), newValue);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(K1 key, long expectedOldValue, long newValue) {
+ try {
+ return backingMap.replace(keyEncoder.apply(key), expectedOldValue, newValue);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> remove(K1 key) {
+ try {
+ return backingMap.remove(keyEncoder.apply(key));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K1 key, long value) {
+ try {
+ return backingMap.remove(keyEncoder.apply(key), value);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ try {
+ return backingMap.size();
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ try {
+ return backingMap.isEmpty();
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ try {
+ return backingMap.clear();
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMap.java
new file mode 100644
index 0000000..ad1de2e
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMap.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.AddAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.DecrementAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndAdd;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndDecrement;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndIncrement;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IncrementAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IsEmpty;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.PutIfAbsent;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Size;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code AsyncAtomicCounterMap} implementation backed by Atomix.
+ */
+@ResourceTypeInfo(id = -157, factory = AtomixAtomicCounterMapFactory.class)
+public class AtomixAtomicCounterMap extends AbstractResource<AtomixAtomicCounterMap>
+ implements AsyncAtomicCounterMap<String> {
+
+ public AtomixAtomicCounterMap(CopycatClient client, Properties options) {
+ super(client, options);
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Long> incrementAndGet(String key) {
+ return client.submit(new IncrementAndGet(key));
+ }
+
+ @Override
+ public CompletableFuture<Long> decrementAndGet(String key) {
+ return client.submit(new DecrementAndGet(key));
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndIncrement(String key) {
+ return client.submit(new GetAndIncrement(key));
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndDecrement(String key) {
+ return client.submit(new GetAndDecrement(key));
+ }
+
+ @Override
+ public CompletableFuture<Long> addAndGet(String key, long delta) {
+ return client.submit(new AddAndGet(key, delta));
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndAdd(String key, long delta) {
+ return client.submit(new GetAndAdd(key, delta));
+ }
+
+ @Override
+ public CompletableFuture<Long> get(String key) {
+ return client.submit(new Get(key));
+ }
+
+ @Override
+ public CompletableFuture<Long> put(String key, long newValue) {
+ return client.submit(new Put(key, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Long> putIfAbsent(String key, long newValue) {
+ return client.submit(new PutIfAbsent(key, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String key, long expectedOldValue, long newValue) {
+ return client.submit(new Replace(key, expectedOldValue, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Long> remove(String key) {
+ return client.submit(new Remove(key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String key, long value) {
+ return client.submit(new RemoveValue(key, value));
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return client.submit(new Size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return client.submit(new IsEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return client.submit(new Clear());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapCommands.java
new file mode 100644
index 0000000..3a7f696
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapCommands.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
+
+/**
+ * Atomic counter map commands.
+ */
+public final class AtomixAtomicCounterMapCommands {
+ private AtomixAtomicCounterMapCommands() {
+ }
+
+ public abstract static class AtomicCounterMapCommand<V> implements Command<V>, CatalystSerializable {
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.SNAPSHOT;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ public abstract static class AtomicCounterMapQuery<V> implements Query<V>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ public abstract static class KeyCommand<V> extends AtomicCounterMapCommand<V> {
+ private String key;
+
+ public KeyCommand() {
+ }
+
+ public KeyCommand(String key) {
+ this.key = key;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeString(key);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ key = buffer.readString();
+ }
+ }
+
+ public abstract static class KeyQuery<V> extends AtomicCounterMapQuery<V> {
+ private String key;
+
+ public KeyQuery() {
+ }
+
+ public KeyQuery(String key) {
+ this.key = key;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeString(key);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ key = buffer.readString();
+ }
+ }
+
+ public static class KeyValueCommand<V> extends KeyCommand<V> {
+ private long value;
+
+ public KeyValueCommand() {
+ }
+
+ public KeyValueCommand(String key, long value) {
+ super(key);
+ this.value = value;
+ }
+
+ public long value() {
+ return value;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ buffer.writeLong(value);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ value = buffer.readLong();
+ }
+ }
+
+ public static class Get extends KeyQuery<Long> {
+ public Get() {
+ }
+
+ public Get(String key) {
+ super(key);
+ }
+ }
+
+ public static class Put extends KeyValueCommand<Long> {
+ public Put() {
+ }
+
+ public Put(String key, long value) {
+ super(key, value);
+ }
+ }
+
+ public static class PutIfAbsent extends KeyValueCommand<Long> {
+ public PutIfAbsent() {
+ }
+
+ public PutIfAbsent(String key, long value) {
+ super(key, value);
+ }
+ }
+
+ public static class Replace extends KeyCommand<Boolean> {
+ private long replace;
+ private long value;
+
+ public Replace() {
+ }
+
+ public Replace(String key, long replace, long value) {
+ super(key);
+ this.replace = replace;
+ this.value = value;
+ }
+
+ public long replace() {
+ return replace;
+ }
+
+ public long value() {
+ return value;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ buffer.writeLong(replace);
+ buffer.writeLong(value);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ replace = buffer.readLong();
+ value = buffer.readLong();
+ }
+ }
+
+ public static class Remove extends KeyCommand<Long> {
+ public Remove() {
+ }
+
+ public Remove(String key) {
+ super(key);
+ }
+ }
+
+ public static class RemoveValue extends KeyValueCommand<Boolean> {
+ public RemoveValue() {
+ }
+
+ public RemoveValue(String key, long value) {
+ super(key, value);
+ }
+ }
+
+ public static class IncrementAndGet extends KeyCommand<Long> {
+ public IncrementAndGet() {
+ }
+
+ public IncrementAndGet(String key) {
+ super(key);
+ }
+ }
+
+ public static class DecrementAndGet extends KeyCommand<Long> {
+ public DecrementAndGet(String key) {
+ super(key);
+ }
+
+ public DecrementAndGet() {
+ }
+ }
+
+ public static class GetAndIncrement extends KeyCommand<Long> {
+ public GetAndIncrement() {
+ }
+
+ public GetAndIncrement(String key) {
+ super(key);
+ }
+ }
+
+ public static class GetAndDecrement extends KeyCommand<Long> {
+ public GetAndDecrement() {
+ }
+
+ public GetAndDecrement(String key) {
+ super(key);
+ }
+ }
+
+ public abstract static class DeltaCommand extends KeyCommand<Long> {
+ private long delta;
+
+ public DeltaCommand() {
+ }
+
+ public DeltaCommand(String key, long delta) {
+ super(key);
+ this.delta = delta;
+ }
+
+ public long delta() {
+ return delta;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ buffer.writeLong(delta);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ delta = buffer.readLong();
+ }
+ }
+
+ public static class AddAndGet extends DeltaCommand {
+ public AddAndGet() {
+ }
+
+ public AddAndGet(String key, long delta) {
+ super(key, delta);
+ }
+ }
+
+ public static class GetAndAdd extends DeltaCommand {
+ public GetAndAdd() {
+ }
+
+ public GetAndAdd(String key, long delta) {
+ super(key, delta);
+ }
+ }
+
+ public static class Size extends AtomicCounterMapQuery<Integer> {
+ }
+
+ public static class IsEmpty extends AtomicCounterMapQuery<Boolean> {
+ }
+
+ public static class Clear extends AtomicCounterMapCommand<Void> {
+ }
+
+ /**
+ * Counter map command type resolver.
+ */
+ public static class TypeResolver implements SerializableTypeResolver {
+ @Override
+ public void resolve(SerializerRegistry registry) {
+ registry.register(Get.class, -790);
+ registry.register(Put.class, -791);
+ registry.register(PutIfAbsent.class, -792);
+ registry.register(Replace.class, -793);
+ registry.register(Remove.class, -794);
+ registry.register(RemoveValue.class, -795);
+ registry.register(IncrementAndGet.class, -796);
+ registry.register(DecrementAndGet.class, -797);
+ registry.register(GetAndIncrement.class, -798);
+ registry.register(GetAndDecrement.class, -799);
+ registry.register(AddAndGet.class, -800);
+ registry.register(GetAndAdd.class, -801);
+ registry.register(Size.class, -801);
+ registry.register(IsEmpty.class, -801);
+ registry.register(Clear.class, -801);
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapFactory.java
new file mode 100644
index 0000000..4caf68e
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.ResourceFactory;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Properties;
+
+/**
+ * Atomic counter map factory.
+ */
+public class AtomixAtomicCounterMapFactory implements ResourceFactory<AtomixAtomicCounterMap> {
+
+ @Override
+ public SerializableTypeResolver createSerializableTypeResolver() {
+ return new AtomixAtomicCounterMapCommands.TypeResolver();
+ }
+
+ @Override
+ public ResourceStateMachine createStateMachine(Properties config) {
+ return new AtomixAtomicCounterMapState(config);
+ }
+
+ @Override
+ public AtomixAtomicCounterMap createInstance(CopycatClient client, Properties options) {
+ return new AtomixAtomicCounterMap(client, options);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapState.java
new file mode 100644
index 0000000..1aed4a8
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapState.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.AddAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.DecrementAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndAdd;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndDecrement;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndIncrement;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IncrementAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IsEmpty;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.PutIfAbsent;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Size;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Atomic counter map state for Atomix.
+ * <p>
+ * The counter map state is implemented as a snapshottable state machine. Snapshots are necessary
+ * since incremental compaction is impractical for counters where the value of a counter is the sum
+ * of all its increments. Note that this snapshotting large state machines may risk blocking of the
+ * Raft cluster with the current implementation of snapshotting in Copycat.
+ */
+public class AtomixAtomicCounterMapState extends ResourceStateMachine implements Snapshottable {
+ private Map<String, Long> map = new HashMap<>();
+
+ public AtomixAtomicCounterMapState(Properties config) {
+ super(config);
+ }
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ executor.register(Put.class, this::put);
+ executor.register(PutIfAbsent.class, this::putIfAbsent);
+ executor.register(Get.class, this::get);
+ executor.register(Replace.class, this::replace);
+ executor.register(Remove.class, this::remove);
+ executor.register(RemoveValue.class, this::removeValue);
+ executor.register(GetAndIncrement.class, this::getAndIncrement);
+ executor.register(GetAndDecrement.class, this::getAndDecrement);
+ executor.register(IncrementAndGet.class, this::incrementAndGet);
+ executor.register(DecrementAndGet.class, this::decrementAndGet);
+ executor.register(AddAndGet.class, this::addAndGet);
+ executor.register(GetAndAdd.class, this::getAndAdd);
+ executor.register(Size.class, this::size);
+ executor.register(IsEmpty.class, this::isEmpty);
+ executor.register(Clear.class, this::clear);
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(map);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ map = reader.readObject();
+ }
+
+ /**
+ * Returns the primitive value for the given primitive wrapper.
+ */
+ private long primitive(Long value) {
+ if (value != null) {
+ return value;
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Handles a {@link Put} command which implements {@link AtomixAtomicCounterMap#put(String, long)}.
+ *
+ * @param commit put commit
+ * @return put result
+ */
+ protected long put(Commit<Put> commit) {
+ try {
+ return primitive(map.put(commit.operation().key(), commit.operation().value()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link PutIfAbsent} command which implements {@link AtomixAtomicCounterMap#putIfAbsent(String, long)}.
+ *
+ * @param commit putIfAbsent commit
+ * @return putIfAbsent result
+ */
+ protected long putIfAbsent(Commit<PutIfAbsent> commit) {
+ try {
+ return primitive(map.putIfAbsent(commit.operation().key(), commit.operation().value()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link Get} query which implements {@link AtomixAtomicCounterMap#get(String)}}.
+ *
+ * @param commit get commit
+ * @return get result
+ */
+ protected long get(Commit<Get> commit) {
+ try {
+ return primitive(map.get(commit.operation().key()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link Replace} command which implements {@link AtomixAtomicCounterMap#replace(String, long, long)}.
+ *
+ * @param commit replace commit
+ * @return replace result
+ */
+ protected boolean replace(Commit<Replace> commit) {
+ try {
+ Long value = map.get(commit.operation().key());
+ if (value == null) {
+ if (commit.operation().replace() == 0) {
+ map.put(commit.operation().key(), commit.operation().value());
+ return true;
+ } else {
+ return false;
+ }
+ } else if (value == commit.operation().replace()) {
+ map.put(commit.operation().key(), commit.operation().value());
+ return true;
+ }
+ return false;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link Remove} command which implements {@link AtomixAtomicCounterMap#remove(String)}.
+ *
+ * @param commit remove commit
+ * @return remove result
+ */
+ protected long remove(Commit<Remove> commit) {
+ try {
+ return primitive(map.remove(commit.operation().key()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link RemoveValue} command which implements {@link AtomixAtomicCounterMap#remove(String, long)}.
+ *
+ * @param commit removeValue commit
+ * @return removeValue result
+ */
+ protected boolean removeValue(Commit<RemoveValue> commit) {
+ try {
+ Long value = map.get(commit.operation().key());
+ if (value == null) {
+ if (commit.operation().value() == 0) {
+ map.remove(commit.operation().key());
+ return true;
+ }
+ return false;
+ } else if (value == commit.operation().value()) {
+ map.remove(commit.operation().key());
+ return true;
+ }
+ return false;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link GetAndIncrement} command which implements
+ * {@link AtomixAtomicCounterMap#getAndIncrement(String)}.
+ *
+ * @param commit getAndIncrement commit
+ * @return getAndIncrement result
+ */
+ protected long getAndIncrement(Commit<GetAndIncrement> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ map.put(commit.operation().key(), value + 1);
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link GetAndDecrement} command which implements
+ * {@link AtomixAtomicCounterMap#getAndDecrement(String)}.
+ *
+ * @param commit getAndDecrement commit
+ * @return getAndDecrement result
+ */
+ protected long getAndDecrement(Commit<GetAndDecrement> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ map.put(commit.operation().key(), value - 1);
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link IncrementAndGet} command which implements
+ * {@link AtomixAtomicCounterMap#incrementAndGet(String)}.
+ *
+ * @param commit incrementAndGet commit
+ * @return incrementAndGet result
+ */
+ protected long incrementAndGet(Commit<IncrementAndGet> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ map.put(commit.operation().key(), ++value);
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link DecrementAndGet} command which implements
+ * {@link AtomixAtomicCounterMap#decrementAndGet(String)}.
+ *
+ * @param commit decrementAndGet commit
+ * @return decrementAndGet result
+ */
+ protected long decrementAndGet(Commit<DecrementAndGet> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ map.put(commit.operation().key(), --value);
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link AddAndGet} command which implements {@link AtomixAtomicCounterMap#addAndGet(String, long)}.
+ *
+ * @param commit addAndGet commit
+ * @return addAndGet result
+ */
+ protected long addAndGet(Commit<AddAndGet> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ value += commit.operation().delta();
+ map.put(commit.operation().key(), value);
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link GetAndAdd} command which implements {@link AtomixAtomicCounterMap#getAndAdd(String, long)}.
+ *
+ * @param commit getAndAdd commit
+ * @return getAndAdd result
+ */
+ protected long getAndAdd(Commit<GetAndAdd> commit) {
+ try {
+ long value = primitive(map.get(commit.operation().key()));
+ map.put(commit.operation().key(), value + commit.operation().delta());
+ return value;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link Size} query which implements {@link AtomixAtomicCounterMap#size()}.
+ *
+ * @param commit size commit
+ * @return size result
+ */
+ protected int size(Commit<Size> commit) {
+ try {
+ return map.size();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles an {@link IsEmpty} query which implements {@link AtomixAtomicCounterMap#isEmpty()}.
+ *
+ * @param commit isEmpty commit
+ * @return isEmpty result
+ */
+ protected boolean isEmpty(Commit<IsEmpty> commit) {
+ try {
+ return map.isEmpty();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a {@link Clear} command which implements {@link AtomixAtomicCounterMap#clear()}.
+ *
+ * @param commit clear commit
+ */
+ protected void clear(Commit<Clear> commit) {
+ try {
+ map.clear();
+ } finally {
+ commit.close();
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java
new file mode 100644
index 0000000..63db592
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.resource.ResourceType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@code AtomixCounterMap}.
+ */
+public class AtomixAtomicCounterMapTest extends AtomixTestBase {
+
+ @BeforeClass
+ public static void preTestSetup() throws Throwable {
+ createCopycatServers(3);
+ }
+
+ @AfterClass
+ public static void postTestCleanup() throws Exception {
+ clearTests();
+ }
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixAtomicCounterMap.class);
+ }
+
+ /**
+ * Tests basic counter map operations.
+ */
+ @Test
+ public void testBasicCounterMapOperations() throws Throwable {
+ AtomixAtomicCounterMap map = createAtomixClient().getResource("testBasicCounterMapOperationMap",
+ AtomixAtomicCounterMap.class).join();
+
+ map.isEmpty().thenAccept(isEmpty -> {
+ assertTrue(isEmpty);
+ }).join();
+
+ map.size().thenAccept(size -> {
+ assertTrue(size == 0);
+ }).join();
+
+ map.put("foo", 2).thenAccept(value -> {
+ assertTrue(value == 0);
+ }).join();
+
+ map.incrementAndGet("foo").thenAccept(value -> {
+ assertTrue(value == 3);
+ }).join();
+
+ map.getAndIncrement("foo").thenAccept(value -> {
+ assertTrue(value == 3);
+ }).join();
+
+ map.get("foo").thenAccept(value -> {
+ assertTrue(value == 4);
+ }).join();
+
+ map.getAndDecrement("foo").thenAccept(value -> {
+ assertTrue(value == 4);
+ }).join();
+
+ map.decrementAndGet("foo").thenAccept(value -> {
+ assertTrue(value == 2);
+ }).join();
+
+ map.size().thenAccept(size -> {
+ assertTrue(size == 1);
+ }).join();
+
+ map.isEmpty().thenAccept(isEmpty -> {
+ assertFalse(isEmpty);
+ }).join();
+
+ map.clear().join();
+
+ map.isEmpty().thenAccept(isEmpty -> {
+ assertTrue(isEmpty);
+ }).join();
+
+ map.size().thenAccept(size -> {
+ assertTrue(size == 0);
+ }).join();
+
+ map.get("foo").thenAccept(value -> {
+ assertTrue(value == 0);
+ }).join();
+
+ map.incrementAndGet("bar").thenAccept(value -> {
+ assertTrue(value == 1);
+ }).join();
+
+ map.addAndGet("bar", 2).thenAccept(value -> {
+ assertTrue(value == 3);
+ }).join();
+
+ map.getAndAdd("bar", 3).thenAccept(value -> {
+ assertTrue(value == 3);
+ }).join();
+
+ map.get("bar").thenAccept(value -> {
+ assertTrue(value == 6);
+ }).join();
+
+ map.putIfAbsent("bar", 1).thenAccept(value -> {
+ assertTrue(value == 6);
+ }).join();
+
+ map.replace("bar", 6, 1).thenAccept(succeeded -> {
+ assertTrue(succeeded);
+ }).join();
+
+ map.replace("bar", 6, 1).thenAccept(succeeded -> {
+ assertFalse(succeeded);
+ }).join();
+
+ map.size().thenAccept(size -> {
+ assertTrue(size == 1);
+ }).join();
+
+ map.remove("bar").thenAccept(value -> {
+ assertTrue(value == 1);
+ }).join();
+
+ map.size().thenAccept(size -> {
+ assertTrue(size == 0);
+ }).join();
+
+ map.put("baz", 3).thenAccept(value -> {
+ assertTrue(value == 0);
+ }).join();
+
+ map.remove("baz", 2).thenAccept(removed -> {
+ assertFalse(removed);
+ }).join();
+
+ map.put("baz", 2).thenAccept(value -> {
+ assertTrue(value == 3);
+ }).join();
+
+ map.remove("baz", 2).thenAccept(removed -> {
+ assertTrue(removed);
+ }).join();
+
+ map.isEmpty().thenAccept(isEmpty -> {
+ assertTrue(isEmpty);
+ }).join();
+
+ map.replace("baz", 0, 5).thenAccept(replaced -> {
+ assertTrue(replaced);
+ }).join();
+
+ map.get("baz").thenAccept(value -> {
+ assertTrue(value == 5);
+ }).join();
+ }
+}
diff --git a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index b77faac..4cc756e 100644
--- a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -19,6 +19,7 @@
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMultimapBuilder;
@@ -108,7 +109,13 @@
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
return null;
}
+
public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
return null;
}
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ return null;
+ }
}