Updates to ECM interface
Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209
Adding origin to IntentData and use it to pick GossipIntentStore peer
Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java b/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
index 0820958..ef42c32 100644
--- a/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
@@ -84,14 +84,13 @@
Key key = Key.of(new BigInteger(id, 16).longValue(), appId);
Intent intent = intentService.getIntent(key);
-
if (intent != null) {
// set up latch and listener to track uninstall progress
CountDownLatch latch = new CountDownLatch(1);
IntentListener listener = (IntentEvent event) -> {
if (Objects.equals(event.subject().key(), key) &&
- (event.type() == IntentEvent.Type.WITHDRAWN
- || event.type() == IntentEvent.Type.WITHDRAWN)) {
+ (event.type() == IntentEvent.Type.WITHDRAWN ||
+ event.type() == IntentEvent.Type.FAILED)) {
latch.countDown();
}
};
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
index f1776a9..1bfa0dc 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
@@ -17,6 +17,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import java.util.List;
@@ -32,6 +33,7 @@
private IntentState state;
private Timestamp version;
+ private NodeId origin;
private List<Intent> installables;
@@ -61,6 +63,19 @@
return version;
}
+ /**
+ * Sets the origin, which is the node that created the instance.
+ *
+ * @param origin origin instance
+ */
+ public void setOrigin(NodeId origin) {
+ this.origin = origin;
+ }
+
+ public NodeId origin() {
+ return origin;
+ }
+
public void setState(IntentState newState) {
this.state = newState;
}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index c33d2ea..cbf2398 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -16,11 +16,9 @@
package org.onosproject.store.cluster.messaging;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.onosproject.cluster.NodeId;
import java.io.IOException;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
// TODO: remove IOExceptions?
@@ -51,9 +49,8 @@
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent successfully; false otherwise.
- * @throws IOException when I/O exception of some sort has occurred
*/
- boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException;
+ boolean unicast(ClusterMessage message, NodeId toNodeId);
/**
* Multicast a message to a set of controller nodes.
@@ -62,7 +59,7 @@
* @param nodeIds recipient node identifiers
* @return true if the message was sent successfully to all nodes in the group; false otherwise.
*/
- boolean multicast(ClusterMessage message, Set<NodeId> nodeIds);
+ boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
/**
* Sends a message synchronously.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 59671c0..42a79b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,7 +16,6 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -39,7 +38,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
@@ -107,7 +105,7 @@
}
@Override
- public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
+ public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = message.getBytes();
@@ -120,8 +118,8 @@
}
@Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
- return unicast(message.subject(), message.getBytes(), toNodeId);
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -137,7 +135,6 @@
}
}
-
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
try {
return unicast(subject, payload, toNodeId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 56d01a0..06a60d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -305,14 +304,13 @@
ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
SERIALIZER.encode(deviceInjectedEvent));
- try {
- clusterCommunicator.unicast(clusterMessage, deviceNode);
- } catch (IOException e) {
- log.warn("Failed to process injected device id: {} desc: {} " +
- "(cluster messaging failed: {})",
- deviceId, deviceDescription, e);
- }
-
+ // TODO check unicast return value
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ /* error log:
+ log.warn("Failed to process injected device id: {} desc: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, deviceDescription, e);
+ */
}
return deviceEvent;
@@ -556,13 +554,14 @@
PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
ClusterMessage clusterMessage = new ClusterMessage(
localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
- try {
- clusterCommunicator.unicast(clusterMessage, deviceNode);
- } catch (IOException e) {
- log.warn("Failed to process injected ports of device id: {} " +
- "(cluster messaging failed: {})",
- deviceId, e);
- }
+
+ //TODO check unicast return value
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ /* error log:
+ log.warn("Failed to process injected ports of device id: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, e);
+ */
}
return deviceEvents == null ? Collections.emptyList() : deviceEvents;
@@ -842,13 +841,13 @@
DEVICE_REMOVE_REQ,
SERIALIZER.encode(deviceId));
- try {
- clusterCommunicator.unicast(message, master);
- } catch (IOException e) {
- log.error("Failed to forward {} remove request to {}", deviceId, master, e);
- }
+ // TODO check unicast return value
+ clusterCommunicator.unicast(message, master);
+ /* error log:
+ log.error("Failed to forward {} remove request to {}", deviceId, master, e);
+ */
- // event will be triggered after master processes it.
+ // event will be triggered after master processes it.
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -51,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
+ private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
private ExecutorService broadcastMessageExecutor;
@@ -140,14 +141,18 @@
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
serializer = createSerializer(checkNotNull(serializerBuilder));
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
new InternalAntiEntropyListener(), backgroundExecutor);
}
+ /**
+ * Creates a new eventually consistent map shared amongst multiple instances.
+ * <p>
+ * Take a look at the other constructor for usage information. The only difference
+ * is that a BiFunction is provided that returns all nodes in the cluster, so
+ * all nodes will be sent write updates immediately.
+ * </p>
+ *
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K
+ */
+ public EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService) {
+ this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+ (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList()));
+ }
+
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
@@ -270,11 +303,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (putInternal(key, value, timestamp)) {
- notifyPeers(new InternalPutEvent<>(key, value, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key, value));
}
}
@@ -318,11 +350,10 @@
Timestamp timestamp = clockService.getTimestamp(key, null);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, null));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
}
@@ -364,11 +395,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, value));
}
}
@@ -393,7 +423,7 @@
}
if (!updates.isEmpty()) {
- notifyPeers(new InternalPutEvent<>(updates));
+ broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
}
if (!removed.isEmpty()) {
- notifyPeers(new InternalRemoveEvent<>(removed));
+ broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
for (RemoveEntry<K> entry : removed) {
EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
}
}
- private void notifyPeers(InternalPutEvent event) {
+ private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
- broadcastMessage(updateMessageSubject, event);
+ multicastMessage(updateMessageSubject, event, peers);
}
- private void notifyPeers(InternalRemoveEvent event) {
+ private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
- broadcastMessage(removeMessageSubject, event);
+ multicastMessage(removeMessageSubject, event, peers);
+ }
+
+ private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+ // FIXME can we parallelize the serialization... use the caller???
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ serializer.encode(event));
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+// clusterCommunicator.broadcast(message);
}
private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
// clusterCommunicator.broadcast(message);
}
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
+ private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
- clusterCommunicator.unicast(message, peer);
+// clusterCommunicator.unicast(message, peer);
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
private boolean underHighLoad() {
@@ -567,11 +606,9 @@
AntiEntropyAdvertisement<K> ad = createAdvertisement();
- try {
- unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
- } catch (IOException e) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
+ // TODO check the return value?
+ unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+ // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- try {
- unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
- } catch (IOException e) {
- log.debug(
- "Failed to send reactive anti-entropy advertisement to {}",
- sender);
- }
-
+ // TODO check the return value?
+ unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+ // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
break;
}
}
@@ -669,12 +701,10 @@
// Send all updates to the peer at once
if (!updatesToSend.isEmpty()) {
- try {
- unicastMessage(sender, updateMessageSubject,
- new InternalPutEvent<>(updatesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value?
+ unicastMessage(sender, updateMessageSubject,
+ new InternalPutEvent<>(updatesToSend));
+ //error log: log.warn("Failed to send advertisement response", e);
}
return externalEvents;
@@ -707,12 +737,10 @@
// Send all removes to the peer at once
if (!removesToSend.isEmpty()) {
- try {
- unicastMessage(sender, removeMessageSubject,
- new InternalRemoveEvent<>(removesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value
+ unicastMessage(sender, removeMessageSubject,
+ new InternalRemoveEvent<>(removesToSend));
+ // error log: log.warn("Failed to send advertisement response", e);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 80f2914..450bb3d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -29,6 +29,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
@@ -138,7 +139,8 @@
private ExecutorService messageHandlingExecutor;
private final ExecutorService backupExecutors =
- Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+ BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+ //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
private boolean syncBackup = false;
@@ -385,12 +387,8 @@
SERIALIZER.encode(operation));
- try {
-
- clusterCommunicator.unicast(message, replicaInfo.master().get());
-
- } catch (IOException e) {
- log.warn("Failed to storeBatch: {}", e.getMessage());
+ if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
+ log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
@@ -401,7 +399,6 @@
new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
-
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -576,15 +573,13 @@
if (nodeId == null) {
notifyDelegate(event);
} else {
- try {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOTE_APPLY_COMPLETED,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, nodeId);
- } catch (IOException e) {
- log.warn("Failed to respond to peer for batch operation result");
- }
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ REMOTE_APPLY_COMPLETED,
+ SERIALIZER.encode(event));
+ // TODO check unicast return value
+ clusterCommunicator.unicast(message, nodeId);
+ //error log: log.warn("Failed to respond to peer for batch operation result");
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.intent.impl;
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
* Manages inventory of Intents in a distributed data store that uses optimistic
* replication and gossip based techniques.
*/
+//FIXME we should listen for leadership changes. if the local instance has just
+// ... become a leader, scan the pending map and process those
@Component(immediate = false, enabled = true)
@Service
public class GossipIntentStore
@@ -86,15 +94,17 @@
clusterService,
clusterCommunicator,
intentSerializer,
- new IntentDataLogicalClockManager<>());
+ new IntentDataLogicalClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
- new IntentDataClockManager<>());
+ new IntentDataClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
- currentMap.addListener(new InternalIntentStatesListener());
+ currentMap.addListener(new InternalCurrentListener());
pendingMap.addListener(new InternalPendingListener());
log.info("Started");
@@ -226,7 +236,6 @@
@Override
public void write(IntentData newData) {
IntentData currentData = currentMap.get(newData.key());
-
if (isUpdateAcceptable(currentData, newData)) {
// Only the master is modifying the current state. Therefore assume
// this always succeeds
@@ -239,6 +248,34 @@
}
}
+ private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+ NodeId master = partitionService.getLeader(key);
+ NodeId origin = (data != null) ? data.origin() : null;
+ NodeId me = clusterService.getLocalNode().id();
+ boolean isMaster = Objects.equals(master, me);
+ boolean isOrigin = Objects.equals(origin, me);
+ if (isMaster && isOrigin) {
+ return ImmutableList.of(getRandomNode());
+ } else if (isMaster) {
+ return ImmutableList.of(origin);
+ } else if (isOrigin) {
+ return ImmutableList.of(master);
+ } else {
+ // FIXME: why are we here? log error?
+ return ImmutableList.of(master);
+ }
+ }
+
+ private NodeId getRandomNode() {
+ List<NodeId> nodes = clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toCollection(ArrayList::new));
+ Collections.shuffle(nodes);
+ // FIXME check if self
+ // FIXME verify nodes.size() > 0
+ return nodes.get(0);
+ }
+
@Override
public void batchWrite(Iterable<IntentData> updates) {
updates.forEach(this::write);
@@ -263,6 +300,7 @@
if (data.version() == null) {
data.setVersion(new WallClockTimestamp());
}
+ data.setOrigin(clusterService.getLocalNode().id());
pendingMap.put(data.key(), copyData(data));
}
@@ -292,7 +330,7 @@
}
}
- private final class InternalIntentStatesListener implements
+ private final class InternalCurrentListener implements
EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 07b89fb..5fa26a7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -29,13 +29,13 @@
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -109,8 +109,13 @@
@Override
public boolean isMine(Key intentKey) {
- return Objects.equal(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
- clusterService.getLocalNode().id());
+ return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
+ clusterService.getLocalNode().id());
+ }
+
+ @Override
+ public NodeId getLeader(Key intentKey) {
+ return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
private void doRelinquish() {
@@ -171,7 +176,7 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
+ if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
index eaeabab1..2ee4434 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.intent.impl;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
/**
@@ -31,5 +32,13 @@
*/
boolean isMine(Key intentKey);
+ /**
+ * Returns the leader for a particular key.
+ *
+ * @param intentKey intent key to query
+ * @return the leader node
+ */
+ NodeId getLeader(Key intentKey);
+
// TODO add API for rebalancing partitions
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index d555069..75cb96c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -337,13 +337,13 @@
ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
- try {
- clusterCommunicator.unicast(linkInjectedMessage, dstNode);
- } catch (IOException e) {
- log.warn("Failed to process link update between src: {} and dst: {} " +
- "(cluster messaging failed: {})",
- linkDescription.src(), linkDescription.dst(), e);
- }
+ // TODO check unicast return value
+ clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+ /* error log:
+ log.warn("Failed to process link update between src: {} and dst: {} " +
+ "(cluster messaging failed: {})",
+ linkDescription.src(), linkDescription.dst(), e);
+ */
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index eb0c38a..45b540d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,19 +15,13 @@
*/
package org.onosproject.store.packet.impl;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
@@ -43,9 +37,14 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Distributed packet store implementation allowing packets to be sent to
* remote instances.
@@ -118,12 +117,10 @@
return;
}
- try {
- communicationService.unicast(new ClusterMessage(
- myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
- } catch (IOException e) {
- log.warn("Failed to send packet-out to {}", master);
- }
+ // TODO check unicast return value
+ communicationService.unicast(new ClusterMessage(
+ myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
+ // error log: log.warn("Failed to send packet-out to {}", master);
}
/**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index e6670de..5ed6384 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,9 +16,9 @@
package org.onosproject.store.ecmap;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,10 +53,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static junit.framework.TestCase.assertFalse;
import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
/**
* Unit tests for EventuallyConsistentMapImpl.
@@ -119,8 +116,8 @@
@Before
public void setUp() throws Exception {
clusterService = createMock(ClusterService.class);
- expect(clusterService.getLocalNode()).andReturn(self)
- .anyTimes();
+ expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
+ expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
replay(clusterService);
clusterCommunicator = createMock(ClusterCommunicationService.class);
@@ -163,7 +160,7 @@
@Test
public void testSize() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertEquals(0, ecMap.size());
ecMap.put(KEY1, VALUE1);
@@ -184,7 +181,7 @@
@Test
public void testIsEmpty() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertTrue(ecMap.isEmpty());
ecMap.put(KEY1, VALUE1);
@@ -195,7 +192,7 @@
@Test
public void testContainsKey() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertFalse(ecMap.containsKey(KEY1));
ecMap.put(KEY1, VALUE1);
@@ -207,7 +204,7 @@
@Test
public void testContainsValue() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertFalse(ecMap.containsValue(VALUE1));
ecMap.put(KEY1, VALUE1);
@@ -222,7 +219,7 @@
@Test
public void testGet() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
CountDownLatch latch;
@@ -278,7 +275,7 @@
ecMap.addListener(listener);
// Set up expected internal message to be broadcast to peers on first put
- expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
+ expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
.peekAtNextTimestamp()), clusterCommunicator);
// Put first value
@@ -289,7 +286,7 @@
verify(clusterCommunicator);
// Set up expected internal message to be broadcast to peers on second put
- expectSpecificMessage(generatePutMessage(
+ expectSpecificMulticastMessage(generatePutMessage(
KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
// Update same key to a new value
@@ -332,14 +329,14 @@
ecMap.addListener(listener);
// Put in an initial value
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
ecMap.put(KEY1, VALUE1);
assertEquals(VALUE1, ecMap.get(KEY1));
// Remove the value and check the correct internal cluster messages
// are sent
- expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+ clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -349,8 +346,8 @@
// Remove the same value again. Even though the value is no longer in
// the map, we expect that the tombstone is updated and another remove
// event is sent to the cluster and external listeners.
- expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+ clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -359,7 +356,7 @@
// Put in a new value for us to try and remove
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
ecMap.put(KEY2, VALUE2);
@@ -400,8 +397,8 @@
ecMap.addListener(listener);
// Expect a multi-update inter-instance message
- expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
- clusterCommunicator);
+ expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+ clusterCommunicator);
Map<String, String> putAllValues = new HashMap<>();
putAllValues.put(KEY1, VALUE1);
@@ -434,12 +431,12 @@
verify(clusterCommunicator);
// Put some items in the map
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
ecMap.put(KEY1, VALUE1);
ecMap.put(KEY2, VALUE2);
ecMap.addListener(listener);
- expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+ expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
ecMap.clear();
@@ -449,7 +446,7 @@
@Test
public void testKeySet() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertTrue(ecMap.keySet().isEmpty());
@@ -482,7 +479,7 @@
@Test
public void testValues() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertTrue(ecMap.values().isEmpty());
@@ -520,7 +517,7 @@
@Test
public void testEntrySet() throws Exception {
- expectAnyMessage(clusterCommunicator);
+ expectPeerMessage(clusterCommunicator);
assertTrue(ecMap.entrySet().isEmpty());
@@ -658,21 +655,52 @@
* @param m message we expect to be sent
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
- private static void expectSpecificMessage(ClusterMessage m,
- ClusterCommunicationService clusterCommunicator) {
+ private static void expectSpecificBroadcastMessage(ClusterMessage m,
+ ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
expect(clusterCommunicator.broadcast(m)).andReturn(true);
replay(clusterCommunicator);
}
/**
- * Sets up a mock ClusterCommunicationService to expect any cluster message
+ * Sets up a mock ClusterCommunicationService to expect a specific cluster
+ * message to be multicast to the cluster.
+ *
+ * @param m message we expect to be sent
+ * @param clusterCommunicator a mock ClusterCommunicationService to set up
+ */
+ private static void expectSpecificMulticastMessage(ClusterMessage m,
+ ClusterCommunicationService clusterCommunicator) {
+ reset(clusterCommunicator);
+ expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+ replay(clusterCommunicator);
+ }
+
+
+ /**
+ * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
* that is sent to it. This is useful for unit tests where we aren't
* interested in testing the messaging component.
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
- private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
+ private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+ reset(clusterCommunicator);
+ expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+ anyObject(Iterable.class)))
+ .andReturn(true)
+ .anyTimes();
+ replay(clusterCommunicator);
+ }
+
+ /**
+ * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
+ * that is sent to it. This is useful for unit tests where we aren't
+ * interested in testing the messaging component.
+ *
+ * @param clusterCommunicator a mock ClusterCommunicationService to set up
+ */
+ private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true)
@@ -700,13 +728,12 @@
}
@Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId)
- throws IOException {
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
return false;
}
@Override
- public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
+ public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
return false;
}
diff --git a/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java b/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
index 087b692..2d8b688 100644
--- a/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
@@ -31,6 +31,8 @@
*/
public class BlockingBooleanTest {
+ private static final int TIMEOUT = 100; //ms
+
@Test
public void basics() {
BlockingBoolean b = new BlockingBoolean(false);
@@ -60,7 +62,7 @@
}
b.set(value);
try {
- assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
@@ -92,7 +94,7 @@
}
});
try {
- assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
@@ -124,14 +126,14 @@
});
}
try {
- assertTrue(sameLatch.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(sameLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
assertEquals(waitLatch.getCount(), numThreads / 2);
} catch (InterruptedException e) {
fail();
}
b.set(true);
try {
- assertTrue(waitLatch.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(waitLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}
@@ -156,7 +158,7 @@
}
});
try {
- assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail();
}