Updates to ECM interface
Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209
Adding origin to IntentData and use it to pick GossipIntentStore peer
Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 59671c0..42a79b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,7 +16,6 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -39,7 +38,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
@@ -107,7 +105,7 @@
}
@Override
- public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
+ public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = message.getBytes();
@@ -120,8 +118,8 @@
}
@Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
- return unicast(message.subject(), message.getBytes(), toNodeId);
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -137,7 +135,6 @@
}
}
-
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
try {
return unicast(subject, payload, toNodeId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 56d01a0..06a60d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -305,14 +304,13 @@
ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
SERIALIZER.encode(deviceInjectedEvent));
- try {
- clusterCommunicator.unicast(clusterMessage, deviceNode);
- } catch (IOException e) {
- log.warn("Failed to process injected device id: {} desc: {} " +
- "(cluster messaging failed: {})",
- deviceId, deviceDescription, e);
- }
-
+ // TODO check unicast return value
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ /* error log:
+ log.warn("Failed to process injected device id: {} desc: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, deviceDescription, e);
+ */
}
return deviceEvent;
@@ -556,13 +554,14 @@
PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
ClusterMessage clusterMessage = new ClusterMessage(
localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
- try {
- clusterCommunicator.unicast(clusterMessage, deviceNode);
- } catch (IOException e) {
- log.warn("Failed to process injected ports of device id: {} " +
- "(cluster messaging failed: {})",
- deviceId, e);
- }
+
+ //TODO check unicast return value
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ /* error log:
+ log.warn("Failed to process injected ports of device id: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, e);
+ */
}
return deviceEvents == null ? Collections.emptyList() : deviceEvents;
@@ -842,13 +841,13 @@
DEVICE_REMOVE_REQ,
SERIALIZER.encode(deviceId));
- try {
- clusterCommunicator.unicast(message, master);
- } catch (IOException e) {
- log.error("Failed to forward {} remove request to {}", deviceId, master, e);
- }
+ // TODO check unicast return value
+ clusterCommunicator.unicast(message, master);
+ /* error log:
+ log.error("Failed to forward {} remove request to {}", deviceId, master, e);
+ */
- // event will be triggered after master processes it.
+ // event will be triggered after master processes it.
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -51,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
+ private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
private ExecutorService broadcastMessageExecutor;
@@ -140,14 +141,18 @@
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
serializer = createSerializer(checkNotNull(serializerBuilder));
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
new InternalAntiEntropyListener(), backgroundExecutor);
}
+ /**
+ * Creates a new eventually consistent map shared amongst multiple instances.
+ * <p>
+ * Take a look at the other constructor for usage information. The only difference
+ * is that a BiFunction is provided that returns all nodes in the cluster, so
+ * all nodes will be sent write updates immediately.
+ * </p>
+ *
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K
+ */
+ public EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService) {
+ this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+ (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList()));
+ }
+
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
@@ -270,11 +303,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (putInternal(key, value, timestamp)) {
- notifyPeers(new InternalPutEvent<>(key, value, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key, value));
}
}
@@ -318,11 +350,10 @@
Timestamp timestamp = clockService.getTimestamp(key, null);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, null));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
}
@@ -364,11 +395,10 @@
Timestamp timestamp = clockService.getTimestamp(key, value);
if (removeInternal(key, timestamp)) {
- notifyPeers(new InternalRemoveEvent<>(key, timestamp));
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, value);
- notifyListeners(externalEvent);
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+ peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, value));
}
}
@@ -393,7 +423,7 @@
}
if (!updates.isEmpty()) {
- notifyPeers(new InternalPutEvent<>(updates));
+ broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
}
if (!removed.isEmpty()) {
- notifyPeers(new InternalRemoveEvent<>(removed));
+ broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
for (RemoveEntry<K> entry : removed) {
EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
}
}
- private void notifyPeers(InternalPutEvent event) {
+ private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
- broadcastMessage(updateMessageSubject, event);
+ multicastMessage(updateMessageSubject, event, peers);
}
- private void notifyPeers(InternalRemoveEvent event) {
+ private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
// FIXME extremely memory expensive when we are overrun
// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
- broadcastMessage(removeMessageSubject, event);
+ multicastMessage(removeMessageSubject, event, peers);
+ }
+
+ private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+ // FIXME can we parallelize the serialization... use the caller???
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ serializer.encode(event));
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+// clusterCommunicator.broadcast(message);
}
private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
// clusterCommunicator.broadcast(message);
}
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
+ private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
- clusterCommunicator.unicast(message, peer);
+// clusterCommunicator.unicast(message, peer);
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
private boolean underHighLoad() {
@@ -567,11 +606,9 @@
AntiEntropyAdvertisement<K> ad = createAdvertisement();
- try {
- unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
- } catch (IOException e) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
+ // TODO check the return value?
+ unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+ // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- try {
- unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
- } catch (IOException e) {
- log.debug(
- "Failed to send reactive anti-entropy advertisement to {}",
- sender);
- }
-
+ // TODO check the return value?
+ unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+ // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
break;
}
}
@@ -669,12 +701,10 @@
// Send all updates to the peer at once
if (!updatesToSend.isEmpty()) {
- try {
- unicastMessage(sender, updateMessageSubject,
- new InternalPutEvent<>(updatesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value?
+ unicastMessage(sender, updateMessageSubject,
+ new InternalPutEvent<>(updatesToSend));
+ //error log: log.warn("Failed to send advertisement response", e);
}
return externalEvents;
@@ -707,12 +737,10 @@
// Send all removes to the peer at once
if (!removesToSend.isEmpty()) {
- try {
- unicastMessage(sender, removeMessageSubject,
- new InternalRemoveEvent<>(removesToSend));
- } catch (IOException e) {
- log.warn("Failed to send advertisement response", e);
- }
+ // TODO check the return value
+ unicastMessage(sender, removeMessageSubject,
+ new InternalRemoveEvent<>(removesToSend));
+ // error log: log.warn("Failed to send advertisement response", e);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 80f2914..450bb3d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -29,6 +29,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
@@ -138,7 +139,8 @@
private ExecutorService messageHandlingExecutor;
private final ExecutorService backupExecutors =
- Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+ BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+ //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
private boolean syncBackup = false;
@@ -385,12 +387,8 @@
SERIALIZER.encode(operation));
- try {
-
- clusterCommunicator.unicast(message, replicaInfo.master().get());
-
- } catch (IOException e) {
- log.warn("Failed to storeBatch: {}", e.getMessage());
+ if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
+ log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
@@ -401,7 +399,6 @@
new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
-
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -576,15 +573,13 @@
if (nodeId == null) {
notifyDelegate(event);
} else {
- try {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOTE_APPLY_COMPLETED,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, nodeId);
- } catch (IOException e) {
- log.warn("Failed to respond to peer for batch operation result");
- }
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ REMOTE_APPLY_COMPLETED,
+ SERIALIZER.encode(event));
+ // TODO check unicast return value
+ clusterCommunicator.unicast(message, nodeId);
+ //error log: log.warn("Failed to respond to peer for batch operation result");
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.intent.impl;
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
* Manages inventory of Intents in a distributed data store that uses optimistic
* replication and gossip based techniques.
*/
+//FIXME we should listen for leadership changes. if the local instance has just
+// ... become a leader, scan the pending map and process those
@Component(immediate = false, enabled = true)
@Service
public class GossipIntentStore
@@ -86,15 +94,17 @@
clusterService,
clusterCommunicator,
intentSerializer,
- new IntentDataLogicalClockManager<>());
+ new IntentDataLogicalClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
- new IntentDataClockManager<>());
+ new IntentDataClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
- currentMap.addListener(new InternalIntentStatesListener());
+ currentMap.addListener(new InternalCurrentListener());
pendingMap.addListener(new InternalPendingListener());
log.info("Started");
@@ -226,7 +236,6 @@
@Override
public void write(IntentData newData) {
IntentData currentData = currentMap.get(newData.key());
-
if (isUpdateAcceptable(currentData, newData)) {
// Only the master is modifying the current state. Therefore assume
// this always succeeds
@@ -239,6 +248,34 @@
}
}
+ private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+ NodeId master = partitionService.getLeader(key);
+ NodeId origin = (data != null) ? data.origin() : null;
+ NodeId me = clusterService.getLocalNode().id();
+ boolean isMaster = Objects.equals(master, me);
+ boolean isOrigin = Objects.equals(origin, me);
+ if (isMaster && isOrigin) {
+ return ImmutableList.of(getRandomNode());
+ } else if (isMaster) {
+ return ImmutableList.of(origin);
+ } else if (isOrigin) {
+ return ImmutableList.of(master);
+ } else {
+ // FIXME: why are we here? log error?
+ return ImmutableList.of(master);
+ }
+ }
+
+ private NodeId getRandomNode() {
+ List<NodeId> nodes = clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toCollection(ArrayList::new));
+ Collections.shuffle(nodes);
+ // FIXME check if self
+ // FIXME verify nodes.size() > 0
+ return nodes.get(0);
+ }
+
@Override
public void batchWrite(Iterable<IntentData> updates) {
updates.forEach(this::write);
@@ -263,6 +300,7 @@
if (data.version() == null) {
data.setVersion(new WallClockTimestamp());
}
+ data.setOrigin(clusterService.getLocalNode().id());
pendingMap.put(data.key(), copyData(data));
}
@@ -292,7 +330,7 @@
}
}
- private final class InternalIntentStatesListener implements
+ private final class InternalCurrentListener implements
EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 07b89fb..5fa26a7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -29,13 +29,13 @@
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -109,8 +109,13 @@
@Override
public boolean isMine(Key intentKey) {
- return Objects.equal(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
- clusterService.getLocalNode().id());
+ return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
+ clusterService.getLocalNode().id());
+ }
+
+ @Override
+ public NodeId getLeader(Key intentKey) {
+ return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
private void doRelinquish() {
@@ -171,7 +176,7 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
+ if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
index eaeabab1..2ee4434 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.intent.impl;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
/**
@@ -31,5 +32,13 @@
*/
boolean isMine(Key intentKey);
+ /**
+ * Returns the leader for a particular key.
+ *
+ * @param intentKey intent key to query
+ * @return the leader node
+ */
+ NodeId getLeader(Key intentKey);
+
// TODO add API for rebalancing partitions
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index d555069..75cb96c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -337,13 +337,13 @@
ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
- try {
- clusterCommunicator.unicast(linkInjectedMessage, dstNode);
- } catch (IOException e) {
- log.warn("Failed to process link update between src: {} and dst: {} " +
- "(cluster messaging failed: {})",
- linkDescription.src(), linkDescription.dst(), e);
- }
+ // TODO check unicast return value
+ clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+ /* error log:
+ log.warn("Failed to process link update between src: {} and dst: {} " +
+ "(cluster messaging failed: {})",
+ linkDescription.src(), linkDescription.dst(), e);
+ */
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index eb0c38a..45b540d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,19 +15,13 @@
*/
package org.onosproject.store.packet.impl;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
@@ -43,9 +37,14 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Distributed packet store implementation allowing packets to be sent to
* remote instances.
@@ -118,12 +117,10 @@
return;
}
- try {
- communicationService.unicast(new ClusterMessage(
- myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
- } catch (IOException e) {
- log.warn("Failed to send packet-out to {}", master);
- }
+ // TODO check unicast return value
+ communicationService.unicast(new ClusterMessage(
+ myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
+ // error log: log.warn("Failed to send packet-out to {}", master);
}
/**