Pulling PartitionService into API and making IntentPerfInstaller configurable
Change-Id: I9fde28986b6714c0ca4d635d5a3699094e2f0081
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 80606267..a89445d 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -15,21 +15,28 @@
*/
package org.onosproject.intentperf;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
-import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -41,12 +48,12 @@
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,6 +62,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
@@ -71,16 +79,37 @@
@Component(immediate = true)
public class IntentPerfInstaller {
- //FIXME make this configurable
- private static final int NUM_WORKERS = 1;
- private static final int NUM_KEYS = 40_000;
-
- public static final int START_DELAY = 5_000; // ms
- private static final int REPORT_PERIOD = 5_000; //ms
- private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
-
private final Logger log = getLogger(getClass());
+ private static final int DEFAULT_NUM_WORKERS = 1;
+
+ private static final int DEFAULT_NUM_KEYS = 40_000;
+ private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+
+ private static final int DEFAULT_NUM_NEIGHBORS = 0;
+
+ private static final int START_DELAY = 5_000; // ms
+ private static final int REPORT_PERIOD = 5_000; //ms
+
+ //FIXME add path length
+
+ @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
+ label = "Number of keys (i.e. unique intents) to generate per instance")
+ private int numKeys = DEFAULT_NUM_KEYS;
+
+ //TODO implement numWorkers property
+// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
+// label = "Number of installer threads per instance")
+// private int numWokers = DEFAULT_NUM_WORKERS;
+
+ @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
+ label = "Goal for cycle period (in ms)")
+ private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+
+ @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
+ label = "Number of neighbors to generate intents for")
+ private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
+
@Reference(cardinality = MANDATORY_UNARY)
protected CoreService coreService;
@@ -93,6 +122,15 @@
@Reference(cardinality = MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected PartitionService partitionService;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected ComponentConfigService configService;
+
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
@@ -105,12 +143,20 @@
@Activate
public void activate() {
+ configService.registerProperties(getClass());
+
String nodeId = clusterService.getLocalNode().ip().toString();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
reportTimer = new Timer("onos-intent-perf-reporter");
- workers = Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
- log.info("Started with Application ID {}", appId.id());
+ workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
+
+
+ // disable flow backups for testing
+ log.info("flow props: {}",
+ configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
+ configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
+ "backupEnabled", "false");
// Schedule delayed start
reportTimer.schedule(new TimerTask() {
@@ -123,11 +169,22 @@
@Deactivate
public void deactivate() {
+ configService.unregisterProperties(getClass(), false);
stop();
- log.info("Stopped");
+ }
+
+ //FIXME add modified
+
+ private void logConfig(String prefix) {
+ log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+ prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
}
public void start() {
+ // TODO perhaps move to start(), but need to call before logConfig
+ // adjust numNeighbors and generate list of neighbors
+ numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
// perhaps we want to prime before listening...
// we will need to discard the first few results for priming and warmup
listener = new Listener();
@@ -144,25 +201,62 @@
// Submit workers
stopped = false;
- Set<Device> devices = new HashSet<>();
- for (int i = 0; i < NUM_WORKERS; i++) {
- workers.submit(new Submitter(createIntents(NUM_KEYS, 2, lastKey, devices)));
+ for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+ workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
}
+ logConfig("Started");
}
public void stop() {
+ stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
- stopped = true;
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
+ log.info("Stopped");
+ }
+
+ private List<NodeId> getNeighbors() {
+ List<NodeId> nodes = clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toCollection(ArrayList::new));
+ // sort neighbors by id
+ Collections.sort(nodes, (node1, node2) ->
+ node1.toString().compareTo(node2.toString()));
+ // rotate the local node to index 0
+ Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
+ log.info("neighbors (raw): {}", nodes); //TODO remove
+ // generate the sub-list that will contain local node and selected neighbors
+ nodes = nodes.subList(0, numNeighbors + 1);
+ log.info("neighbors: {}", nodes); //TODO remove
+ return nodes;
+ }
+
+
+ private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
+ // choose a random device for which this node is master
+ List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
+ Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
+
+ //FIXME we currently ignore the path length and always use the same device
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthDst(MacAddress.valueOf(mac)).build();
+ TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
+ ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
+ ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
+
+ return new PointToPointIntent(appId, key,
+ selector, treatment,
+ ingress, egress,
+ Collections.emptyList(),
+ Intent.DEFAULT_INTENT_PRIORITY);
}
/**
@@ -171,58 +265,55 @@
* @param numberOfKeys number of intents
* @param pathLength path depth
* @param firstKey first key to attempt
- * @param devices set of previously utilized devices @return set of intents
+ * @return set of intents
*/
- private Set<Intent> createIntents(int numberOfKeys, int pathLength,
- int firstKey, Set<Device> devices) {
- Iterator<Device> deviceItr = deviceService.getAvailableDevices().iterator();
- Set<Intent> result = new HashSet<>();
+ private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
+ //Set<Intent> result = new HashSet<>();
- Device ingressDevice = null;
- while (deviceItr.hasNext()) {
- Device device = deviceItr.next();
- if (deviceService.getRole(device.id()) == MastershipRole.MASTER &&
- !devices.contains(device)) {
- ingressDevice = device;
- devices.add(device);
- break;
- }
- }
- checkState(ingressDevice != null, "There are no local devices");
+ List<NodeId> neighbors = getNeighbors();
- // prefix based on node id
- long prefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
+ Multimap<NodeId, Device> devices = ArrayListMultimap.create();
+ deviceService.getAvailableDevices().forEach(device ->
+ devices.put(mastershipService.getMasterFor(device.id()), device));
+
+ // ensure that we have at least one device per neighbor
+ neighbors.forEach(node ->
+ checkState(devices.get(node).size() > 0,
+ "There are no devices for {}", node));
+
+
+ // TODO pull this outside so that createIntent can use it
+ // prefix based on node id for keys generated on this instance
+ long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
+
+ int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
+ Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
+
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
- Key key = Key.of(prefix + k, appId);
- //FIXME comment this out for spread of keys
- if (!intentService.isLocal(key)) {
- // Bail if the key is not local
+ Key key = Key.of(keyPrefix + k, appId);
+
+ NodeId leader = partitionService.getLeader(key);
+ if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
+ // Bail if we are not sending to this node or we have enough for this node
continue;
}
-
- //FIXME we currently ignore the path length and always use the same device
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchEthDst(MacAddress.valueOf(count)).build();
- TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
- ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
- ConnectPoint egress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(2));
-
- Intent intent = new PointToPointIntent(appId, key,
- selector, treatment,
- ingress, egress,
- Collections.emptyList(),
- Intent.DEFAULT_INTENT_PRIORITY);
- result.add(intent);
+ intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
// Bump up the counter and remember this as the last key used.
count++;
lastKey = k;
- if (lastKey % 1000 == 0) {
- log.info("Building intents... {} (attempt: {})", lastKey, count);
+ if (count % 1000 == 0) {
+ log.info("Building intents... {} (attempt: {})", count, lastKey);
}
}
+ checkState(intents.values().size() == numberOfKeys,
+ "Generated wrong number of intents");
log.info("Created {} intents", numberOfKeys);
- return result;
+
+ //FIXME remove this
+ intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
+
+ return Sets.newHashSet(intents.values());
}
// Submits intent operations.
@@ -237,8 +328,8 @@
private Submitter(Set<Intent> intents) {
this.intents = intents;
- lastCount = NUM_KEYS / 4;
- lastDuration = 1000; // 1 second
+ lastCount = numKeys / 4;
+ lastDuration = 1_000; // 1 second
}
@Override
@@ -247,6 +338,7 @@
while (!stopped) {
cycle();
}
+ clear();
}
private Iterable<Intent> subset(Set<Intent> intents) {
@@ -282,6 +374,10 @@
}
}
+ private void clear() {
+ submitted.forEach(this::withdraw);
+ }
+
// Runs a single operation cycle.
private void cycle() {
//TODO consider running without rate adjustment
@@ -292,11 +388,11 @@
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
- if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
+ if (delta > cyclePeriod * 3 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
- int difference = GOAL_CYCLE_PERIOD - (int) delta;
+ int difference = cyclePeriod - (int) delta;
if (difference > 0) {
delay(difference);
}
@@ -307,9 +403,10 @@
int cycleCount = 0;
private void adjustRates() {
//FIXME need to iron out the rate adjustment
+ //FIXME we should taper the adjustments over time
if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
- lastDuration <= GOAL_CYCLE_PERIOD) {
+ lastDuration <= cyclePeriod) {
lastCount = Math.min(lastCount + 1000, intents.size() / 2);
} else {
lastCount *= 0.8;
@@ -324,8 +421,8 @@
// Event listener to monitor throughput.
final class Listener implements IntentListener {
- private Map<IntentEvent.Type, Counter> counters;
private final Counter runningTotal = new Counter();
+ private volatile Map<IntentEvent.Type, Counter> counters;
private volatile double processedThroughput = 0;
private volatile double requestThroughput = 0;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
similarity index 93%
rename from core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
rename to core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
index 2ee4434..e963abc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.intent.impl;
+package org.onosproject.net.intent;
import org.onosproject.cluster.NodeId;
-import org.onosproject.net.intent.Key;
/**
* Service for interacting with the partition-to-instance assignments.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 717c9fe..2b59fae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -33,6 +33,7 @@
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 5fa26a7..72590aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -31,6 +31,7 @@
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;