Finished implementation of GossipIntentStore based on new API and semantics.
Change-Id: I1a71d075e5d34ab7b9f7c2533d389235d6da1d9a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 2fc6fd8..17439ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -37,7 +37,8 @@
import org.onosproject.store.impl.EventuallyConsistentMapEvent;
import org.onosproject.store.impl.EventuallyConsistentMapImpl;
import org.onosproject.store.impl.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+import org.onosproject.store.impl.SystemClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
@@ -58,12 +59,6 @@
private final Logger log = getLogger(getClass());
- /*private EventuallyConsistentMap<IntentId, Intent> intents;
-
- private EventuallyConsistentMap<IntentId, IntentState> intentStates;
-
- private EventuallyConsistentMap<IntentId, List<Intent>> installables;*/
-
// Map of intent key => current intent state
private EventuallyConsistentMap<Key, IntentData> currentState;
@@ -82,36 +77,22 @@
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API);
- /*intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
-
- intentStates = new EventuallyConsistentMapImpl<>("intent-states",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
-
- installables = new EventuallyConsistentMapImpl<>("intent-installables",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
- */
+ .register(KryoNamespaces.API)
+ .register(IntentData.class)
+ .register(MultiValuedTimestamp.class)
+ .register(SystemClockTimestamp.class);
currentState = new EventuallyConsistentMapImpl<>("intent-current",
clusterService,
clusterCommunicator,
intentSerializer,
- new WallclockClockManager<>());
+ new IntentDataLogicalClockManager<>());
pending = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
- new WallclockClockManager<>());
+ new IntentDataClockManager<>());
currentState.addListener(new InternalIntentStatesListener());
pending.addListener(new InternalPendingListener());
@@ -121,10 +102,6 @@
@Deactivate
public void deactivate() {
-
- /*intents.destroy();
- intentStates.destroy();
- installables.destroy();*/
currentState.destroy();
pending.destroy();
@@ -133,7 +110,6 @@
@Override
public long getIntentCount() {
- //return intents.size();
return currentState.size();
}
@@ -146,99 +122,45 @@
@Override
public IntentState getIntentState(Key intentKey) {
- // TODO: implement this
- return IntentState.FAILED;
+ IntentData data = currentState.get(intentKey);
+ if (data != null) {
+ return data.state();
+ }
+ return null;
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
- // TODO: implement this or delete class
+ IntentData data = currentState.get(intentKey);
+ if (data != null) {
+ return data.installables();
+ }
return null;
- /*
- return installables.get(intentId);
- */
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
- /*
- List<BatchWrite.Operation> failed = new ArrayList<>();
-
- for (BatchWrite.Operation op : batch.operations()) {
- switch (op.type()) {
- case CREATE_INTENT:
- checkArgument(op.args().size() == 1,
- "CREATE_INTENT takes 1 argument. %s", op);
- Intent intent = op.arg(0);
-
- intents.put(intent.id(), intent);
- intentStates.put(intent.id(), INSTALL_REQ);
-
- // TODO remove from pending?
-
-
- break;
- case REMOVE_INTENT:
- checkArgument(op.args().size() == 1,
- "REMOVE_INTENT takes 1 argument. %s", op);
- IntentId intentId = op.arg(0);
-
- intents.remove(intentId);
- intentStates.remove(intentId);
- installables.remove(intentId);
-
- break;
- case SET_STATE:
- checkArgument(op.args().size() == 2,
- "SET_STATE takes 2 arguments. %s", op);
- intent = op.arg(0);
- IntentState newState = op.arg(1);
-
- intentStates.put(intent.id(), newState);
-
- break;
- case SET_INSTALLABLE:
- checkArgument(op.args().size() == 2,
- "SET_INSTALLABLE takes 2 arguments. %s", op);
- intentId = op.arg(0);
- List<Intent> installableIntents = op.arg(1);
-
- installables.put(intentId, installableIntents);
-
- break;
- case REMOVE_INSTALLED:
- checkArgument(op.args().size() == 1,
- "REMOVE_INSTALLED takes 1 argument. %s", op);
- intentId = op.arg(0);
- installables.remove(intentId);
- break;
- default:
- log.warn("Unknown Operation encountered: {}", op);
- failed.add(op);
- break;
- }
- }
-
- return failed;
- */
+ // Deprecated
return null;
}
@Override
public void write(IntentData newData) {
+ log.debug("writing intent {}", newData);
+
// Only the master is modifying the current state. Therefore assume
// this always succeeds
currentState.put(newData.key(), newData);
// if current.put succeeded
- //pending.remove(newData.key(), newData);
+ pending.remove(newData.key(), newData);
- try {
+ /*try {
notifyDelegate(IntentEvent.getEvent(newData));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
- }
+ }*/
}
@Override
@@ -262,14 +184,17 @@
@Override
public void addPending(IntentData data) {
+ log.debug("new call to pending {}", data);
+ if (data.version() == null) {
+ log.debug("updating timestamp");
+ data.setVersion(new SystemClockTimestamp());
+ }
pending.put(data.key(), data);
}
@Override
public boolean isMaster(Intent intent) {
- // TODO
- //return partitionService.isMine(intent.key());
- return false;
+ return partitionService.isMine(intent.key());
}
private void notifyDelegateIfNotNull(IntentEvent event) {
@@ -284,18 +209,16 @@
public void event(
EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
- // TODO check event send logic
IntentEvent externalEvent;
- IntentData intentData = currentState.get(event.key()); // TODO OK if this is null?
+ IntentData intentData = event.value();
- /*
try {
- externalEvent = IntentEvent.getEvent(event.value(), intent);
+ externalEvent = IntentEvent.getEvent(intentData.state(), intentData.intent());
} catch (IllegalArgumentException e) {
externalEvent = null;
}
- notifyDelegateIfNotNull(externalEvent);*/
+ notifyDelegateIfNotNull(externalEvent);
}
}
}
@@ -314,6 +237,13 @@
delegate.process(event.value());
}
}
+
+ try {
+ notifyDelegate(IntentEvent.getEvent(event.value()));
+ } catch (IllegalArgumentException e) {
+ //no-op
+ log.trace("ignore this exception: {}", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
index 85c78dd..ef15fe5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
@@ -20,11 +20,11 @@
import org.onosproject.store.impl.ClockService;
/**
- * ClockService that generates timestamps based on IntentData versions.
+ * ClockService that uses IntentData versions as timestamps.
*/
-public class IntentDataClockManager implements ClockService<IntentData> {
+public class IntentDataClockManager<K> implements ClockService<K, IntentData> {
@Override
- public Timestamp getTimestamp(IntentData data) {
- return null;
+ public Timestamp getTimestamp(K key, IntentData intentData) {
+ return intentData.version();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
new file mode 100644
index 0000000..950cfde
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.intent.impl;
+
+import org.onosproject.net.intent.IntentData;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * ClockService that generates logical timestamps based on IntentData versions.
+ */
+public class IntentDataLogicalClockManager<K> implements ClockService<K, IntentData> {
+
+ private final AtomicLong sequenceNumber = new AtomicLong(0);
+
+ @Override
+ public Timestamp getTimestamp(K key, IntentData intentData) {
+ return new MultiValuedTimestamp(intentData.version(), sequenceNumber.getAndIncrement());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
index 3b3e538..1ed2e4f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java
@@ -24,14 +24,14 @@
* processed by a single ONOS instance at a time.
*/
public class PartitionId {
- private final int id;
+ private final long id;
/**
* Creates a new partition ID.
*
* @param id the partition ID
*/
- PartitionId(int id) {
+ PartitionId(long id) {
this.id = id;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index eb831c5..a709a13 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -26,6 +26,7 @@
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.net.intent.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,8 +34,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Manages the assignment of intent keyspace partitions to instances.
*/
@@ -75,14 +74,15 @@
leadershipService.removeListener(leaderListener);
}
- private PartitionId getPartitionForKey(String intentKey) {
- return new PartitionId(intentKey.hashCode() % NUM_PARTITIONS);
+ private PartitionId getPartitionForKey(Key intentKey) {
+ log.debug("Getting partition for {}: {}", intentKey,
+ new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS));
+ return new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS);
}
@Override
- public boolean isMine(String intentKey) {
- return checkNotNull(
- myPartitions.contains(getPartitionForKey(intentKey)));
+ public boolean isMine(Key intentKey) {
+ return myPartitions.contains(getPartitionForKey(intentKey));
}
private final class InternalLeadershipListener implements LeadershipEventListener {
@@ -115,7 +115,6 @@
myPartitions.remove(new PartitionId(partitionId));
}
}
-
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
index 1062b65..eaeabab1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.intent.impl;
+import org.onosproject.net.intent.Key;
+
/**
* Service for interacting with the partition-to-instance assignments.
*/
@@ -27,7 +29,7 @@
* @param intentKey intent key to query
* @return true if the key is owned by this instance, otherwise false
*/
- boolean isMine(String intentKey);
+ boolean isMine(Key intentKey);
// TODO add API for rebalancing partitions
}