Simplified ECMap implmentation by merging items and tombstones maps
Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
deleted file mode 100644
index 68d51d4..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import java.util.Objects;
-
-import org.onosproject.store.Timestamp;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for events in an EventuallyConsistentMap.
- */
-public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
- private final K key;
- private final Timestamp timestamp;
-
- /**
- * Creates a new put entry.
- *
- * @param key key of the entry
- * @param timestamp timestamp of the put event
- */
- public AbstractEntry(K key, Timestamp timestamp) {
- this.key = checkNotNull(key);
- this.timestamp = checkNotNull(timestamp);
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- protected AbstractEntry() {
- this.key = null;
- this.timestamp = null;
- }
-
- /**
- * Returns the key of the entry.
- *
- * @return the key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the timestamp of the event.
- *
- * @return the timestamp
- */
- public Timestamp timestamp() {
- return timestamp;
- }
-
- @Override
- public int compareTo(AbstractEntry<K, V> o) {
- return this.timestamp.compareTo(o.timestamp);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timestamp);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o instanceof AbstractEntry) {
- final AbstractEntry that = (AbstractEntry) o;
- return this.timestamp.equals(that.timestamp);
- }
- return false;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
index 23b2dfc..d783fe2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
@@ -16,11 +16,11 @@
package org.onosproject.store.ecmap;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.Timestamp;
import java.util.Map;
-
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -29,22 +29,18 @@
public class AntiEntropyAdvertisement<K> {
private final NodeId sender;
- private final Map<K, Timestamp> timestamps;
- private final Map<K, Timestamp> tombstones;
+ private final Map<K, MapValue.Digest> digest;
/**
* Creates a new anti entropy advertisement message.
*
* @param sender the sender's node ID
- * @param timestamps map of item key to timestamp for current items
- * @param tombstones map of item key to timestamp for removed items
+ * @param digest for map entries
*/
public AntiEntropyAdvertisement(NodeId sender,
- Map<K, Timestamp> timestamps,
- Map<K, Timestamp> tombstones) {
+ Map<K, MapValue.Digest> digest) {
this.sender = checkNotNull(sender);
- this.timestamps = checkNotNull(timestamps);
- this.tombstones = checkNotNull(tombstones);
+ this.digest = ImmutableMap.copyOf(checkNotNull(digest));
}
/**
@@ -57,36 +53,19 @@
}
/**
- * Returns the map of current item timestamps.
+ * Returns the digest for map entries.
*
- * @return current item timestamps
+ * @return mapping from key to associated digest
*/
- public Map<K, Timestamp> timestamps() {
- return timestamps;
- }
-
- /**
- * Returns the map of removed item timestamps.
- *
- * @return removed item timestamps
- */
- public Map<K, Timestamp> tombstones() {
- return tombstones;
- }
-
- // For serializer
- @SuppressWarnings("unused")
- private AntiEntropyAdvertisement() {
- this.sender = null;
- this.timestamps = null;
- this.tombstones = null;
+ public Map<K, MapValue.Digest> digest() {
+ return digest;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("timestampsSize", timestamps.size())
- .add("tombstonesSize", tombstones.size())
+ .add("sender", sender)
+ .add("totalEntries", digest.size())
.toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 118ef78..1fd27d3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -18,9 +18,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
@@ -30,12 +29,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -43,30 +40,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@@ -80,12 +74,12 @@
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
- private final ConcurrentMap<K, Timestamped<V>> items;
- private final ConcurrentMap<K, Timestamp> removedItems;
+ private final Map<K, MapValue<V>> items;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
+ private final NodeId localNodeId;
private final BiFunction<K, V, Timestamp> timestampProvider;
@@ -93,7 +87,7 @@
private final MessageSubject antiEntropyAdvertisementSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
- = new CopyOnWriteArraySet<>();
+ = Sets.newCopyOnWriteArraySet();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
@@ -162,13 +156,13 @@
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent) {
- items = new ConcurrentHashMap<>();
- removedItems = new ConcurrentHashMap<>();
+ items = Maps.newConcurrentMap();
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
+ this.localNodeId = clusterService.getLocalNode().id();
this.serializer = createSerializer(serializerBuilder);
@@ -179,7 +173,7 @@
} else {
this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
.map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .filter(nodeId -> !nodeId.equals(localNodeId))
.collect(Collectors.toList());
}
@@ -210,7 +204,7 @@
newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
- persistentStore.readInto(items, removedItems);
+ persistentStore.readInto(items);
} else {
this.persistentStore = null;
}
@@ -223,17 +217,21 @@
}
// start anti-entropy thread
- this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
initialDelaySec, antiEntropyPeriod,
antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalEventListener(), this.executor);
+ serializer::decode,
+ this::processUpdates,
+ this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener(), this.backgroundExecutor);
+ serializer::decode,
+ this::handleAntiEntropyAdvertisement,
+ this.backgroundExecutor);
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
@@ -245,14 +243,13 @@
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
+ .register(KryoNamespaces.BASIC)
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
- .register(PutEntry.class)
- .register(RemoveEntry.class)
- .register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
- .register(HashMap.class)
- .register(Timestamped.class)
+ .register(UpdateEntry.class)
+ .register(MapValue.class)
+ .register(MapValue.Digest.class)
.build();
}
};
@@ -261,29 +258,31 @@
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
- return items.size();
+ // TODO: Maintain a separate counter for tracking live elements in map.
+ return Maps.filterValues(items, MapValue::isAlive).size();
}
@Override
public boolean isEmpty() {
checkState(!destroyed, destroyedMessage);
- return items.isEmpty();
+ return size() == 0;
}
@Override
public boolean containsKey(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
- return items.containsKey(key);
+ return get(key) != null;
}
@Override
public boolean containsValue(V value) {
checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
-
- return items.values().stream()
- .anyMatch(timestamped -> timestamped.value().equals(value));
+ return items.values()
+ .stream()
+ .filter(MapValue::isAlive)
+ .anyMatch(v -> v.get().equals(value));
}
@Override
@@ -291,11 +290,8 @@
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
- Timestamped<V> value = items.get(key);
- if (value != null) {
- return value.value();
- }
- return null;
+ MapValue<V> value = items.get(key);
+ return (value == null || value.isTombstone()) ? null : value.get();
}
@Override
@@ -304,123 +300,18 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- Timestamp timestamp = timestampProvider.apply(key, value);
-
- if (putInternal(key, value, timestamp)) {
- notifyPeers(new PutEntry<>(key, value, timestamp),
- peerUpdateFunction.apply(key, value));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key, value));
+ MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+ if (updateInternal(key, newValue)) {
+ notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
}
}
- private boolean putInternal(K key, V value, Timestamp timestamp) {
- counter.incrementCount();
- Timestamp removed = removedItems.get(key);
- if (removed != null && removed.isNewerThan(timestamp)) {
- log.debug("ecmap - removed was newer {}", value);
- return false;
- }
-
- final MutableBoolean updated = new MutableBoolean(false);
-
- items.compute(key, (k, existing) -> {
- if (existing != null && existing.isNewerThan(timestamp)) {
- updated.setFalse();
- return existing;
- } else {
- updated.setTrue();
- return new Timestamped<>(value, timestamp);
- }
- });
-
- boolean success = updated.booleanValue();
- if (!success) {
- log.debug("ecmap - existing was newer {}", value);
- }
-
- if (success && removed != null) {
- removedItems.remove(key, removed);
- }
-
- if (success && persistent) {
- persistentStore.put(key, value, timestamp);
- }
-
- return success;
- }
-
@Override
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
-
- // TODO prevent calls here if value is important for timestamp
- Timestamp timestamp = timestampProvider.apply(key, null);
-
- Optional<V> removedValue = removeInternal(key, timestamp);
- if (removedValue == null) {
- return null;
- }
- notifyPeers(new RemoveEntry<>(key, timestamp),
- peerUpdateFunction.apply(key, null));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
-
- return removedValue.orElse(null);
- }
-
- /**
- * Returns null if the timestamp is for a outdated request i.e.
- * the value is the map is more recent or a tombstone exists with a
- * more recent timestamp.
- * Returns non-empty optional if a value was indeed removed from the map.
- * Returns empty optional if map did not contain a value for the key but the existing
- * tombstone is older than this timestamp.
- * @param key key
- * @param timestamp timestamp for remove request
- * @return Optional value.
- */
- private Optional<V> removeInternal(K key, Timestamp timestamp) {
- if (timestamp == null) {
- return null;
- }
-
- counter.incrementCount();
- final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
- items.compute(key, (k, existing) -> {
- if (existing != null && existing.isNewerThan(timestamp)) {
- return existing;
- } else {
- removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
- return null;
- }
- });
-
- if (isNull(removedValue.get())) {
- return null;
- }
-
- boolean updatedTombstone = false;
-
- if (!tombstonesDisabled) {
- Timestamp removedTimestamp = removedItems.get(key);
- if (removedTimestamp == null) {
- //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
- updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
- } else if (timestamp.isNewerThan(removedTimestamp)) {
- updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
- }
- }
-
- if (persistent) {
- persistentStore.remove(key, timestamp);
- }
-
- if (tombstonesDisabled || updatedTombstone) {
- return removedValue.get();
- }
- return null;
+ return removeInternal(key, Optional.empty());
}
@Override
@@ -428,15 +319,34 @@
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
+ removeInternal(key, Optional.of(value));
+ }
- Timestamp timestamp = timestampProvider.apply(key, value);
+ private V removeInternal(K key, Optional<V> value) {
+ checkState(!destroyed, destroyedMessage);
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
- if (nonNull(removeInternal(key, timestamp))) {
- notifyPeers(new RemoveEntry<>(key, timestamp),
- peerUpdateFunction.apply(key, value));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, value));
+ MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
+ AtomicBoolean updated = new AtomicBoolean(false);
+ AtomicReference<V> previousValue = new AtomicReference<>();
+ items.compute(key, (k, existing) -> {
+ if (existing != null && existing.isAlive()) {
+ updated.set(!value.isPresent() || value.get().equals(existing.get()));
+ previousValue.set(existing.get());
+ }
+ updated.set(existing == null || newValue.isNewerThan(existing));
+ return updated.get() ? newValue : existing;
+ });
+ if (updated.get()) {
+ notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
+ notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
+ if (persistent) {
+ persistentStore.update(key, newValue);
+ }
+ return previousValue.get();
}
+ return null;
}
@Override
@@ -448,30 +358,59 @@
@Override
public void clear() {
checkState(!destroyed, destroyedMessage);
- items.forEach((key, value) -> remove(key));
+ Maps.filterValues(items, MapValue::isAlive)
+ .forEach((k, v) -> remove(k));
}
@Override
public Set<K> keySet() {
checkState(!destroyed, destroyedMessage);
- return items.keySet();
+ return Maps.filterValues(items, MapValue::isAlive)
+ .keySet();
}
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
- return items.values().stream()
- .map(Timestamped::value)
- .collect(Collectors.toList());
+ return Maps.filterValues(items, MapValue::isAlive)
+ .values()
+ .stream()
+ .map(MapValue::get)
+ .collect(Collectors.toList());
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(!destroyed, destroyedMessage);
+ return Maps.filterValues(items, MapValue::isAlive)
+ .entrySet()
+ .stream()
+ .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+ .collect(Collectors.toSet());
+ }
- return items.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue().value()))
- .collect(Collectors.toSet());
+ /**
+ * Returns true if newValue was accepted i.e. map is updated.
+ * @param key key
+ * @param newValue proposed new value
+ * @return true if update happened; false if map already contains a more recent value for the key
+ */
+ private boolean updateInternal(K key, MapValue<V> newValue) {
+ AtomicBoolean updated = new AtomicBoolean(false);
+ items.compute(key, (k, existing) -> {
+ if (existing == null || newValue.isNewerThan(existing)) {
+ updated.set(true);
+ if (newValue.isTombstone()) {
+ return tombstonesDisabled ? null : newValue;
+ }
+ return newValue;
+ }
+ return existing;
+ });
+ if (updated.get() && persistent) {
+ persistentStore.update(key, newValue);
+ }
+ return updated.get();
}
@Override
@@ -503,26 +442,20 @@
}
private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
- for (EventuallyConsistentMapListener<K, V> listener : listeners) {
- listener.event(event);
- }
+ listeners.forEach(listener -> listener.event(event));
}
- private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+ private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
queueUpdate(event, peers);
}
- private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
- queueUpdate(event, peers);
- }
-
- private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+ private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
if (peers == null) {
// we have no friends :(
return;
}
peers.forEach(node ->
- senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+ senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
);
}
@@ -530,276 +463,107 @@
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
- private final class SendAdvertisementTask implements Runnable {
- @Override
- public void run() {
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
+ private void sendAdvertisement() {
+ try {
if (underHighLoad() || destroyed) {
return;
}
-
- try {
- final NodeId self = clusterService.getLocalNode().id();
- Set<ControllerNode> nodes = clusterService.getNodes();
-
- List<NodeId> nodeIds = nodes.stream()
- .map(ControllerNode::id)
- .collect(Collectors.toList());
-
- if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
- log.trace("No other peers in the cluster.");
- return;
- }
-
- NodeId peer;
- do {
- int idx = RandomUtils.nextInt(0, nodeIds.size());
- peer = nodeIds.get(idx);
- } while (peer.equals(self));
-
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
- AntiEntropyAdvertisement<K> ad = createAdvertisement();
- NodeId destination = peer;
- clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send anti-entropy advertisement to {}", destination);
- }
- });
-
- } catch (Exception e) {
- // Catch all exceptions to avoid scheduled task being suppressed.
- log.error("Exception thrown while sending advertisement", e);
- }
+ pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
+ } catch (Exception e) {
+ // Catch all exceptions to avoid scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
}
}
+ private Optional<NodeId> pickRandomActivePeer() {
+ List<NodeId> activePeers = clusterService.getNodes()
+ .stream()
+ .filter(node -> !localNodeId.equals(node))
+ .map(ControllerNode::id)
+ .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
+ .collect(Collectors.toList());
+ Collections.shuffle(activePeers);
+ return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+ }
+
+ private void sendAdvertisementToPeer(NodeId peer) {
+ clusterCommunicator.unicast(createAdvertisement(),
+ antiEntropyAdvertisementSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send anti-entropy advertisement to {}", peer);
+ }
+ });
+ }
+
+
private AntiEntropyAdvertisement<K> createAdvertisement() {
- final NodeId self = clusterService.getLocalNode().id();
-
- Map<K, Timestamp> timestamps = new HashMap<>(items.size());
-
- items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
-
- Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
-
- return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+ return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
}
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
- List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+ if (destroyed || underHighLoad()) {
+ return;
+ }
+ try {
+ antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
- externalEvents = antiEntropyCheckLocalItems(ad);
-
- antiEntropyCheckLocalRemoved(ad);
-
- if (!lightweightAntiEntropy) {
- externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
-
- // if remote ad has something unknown, actively sync
- for (K key : ad.timestamps().keySet()) {
- if (!items.containsKey(key)) {
+ if (!lightweightAntiEntropy) {
+ Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
+ // if remote ad has something unknown, actively sync
+ if (missingKeys.size() > 0) {
// Send the advertisement back if this peer is out-of-sync
- final NodeId sender = ad.sender();
- AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-
- clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send reactive "
- + "anti-entropy advertisement to {}", sender);
- }
- });
- break;
+ // TODO: Send ad for missing keys and for entries that are stale
+ sendAdvertisementToPeer(ad.sender());
}
}
+ } catch (Exception e) {
+ log.warn("Error handling anti-entropy advertisement", e);
}
- externalEvents.forEach(this::notifyListeners);
}
/**
- * Checks if any of the remote's live items or tombstones are out of date
- * according to our local live item list, or if our live items are out of
- * date according to the remote's tombstone list.
- * If the local copy is more recent, it will be pushed to the remote. If the
- * remote has a more recent remove, we apply that to the local state.
- *
- * @param ad remote anti-entropy advertisement
- * @return list of external events relating to local operations performed
+ * Processes anti-entropy ad from peer by taking following actions:
+ * 1. If peer has an old entry, updates peer.
+ * 2. If peer indicates an entry is removed and has a more recent
+ * timestamp than the local entry, update local state.
*/
private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
AntiEntropyAdvertisement<K> ad) {
- final List<EventuallyConsistentMapEvent<K, V>> externalEvents
- = new LinkedList<>();
+ final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
final NodeId sender = ad.sender();
-
- for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
- K key = item.getKey();
- Timestamped<V> localValue = item.getValue();
-
- Timestamp remoteTimestamp = ad.timestamps().get(key);
- if (remoteTimestamp == null) {
- remoteTimestamp = ad.tombstones().get(key);
- }
- if (remoteTimestamp == null || localValue
- .isNewerThan(remoteTimestamp)) {
+ items.forEach((key, localValue) -> {
+ MapValue.Digest remoteValueDigest = ad.digest().get(key);
+ if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
- queueUpdate(new PutEntry<>(key, localValue.value(),
- localValue.timestamp()), ImmutableList.of(sender));
- }
-
- Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
- if (remoteDeadTimestamp != null &&
- remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
- // sender has a more recent remove
- if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
- externalEvents.add(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+ queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
+ } else {
+ if (remoteValueDigest.isTombstone()
+ && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
+ if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
+ externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
+ }
}
}
- }
-
+ });
return externalEvents;
}
- /**
- * Checks if any items in the remote live list are out of date according
- * to our tombstone list. If we find we have a more up to date tombstone,
- * we'll send it to the remote.
- *
- * @param ad remote anti-entropy advertisement
- */
- private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
- final NodeId sender = ad.sender();
-
- for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
- K key = dead.getKey();
- Timestamp localDeadTimestamp = dead.getValue();
-
- Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
- if (remoteLiveTimestamp != null
- && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
- // sender has zombie, push remove
- queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
- }
+ private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
+ if (destroyed) {
+ return;
}
- }
+ updates.forEach(update -> {
+ final K key = update.key();
+ final MapValue<V> value = update.value();
- /**
- * Checks if any of the local live items are out of date according to the
- * remote's tombstone advertisements. If we find a local item is out of date,
- * we'll apply the remove operation to the local state.
- *
- * @param ad remote anti-entropy advertisement
- * @return list of external events relating to local operations performed
- */
- private List<EventuallyConsistentMapEvent<K, V>>
- antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
- final List<EventuallyConsistentMapEvent<K, V>> externalEvents
- = new LinkedList<>();
-
- for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
- K key = remoteDead.getKey();
- Timestamp remoteDeadTimestamp = remoteDead.getValue();
-
- Timestamped<V> local = items.get(key);
- Timestamp localDead = removedItems.get(key);
- if (local != null && remoteDeadTimestamp.isNewerThan(
- local.timestamp())) {
- // If the remote has a more recent tombstone than either our local
- // value, then do a remove with their timestamp
- if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
- externalEvents.add(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null));
- }
- } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
- localDead)) {
- // If the remote has a more recent tombstone than us, update ours
- // to their timestamp
- removeInternal(key, remoteDeadTimestamp);
+ if (updateInternal(key, value)) {
+ final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
+ notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
}
- }
-
- return externalEvents;
- }
-
- private final class InternalAntiEntropyListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received anti-entropy advertisement from peer: {}",
- message.sender());
- AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
- try {
- if (!underHighLoad()) {
- handleAntiEntropyAdvertisement(advertisement);
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling advertisements", e);
- }
- }
- }
-
- private final class InternalEventListener implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- if (destroyed) {
- return;
- }
-
- log.debug("Received update event from peer: {}", message.sender());
- Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
-
- try {
- // TODO clean this for loop up
- for (AbstractEntry<K, V> entry : events) {
- final K key = entry.key();
- V value;
- final Timestamp timestamp = entry.timestamp();
- final EventuallyConsistentMapEvent.Type type;
- if (entry instanceof PutEntry) {
- PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
- value = putEntry.value();
- type = EventuallyConsistentMapEvent.Type.PUT;
- } else if (entry instanceof RemoveEntry) {
- type = EventuallyConsistentMapEvent.Type.REMOVE;
- value = null;
- } else {
- throw new IllegalStateException("Unknown entry type " + entry.getClass());
- }
-
- boolean success;
- switch (type) {
- case PUT:
- success = putInternal(key, value, timestamp);
- break;
- case REMOVE:
- Optional<V> removedValue = removeInternal(key, timestamp);
- success = removedValue != null;
- if (success) {
- value = removedValue.orElse(null);
- }
- break;
- default:
- success = false;
- }
- if (success) {
- notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
- }
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling put", e);
- }
- }
+ });
}
// TODO pull this into the class if this gets pulled out...
@@ -808,7 +572,7 @@
private static final int DEFAULT_MAX_BATCH_MS = 50;
private static final Timer TIMER = new Timer("onos-ecm-sender-events");
- private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+ private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
private final NodeId peer;
@@ -818,23 +582,21 @@
}
@Override
- public void processItems(List<AbstractEntry<K, V>> items) {
- Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
- items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
- oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
- )
- );
+ public void processItems(List<UpdateEntry<K, V>> items) {
+ Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
+ items.forEach(item -> map.compute(item.key(), (key, existing) ->
+ existing == null || item.compareTo(existing) > 0 ? item : existing));
communicationExecutor.submit(() -> {
- clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+ clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
updateMessageSubject,
serializer::encode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
- log.debug("Failed to send to {}", peer);
+ log.debug("Failed to send to {}", peer, error);
}
});
});
}
}
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
index f803bb8..d1ada8f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
@@ -16,13 +16,10 @@
package org.onosproject.store.ecmap;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Hasher;
import org.mapdb.Serializer;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import java.io.File;
@@ -42,7 +39,6 @@
private final DB database;
private final Map<byte[], byte[]> items;
- private final Map<byte[], byte[]> tombstones;
/**
* Creates a new MapDB based persistent store.
@@ -65,102 +61,32 @@
.valueSerializer(Serializer.BYTE_ARRAY)
.hasher(Hasher.BYTE_ARRAY)
.makeOrGet();
-
- tombstones = database.createHashMap("tombstones")
- .keySerializer(Serializer.BYTE_ARRAY)
- .valueSerializer(Serializer.BYTE_ARRAY)
- .hasher(Hasher.BYTE_ARRAY)
- .makeOrGet();
}
@Override
- public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
+ public void readInto(Map<K, MapValue<V>> items) {
this.items.forEach((keyBytes, valueBytes) ->
items.put(serializer.decode(keyBytes),
- serializer.decode(valueBytes)));
-
- this.tombstones.forEach((keyBytes, valueBytes) ->
- tombstones.put(serializer.decode(keyBytes),
- serializer.decode(valueBytes)));
+ serializer.decode(valueBytes)));
}
@Override
- public void put(K key, V value, Timestamp timestamp) {
- executor.submit(() -> putInternal(key, value, timestamp));
+ public void update(K key, MapValue<V> value) {
+ executor.submit(() -> updateInternal(key, value));
}
- private void putInternal(K key, V value, Timestamp timestamp) {
+ private void updateInternal(K key, MapValue<V> newValue) {
byte[] keyBytes = serializer.encode(key);
- byte[] removedBytes = tombstones.get(keyBytes);
-
- Timestamp removed = removedBytes == null ? null :
- serializer.decode(removedBytes);
- if (removed != null && removed.isNewerThan(timestamp)) {
- return;
- }
-
- final MutableBoolean updated = new MutableBoolean(false);
items.compute(keyBytes, (k, existingBytes) -> {
- Timestamped<V> existing = existingBytes == null ? null :
+ MapValue<V> existing = existingBytes == null ? null :
serializer.decode(existingBytes);
- if (existing != null && existing.isNewerThan(timestamp)) {
- updated.setFalse();
- return existingBytes;
+ if (existing == null || newValue.isNewerThan(existing)) {
+ return serializer.encode(newValue);
} else {
- updated.setTrue();
- return serializer.encode(new Timestamped<>(value, timestamp));
+ return existingBytes;
}
});
-
- boolean success = updated.booleanValue();
-
- if (success && removed != null) {
- tombstones.remove(keyBytes, removedBytes);
- }
-
database.commit();
}
-
- @Override
- public void remove(K key, Timestamp timestamp) {
- executor.submit(() -> removeInternal(key, timestamp));
- }
-
- private void removeInternal(K key, Timestamp timestamp) {
- byte[] keyBytes = serializer.encode(key);
-
- final MutableBoolean updated = new MutableBoolean(false);
-
- items.compute(keyBytes, (k, existingBytes) -> {
- Timestamp existing = existingBytes == null ? null :
- serializer.decode(existingBytes);
- if (existing != null && existing.isNewerThan(timestamp)) {
- updated.setFalse();
- return existingBytes;
- } else {
- updated.setTrue();
- // remove from items map
- return null;
- }
- });
-
- if (!updated.booleanValue()) {
- return;
- }
-
- byte[] timestampBytes = serializer.encode(timestamp);
- byte[] removedBytes = tombstones.get(keyBytes);
-
- Timestamp removedTimestamp = removedBytes == null ? null :
- serializer.decode(removedBytes);
- if (removedTimestamp == null) {
- tombstones.putIfAbsent(keyBytes, timestampBytes);
- } else if (timestamp.isNewerThan(removedTimestamp)) {
- tombstones.replace(keyBytes, removedBytes, timestampBytes);
- }
-
- database.commit();
- }
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java
new file mode 100644
index 0000000..9225561
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java
@@ -0,0 +1,99 @@
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a value in EventuallyConsistentMap.
+ *
+ * @param <V> value type
+ */
+public class MapValue<V> implements Comparable<MapValue<V>> {
+ private final Timestamp timestamp;
+ private final V value;
+
+ public MapValue(V value, Timestamp timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public boolean isTombstone() {
+ return value == null;
+ }
+
+ public boolean isAlive() {
+ return value != null;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ public V get() {
+ return value;
+ }
+
+ @Override
+ public int compareTo(MapValue<V> o) {
+ return this.timestamp.compareTo(o.timestamp);
+ }
+
+ public boolean isNewerThan(MapValue<V> other) {
+ return timestamp.isNewerThan(other.timestamp);
+ }
+
+ public boolean isNewerThan(Timestamp timestamp) {
+ return timestamp.isNewerThan(timestamp);
+ }
+
+ public Digest digest() {
+ return new Digest(timestamp, isTombstone());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("timestamp", timestamp)
+ .add("value", value)
+ .toString();
+ }
+
+ @SuppressWarnings("unused")
+ private MapValue() {
+ this.timestamp = null;
+ this.value = null;
+ }
+
+ /**
+ * Digest or summary of a MapValue for use during Anti-Entropy exchanges.
+ */
+ public static class Digest {
+ private final Timestamp timestamp;
+ private final boolean isTombstone;
+
+ public Digest(Timestamp timestamp, boolean isTombstone) {
+ this.timestamp = timestamp;
+ this.isTombstone = isTombstone;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ public boolean isTombstone() {
+ return isTombstone;
+ }
+
+ public boolean isNewerThan(Digest other) {
+ return timestamp.isNewerThan(other.timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("timestamp", timestamp)
+ .add("isTombstone", isTombstone)
+ .toString();
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
index b945f93..302f7f8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
@@ -16,9 +16,6 @@
package org.onosproject.store.ecmap;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.Timestamped;
-
import java.util.Map;
/**
@@ -30,24 +27,14 @@
* Read the contents of the disk into the given maps.
*
* @param items items map
- * @param tombstones tombstones map
*/
- void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones);
+ void readInto(Map<K, MapValue<V>> items);
/**
- * Puts a new key,value pair into the map on disk.
+ * Updates a key,value pair in the persistent store.
*
* @param key the key
* @param value the value
- * @param timestamp the timestamp of the update
*/
- void put(K key, V value, Timestamp timestamp);
-
- /**
- * Removes a key from the map on disk.
- *
- * @param key the key
- * @param timestamp the timestamp of the update
- */
- void remove(K key, Timestamp timestamp);
+ void update(K key, MapValue<V> value);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
deleted file mode 100644
index 18b0986..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.store.Timestamp;
-
-/**
- * Describes a single remove event in an EventuallyConsistentMap.
- */
-final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
- /**
- * Creates a new remove entry.
- *
- * @param key key of the entry
- * @param timestamp timestamp of the remove event
- */
- public RemoveEntry(K key, Timestamp timestamp) {
- super(key, timestamp);
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private RemoveEntry() {
- super();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("key", key())
- .add("timestamp", timestamp())
- .toString();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
similarity index 66%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
rename to core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
index ddb4ae9..41eb3a2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
@@ -15,34 +15,35 @@
*/
package org.onosproject.store.ecmap;
-import com.google.common.base.MoreObjects;
-import org.onosproject.store.Timestamp;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.MoreObjects;
+
/**
- * Describes a single put event in an EventuallyConsistentMap.
+ * Describes a single update event in an EventuallyConsistentMap.
*/
-final class PutEntry<K, V> extends AbstractEntry<K, V> {
- private final V value;
+final class UpdateEntry<K, V> implements Comparable<UpdateEntry<K, V>> {
+ private final K key;
+ private final MapValue<V> value;
/**
- * Creates a new put entry.
+ * Creates a new update entry.
*
* @param key key of the entry
* @param value value of the entry
- * @param timestamp timestamp of the put event
*/
- public PutEntry(K key, V value, Timestamp timestamp) {
- super(key, timestamp);
+ public UpdateEntry(K key, MapValue<V> value) {
+ this.key = checkNotNull(key);
this.value = checkNotNull(value);
}
- // Needed for serialization.
- @SuppressWarnings("unused")
- private PutEntry() {
- super();
- this.value = null;
+ /**
+ * Returns the key.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
}
/**
@@ -50,16 +51,26 @@
*
* @return the value
*/
- public V value() {
+ public MapValue<V> value() {
return value;
}
@Override
+ public int compareTo(UpdateEntry<K, V> o) {
+ return this.value.timestamp().compareTo(o.value.timestamp());
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key())
.add("value", value)
- .add("timestamp", timestamp())
.toString();
}
+
+ @SuppressWarnings("unused")
+ private UpdateEntry() {
+ this.key = null;
+ this.value = null;
+ }
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 28be8dc..57943ad 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,8 +16,8 @@
package org.onosproject.store.ecmap;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
@@ -32,7 +32,6 @@
import org.onosproject.event.AbstractEvent;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
@@ -44,11 +43,13 @@
import org.onosproject.store.service.EventuallyConsistentMapListener;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -89,8 +90,8 @@
private final ControllerNode self =
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
- private ClusterMessageHandler updateHandler;
- private ClusterMessageHandler antiEntropyHandler;
+ private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+ private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
/*
* Serialization is a bit tricky here. We need to serialize in the tests
@@ -109,11 +110,10 @@
// Below is the classes that the map internally registers
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
- .register(PutEntry.class)
- .register(RemoveEntry.class)
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
+ .register(Optional.class)
.build();
}
};
@@ -131,9 +131,9 @@
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
- clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
- expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
+ clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
replay(clusterCommunicator);
@@ -237,15 +237,15 @@
assertEquals(VALUE1, ecMap.get(KEY1));
// Remote put
- ClusterMessage message
- = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
+ List<UpdateEntry<String, String>> message
+ = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
// Create a latch so we know when the put operation has finished
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
assertNull(ecMap.get(KEY2));
- updateHandler.handle(message);
+ updateHandler.accept(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertEquals(VALUE2, ecMap.get(KEY2));
@@ -255,14 +255,13 @@
assertNull(ecMap.get(KEY2));
// Remote remove
- ClusterMessage removeMessage
- = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
+ message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
// Create a latch so we know when the remove operation has finished
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
- updateHandler.handle(removeMessage);
+ updateHandler.accept(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertNull(ecMap.get(KEY1));
@@ -601,49 +600,35 @@
}
}
- private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
- PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(Lists.newArrayList(event)));
+ private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
+ return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
}
- private List<PutEntry<String, String>> generatePutMessage(
+ private List<UpdateEntry<String, String>> generatePutMessage(
String key1, String value1, String key2, String value2) {
- ArrayList<PutEntry<String, String>> list = new ArrayList<>();
+ List<UpdateEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
- PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
-
- list.add(pe1);
- list.add(pe2);
+ list.add(generatePutMessage(key1, value1, timestamp1));
+ list.add(generatePutMessage(key2, value2, timestamp2));
return list;
}
- private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
- RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(Lists.newArrayList(event)));
+ private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
+ return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
}
- private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
- ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
+ private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
+ List<UpdateEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
- RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
-
- list.add(re1);
- list.add(re2);
+ list.add(generateRemoveMessage(key1, timestamp1));
+ list.add(generateRemoveMessage(key2, timestamp2));
return list;
}
@@ -737,13 +722,6 @@
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = subscriber;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = subscriber;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
}
@Override
@@ -793,6 +771,13 @@
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder, Consumer<M> handler,
Executor executor) {
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+ } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+ antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+ } else {
+ throw new RuntimeException("Unexpected message subject " + subject.toString());
+ }
}
}