Modified GossipIntentStore to use EventuallyConsistentMaps.
All IntentStore operations are now implemented.
ONOS-858
Change-Id: I5081805b61c7e25e28707b90093cae12b5a4374b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 4ffd4bf..157d66c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -16,8 +16,6 @@
package org.onosproject.store.intent.impl;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -26,46 +24,29 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentClockService;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.Timestamped;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.onosproject.store.impl.EventuallyConsistentMap;
+import org.onosproject.store.impl.EventuallyConsistentMapEvent;
+import org.onosproject.store.impl.EventuallyConsistentMapImpl;
+import org.onosproject.store.impl.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.minPriority;
-import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -80,20 +61,11 @@
private final Logger log = getLogger(getClass());
- private final ConcurrentMap<IntentId, Intent> intents =
- new ConcurrentHashMap<>();
+ private EventuallyConsistentMap<IntentId, Intent> intents;
- private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
- = new ConcurrentHashMap<>();
+ private EventuallyConsistentMap<IntentId, IntentState> intentStates;
- private final Set<IntentId> withdrawRequestedIntents
- = Sets.newConcurrentHashSet();
-
- private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
- = new ConcurrentHashMap<>();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentClockService intentClockService;
+ private EventuallyConsistentMap<IntentId, List<Intent>> installables;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -101,64 +73,39 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(InternalIntentEvent.class)
- .register(InternalSetInstallablesEvent.class)
- .register(Collections.emptyList().getClass())
- //.register(InternalIntentAntiEntropyEvent.class)
- //.register(IntentAntiEntropyAdvertisement.class)
- .build();
- }
- };
-
- private ExecutorService executor;
-
- private ScheduledExecutorService backgroundExecutor;
-
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
-
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
- new InternalIntentCreateOrUpdateEventListener());
- clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
- new InternalIntentSetInstallablesListener());
- clusterCommunicator.addSubscriber(
- INTENT_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalIntentAntiEntropyAdvertisementListener());
+ KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ // TODO this should be in BASIC namespace
+ .register(Collections.emptyList().getClass());
+ intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
+ intentStates = new EventuallyConsistentMapImpl<>("intent-states",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- backgroundExecutor =
- newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
+ installables = new EventuallyConsistentMapImpl<>("intent-installables",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer,
+ new WallclockClockManager<>());
- // start anti-entropy thread
- //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- //initialDelaySec, periodSec, TimeUnit.SECONDS);
+ intentStates.addListener(new InternalIntentStatesListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
- executor.shutdownNow();
- backgroundExecutor.shutdownNow();
- try {
- if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Timeout during executor shutdown");
- }
- } catch (InterruptedException e) {
- log.error("Error during executor shutdown", e);
- }
-
- intents.clear();
+ intents.destroy();
+ intentStates.destroy();
+ installables.destroy();
log.info("Stopped");
}
@@ -181,76 +128,19 @@
@Override
public IntentState getIntentState(IntentId intentId) {
- Timestamped<IntentState> state = intentStates.get(intentId);
- if (state != null) {
- return state.value();
- }
- return null;
- }
-
- private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
- switch (newState) {
- case WITHDRAW_REQ:
- withdrawRequestedIntents.add(intentId);
- break;
- case INSTALL_REQ:
- case COMPILING:
- case INSTALLING:
- case INSTALLED:
- case RECOMPILING:
- case WITHDRAWING:
- case WITHDRAWN:
- case FAILED:
- synchronized (intentStates) {
- Timestamped<IntentState> existing = intentStates.get(intentId);
- if (existing == null || !existing.isNewer(timestamp)) {
- intentStates.put(intentId, new Timestamped<>(newState, timestamp));
- }
- }
- break;
- default:
- log.warn("Unknown intent state {}", newState);
- break;
- }
-
- try {
- // TODO make sure it's OK if the intent is null
- return IntentEvent.getEvent(newState, intents.get(intentId));
- } catch (IllegalArgumentException e) {
- // Transient states can't be used for events, so don't send one
- return null;
- }
- }
-
- private void setInstallableIntentsInternal(IntentId intentId,
- List<Intent> installableIntents,
- Timestamp timestamp) {
- synchronized (installables) {
- Timestamped<List<Intent>> existing = installables.get(intentId);
- if (existing == null || !existing.isNewer(timestamp)) {
- installables.put(intentId,
- new Timestamped<>(installableIntents, timestamp));
- }
- }
+ return intentStates.get(intentId);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
- Timestamped<List<Intent>> tInstallables = installables.get(intentId);
- if (tInstallables != null) {
- return tInstallables.value();
- }
- return null;
+ return installables.get(intentId);
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
- List<IntentEvent> events = Lists.newArrayList();
List<BatchWrite.Operation> failed = new ArrayList<>();
- Timestamp timestamp = null;
-
for (BatchWrite.Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
@@ -258,19 +148,18 @@
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
- timestamp = intentClockService.getTimestamp(intent.id());
- if (createIntentInternal(intent)) {
- events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp));
- notifyPeers(new InternalIntentEvent(intent.id(), intent,
- INSTALL_REQ, timestamp));
- }
+ intents.put(intent.id(), intent);
+ intentStates.put(intent.id(), INSTALL_REQ);
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
- IntentId intentId = (IntentId) op.arg(0);
- // TODO implement
+ IntentId intentId = op.arg(0);
+
+ intents.remove(intentId);
+ intentStates.remove(intentId);
+ installables.remove(intentId);
break;
case SET_STATE:
@@ -279,10 +168,7 @@
intent = op.arg(0);
IntentState newState = op.arg(1);
- timestamp = intentClockService.getTimestamp(intent.id());
- IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
- events.add(externalEvent);
- notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
+ intentStates.put(intent.id(), newState);
break;
case SET_INSTALLABLE:
@@ -291,18 +177,14 @@
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
- timestamp = intentClockService.getTimestamp(intentId);
- setInstallableIntentsInternal(
- intentId, installableIntents, timestamp);
-
- notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp));
+ installables.put(intentId, installableIntents);
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
- // TODO implement
+ installables.remove(intentId);
break;
default:
log.warn("Unknown Operation encountered: {}", op);
@@ -311,121 +193,34 @@
}
}
- notifyDelegate(events);
return failed;
}
- private boolean createIntentInternal(Intent intent) {
- Intent oldValue = intents.putIfAbsent(intent.id(), intent);
- if (oldValue == null) {
- return true;
- }
-
- log.warn("Intent ID {} already in store, throwing new update away",
- intent.id());
- return false;
- }
-
- private void notifyPeers(InternalIntentEvent event) {
- broadcastMessage(INTENT_UPDATED_MSG, event);
- }
-
- private void notifyPeers(InternalSetInstallablesEvent event) {
- broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
- }
-
- private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
- }
-
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, peer);
- }
-
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
- private final class InternalIntentCreateOrUpdateEventListener
- implements ClusterMessageHandler {
+ private final class InternalIntentStatesListener implements
+ EventuallyConsistentMapListener<IntentId, IntentState> {
@Override
- public void handle(ClusterMessage message) {
+ public void event(
+ EventuallyConsistentMapEvent<IntentId, IntentState> event) {
+ if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ IntentEvent externalEvent;
+ Intent intent = intents.get(event.key()); // TODO OK if this is null?
- log.debug("Received intent update event from peer: {}", message.sender());
- InternalIntentEvent event = SERIALIZER.decode(message.payload());
-
- IntentId intentId = event.intentId();
- Intent intent = event.intent();
- IntentState state = event.state();
- Timestamp timestamp = event.timestamp();
-
- executor.submit(() -> {
try {
- switch (state) {
- case INSTALL_REQ:
- createIntentInternal(intent);
- // Fallthrough to setStateInternal for INSTALL_REQ
- default:
- notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
- break;
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling intent create or update", e);
+ externalEvent = IntentEvent.getEvent(event.value(), intent);
+ } catch (IllegalArgumentException e) {
+ externalEvent = null;
}
- });
+
+ notifyDelegateIfNotNull(externalEvent);
+ }
}
}
- private final class InternalIntentSetInstallablesListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received intent set installables event from peer: {}", message.sender());
- InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
-
- IntentId intentId = event.intentId();
- List<Intent> installables = event.installables();
- Timestamp timestamp = event.timestamp();
-
- executor.submit(() -> {
- try {
- setInstallableIntentsInternal(intentId, installables, timestamp);
- } catch (Exception e) {
- log.warn("Exception thrown handling intent set installables", e);
- }
- });
- }
- }
-
- private final class InternalIntentAntiEntropyAdvertisementListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
- // TODO implement
- //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutor.submit(() -> {
- try {
- log.debug("something");
- //handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling intent advertisements", e);
- }
- });
- }
- }
}