[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
new file mode 100644
index 0000000..6d7007a
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertEquals;
+import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PUT;
+
+/**
+ * Atomic counter map service test.
+ */
+public class AtomixAtomicCounterMapServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixAtomicCounterMapService service = new AtomixAtomicCounterMapService();
+ service.put(new DefaultCommit<>(
+ 2,
+ PUT,
+ new AtomixAtomicCounterMapOperations.Put("foo", 1),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixAtomicCounterMapService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ long value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ new AtomixAtomicCounterMapOperations.Get("foo"),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertEquals(1, value);
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java
index 63db592..ee44b29 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapTest.java
@@ -15,9 +15,8 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import io.atomix.resource.ResourceType;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
@@ -26,21 +25,16 @@
/**
* Unit test for {@code AtomixCounterMap}.
*/
-public class AtomixAtomicCounterMapTest extends AtomixTestBase {
+public class AtomixAtomicCounterMapTest extends AtomixTestBase<AtomixAtomicCounterMap> {
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixAtomicCounterMapService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixAtomicCounterMap.class);
+ protected AtomixAtomicCounterMap createPrimitive(RaftProxy proxy) {
+ return new AtomixAtomicCounterMap(proxy);
}
/**
@@ -48,8 +42,7 @@
*/
@Test
public void testBasicCounterMapOperations() throws Throwable {
- AtomixAtomicCounterMap map = createAtomixClient().getResource("testBasicCounterMapOperationMap",
- AtomixAtomicCounterMap.class).join();
+ AtomixAtomicCounterMap map = newPrimitive("testBasicCounterMapOperationMap");
map.isEmpty().thenAccept(isEmpty -> {
assertTrue(isEmpty);
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
new file mode 100644
index 0000000..098c193
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
+
+/**
+ * Consistent map service test.
+ */
+public class AtomixConsistentMapServiceTest {
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixConsistentMapService service = new AtomixConsistentMapService();
+ service.updateAndGet(new DefaultCommit<>(
+ 2,
+ UPDATE_AND_GET,
+ new AtomixConsistentMapOperations.UpdateAndGet("foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixConsistentMapService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Versioned<byte[]> value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ new AtomixConsistentMapOperations.Get("foo"),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertArrayEquals("Hello world!".getBytes(), value.value());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 284b57b..e858b3f 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -15,11 +15,18 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
-import io.atomix.resource.ResourceType;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
@@ -30,14 +37,6 @@
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
-import java.util.Arrays;
-import java.util.ConcurrentModificationException;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletionException;
-import java.util.stream.Collectors;
-
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -51,20 +50,16 @@
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
-public class AtomixConsistentMapTest extends AtomixTestBase {
+public class AtomixConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
- }
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixConsistentMap.class);
+ protected RaftService createService() {
+ return new AtomixConsistentMapService();
+ }
+
+ @Override
+ protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
+ return new AtomixConsistentMap(proxy);
}
/**
@@ -119,8 +114,7 @@
final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
- AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testBasicMapOperationMap");
map.isEmpty().thenAccept(result -> {
assertTrue(result);
@@ -249,8 +243,7 @@
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
- AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testMapComputeOperationsMap");
map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
@@ -287,8 +280,7 @@
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
- AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testMapListenerMap");
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
@@ -343,8 +335,7 @@
}
protected void transactionPrepareTests() throws Throwable {
- AtomixConsistentMap map = createAtomixClient().getResource("testPrepareTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testPrepareTestsMap");
TransactionId transactionId1 = TransactionId.from("tx1");
TransactionId transactionId2 = TransactionId.from("tx2");
@@ -420,8 +411,7 @@
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
- AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testCommitTestsMap");
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
@@ -521,8 +511,7 @@
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
- AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap map = newPrimitive("testTransactionRollbackTestsMap");
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
new file mode 100644
index 0000000..836c08c
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+
+/**
+ * Consistent set multimap service test.
+ */
+public class AtomixConsistentSetMultimapServiceTest {
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
+ service.put(new DefaultCommit<>(
+ 2,
+ PUT,
+ new AtomixConsistentSetMultimapOperations.Put(
+ "foo", Arrays.asList("Hello world!".getBytes()), Match.ANY),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixConsistentSetMultimapService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Versioned<Collection<? extends byte[]>> value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ new AtomixConsistentSetMultimapOperations.Get("foo"),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertEquals(1, value.value().size());
+ assertArrayEquals("Hello world!".getBytes(), value.value().iterator().next());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index b1ec1f8..2d01912 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -16,22 +16,21 @@
package org.onosproject.store.primitives.resources.impl;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.TreeMultiset;
-import io.atomix.resource.ResourceType;
-import org.apache.commons.collections.keyvalue.DefaultMapEntry;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onlab.util.Tools;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.TreeMultiset;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Test;
+import org.onlab.util.Tools;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -39,7 +38,7 @@
/**
* Tests the {@link AtomixConsistentSetMultimap}.
*/
-public class AtomixConsistentSetMultimapTest extends AtomixTestBase {
+public class AtomixConsistentSetMultimapTest extends AtomixTestBase<AtomixConsistentSetMultimap> {
private final String keyOne = "hello";
private final String keyTwo = "goodbye";
private final String keyThree = "foo";
@@ -55,19 +54,14 @@
valueThree,
valueFour);
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixConsistentSetMultimapService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixConsistentSetMultimap.class);
+ protected AtomixConsistentSetMultimap createPrimitive(RaftProxy proxy) {
+ return new AtomixConsistentSetMultimap(proxy);
}
/**
@@ -154,9 +148,10 @@
});
});
+ final String[] removedKey = new String[1];
+
//Test behavior after removals
allValues.forEach(value -> {
- final String[] removedKey = new String[1];
allKeys.forEach(key -> {
map.remove(key, value)
.thenAccept(result -> assertTrue(result)).join();
@@ -164,11 +159,12 @@
.thenAccept(result -> assertFalse(result)).join();
removedKey[0] = key;
});
- //Check that contains key works properly for removed keys
- map.containsKey(removedKey[0])
- .thenAccept(result -> assertFalse(result));
});
+ //Check that contains key works properly for removed keys
+ map.containsKey(removedKey[0])
+ .thenAccept(result -> assertFalse(result));
+
//Check that contains value works correctly for removed values
allValues.forEach(value -> {
map.containsValue(value)
@@ -403,9 +399,7 @@
private AtomixConsistentSetMultimap createResource(String mapName) {
try {
- AtomixConsistentSetMultimap map = createAtomixClient().
- getResource(mapName, AtomixConsistentSetMultimap.class)
- .join();
+ AtomixConsistentSetMultimap map = newPrimitive(mapName);
return map;
} catch (Throwable e) {
throw new RuntimeException(e.toString());
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
new file mode 100644
index 0000000..04698e0
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
+
+/**
+ * Consistent tree map service test.
+ */
+public class AtomixConsistentTreeMapServiceTest {
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixConsistentTreeMapService service = new AtomixConsistentTreeMapService();
+ service.updateAndGet(new DefaultCommit<>(
+ 2,
+ UPDATE_AND_GET,
+ new AtomixConsistentTreeMapOperations.UpdateAndGet(
+ "foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixConsistentTreeMapService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Versioned<byte[]> value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ new AtomixConsistentTreeMapOperations.Get("foo"),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertArrayEquals("Hello world!".getBytes(), value.value());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index bbaaf57..f1de625 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -15,16 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import io.atomix.resource.ResourceType;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onlab.util.Tools;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -33,6 +23,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -44,7 +43,7 @@
/**
* Unit tests for {@link AtomixConsistentTreeMap}.
*/
-public class AtomixConsistentTreeMapTest extends AtomixTestBase {
+public class AtomixConsistentTreeMapTest extends AtomixTestBase<AtomixConsistentTreeMap> {
private final String keyFour = "hello";
private final String keyThree = "goodbye";
private final String keyTwo = "foo";
@@ -60,19 +59,15 @@
valueTwo,
valueThree,
valueFour);
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
- @AfterClass
- public static void postTestCleanup() throws Throwable {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixConsistentTreeMapService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixConsistentTreeMap.class);
+ protected AtomixConsistentTreeMap createPrimitive(RaftProxy proxy) {
+ return new AtomixConsistentTreeMap(proxy);
}
/**
@@ -359,7 +354,9 @@
map.ceilingKey(keyOne).thenAccept(result -> assertNull(result))
.join();
map.higherKey(keyOne).thenAccept(result -> assertNull(result)).join();
- map.delete().join();
+
+ // TODO: delete() is not supported
+ //map.delete().join();
allKeys.forEach(key -> map.put(
key, allValues.get(allKeys.indexOf(key)))
@@ -481,15 +478,14 @@
map.higherKey(keyFour).thenAccept(
result -> assertNull(result))
.join();
- map.delete().join();
+ // TODO: delete() is not supported
+ //map.delete().join();
}
private AtomixConsistentTreeMap createResource(String mapName) {
try {
- AtomixConsistentTreeMap map = createAtomixClient().
- getResource(mapName, AtomixConsistentTreeMap.class)
- .join();
+ AtomixConsistentTreeMap map = newPrimitive(mapName);
return map;
} catch (Throwable e) {
throw new RuntimeException(e.toString());
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
new file mode 100644
index 0000000..707aa91
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertEquals;
+import static org.onosproject.store.primitives.resources.impl.AtomixCounterOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixCounterOperations.SET;
+
+/**
+ * Counter service test.
+ */
+public class AtomixCounterServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixCounterService service = new AtomixCounterService();
+ service.set(new DefaultCommit<>(
+ 2,
+ SET,
+ new AtomixCounterOperations.Set(1L),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixCounterService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ long value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ null,
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertEquals(1, value);
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterTest.java
similarity index 69%
rename from core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
rename to core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterTest.java
index eb835cd..2e334d6 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterTest.java
@@ -15,35 +15,26 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import io.atomix.Atomix;
-import io.atomix.resource.ResourceType;
-import io.atomix.variables.DistributedLong;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-/**git s
+/**
* Unit tests for {@link AtomixCounter}.
*/
-public class AtomixLongTest extends AtomixTestBase {
-
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+public class AtomixCounterTest extends AtomixTestBase<AtomixCounter> {
+ @Override
+ protected RaftService createService() {
+ return new AtomixCounterService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(DistributedLong.class);
+ protected AtomixCounter createPrimitive(RaftProxy proxy) {
+ return new AtomixCounter(proxy);
}
@Test
@@ -52,9 +43,7 @@
}
protected void basicOperationsTest() throws Throwable {
- Atomix atomix = createAtomixClient();
- AtomixCounter along = new AtomixCounter("test-long-basic-operations",
- atomix.getLong("test-long").join());
+ AtomixCounter along = newPrimitive("test-counter-basic-operations");
assertEquals(0, along.get().join().longValue());
assertEquals(1, along.incrementAndGet().join().longValue());
along.set(100).join();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
new file mode 100644
index 0000000..8e1ae91
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Optional;
+
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import org.junit.Test;
+import org.onlab.util.Match;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.Versioned;
+
+import static org.easymock.EasyMock.mock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+
+/**
+ * Document tree service test.
+ */
+public class AtomixDocumentTreeServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ AtomixDocumentTreeService service = new AtomixDocumentTreeService();
+ service.update(new DefaultCommit<>(
+ 2,
+ UPDATE,
+ new AtomixDocumentTreeOperations.Update(
+ DocumentPath.from("root|foo"),
+ Optional.of("Hello world!".getBytes()),
+ Match.any(),
+ Match.ifNull()),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixDocumentTreeService();
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Versioned<byte[]> value = service.get(new DefaultCommit<>(
+ 2,
+ GET,
+ new AtomixDocumentTreeOperations.Get(DocumentPath.from("root|foo")),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertArrayEquals("Hello world!".getBytes(), value.value());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
index dbf8c66..ec81c41 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
@@ -16,22 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import com.google.common.base.Throwables;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
@@ -40,32 +32,34 @@
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Versioned;
-import com.google.common.base.Throwables;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Unit tests for {@link AtomixDocumentTree}.
*/
-public class AtomixDocumentTreeTest extends AtomixTestBase {
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
+public class AtomixDocumentTreeTest extends AtomixTestBase<AtomixDocumentTree> {
+
+ @Override
+ protected RaftService createService() {
+ return new AtomixDocumentTreeService();
}
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
- }
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixDocumentTree.class);
+ protected AtomixDocumentTree createPrimitive(RaftProxy proxy) {
+ return new AtomixDocumentTree(proxy);
}
+
/**
* Tests queries (get and getChildren).
*/
@Test
public void testQueries() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
Versioned<byte[]> root = tree.get(path("root")).join();
assertEquals(1, root.version());
assertNull(root.value());
@@ -76,8 +70,7 @@
*/
@Test
public void testCreate() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -100,8 +93,7 @@
*/
@Test
public void testRecursiveCreate() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.createRecursive(path("root.a.b.c"), "abc".getBytes()).join();
Versioned<byte[]> a = tree.get(path("root.a")).join();
assertArrayEquals(null, a.value());
@@ -118,8 +110,7 @@
*/
@Test
public void testSet() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -146,8 +137,7 @@
*/
@Test
public void testReplaceVersion() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -168,8 +158,7 @@
*/
@Test
public void testReplaceValue() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -190,8 +179,7 @@
*/
@Test
public void testRemove() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -219,8 +207,7 @@
*/
@Test
public void testRemoveFailures() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -252,8 +239,7 @@
*/
@Test
public void testCreateFailures() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
try {
tree.create(path("root.a.c"), "ac".getBytes()).join();
fail();
@@ -267,8 +253,7 @@
*/
@Test
public void testSetFailures() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
try {
tree.set(path("root.a.c"), "ac".getBytes()).join();
fail();
@@ -282,8 +267,7 @@
*/
@Test
public void testGetChildren() throws Throwable {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -309,8 +293,7 @@
*/
@Test
public void testClear() {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
tree.create(path("root.a"), "a".getBytes()).join();
tree.create(path("root.a.b"), "ab".getBytes()).join();
tree.create(path("root.a.c"), "ac".getBytes()).join();
@@ -324,8 +307,7 @@
*/
@Test
public void testNotifications() throws Exception {
- AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
- AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree = newPrimitive(UUID.randomUUID().toString());
TestEventListener listener = new TestEventListener();
// add listener; create a node in the tree and verify an CREATED event is received.
@@ -359,12 +341,9 @@
@Test
public void testFilteredNotifications() throws Throwable {
- AtomixClient client1 = createAtomixClient();
- AtomixClient client2 = createAtomixClient();
-
String treeName = UUID.randomUUID().toString();
- AtomixDocumentTree tree1 = client1.getResource(treeName, AtomixDocumentTree.class).join();
- AtomixDocumentTree tree2 = client2.getResource(treeName, AtomixDocumentTree.class).join();
+ AtomixDocumentTree tree1 = newPrimitive(treeName);
+ AtomixDocumentTree tree2 = newPrimitive(treeName);
TestEventListener listener1a = new TestEventListener(3);
TestEventListener listener1ab = new TestEventListener(2);
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
index 1fbc464..2a9ca02 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
@@ -17,10 +17,8 @@
import java.util.concurrent.CompletableFuture;
-import io.atomix.resource.ResourceType;
-import io.atomix.variables.DistributedLong;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -28,21 +26,16 @@
/**
* Unit test for {@code AtomixIdGenerator}.
*/
-public class AtomixIdGeneratorTest extends AtomixTestBase {
+public class AtomixIdGeneratorTest extends AtomixTestBase<AtomixCounter> {
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixCounterService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(DistributedLong.class);
+ protected AtomixCounter createPrimitive(RaftProxy proxy) {
+ return new AtomixCounter(proxy);
}
/**
@@ -50,10 +43,8 @@
*/
@Test
public void testNextId() throws Throwable {
- AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextId",
- createAtomixClient().getLong("testNextId").join());
- AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextId",
- createAtomixClient().getLong("testNextId").join());
+ AtomixIdGenerator idGenerator1 = new AtomixIdGenerator(newPrimitive("testNextId"));
+ AtomixIdGenerator idGenerator2 = new AtomixIdGenerator(newPrimitive("testNextId"));
CompletableFuture<Long> future11 = idGenerator1.nextId();
CompletableFuture<Long> future12 = idGenerator1.nextId();
@@ -82,10 +73,8 @@
*/
@Test
public void testNextIdBatchRollover() throws Throwable {
- AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextIdBatchRollover",
- createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
- AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextIdBatchRollover",
- createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
+ AtomixIdGenerator idGenerator1 = new AtomixIdGenerator(newPrimitive("testNextIdBatchRollover"), 2);
+ AtomixIdGenerator idGenerator2 = new AtomixIdGenerator(newPrimitive("testNextIdBatchRollover"), 2);
CompletableFuture<Long> future11 = idGenerator1.nextId();
CompletableFuture<Long> future12 = idGenerator1.nextId();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
new file mode 100644
index 0000000..d3b6343
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftServerContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.ThreadContext;
+import org.junit.Test;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
+
+/**
+ * Leader elector service test.
+ */
+public class AtomixLeaderElectorServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ DefaultServiceContext context = mock(DefaultServiceContext.class);
+ expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+ expect(context.serviceName()).andReturn("test").anyTimes();
+ expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+ expect(context.executor()).andReturn(mock(ThreadContext.class)).anyTimes();
+
+ RaftServerContext server = mock(RaftServerContext.class);
+ expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+ replay(context, server);
+
+ AtomixLeaderElectorService service = new AtomixLeaderElectorService();
+ service.init(context);
+
+ NodeId nodeId = NodeId.nodeId("1");
+ service.run(new DefaultCommit<>(
+ 2,
+ RUN,
+ new AtomixLeaderElectorOperations.Run("test", nodeId),
+ new RaftSessionContext(
+ SessionId.from(1),
+ MemberId.from("1"),
+ "test",
+ ServiceType.from(LEADER_ELECTOR.name()),
+ ReadConsistency.LINEARIZABLE,
+ 5000,
+ context,
+ server),
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixLeaderElectorService();
+ service.init(context);
+
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Leadership value = service.getLeadership(new DefaultCommit<>(
+ 2,
+ GET_LEADERSHIP,
+ new AtomixLeaderElectorOperations.GetLeadership("test"),
+ mock(RaftSessionContext.class),
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertEquals(value.leader().nodeId(), nodeId);
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 00df1fc..4bd6d7f 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -15,21 +15,18 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -37,25 +34,20 @@
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
-public class AtomixLeaderElectorTest extends AtomixTestBase {
+public class AtomixLeaderElectorTest extends AtomixTestBase<AtomixLeaderElector> {
NodeId node1 = new NodeId("node1");
NodeId node2 = new NodeId("node2");
NodeId node3 = new NodeId("node3");
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(3);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixLeaderElectorService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixLeaderElector.class);
+ protected AtomixLeaderElector createPrimitive(RaftProxy proxy) {
+ return new AtomixLeaderElector(proxy);
}
@Test
@@ -64,18 +56,15 @@
}
private void leaderElectorRunTests() throws Throwable {
- Atomix client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-run",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-run");
elector1.run("foo", node1).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
assertEquals(1, result.candidates().size());
assertEquals(node1, result.candidates().get(0));
}).join();
- Atomix client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-run",
- AtomixLeaderElector.class).join();
+
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-run");
elector2.run("foo", node2).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
@@ -91,13 +80,9 @@
}
private void leaderElectorWithdrawTests() throws Throwable {
- Atomix client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-withdraw",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-withdraw");
elector1.run("foo", node1).join();
- Atomix client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-withdraw",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-withdraw");
elector2.run("foo", node2).join();
LeaderEventListener listener1 = new LeaderEventListener();
@@ -121,6 +106,14 @@
assertEquals(1, result.newValue().candidates().size());
assertEquals(node2, result.newValue().candidates().get(0));
}).join();
+
+ Leadership leadership1 = elector1.getLeadership("foo").join();
+ assertEquals(node2, leadership1.leader().nodeId());
+ assertEquals(1, leadership1.candidates().size());
+
+ Leadership leadership2 = elector2.getLeadership("foo").join();
+ assertEquals(node2, leadership2.leader().nodeId());
+ assertEquals(1, leadership2.candidates().size());
}
@Test
@@ -129,15 +122,9 @@
}
private void leaderElectorAnointTests() throws Throwable {
- Atomix client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-anoint",
- AtomixLeaderElector.class).join();
- Atomix client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-anoint",
- AtomixLeaderElector.class).join();
- Atomix client3 = createAtomixClient();
- AtomixLeaderElector elector3 = client3.getResource("test-elector-anoint",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-anoint");
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-anoint");
+ AtomixLeaderElector elector3 = newPrimitive("test-elector-anoint");
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
@@ -185,15 +172,9 @@
}
private void leaderElectorPromoteTests() throws Throwable {
- AtomixClient client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-promote",
- AtomixLeaderElector.class).join();
- AtomixClient client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-promote",
- AtomixLeaderElector.class).join();
- AtomixClient client3 = createAtomixClient();
- AtomixLeaderElector elector3 = client3.getResource("test-elector-promote",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-promote");
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-promote");
+ AtomixLeaderElector elector3 = newPrimitive("test-elector-promote");
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
@@ -245,17 +226,13 @@
}
private void leaderElectorLeaderSessionCloseTests() throws Throwable {
- AtomixClient client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-leader-session-close",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-leader-session-close");
elector1.run("foo", node1).join();
- Atomix client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-leader-session-close",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-leader-session-close");
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector2.addChangeListener(listener).join();
- client1.close();
+ elector1.proxy.close();
listener.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(1, result.newValue().candidates().size());
@@ -269,17 +246,13 @@
}
private void leaderElectorNonLeaderSessionCloseTests() throws Throwable {
- Atomix client1 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-non-leader-session-close",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-non-leader-session-close");
elector1.run("foo", node1).join();
- AtomixClient client2 = createAtomixClient();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-non-leader-session-close",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-non-leader-session-close");
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector1.addChangeListener(listener).join();
- client2.close().join();
+ elector2.proxy.close().join();
listener.nextEvent().thenAccept(result -> {
assertEquals(node1, result.newValue().leaderNodeId());
assertEquals(1, result.newValue().candidates().size());
@@ -293,12 +266,8 @@
}
private void leaderElectorQueryTests() throws Throwable {
- Atomix client1 = createAtomixClient();
- Atomix client2 = createAtomixClient();
- AtomixLeaderElector elector1 = client1.getResource("test-elector-query",
- AtomixLeaderElector.class).join();
- AtomixLeaderElector elector2 = client2.getResource("test-elector-query",
- AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector1 = newPrimitive("test-elector-query");
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-query");
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
elector2.run("bar", node2).join();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index f7a5007..7073bb5 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -15,133 +15,490 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import io.atomix.AtomixClient;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.local.LocalServerRegistry;
-import io.atomix.catalyst.transport.netty.NettyTransport;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.server.CopycatServer;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.internal.ResourceManagerState;
-import io.atomix.resource.ResourceType;
-import org.onlab.junit.TestTools;
-import org.onosproject.store.primitives.impl.CatalystSerializers;
-
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.event.RaftEvent;
+import io.atomix.protocols.raft.event.impl.DefaultEventType;
+import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.protocols.raft.operation.RaftOperation;
+import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.proxy.CommunicationStrategy;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import io.atomix.storage.StorageLevel;
+import org.junit.After;
+import org.junit.Before;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.impl.RaftClientCommunicator;
+import org.onosproject.store.primitives.impl.RaftServerCommunicator;
+import org.onosproject.store.service.Serializer;
/**
* Base class for various Atomix tests.
+ *
+ * @param <T> the Raft primitive type being tested
*/
-public abstract class AtomixTestBase {
- protected static LocalServerRegistry registry = new LocalServerRegistry();
- protected static List<Address> members = new ArrayList<>();
- protected static List<CopycatClient> copycatClients = new ArrayList<>();
- protected static List<CopycatServer> copycatServers = new ArrayList<>();
- protected static List<AtomixClient> atomixClients = new ArrayList<>();
- protected static List<CopycatServer> atomixServers = new ArrayList<>();
- protected static Serializer serializer = CatalystSerializers.getSerializer();
- protected static AtomicInteger port = new AtomicInteger(49200);
+public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
+
+ private static final Serializer PROTOCOL_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(OpenSessionRequest.class)
+ .register(OpenSessionResponse.class)
+ .register(CloseSessionRequest.class)
+ .register(CloseSessionResponse.class)
+ .register(KeepAliveRequest.class)
+ .register(KeepAliveResponse.class)
+ .register(QueryRequest.class)
+ .register(QueryResponse.class)
+ .register(CommandRequest.class)
+ .register(CommandResponse.class)
+ .register(MetadataRequest.class)
+ .register(MetadataResponse.class)
+ .register(JoinRequest.class)
+ .register(JoinResponse.class)
+ .register(LeaveRequest.class)
+ .register(LeaveResponse.class)
+ .register(ConfigureRequest.class)
+ .register(ConfigureResponse.class)
+ .register(ReconfigureRequest.class)
+ .register(ReconfigureResponse.class)
+ .register(InstallRequest.class)
+ .register(InstallResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(VoteRequest.class)
+ .register(VoteResponse.class)
+ .register(AppendRequest.class)
+ .register(AppendResponse.class)
+ .register(PublishRequest.class)
+ .register(ResetRequest.class)
+ .register(RaftResponse.Status.class)
+ .register(RaftError.class)
+ .register(RaftError.Type.class)
+ .register(ReadConsistency.class)
+ .register(byte[].class)
+ .register(long[].class)
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(RaftOperation.class)
+ .register(RaftEvent.class)
+ .register(DefaultEventType.class)
+ .register(DefaultOperationId.class)
+ .register(OperationType.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(LinkedList.class)
+ .register(Collections.emptyList().getClass())
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(SessionId.class)
+ .register(RaftMember.Type.class)
+ .register(RaftMember.Status.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(AtomixAtomicCounterMapOperations.class)
+ .register(AtomixConsistentMapEvents.class)
+ .register(AtomixConsistentMapOperations.class)
+ .register(AtomixConsistentSetMultimapOperations.class)
+ .register(AtomixConsistentSetMultimapEvents.class)
+ .register(AtomixConsistentTreeMapEvents.class)
+ .register(AtomixConsistentTreeMapOperations.class)
+ .register(AtomixCounterOperations.class)
+ .register(AtomixDocumentTreeEvents.class)
+ .register(AtomixDocumentTreeOperations.class)
+ .register(AtomixLeaderElectorEvents.class)
+ .register(AtomixLeaderElectorOperations.class)
+ .register(AtomixWorkQueueEvents.class)
+ .register(AtomixWorkQueueOperations.class)
+ .build());
+
+ private static final Serializer STORAGE_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(RaftOperation.class)
+ .register(ReadConsistency.class)
+ .register(AtomixAtomicCounterMapOperations.class)
+ .register(AtomixConsistentMapOperations.class)
+ .register(AtomixConsistentSetMultimapOperations.class)
+ .register(AtomixConsistentTreeMapOperations.class)
+ .register(AtomixCounterOperations.class)
+ .register(AtomixDocumentTreeOperations.class)
+ .register(AtomixLeaderElectorOperations.class)
+ .register(AtomixWorkQueueOperations.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(RaftMember.Type.class)
+ .register(RaftMember.Status.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(byte[].class)
+ .register(long[].class)
+ .build());
+
+ protected TestClusterCommunicationServiceFactory communicationServiceFactory;
+ protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
+ protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
+ protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
+ protected int nextId;
/**
- * Creates a new resource state machine.
+ * Creates the primitive service.
*
- * @return A new resource state machine.
+ * @return the primitive service
*/
- protected abstract ResourceType resourceType();
+ protected abstract RaftService createService();
+
+ /**
+ * Creates a new primitive.
+ *
+ * @param name the primitive name
+ * @return the primitive instance
+ */
+ protected T newPrimitive(String name) {
+ RaftClient client = createClient();
+ RaftProxy proxy = client.newProxyBuilder()
+ .withName(name)
+ .withServiceType("test")
+ .withReadConsistency(readConsistency())
+ .withCommunicationStrategy(communicationStrategy())
+ .build()
+ .open()
+ .join();
+ return createPrimitive(proxy);
+ }
+
+ /**
+ * Creates a new primitive instance.
+ *
+ * @param proxy the primitive proxy
+ * @return the primitive instance
+ */
+ protected abstract T createPrimitive(RaftProxy proxy);
+
+ /**
+ * Returns the proxy read consistency.
+ *
+ * @return the primitive read consistency
+ */
+ protected ReadConsistency readConsistency() {
+ return ReadConsistency.LINEARIZABLE;
+ }
+
+ /**
+ * Returns the proxy communication strategy.
+ *
+ * @return the primitive communication strategy
+ */
+ protected CommunicationStrategy communicationStrategy() {
+ return CommunicationStrategy.LEADER;
+ }
+
+ @Before
+ public void prepare() {
+ members.clear();
+ clients.clear();
+ servers.clear();
+ communicationServiceFactory = new TestClusterCommunicationServiceFactory();
+ createServers(3);
+ }
+
+ @After
+ public void cleanup() {
+ shutdown();
+ }
+
+ /**
+ * Shuts down clients and servers.
+ */
+ private void shutdown() {
+ clients.forEach(c -> {
+ try {
+ c.close().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ }
+ });
+
+ servers.forEach(s -> {
+ try {
+ if (s.isRunning()) {
+ s.shutdown().get(10, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ }
+ });
+
+ Path directory = Paths.get("target/primitives/");
+ if (Files.exists(directory)) {
+ try {
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * Returns the next unique member identifier.
+ *
+ * @return The next unique member identifier.
+ */
+ private MemberId nextMemberId() {
+ return MemberId.from(String.valueOf(++nextId));
+ }
/**
* Returns the next server address.
*
+ * @param type The startup member type.
* @return The next server address.
*/
- private static Address nextAddress() {
- Address address = new Address("127.0.0.1",
- TestTools.findAvailablePort(port.getAndIncrement()));
- members.add(address);
- return address;
+ private RaftMember nextMember(RaftMember.Type type) {
+ return new TestMember(nextMemberId(), type);
}
/**
- * Creates a set of Copycat servers.
+ * Creates a set of Raft servers.
*/
- protected static List<CopycatServer> createCopycatServers(int nodes)
- throws Throwable {
- List<CopycatServer> servers = new ArrayList<>();
-
- List<Address> members = new ArrayList<>();
+ protected List<RaftServer> createServers(int nodes) {
+ List<RaftServer> servers = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
- Address address = nextAddress();
- members.add(address);
- CopycatServer server = createCopycatServer(address);
- if (members.size() <= 1) {
- server.bootstrap().join();
- } else {
- server.join(members).join();
- }
+ members.add(nextMember(RaftMember.Type.ACTIVE));
+ }
+
+ CountDownLatch latch = new CountDownLatch(nodes);
+ for (int i = 0; i < nodes; i++) {
+ RaftServer server = createServer(members.get(i));
+ server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
+ .thenRun(latch::countDown);
servers.add(server);
}
+ try {
+ latch.await(30000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
return servers;
}
/**
- * Creates a Copycat server.
+ * Creates a Raft server.
*/
- protected static CopycatServer createCopycatServer(Address address) {
- CopycatServer server = CopycatServer.builder(address)
- .withTransport(NettyTransport.builder().withThreads(1).build())
- .withStorage(Storage.builder()
- .withStorageLevel(StorageLevel.MEMORY)
- .build())
- .withStateMachine(ResourceManagerState::new)
- .withSerializer(serializer.clone())
- .build();
- copycatServers.add(server);
+ private RaftServer createServer(RaftMember member) {
+ RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
+ .withType(member.getType())
+ .withProtocol(new RaftServerCommunicator(
+ PartitionId.from(1),
+ PROTOCOL_SERIALIZER,
+ communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
+ .withStorage(RaftStorage.newBuilder()
+ .withStorageLevel(StorageLevel.MEMORY)
+ .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
+ .withSerializer(new AtomixSerializerAdapter(STORAGE_SERIALIZER))
+ .withMaxSegmentSize(1024 * 1024)
+ .build())
+ .addService("test", this::createService);
+
+ RaftServer server = builder.build();
+ servers.add(server);
return server;
}
- public static void clearTests() throws Exception {
- registry = new LocalServerRegistry();
- members = new ArrayList<>();
+ /**
+ * Creates a Raft client.
+ */
+ private RaftClient createClient() {
+ MemberId memberId = nextMemberId();
+ RaftClient client = RaftClient.newBuilder()
+ .withMemberId(memberId)
+ .withProtocol(new RaftClientCommunicator(
+ PartitionId.from(1),
+ PROTOCOL_SERIALIZER,
+ communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
+ .build();
- CompletableFuture<Void> closeClients =
- CompletableFuture.allOf(atomixClients.stream()
- .map(AtomixClient::close)
- .toArray(CompletableFuture[]::new));
- closeClients.join();
-
- CompletableFuture<Void> closeServers =
- CompletableFuture.allOf(copycatServers.stream()
- .map(CopycatServer::shutdown)
- .toArray(CompletableFuture[]::new));
- closeServers.join();
-
- atomixClients.clear();
- copycatServers.clear();
+ client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
+ clients.add(client);
+ return client;
}
-
/**
- * Creates a Atomix client.
+ * Test member.
*/
- protected AtomixClient createAtomixClient() {
- CountDownLatch latch = new CountDownLatch(1);
- AtomixClient client = AtomixClient.builder()
- .withTransport(NettyTransport.builder().withThreads(1).build())
- .withSerializer(serializer.clone())
- .build();
- client.connect(members).thenRun(latch::countDown);
- atomixClients.add(client);
- Uninterruptibles.awaitUninterruptibly(latch);
- return client;
+ public static class TestMember implements RaftMember {
+ private final MemberId memberId;
+ private final Type type;
+
+ public TestMember(MemberId memberId, Type type) {
+ this.memberId = memberId;
+ this.type = type;
+ }
+
+ @Override
+ public MemberId memberId() {
+ return memberId;
+ }
+
+ @Override
+ public int hash() {
+ return memberId.hashCode();
+ }
+
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public void addTypeChangeListener(Consumer<Type> listener) {
+
+ }
+
+ @Override
+ public void removeTypeChangeListener(Consumer<Type> listener) {
+
+ }
+
+ @Override
+ public Status getStatus() {
+ return Status.AVAILABLE;
+ }
+
+ @Override
+ public Instant getLastUpdated() {
+ return Instant.now();
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> promote() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> promote(Type type) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> demote() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> demote(Type type) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> remove() {
+ return null;
+ }
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
new file mode 100644
index 0000000..44ebd52
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftServerContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.session.impl.RaftSessionContext;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.protocols.raft.storage.snapshot.Snapshot;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import io.atomix.storage.StorageLevel;
+import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.ThreadContext;
+import org.junit.Test;
+import org.onosproject.store.service.Task;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
+import static org.onosproject.store.service.DistributedPrimitive.Type.WORK_QUEUE;
+
+/**
+ * Work queue service test.
+ */
+public class AtomixWorkQueueServiceTest {
+ @Test
+ public void testSnapshot() throws Exception {
+ SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
+ Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+
+ DefaultServiceContext context = mock(DefaultServiceContext.class);
+ expect(context.serviceType()).andReturn(ServiceType.from(WORK_QUEUE.name())).anyTimes();
+ expect(context.serviceName()).andReturn("test").anyTimes();
+ expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+ expect(context.executor()).andReturn(mock(ThreadContext.class)).anyTimes();
+
+ RaftServerContext server = mock(RaftServerContext.class);
+ expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+ replay(context, server);
+
+ RaftSessionContext session = new RaftSessionContext(
+ SessionId.from(1),
+ MemberId.from("1"),
+ "test",
+ ServiceType.from(WORK_QUEUE.name()),
+ ReadConsistency.LINEARIZABLE,
+ 5000,
+ context,
+ server);
+
+ AtomixWorkQueueService service = new AtomixWorkQueueService();
+ service.init(context);
+
+ service.add(new DefaultCommit<>(
+ 2,
+ ADD,
+ new AtomixWorkQueueOperations.Add(Arrays.asList("Hello world!".getBytes())),
+ session,
+ System.currentTimeMillis()));
+
+ try (SnapshotWriter writer = snapshot.openWriter()) {
+ service.snapshot(writer);
+ }
+
+ snapshot.complete();
+
+ service = new AtomixWorkQueueService();
+ service.init(context);
+
+ try (SnapshotReader reader = snapshot.openReader()) {
+ service.install(reader);
+ }
+
+ Collection<Task<byte[]>> value = service.take(new DefaultCommit<>(
+ 2,
+ TAKE,
+ new AtomixWorkQueueOperations.Take(1),
+ session,
+ System.currentTimeMillis()));
+ assertNotNull(value);
+ assertEquals(1, value.size());
+ assertArrayEquals("Hello world!".getBytes(), value.iterator().next().payload());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 3675373..5357b04 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,13 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@@ -30,48 +23,43 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
-public class AtomixWorkQueueTest extends AtomixTestBase {
-
+public class AtomixWorkQueueTest extends AtomixTestBase<AtomixWorkQueue> {
private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
- @BeforeClass
- public static void preTestSetup() throws Throwable {
- createCopycatServers(1);
- }
-
- @AfterClass
- public static void postTestCleanup() throws Exception {
- clearTests();
+ @Override
+ protected RaftService createService() {
+ return new AtomixWorkQueueService();
}
@Override
- protected ResourceType resourceType() {
- return new ResourceType(AtomixWorkQueue.class);
+ protected AtomixWorkQueue createPrimitive(RaftProxy proxy) {
+ return new AtomixWorkQueue(proxy);
}
@Test
public void testAdd() throws Throwable {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
- Atomix atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
byte[] task2 = DEFAULT_PAYLOAD;
queue2.addOne(task2).join();
@@ -84,8 +72,7 @@
@Test
public void testAddMultiple() throws Throwable {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item1 = DEFAULT_PAYLOAD;
byte[] item2 = DEFAULT_PAYLOAD;
queue1.addMultiple(Arrays.asList(item1, item2)).join();
@@ -99,13 +86,11 @@
@Test
public void testTakeAndComplete() throws Throwable {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item1 = DEFAULT_PAYLOAD;
queue1.addOne(item1).join();
- Atomix atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
Task<byte[]> removedTask = queue2.take().join();
WorkQueueStats stats = queue2.stats().join();
@@ -128,13 +113,11 @@
@Test
public void testUnexpectedClientClose() throws Throwable {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item1 = DEFAULT_PAYLOAD;
queue1.addOne(item1).join();
- AtomixClient atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
queue2.take().join();
WorkQueueStats stats = queue1.stats().join();
@@ -142,7 +125,7 @@
assertEquals(1, stats.totalInProgress());
assertEquals(0, stats.totalCompleted());
- atomix2.close().join();
+ queue2.proxy.close().join();
stats = queue1.stats().join();
assertEquals(1, stats.totalPending());
@@ -153,15 +136,13 @@
@Test
public void testAutomaticTaskProcessing() throws Throwable {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
Executor executor = Executors.newSingleThreadExecutor();
CountDownLatch latch1 = new CountDownLatch(1);
queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
- AtomixClient atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
byte[] item1 = DEFAULT_PAYLOAD;
queue2.addOne(item1).join();
@@ -189,13 +170,11 @@
@Test
public void testDestroy() {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
- Atomix atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
byte[] task2 = DEFAULT_PAYLOAD;
queue2.addOne(task2).join();
@@ -215,8 +194,7 @@
@Test
public void testCompleteAttemptWithIncorrectSession() {
String queueName = UUID.randomUUID().toString();
- Atomix atomix1 = createAtomixClient();
- AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue1 = newPrimitive(queueName);
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
@@ -224,8 +202,7 @@
String taskId = task.taskId();
// Create another client and get a handle to the same queue.
- Atomix atomix2 = createAtomixClient();
- AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ AtomixWorkQueue queue2 = newPrimitive(queueName);
// Attempt completing the task with new client and verify task is not completed
queue2.complete(taskId).join();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
new file mode 100644
index 0000000..bb4b78c
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import io.atomix.utils.concurrent.Futures;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingException;
+
+/**
+ * Cluster communication service implementation used for testing.
+ */
+public class TestClusterCommunicationService implements ClusterCommunicationService {
+ private final NodeId localNodeId;
+ private final Map<NodeId, TestClusterCommunicationService> nodes;
+ private final Map<MessageSubject, Function<byte[], CompletableFuture<byte[]>>> subscribers =
+ Maps.newConcurrentMap();
+
+ public TestClusterCommunicationService(NodeId localNodeId, Map<NodeId, TestClusterCommunicationService> nodes) {
+ this.localNodeId = localNodeId;
+ this.nodes = nodes;
+ nodes.put(localNodeId, this);
+ }
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ nodes.forEach((nodeId, node) -> {
+ if (!nodeId.equals(localNodeId)) {
+ node.handle(subject, encoder.apply(message));
+ }
+ });
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ nodes.values().forEach(node -> node.handle(subject, encoder.apply(message)));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(
+ M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
+ TestClusterCommunicationService node = nodes.get(toNodeId);
+ if (node != null) {
+ node.handle(subject, encoder.apply(message));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+ nodes.values().stream()
+ .filter(n -> nodeIds.contains(n))
+ .forEach(n -> n.handle(subject, encoder.apply(message)));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(
+ M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ TestClusterCommunicationService node = nodes.get(toNodeId);
+ if (node == null) {
+ return Futures.exceptionalFuture(new MessagingException.NoRemoteHandler());
+ }
+ return node.handle(subject, encoder.apply(message)).thenApply(decoder);
+ }
+
+ private CompletableFuture<byte[]> handle(MessageSubject subject, byte[] message) {
+ Function<byte[], CompletableFuture<byte[]>> subscriber = subscribers.get(subject);
+ if (subscriber != null) {
+ return subscriber.apply(message);
+ }
+ return Futures.exceptionalFuture(new MessagingException.NoRemoteHandler());
+ }
+
+ private boolean isSubscriber(MessageSubject subject) {
+ return subscribers.containsKey(subject);
+ }
+
+ @Override
+ public <M, R> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ subscribers.put(subject, message -> {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ future.complete(encoder.apply(handler.apply(decoder.apply(message))));
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ return future;
+ });
+ }
+
+ @Override
+ public <M, R> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+
+ Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+ subscribers.put(subject, message -> {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ try {
+ handler.apply(decoder.apply(message)).whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(encoder.apply(result));
+ } else {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ return future;
+ });
+ }
+
+ @Override
+ public <M> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ subscribers.put(subject, message -> {
+ try {
+ handler.accept(decoder.apply(message));
+ } catch (Exception e) {
+ return Futures.exceptionalFuture(new MessagingException.RemoteHandlerFailure());
+ }
+ return Futures.completedFuture(null);
+ });
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ subscribers.remove(subject);
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationServiceFactory.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationServiceFactory.java
new file mode 100644
index 0000000..819c9c3
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Test cluster communication service factory.
+ */
+public class TestClusterCommunicationServiceFactory {
+ private final Map<NodeId, TestClusterCommunicationService> nodes = Maps.newConcurrentMap();
+
+ /**
+ * Creates a new cluster communication service for the given node.
+ *
+ * @param localNodeId the node for which to create the service
+ * @return the communication service for the given node
+ */
+ public ClusterCommunicationService newCommunicationService(NodeId localNodeId) {
+ return new TestClusterCommunicationService(localNodeId, nodes);
+ }
+}