Updates to ECM interface
Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209
Adding origin to IntentData and use it to pick GossipIntentStore peer
Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -51,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
+ private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
private ExecutorService broadcastMessageExecutor;
@@ -140,14 +141,18 @@
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
serializer = createSerializer(checkNotNull(serializerBuilder));
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
new InternalAntiEntropyListener(), backgroundExecutor);
}
+ /**
+ * Creates a new eventually consistent map shared amongst multiple instances.
+ * <p>
+ * Take a look at the other constructor for usage information. The only difference
+ * is that a BiFunction is provided that returns all nodes in the cluster, so
+ * all nodes will be sent write updates immediately.
+ * </p>
+ *
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K
+ */
+ public EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService) {
+ this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+ (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList()));
+ }
+
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
@@ -270,11 +303,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (putInternal(key, value, timestamp)) {
- notifyPeers(new InternalPutEvent<>(key, value, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key, value));
}
}
@@ -318,11 +350,10 @@
Timestamp timestamp = clockService.getTimestamp(key, null);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, null));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
}
@@ -364,11 +395,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, value));
}
}
@@ -393,7 +423,7 @@
}
if (!updates.isEmpty()) {
- notifyPeers(new InternalPutEvent<>(updates));
+ broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
}
if (!removed.isEmpty()) {
- notifyPeers(new InternalRemoveEvent<>(removed));
+ broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
for (RemoveEntry<K> entry : removed) {
EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
}
}
- private void notifyPeers(InternalPutEvent event) {
+ private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
- broadcastMessage(updateMessageSubject, event);
+ multicastMessage(updateMessageSubject, event, peers);
}
- private void notifyPeers(InternalRemoveEvent event) {
+ private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
- broadcastMessage(removeMessageSubject, event);
+ multicastMessage(removeMessageSubject, event, peers);
+ }
+
+ private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+ // FIXME can we parallelize the serialization... use the caller???
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ serializer.encode(event));
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+// clusterCommunicator.broadcast(message);
}
private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
// clusterCommunicator.broadcast(message);
}
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
+ private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
- clusterCommunicator.unicast(message, peer);
+// clusterCommunicator.unicast(message, peer);
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
private boolean underHighLoad() {
@@ -567,11 +606,9 @@
AntiEntropyAdvertisement<K> ad = createAdvertisement();
- try {
- unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
- } catch (IOException e) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
+ // TODO check the return value?
+ unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+ // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- try {
- unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
- } catch (IOException e) {
- log.debug(
- "Failed to send reactive anti-entropy advertisement to {}",
- sender);
- }
-
+ // TODO check the return value?
+ unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+ // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
break;
}
}
@@ -669,12 +701,10 @@
// Send all updates to the peer at once
if (!updatesToSend.isEmpty()) {
- try {
- unicastMessage(sender, updateMessageSubject,
- new InternalPutEvent<>(updatesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value?
+ unicastMessage(sender, updateMessageSubject,
+ new InternalPutEvent<>(updatesToSend));
+ //error log: log.warn("Failed to send advertisement response", e);
}
return externalEvents;
@@ -707,12 +737,10 @@
// Send all removes to the peer at once
if (!removesToSend.isEmpty()) {
- try {
- unicastMessage(sender, removeMessageSubject,
- new InternalRemoveEvent<>(removesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value
+ unicastMessage(sender, removeMessageSubject,
+ new InternalRemoveEvent<>(removesToSend));
+ // error log: log.warn("Failed to send advertisement response", e);
}
}