Made intent perf app multi-threaded; doesn't seem to help.
Made Jono's changes to ECM per Madan's suggesion.
Added cell beast.
Re-enabled anti-entropy.
Added ability to push bits through test proxy for faster upload.

Change-Id: I1455d6d443a697d7a3973c88cb81bfdac0e1dd7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 4404d98..19e77bb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -46,6 +46,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -66,7 +67,6 @@
     private final Map<K, Timestamped<V>> items;
     private final Map<K, Timestamp> removedItems;
 
-    private final String mapName;
     private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
     private final KryoSerializer serializer;
@@ -88,6 +88,7 @@
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
+    private final String destroyedMessage;
 
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
@@ -98,18 +99,20 @@
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
-     *
+     * <p>
      * Each map is identified by a string map name. EventuallyConsistentMapImpl
      * objects in different JVMs that use the same map name will form a
      * distributed map across JVMs (provided the cluster service is aware of
      * both nodes).
-     *
+     * </p>
+     * <p>
      * The client is expected to provide an
      * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
      * will be stored in this map have been registered (including referenced
      * classes). This serializer will be used to serialize both K and V for
      * inter-node notifications.
-     *
+     * </p>
+     * <p>
      * The client must provide an {@link org.onosproject.store.impl.ClockService}
      * which can generate timestamps for a given key. The clock service is free
      * to generate timestamps however it wishes, however these timestamps will
@@ -117,6 +120,7 @@
      * to ensure updates are properly ordered for the use case (i.e. in some
      * cases wallclock time will suffice, whereas in other cases logical time
      * will be necessary).
+     * </p>
      *
      * @param mapName             a String identifier for the map.
      * @param clusterService      the cluster service
@@ -131,12 +135,11 @@
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
                                        ClockService<K, V> clockService) {
-
-        this.mapName = checkNotNull(mapName);
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
 
         serializer = createSerializer(checkNotNull(serializerBuilder));
+        destroyedMessage = mapName + ERROR_DESTROYED;
 
         this.clockService = checkNotNull(clockService);
 
@@ -153,10 +156,9 @@
                         groupedThreads("onos/ecm", mapName + "-bg-%d")));
 
         // start anti-entropy thread
-        //FIXME need to re-enable
-//        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-//                                               initialDelaySec, periodSec,
-//                                               TimeUnit.SECONDS);
+        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                                               initialDelaySec, periodSec,
+                                               TimeUnit.SECONDS);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
@@ -191,6 +193,7 @@
     /**
      * Sets the executor to use for broadcasting messages and returns this
      * instance for method chaining.
+     *
      * @param executor executor service
      * @return this instance
      */
@@ -202,26 +205,26 @@
 
     @Override
     public int size() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         return items.size();
     }
 
     @Override
     public boolean isEmpty() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         return items.isEmpty();
     }
 
     @Override
     public boolean containsKey(K key) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         return items.containsKey(key);
     }
 
     @Override
     public boolean containsValue(V value) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(value, ERROR_NULL_VALUE);
 
         return items.values().stream()
@@ -230,7 +233,7 @@
 
     @Override
     public V get(K key) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
 
         Timestamped<V> value = items.get(key);
@@ -242,7 +245,7 @@
 
     @Override
     public void put(K key, V value) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
 
@@ -284,7 +287,7 @@
 
     @Override
     public void remove(K key) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
 
         // TODO prevent calls here if value is important for timestamp
@@ -321,7 +324,7 @@
 
     @Override
     public void remove(K key, V value) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
 
@@ -338,7 +341,7 @@
 
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
 
@@ -362,8 +365,8 @@
             for (PutEntry<K, V> entry : updates) {
                 EventuallyConsistentMapEvent<K, V> externalEvent =
                         new EventuallyConsistentMapEvent<>(
-                        EventuallyConsistentMapEvent.Type.PUT, entry.key(),
-                        entry.value());
+                                EventuallyConsistentMapEvent.Type.PUT, entry.key(),
+                                entry.value());
                 notifyListeners(externalEvent);
             }
         }
@@ -371,7 +374,7 @@
 
     @Override
     public void clear() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
 
@@ -399,14 +402,14 @@
 
     @Override
     public Set<K> keySet() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         return items.keySet();
     }
 
     @Override
     public Collection<V> values() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         return items.values().stream()
                 .map(Timestamped::value)
@@ -415,7 +418,7 @@
 
     @Override
     public Set<Map.Entry<K, V>> entrySet() {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         return items.entrySet().stream()
                 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
@@ -424,14 +427,14 @@
 
     @Override
     public void addListener(EventuallyConsistentMapListener<K, V> listener) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         listeners.add(checkNotNull(listener));
     }
 
     @Override
     public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
-        checkState(!destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, destroyedMessage);
 
         listeners.remove(checkNotNull(listener));
     }
@@ -502,7 +505,7 @@
                 Set<ControllerNode> nodes = clusterService.getNodes();
 
                 List<NodeId> nodeIds = nodes.stream()
-                        .map(node -> node.id())
+                        .map(ControllerNode::id)
                         .collect(Collectors.toList());
 
                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
@@ -689,7 +692,7 @@
      */
     // Guarded by synchronized (this)
     private List<EventuallyConsistentMapEvent<K, V>>
-            antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
+    antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
         final List<EventuallyConsistentMapEvent<K, V>> externalEvents
                 = new LinkedList<>();
 
@@ -752,8 +755,8 @@
                         if (putInternal(key, value, timestamp)) {
                             EventuallyConsistentMapEvent<K, V> externalEvent =
                                     new EventuallyConsistentMapEvent<>(
-                                    EventuallyConsistentMapEvent.Type.PUT, key,
-                                    value);
+                                            EventuallyConsistentMapEvent.Type.PUT, key,
+                                            value);
                             notifyListeners(externalEvent);
                         }
                     }