Moving from local to netty transport.

Change-Id: Id37af6fa4d0971fd34ed18951196dde47bc4a12d
diff --git a/core/store/primitives/BUCK b/core/store/primitives/BUCK
index 63a9b7d..6fa9630 100644
--- a/core/store/primitives/BUCK
+++ b/core/store/primitives/BUCK
@@ -9,6 +9,12 @@
 TEST_DEPS = [
     '//lib:TEST',
     '//core/api:onos-api-tests',
+    '//lib:netty-transport',
+    '//lib:catalyst-transport',
+    '//lib:netty-handler',
+    '//lib:netty-buffer',
+    '//lib:netty-codec',
+
 ]
 
 osgi_jar_with_tests (
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index f3345b1..aad27b9 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -73,5 +73,22 @@
             <artifactId>atomix</artifactId>
             <version>1.0.0-rc8</version>
         </dependency>
+
+        <dependency>
+            <groupId>io.atomix.catalyst</groupId>
+            <artifactId>catalyst-netty</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>io.atomix.catalyst</groupId>
+            <artifactId>catalyst-transport</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-junit</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
index 251a7ca..bcaf0fd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
@@ -166,11 +166,13 @@
         public void writeObject(BufferOutput<?> buffer,
                                 Serializer serializer) {
             super.writeObject(buffer, serializer);
+            serializer.writeObject(value, buffer);
         }
 
         @Override
         public void readObject(BufferInput<?> buffer, Serializer serializer) {
             super.readObject(buffer, serializer);
+            value = serializer.readObject(buffer);
         }
     }
 
@@ -548,6 +550,9 @@
      */
     public static class Get extends
             KeyQuery<Versioned<Collection<? extends byte[]>>> {
+        public Get() {
+        }
+
         public Get(String key) {
             super(key);
         }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java
index 5fd9189..f7085e1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
@@ -170,16 +171,19 @@
      */
     protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         try {
+            if (backingMap.values().isEmpty()) {
+                return false;
+            }
             Match<byte[]> match = Match.ifValue(commit.operation().value());
             return backingMap
                     .values()
                     .stream()
                     .anyMatch(valueList ->
-                          valueList
-                              .values()
-                              .stream()
-                              .anyMatch(byteValue ->
-                                    match.matches(byteValue)));
+                                      valueList
+                                              .values()
+                                              .stream()
+                                              .anyMatch(byteValue ->
+                                                    match.matches(byteValue)));
         } finally {
             commit.close();
         }
@@ -230,7 +234,7 @@
      */
     protected Set<String> keySet(Commit<? extends KeySet> commit) {
         try {
-            return backingMap.keySet();
+            return ImmutableSet.copyOf(backingMap.keySet());
         } finally {
             commit.close();
         }
@@ -444,7 +448,7 @@
 
         @Override
         public Collection<? extends byte[]> values() {
-            return valueCountdownMap.keySet();
+            return ImmutableSet.copyOf(valueCountdownMap.keySet());
         }
 
         @Override
@@ -747,4 +751,4 @@
             }
         }
     }
-}
+ }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 7ee481c..1bd06a1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -182,6 +182,18 @@
                     .add("nodeId", nodeId)
                     .toString();
         }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(nodeId, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            nodeId = serializer.readObject(buffer);
+        }
     }
 
     /**
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 894cc79..6281672 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -16,6 +16,8 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import static org.slf4j.LoggerFactory.getLogger;
+
+import com.google.common.collect.ImmutableSet;
 import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
@@ -287,10 +289,10 @@
     public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
         try {
             NodeId nodeId = commit.operation().nodeId();
-            return Maps.filterEntries(elections, e -> {
+            return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
                 Leader leader = leadership(e.getKey()).leader();
                 return leader != null && leader.nodeId().equals(nodeId);
-            }).keySet();
+            }).keySet());
         } finally {
             commit.close();
         }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java
index aad068a..04fcdbc 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java
@@ -20,27 +20,18 @@
 import com.google.common.collect.Multiset;
 import com.google.common.collect.TreeMultiset;
 import com.google.common.io.Files;
-
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.local.LocalTransport;
-import io.atomix.copycat.server.CopycatServer;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.internal.ResourceManagerState;
 import io.atomix.resource.ResourceType;
-
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.util.Tools;
 
 import java.io.File;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -49,7 +40,6 @@
 /**
  * Tests the {@link AsyncConsistentSetMultimap}.
  */
-@Ignore
 public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
     private final File testDir = Files.createTempDir();
     private final String keyOne = "hello";
@@ -66,6 +56,7 @@
                                                               valueTwo,
                                                               valueThree,
                                                               valueFour);
+    private final AtomicInteger port = new AtomicInteger(49200);
 
     @Override
     protected ResourceType resourceType() {
@@ -411,7 +402,6 @@
         clearTests();
     }
 
-
     private AsyncConsistentSetMultimap createResource(int clusterSize) {
         try {
             createCopycatServers(clusterSize);
@@ -424,24 +414,6 @@
         }
     }
 
