Made intent perf app multi-threaded; doesn't seem to help.
Made Jono's changes to ECM per Madan's suggesion.
Added cell beast.
Re-enabled anti-entropy.
Added ability to push bits through test proxy for faster upload.
Change-Id: I1455d6d443a697d7a3973c88cb81bfdac0e1dd7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 4404d98..19e77bb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -46,6 +46,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -66,7 +67,6 @@
private final Map<K, Timestamped<V>> items;
private final Map<K, Timestamp> removedItems;
- private final String mapName;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
@@ -88,6 +88,7 @@
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
+ private final String destroyedMessage;
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
@@ -98,18 +99,20 @@
/**
* Creates a new eventually consistent map shared amongst multiple instances.
- *
+ * <p>
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
- *
+ * </p>
+ * <p>
* The client is expected to provide an
* {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
* will be stored in this map have been registered (including referenced
* classes). This serializer will be used to serialize both K and V for
* inter-node notifications.
- *
+ * </p>
+ * <p>
* The client must provide an {@link org.onosproject.store.impl.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
@@ -117,6 +120,7 @@
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
+ * </p>
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
@@ -131,12 +135,11 @@
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService) {
-
- this.mapName = checkNotNull(mapName);
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
serializer = createSerializer(checkNotNull(serializerBuilder));
+ destroyedMessage = mapName + ERROR_DESTROYED;
this.clockService = checkNotNull(clockService);
@@ -153,10 +156,9 @@
groupedThreads("onos/ecm", mapName + "-bg-%d")));
// start anti-entropy thread
- //FIXME need to re-enable
-// backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-// initialDelaySec, periodSec,
-// TimeUnit.SECONDS);
+ backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec,
+ TimeUnit.SECONDS);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
@@ -191,6 +193,7 @@
/**
* Sets the executor to use for broadcasting messages and returns this
* instance for method chaining.
+ *
* @param executor executor service
* @return this instance
*/
@@ -202,26 +205,26 @@
@Override
public int size() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
return items.size();
}
@Override
public boolean isEmpty() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
return items.values().stream()
@@ -230,7 +233,7 @@
@Override
public V get(K key) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
Timestamped<V> value = items.get(key);
@@ -242,7 +245,7 @@
@Override
public void put(K key, V value) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
@@ -284,7 +287,7 @@
@Override
public void remove(K key) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
// TODO prevent calls here if value is important for timestamp
@@ -321,7 +324,7 @@
@Override
public void remove(K key, V value) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
@@ -338,7 +341,7 @@
@Override
public void putAll(Map<? extends K, ? extends V> m) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
@@ -362,8 +365,8 @@
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, entry.key(),
- entry.value());
+ EventuallyConsistentMapEvent.Type.PUT, entry.key(),
+ entry.value());
notifyListeners(externalEvent);
}
}
@@ -371,7 +374,7 @@
@Override
public void clear() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
@@ -399,14 +402,14 @@
@Override
public Set<K> keySet() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
return items.keySet();
}
@Override
public Collection<V> values() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
return items.values().stream()
.map(Timestamped::value)
@@ -415,7 +418,7 @@
@Override
public Set<Map.Entry<K, V>> entrySet() {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
return items.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue().value()))
@@ -424,14 +427,14 @@
@Override
public void addListener(EventuallyConsistentMapListener<K, V> listener) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
- checkState(!destroyed, mapName + ERROR_DESTROYED);
+ checkState(!destroyed, destroyedMessage);
listeners.remove(checkNotNull(listener));
}
@@ -502,7 +505,7 @@
Set<ControllerNode> nodes = clusterService.getNodes();
List<NodeId> nodeIds = nodes.stream()
- .map(node -> node.id())
+ .map(ControllerNode::id)
.collect(Collectors.toList());
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
@@ -689,7 +692,7 @@
*/
// Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>>
- antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
+ antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents
= new LinkedList<>();
@@ -752,8 +755,8 @@
if (putInternal(key, value, timestamp)) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key,
- value);
+ EventuallyConsistentMapEvent.Type.PUT, key,
+ value);
notifyListeners(externalEvent);
}
}