adding sender-side accumulator to ecmap
Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
new file mode 100644
index 0000000..4a87b41
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for events in an EventuallyConsistentMap.
+ */
+public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
+ private final K key;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a new put entry.
+ *
+ * @param key key of the entry
+ * @param timestamp timestamp of the put event
+ */
+ public AbstractEntry(K key, Timestamp timestamp) {
+ this.key = checkNotNull(key);
+ this.timestamp = checkNotNull(timestamp);
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ protected AbstractEntry() {
+ this.key = null;
+ this.timestamp = null;
+ }
+
+ /**
+ * Returns the key of the entry.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the timestamp of the event.
+ *
+ * @return the timestamp
+ */
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int compareTo(AbstractEntry<K, V> o) {
+ return this.timestamp.compareTo(o.timestamp);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8f99d0e..c2c46fc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -15,9 +15,13 @@
*/
package org.onosproject.store.ecmap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
@@ -42,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -78,7 +83,6 @@
private final ClockService<K, V> clockService;
private final MessageSubject updateMessageSubject;
- private final MessageSubject removeMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
@@ -87,9 +91,10 @@
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
- private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
+ private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
- private ExecutorService broadcastMessageExecutor;
+ private ExecutorService communicationExecutor;
+ private Map<NodeId, EventAccumulator> senderPending;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -149,7 +154,7 @@
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService,
- BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
@@ -168,27 +173,23 @@
// sending executor; should be capped
//TODO make # of threads configurable
- broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
- newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
+ //TODO this probably doesn't need to be bounded anymore
+ communicationExecutor =
+ newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+ senderPending = Maps.newConcurrentMap();
backgroundExecutor =
- //FIXME anti-entropy can take >60 seconds and it blocks fg workers
- // ... dropping minPriority to try to help until this can be parallel
- newSingleThreadScheduledExecutor(//minPriority(
- groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
+ newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
// start anti-entropy thread
- //TODO disable anti-entropy for now in testing (it is unstable)
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalPutEventListener(), executor);
- removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
- clusterCommunicator.addSubscriber(removeMessageSubject,
- new InternalRemoveEventListener(), executor);
+ new InternalEventListener(), executor);
+
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
new InternalAntiEntropyListener(), backgroundExecutor);
@@ -232,8 +233,6 @@
.register(PutEntry.class)
.register(RemoveEntry.class)
.register(ArrayList.class)
- .register(InternalPutEvent.class)
- .register(InternalRemoveEvent.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.build();
@@ -250,7 +249,7 @@
*/
public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
checkNotNull(executor, "Null executor");
- broadcastMessageExecutor = executor;
+ communicationExecutor = executor;
return this;
}
@@ -303,7 +302,7 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (putInternal(key, value, timestamp)) {
- notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+ notifyPeers(new PutEntry<>(key, value, timestamp),
peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key, value));
@@ -350,7 +349,7 @@
Timestamp timestamp = clockService.getTimestamp(key, null);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ notifyPeers(new RemoveEntry<>(key, timestamp),
peerUpdateFunction.apply(key, null));
notifyListeners(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
@@ -395,7 +394,7 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ notifyPeers(new RemoveEntry<>(key, timestamp),
peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, value));
@@ -405,75 +404,24 @@
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(!destroyed, destroyedMessage);
-
- List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
-
- for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
- K key = entry.getKey();
- V value = entry.getValue();
-
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
-
- Timestamp timestamp = clockService.getTimestamp(key, value);
-
- if (putInternal(key, value, timestamp)) {
- updates.add(new PutEntry<>(key, value, timestamp));
- }
- }
-
- if (!updates.isEmpty()) {
- broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
-
- for (PutEntry<K, V> entry : updates) {
- EventuallyConsistentMapEvent<K, V> externalEvent =
- new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, entry.key(),
- entry.value());
- notifyListeners(externalEvent);
- }
- }
+ m.forEach(this::put);
}
@Override
public void clear() {
checkState(!destroyed, destroyedMessage);
-
- List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
-
- for (K key : items.keySet()) {
- // TODO also this is not applicable if value is important for timestamp?
- Timestamp timestamp = clockService.getTimestamp(key, null);
-
- if (removeInternal(key, timestamp)) {
- removed.add(new RemoveEntry<>(key, timestamp));
- }
- }
-
- if (!removed.isEmpty()) {
- broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
-
- for (RemoveEntry<K> entry : removed) {
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
- null);
- notifyListeners(externalEvent);
- }
- }
+ items.forEach((key, value) -> remove(key));
}
@Override
public Set<K> keySet() {
checkState(!destroyed, destroyedMessage);
-
return items.keySet();
}
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
-
return items.values().stream()
.map(Timestamped::value)
.collect(Collectors.toList());
@@ -508,12 +456,11 @@
executor.shutdown();
backgroundExecutor.shutdown();
- broadcastMessageExecutor.shutdown();
+ communicationExecutor.shutdown();
listeners.clear();
clusterCommunicator.removeSubscriber(updateMessageSubject);
- clusterCommunicator.removeSubscriber(removeMessageSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
}
@@ -523,45 +470,32 @@
}
}
- private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
- // FIXME extremely memory expensive when we are overrun
-// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
- multicastMessage(updateMessageSubject, event, peers);
+ private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+ queueUpdate(event, peers);
}
- private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
- // FIXME extremely memory expensive when we are overrun
-// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
- multicastMessage(removeMessageSubject, event, peers);
+ private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
+ queueUpdate(event, peers);
}
- private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
- // FIXME can we parallelize the serialization... use the caller???
+ private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+ if (peers == null) {
+ // we have no friends :(
+ return;
+ }
+ peers.forEach(node ->
+ senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+ );
+ }
+
+ private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
- broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
-// clusterCommunicator.broadcast(message);
- }
-
- private void broadcastMessage(MessageSubject subject, Object event) {
- // FIXME can we parallelize the serialization... use the caller???
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- serializer.encode(event));
- broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
-// clusterCommunicator.broadcast(message);
- }
-
- private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- serializer.encode(event));
-// clusterCommunicator.unicast(message, peer);
- broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
+ return clusterCommunicator.unicast(message, peer);
+ // Note: we had this flipped before...
+// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
private boolean underHighLoad() {
@@ -606,9 +540,9 @@
AntiEntropyAdvertisement<K> ad = createAdvertisement();
- // TODO check the return value?
- unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
- // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
+ if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
+ log.debug("Failed to send anti-entropy advertisement to {}", peer);
+ }
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -644,9 +578,9 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- // TODO check the return value?
- unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
- // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+ if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
+ log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+ }
break;
}
}
@@ -670,8 +604,6 @@
= new LinkedList<>();
final NodeId sender = ad.sender();
- final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
-
for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
K key = item.getKey();
Timestamped<V> localValue = item.getValue();
@@ -683,9 +615,8 @@
if (remoteTimestamp == null || localValue
.isNewerThan(remoteTimestamp)) {
// local value is more recent, push to sender
- updatesToSend
- .add(new PutEntry<>(key, localValue.value(),
- localValue.timestamp()));
+ queueUpdate(new PutEntry<>(key, localValue.value(),
+ localValue.timestamp()), ImmutableList.of(sender));
}
Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
@@ -699,14 +630,6 @@
}
}
- // Send all updates to the peer at once
- if (!updatesToSend.isEmpty()) {
- // TODO check the return value?
- unicastMessage(sender, updateMessageSubject,
- new InternalPutEvent<>(updatesToSend));
- //error log: log.warn("Failed to send advertisement response", e);
- }
-
return externalEvents;
}
@@ -720,8 +643,6 @@
private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
final NodeId sender = ad.sender();
- final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
-
for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
K key = dead.getKey();
Timestamp localDeadTimestamp = dead.getValue();
@@ -730,18 +651,9 @@
if (remoteLiveTimestamp != null
&& localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
// sender has zombie, push remove
- removesToSend
- .add(new RemoveEntry<>(key, localDeadTimestamp));
+ queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
}
}
-
- // Send all removes to the peer at once
- if (!removesToSend.isEmpty()) {
- // TODO check the return value
- unicastMessage(sender, removeMessageSubject,
- new InternalRemoveEvent<>(removesToSend));
- // error log: log.warn("Failed to send advertisement response", e);
- }
}
/**
@@ -800,25 +712,44 @@
}
}
- private final class InternalPutEventListener implements
+ private final class InternalEventListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
- log.debug("Received put event from peer: {}", message.sender());
- InternalPutEvent<K, V> event = serializer.decode(message.payload());
+ log.debug("Received update event from peer: {}", message.sender());
+ Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
try {
- for (PutEntry<K, V> entry : event.entries()) {
- K key = entry.key();
- V value = entry.value();
- Timestamp timestamp = entry.timestamp();
+ // TODO clean this for loop up
+ for (AbstractEntry<K, V> entry : events) {
+ final K key = entry.key();
+ final V value;
+ final Timestamp timestamp = entry.timestamp();
+ final EventuallyConsistentMapEvent.Type type;
+ if (entry instanceof PutEntry) {
+ PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
+ value = putEntry.value();
+ type = EventuallyConsistentMapEvent.Type.PUT;
+ } else if (entry instanceof RemoveEntry) {
+ type = EventuallyConsistentMapEvent.Type.REMOVE;
+ value = null;
+ } else {
+ throw new IllegalStateException("Unknown entry type " + entry.getClass());
+ }
- if (putInternal(key, value, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent =
- new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key,
- value);
- notifyListeners(externalEvent);
+ boolean success;
+ switch (type) {
+ case PUT:
+ success = putInternal(key, value, timestamp);
+ break;
+ case REMOVE:
+ success = removeInternal(key, timestamp);
+ break;
+ default:
+ success = false;
+ }
+ if (success) {
+ notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
}
}
} catch (Exception e) {
@@ -827,29 +758,35 @@
}
}
- private final class InternalRemoveEventListener implements
- ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received remove event from peer: {}", message.sender());
- InternalRemoveEvent<K> event = serializer.decode(message.payload());
- try {
- for (RemoveEntry<K> entry : event.entries()) {
- K key = entry.key();
- Timestamp timestamp = entry.timestamp();
+ // TODO pull this into the class if this gets pulled out...
+ private static final int DEFAULT_MAX_EVENTS = 1000;
+ private static final int DEFAULT_MAX_IDLE_MS = 10;
+ private static final int DEFAULT_MAX_BATCH_MS = 50;
+ private static final Timer TIMER = new Timer("onos-ecm-sender-events");
- if (removeInternal(key, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE,
- key, null);
- notifyListeners(externalEvent);
- }
+ private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+
+ private final NodeId peer;
+
+ private EventAccumulator(NodeId peer) {
+ super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
+ this.peer = peer;
+ }
+
+ @Override
+ public void processItems(List<AbstractEntry<K, V>> items) {
+ Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
+ items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
+ oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
+ )
+ );
+ communicationExecutor.submit(() -> {
+ try {
+ unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
+ } catch (Exception e) {
+ log.warn("broadcast error", e);
}
- } catch (Exception e) {
- log.warn("Exception thrown handling remove", e);
- }
+ });
}
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java
deleted file mode 100644
index 950d320..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.collect.ImmutableList;
-import org.onosproject.store.Timestamp;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Internal inter-instance event used by EventuallyConsistentMap for PUT events.
- */
-final class InternalPutEvent<K, V> {
- private final List<PutEntry<K, V>> entries;
-
- /**
- * Creates a put event for a single key.
- *
- * @param key key the event concerns
- * @param value value of the key
- * @param timestamp timestamp of the event
- */
- public InternalPutEvent(K key, V value, Timestamp timestamp) {
- entries = ImmutableList.of(new PutEntry<>(key, value, timestamp));
- }
-
- /**
- * Creates a put event for multiple keys.
- *
- * @param entries list of put entries to send an event for
- */
- public InternalPutEvent(List<PutEntry<K, V>> entries) {
- this.entries = checkNotNull(entries);
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private InternalPutEvent() {
- entries = null;
- }
-
- /**
- * Returns the list of put entries this event concerns.
- *
- * @return list of put entries
- */
- public List<PutEntry<K, V>> entries() {
- return entries;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java
deleted file mode 100644
index aa9a78e1..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.collect.ImmutableList;
-import org.onosproject.store.Timestamp;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Internal inter-instance event used by EventuallyConsistentMap for REMOVE
- * events.
- */
-final class InternalRemoveEvent<K> {
- private final List<RemoveEntry<K>> entries;
-
- /**
- * Creates a remove event for a single key.
- *
- * @param key key the event concerns
- * @param timestamp timestamp of the event
- */
- public InternalRemoveEvent(K key, Timestamp timestamp) {
- entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
- }
-
- /**
- * Creates a remove event for multiple keys.
- *
- * @param entries list of remove entries to send an event for
- */
- public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
- this.entries = checkNotNull(entries);
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private InternalRemoveEvent() {
- entries = null;
- }
-
- /**
- * Returns the list of remove entries this event concerns.
- *
- * @return list of remove entries
- */
- public List<RemoveEntry<K>> entries() {
- return entries;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
index bb86961..ddb4ae9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
@@ -23,10 +23,8 @@
/**
* Describes a single put event in an EventuallyConsistentMap.
*/
-final class PutEntry<K, V> {
- private final K key;
+final class PutEntry<K, V> extends AbstractEntry<K, V> {
private final V value;
- private final Timestamp timestamp;
/**
* Creates a new put entry.
@@ -36,26 +34,15 @@
* @param timestamp timestamp of the put event
*/
public PutEntry(K key, V value, Timestamp timestamp) {
- this.key = checkNotNull(key);
+ super(key, timestamp);
this.value = checkNotNull(value);
- this.timestamp = checkNotNull(timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private PutEntry() {
- this.key = null;
+ super();
this.value = null;
- this.timestamp = null;
- }
-
- /**
- * Returns the key of the entry.
- *
- * @return the key
- */
- public K key() {
- return key;
}
/**
@@ -67,21 +54,12 @@
return value;
}
- /**
- * Returns the timestamp of the event.
- *
- * @return the timestamp
- */
- public Timestamp timestamp() {
- return timestamp;
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("key", key)
+ .add("key", key())
.add("value", value)
- .add("timestamp", timestamp)
+ .add("timestamp", timestamp())
.toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
index 7d34796..18b0986 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
@@ -18,15 +18,10 @@
import com.google.common.base.MoreObjects;
import org.onosproject.store.Timestamp;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Describes a single remove event in an EventuallyConsistentMap.
*/
-final class RemoveEntry<K> {
- private final K key;
- private final Timestamp timestamp;
-
+final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
/**
* Creates a new remove entry.
*
@@ -34,40 +29,20 @@
* @param timestamp timestamp of the remove event
*/
public RemoveEntry(K key, Timestamp timestamp) {
- this.key = checkNotNull(key);
- this.timestamp = checkNotNull(timestamp);
+ super(key, timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private RemoveEntry() {
- this.key = null;
- this.timestamp = null;
- }
-
- /**
- * Returns the key of the entry.
- *
- * @return the key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the timestamp of the event.
- *
- * @return the timestamp
- */
- public Timestamp timestamp() {
- return timestamp;
+ super();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("key", key)
- .add("timestamp", timestamp)
+ .add("key", key())
+ .add("timestamp", timestamp())
.toString();
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 5ed6384..f7c9323 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
@@ -67,10 +68,8 @@
private SequentialClockService<String, String> clockService;
private static final String MAP_NAME = "test";
- private static final MessageSubject PUT_MESSAGE_SUBJECT
+ private static final MessageSubject UPDATE_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-update");
- private static final MessageSubject REMOVE_MESSAGE_SUBJECT
- = new MessageSubject("ecm-" + MAP_NAME + "-remove");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
@@ -82,8 +81,7 @@
private final ControllerNode self =
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
- private ClusterMessageHandler putHandler;
- private ClusterMessageHandler removeHandler;
+ private ClusterMessageHandler updateHandler;
private ClusterMessageHandler antiEntropyHandler;
/*
@@ -105,8 +103,6 @@
.register(PutEntry.class)
.register(RemoveEntry.class)
.register(ArrayList.class)
- .register(InternalPutEvent.class)
- .register(InternalRemoveEvent.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.build();
@@ -237,7 +233,7 @@
ecMap.addListener(new TestListener(latch));
assertNull(ecMap.get(KEY2));
- putHandler.handle(message);
+ updateHandler.handle(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertEquals(VALUE2, ecMap.get(KEY2));
@@ -254,7 +250,7 @@
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
- removeHandler.handle(removeMessage);
+ updateHandler.handle(removeMessage);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertNull(ecMap.get(KEY1));
@@ -568,8 +564,7 @@
@Test
public void testDestroy() throws Exception {
- clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
- clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
replay(clusterCommunicator);
@@ -594,12 +589,11 @@
}
private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
- InternalPutEvent<String, String> event =
- new InternalPutEvent<>(key, value, timestamp);
+ PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
return new ClusterMessage(
- clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(Lists.newArrayList(event)));
}
private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
@@ -614,38 +608,35 @@
list.add(pe1);
list.add(pe2);
- InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
return new ClusterMessage(
- clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(list));
}
private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
- InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
+ RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
return new ClusterMessage(
- clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(Lists.newArrayList(event)));
}
private ClusterMessage generateRemoveMessage(String key1, String key2) {
- ArrayList<RemoveEntry<String>> list = new ArrayList<>();
+ ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
- RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
+ RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
+ RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
list.add(re1);
list.add(re2);
- InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
-
return new ClusterMessage(
- clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(list));
}
/**
@@ -655,10 +646,14 @@
* @param m message we expect to be sent
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private static void expectSpecificBroadcastMessage(ClusterMessage m,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(m)).andReturn(true);
+// expect(clusterCommunicator.broadcast(m)).andReturn(true);
+ expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+ .andReturn(true)
+ .anyTimes();
replay(clusterCommunicator);
}
@@ -669,10 +664,14 @@
* @param m message we expect to be sent
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private static void expectSpecificMulticastMessage(ClusterMessage m,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+ expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+ .andReturn(true)
+ .anyTimes();
replay(clusterCommunicator);
}
@@ -684,10 +683,13 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
- anyObject(Iterable.class)))
+// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+// anyObject(Iterable.class)))
+ expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
+ anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -700,9 +702,13 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+// .andReturn(true)
+// .anyTimes();
+ expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -747,10 +753,8 @@
@Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber) {
- if (subject.equals(PUT_MESSAGE_SUBJECT)) {
- putHandler = subscriber;
- } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
- removeHandler = subscriber;
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = subscriber;
} else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = subscriber;
} else {
@@ -762,10 +766,8 @@
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
- if (subject.equals(PUT_MESSAGE_SUBJECT)) {
- putHandler = subscriber;
- } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
- removeHandler = subscriber;
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = subscriber;
} else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = subscriber;
} else {