-    @Override
-    protected CopycatServer createCopycatServer(Address address) {
-        CopycatServer server = CopycatServer.builder(address)
-                .withTransport(new LocalTransport(registry))
-                .withStorage(Storage.builder()
-                                     .withStorageLevel(StorageLevel.MEMORY)
-                                     .withDirectory(testDir + "/" + address.port())
-                                     .build())
-                .withStateMachine(ResourceManagerState::new)
-                .withSerializer(serializer.clone())
-                .withHeartbeatInterval(Duration.ofMillis(25))
-                .withElectionTimeout(Duration.ofMillis(50))
-                .withSessionTimeout(Duration.ofMillis(100))
-                .build();
-        copycatServers.add(server);
-        return server;
-    }
-
     /**
      * Returns two arrays contain the same set of elements,
      * regardless of order.
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 0ec21f7..df44ecf 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -15,19 +15,9 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 import io.atomix.resource.ResourceType;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.ConcurrentModificationException;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletionException;
-import java.util.stream.Collectors;
-
-import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.MapUpdate;
@@ -37,13 +27,27 @@
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Unit tests for {@link AtomixConsistentMap}.
  */
-@Ignore
 public class AtomixConsistentMapTest extends AtomixTestBase {
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 6fb064b..f6be9f7 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -20,7 +20,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -36,7 +35,6 @@
 /**
  * Unit tests for {@link AtomixLeaderElector}.
  */
-@Ignore
 public class AtomixLeaderElectorTest extends AtomixTestBase {
 
     NodeId node1 = new NodeId("node1");
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
index 49625b6..ae7bfee 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -15,19 +15,18 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import static org.junit.Assert.*;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
 import io.atomix.Atomix;
 import io.atomix.resource.ResourceType;
 import io.atomix.variables.DistributedLong;
+import org.junit.Test;
 
-/**
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**git s
  * Unit tests for {@link AtomixCounter}.
  */
-@Ignore
 public class AtomixLongTest extends AtomixTestBase {
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 82265ad..01c33ec 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -15,17 +15,22 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import io.atomix.AtomixClient;
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.local.LocalServerRegistry;
-import io.atomix.catalyst.transport.local.LocalTransport;
+import io.atomix.catalyst.transport.netty.NettyTransport;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.copycat.server.CopycatServer;
 import io.atomix.copycat.server.storage.Storage;
 import io.atomix.copycat.server.storage.StorageLevel;
 import io.atomix.manager.internal.ResourceManagerState;
 import io.atomix.resource.ResourceType;
+import org.junit.After;
+import org.junit.Before;
+import org.onlab.junit.TestTools;
+import org.onosproject.store.primitives.impl.CatalystSerializers;
 
 import java.io.File;
 import java.io.IOException;
@@ -35,12 +40,7 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-
-import org.junit.After;
-import org.junit.Before;
-import org.onosproject.store.primitives.impl.CatalystSerializers;
-
-import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Base class for various Atomix* tests.
@@ -48,7 +48,7 @@
 public abstract class AtomixTestBase {
     private static final File TEST_DIR = new File("target/test-logs");
     protected LocalServerRegistry registry;
-    protected int port;
+    protected final AtomicInteger port = new AtomicInteger(49200);
     protected List<Address> members;
     protected List<CopycatClient> copycatClients = new ArrayList<>();
     protected List<CopycatServer> copycatServers = new ArrayList<>();
@@ -69,7 +69,8 @@
      * @return The next server address.
      */
     private Address nextAddress() {
-        Address address = new Address("localhost", port++);
+        Address address = new Address("127.0.0.1",
+                          TestTools.findAvailablePort(port.getAndIncrement()));
         members.add(address);
         return address;
     }
@@ -82,13 +83,16 @@
         List<CopycatServer> servers = new ArrayList<>();
 
         List<Address> members = new ArrayList<>();
-        for (int i = 0; i < nodes; i++) {
-            members.add(nextAddress());
-        }
 
         for (int i = 0; i < nodes; i++) {
-            CopycatServer server = createCopycatServer(members.get(i));
-            server.bootstrap(members).thenRun(latch::countDown);
+            Address address = nextAddress();
+            members.add(address);
+            CopycatServer server = createCopycatServer(address);
+            if (members.size() <= 1) {
+                server.bootstrap().thenRun(latch::countDown).join();
+            } else {
+                server.join(members).thenRun(latch::countDown);
+            }
             servers.add(server);
         }
 
@@ -102,11 +106,10 @@
      */
     protected CopycatServer createCopycatServer(Address address) {
         CopycatServer server = CopycatServer.builder(address)
-                .withTransport(new LocalTransport(registry))
+                .withTransport(NettyTransport.builder().withThreads(1).build())
                 .withStorage(Storage.builder()
-                        .withStorageLevel(StorageLevel.DISK)
-                        .withDirectory(TEST_DIR + "/" + address.port())
-                        .build())
+                             .withStorageLevel(StorageLevel.MEMORY)
+                             .build())
                 .withStateMachine(ResourceManagerState::new)
                 .withSerializer(serializer.clone())
                 .withHeartbeatInterval(Duration.ofMillis(25))
@@ -122,7 +125,6 @@
     public void clearTests() throws Exception {
         registry = new LocalServerRegistry();
         members = new ArrayList<>();
-        port = 5000;
 
         CompletableFuture<Void> closeClients =
                 CompletableFuture.allOf(atomixClients.stream()
@@ -165,7 +167,7 @@
     protected AtomixClient createAtomixClient() {
         CountDownLatch latch = new CountDownLatch(1);
         AtomixClient client = AtomixClient.builder()
-                .withTransport(new LocalTransport(registry))
+                .withTransport(NettyTransport.builder().withThreads(1).build())
                 .withSerializer(serializer.clone())
                 .build();
         client.connect(members).thenRun(latch::countDown);