Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
index 9855498..468017f 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
@@ -1,8 +1,8 @@
package org.onlab.onos.net.intent;
-import java.util.concurrent.Future;
+import java.util.List;
-import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
/**
* Abstraction of entity capable of installing intents to the environment.
@@ -14,7 +14,7 @@
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
- Future<CompletedBatchOperation> install(T intent);
+ List<FlowRuleBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
@@ -22,5 +22,5 @@
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
- Future<CompletedBatchOperation> uninstall(T intent);
+ List<FlowRuleBatchOperation> uninstall(T intent);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java b/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java
new file mode 100644
index 0000000..3c4eebe
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java
@@ -0,0 +1,58 @@
+package org.onlab.onos.net.resource;
+
+import java.util.Objects;
+
+/**
+ * Representation of bandwidth resource.
+ */
+public final class Bandwidth extends LinkResource {
+
+ private final double bandwidth;
+
+ /**
+ * Creates a new instance with given bandwidth.
+ *
+ * @param bandwidth bandwidth value to be assigned
+ */
+ private Bandwidth(double bandwidth) {
+ this.bandwidth = bandwidth;
+ }
+
+ /**
+ * Creates a new instance with given bandwidth.
+ *
+ * @param bandwidth bandwidth value to be assigned
+ * @return {@link Bandwidth} instance with given bandwidth
+ */
+ public static Bandwidth valueOf(double bandwidth) {
+ return new Bandwidth(bandwidth);
+ }
+
+ /**
+ * Returns bandwidth as a double value.
+ *
+ * @return bandwidth as a double value
+ */
+ public double toDouble() {
+ return bandwidth;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Bandwidth) {
+ Bandwidth that = (Bandwidth) obj;
+ return Objects.equals(this.bandwidth, that.bandwidth);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.bandwidth);
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(this.bandwidth);
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/Lambda.java b/core/api/src/main/java/org/onlab/onos/net/resource/Lambda.java
new file mode 100644
index 0000000..86ea08e
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/Lambda.java
@@ -0,0 +1,58 @@
+package org.onlab.onos.net.resource;
+
+import java.util.Objects;
+
+/**
+ * Representation of lambda resource.
+ */
+public final class Lambda extends LinkResource {
+
+ private final int lambda;
+
+ /**
+ * Creates a new instance with given lambda.
+ *
+ * @param lambda lambda value to be assigned
+ */
+ private Lambda(int lambda) {
+ this.lambda = lambda;
+ }
+
+ /**
+ * Creates a new instance with given lambda.
+ *
+ * @param lambda lambda value to be assigned
+ * @return {@link Lambda} instance with given lambda
+ */
+ public static Lambda valueOf(int lambda) {
+ return new Lambda(lambda);
+ }
+
+ /**
+ * Returns lambda as an int value.
+ * @return lambda as an int value
+ */
+ public int toInt() {
+ return lambda;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Lambda) {
+ Lambda that = (Lambda) obj;
+ return Objects.equals(this.lambda, that.lambda);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.lambda);
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(this.lambda);
+ }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/LinkResource.java b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResource.java
new file mode 100644
index 0000000..7605501
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResource.java
@@ -0,0 +1,8 @@
+package org.onlab.onos.net.resource;
+
+/**
+ * Abstraction of link resource.
+ */
+public abstract class LinkResource {
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/LinkResourceService.java b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResourceService.java
new file mode 100644
index 0000000..d2736b6
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResourceService.java
@@ -0,0 +1,60 @@
+package org.onlab.onos.net.resource;
+
+import java.util.Map;
+
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.PathIntent;
+
+/**
+ * Service for providing link resource allocation.
+ */
+public interface LinkResourceService {
+
+ /**
+ * Allocates resources along the path.
+ * <p>
+ * Tries to allocate given resources on the links along the path specified
+ * by the given intent.
+ *
+ * @param res resources to be allocated
+ * @param intent an intent to be used for specifying the path
+ */
+ void allocateResource(LinkResources res, PathIntent intent);
+
+ /**
+ * Releases resources along the path.
+ *
+ * @param intentId an ID for the intent for specifying the path
+ */
+ void releaseResource(IntentId intentId);
+
+ /**
+ * Returns all allocated resources to each link.
+ *
+ * @return allocated resources to each link with {@link IntentId}
+ */
+ Map<Link, Map<IntentId, LinkResources>> allocatedResources();
+
+ /**
+ * Returns all allocated resources to given link.
+ *
+ * @param link a target link
+ * @return allocated resources to the target link with {@link IntentId}
+ */
+ Map<IntentId, LinkResources> allocatedResources(Link link);
+
+ /**
+ * Returns available resources for each link.
+ *
+ * @return available resources for each link
+ */
+ Map<Link, LinkResources> availableResources();
+
+ /**
+ * Returns available resources for given link.
+ * @param link a target link
+ * @return available resources for the target link
+ */
+ LinkResources availableResources(Link link);
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/LinkResources.java b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResources.java
new file mode 100644
index 0000000..6eadb56
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/LinkResources.java
@@ -0,0 +1,48 @@
+package org.onlab.onos.net.resource;
+
+import java.util.Set;
+
+/**
+ * Abstraction of a resources of a link.
+ */
+public interface LinkResources {
+
+ /**
+ * Returns resources as a set of {@link LinkResource}s.
+ *
+ * @return a set of {@link LinkResource}s
+ */
+ Set<LinkResource> resources();
+
+ /**
+ * Builder of {@link LinkResources}.
+ */
+ public interface Builder {
+
+ /**
+ * Adds bandwidth resource.
+ * <p>
+ * This operation adds given bandwidth to previous bandwidth and
+ * generates single bandwidth resource.
+ *
+ * @param bandwidth bandwidth value to be added
+ * @return self
+ */
+ public Builder addBandwidth(double bandwidth);
+
+ /**
+ * Adds lambda resource.
+ *
+ * @param lambda lambda value to be added
+ * @return self
+ */
+ public Builder addLambda(int lambda);
+
+ /**
+ * Builds an immutable link resources.
+ *
+ * @return link resources
+ */
+ public LinkResources build();
+ }
+}
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
index 163a056..a51746d 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
@@ -14,12 +14,11 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
/**
* Suite of tests for the intent service contract.
@@ -298,7 +297,7 @@
}
@Override
- public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
+ public List<FlowRuleBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
@@ -306,7 +305,7 @@
}
@Override
- public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
+ public List<FlowRuleBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 5824996..cb807d5 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -34,6 +34,8 @@
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
@@ -90,6 +92,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowRuleService;
+
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -283,7 +288,7 @@
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
- List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
+ List<FlowRuleBatchOperation> installWork = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
@@ -291,13 +296,13 @@
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
- Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
- installFutures.add(future);
+ List<FlowRuleBatchOperation> batch = getInstaller(installable).install(installable);
+ installWork.addAll(batch);
}
}
// FIXME we have to wait for the installable intents
//eventDispatcher.post(store.setState(intent, INSTALLED));
- monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, installWork, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
uninstallIntent(intent, RECOMPILING);
@@ -369,16 +374,16 @@
* @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent, IntentState nextState) {
- List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
+ List<FlowRuleBatchOperation> uninstallWork = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
- Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
- uninstallFutures.add(future);
+ List<FlowRuleBatchOperation> batches = getInstaller(installable).uninstall(installable);
+ uninstallWork.addAll(batches);
}
}
- monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallWork, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
@@ -495,17 +500,27 @@
private class IntentInstallMonitor implements Runnable {
private final Intent intent;
+ private final List<FlowRuleBatchOperation> work;
private final List<Future<CompletedBatchOperation>> futures;
private final IntentState nextState;
public IntentInstallMonitor(Intent intent,
- List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
+ List<FlowRuleBatchOperation> work,
+ IntentState nextState) {
this.intent = intent;
- this.futures = futures;
+ this.work = work;
+ // TODO how many Futures can be outstanding? one?
+ this.futures = Lists.newLinkedList();
this.nextState = nextState;
+
+ // TODO need to kick off the first batch sometime, why not now?
+ futures.add(applyNextBatch());
}
- private void updateIntent(Intent intent) {
+ /**
+ * Update the intent store with the next status for this intent.
+ */
+ private void updateIntent() {
if (nextState == RECOMPILING) {
executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
@@ -515,22 +530,55 @@
}
}
- @Override
- public void run() {
+ /**
+ * Apply a list of FlowRules.
+ *
+ * @param rules rules to apply
+ */
+ private Future<CompletedBatchOperation> applyNextBatch() {
+ if (work.isEmpty()) {
+ return null;
+ }
+ FlowRuleBatchOperation batch = work.remove(0);
+ return flowRuleService.applyBatch(batch);
+ }
+
+ /**
+ * Iterate through the pending futures, and remove them when they have completed.
+ */
+ private void processFutures() {
+ List<Future<CompletedBatchOperation>> newFutures = Lists.newArrayList();
for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
Future<CompletedBatchOperation> future = i.next();
try {
// TODO: we may want to get the future here and go back to the future.
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
- // TODO check if future succeeded and if not report fail items
+ if (completed.isSuccess()) {
+ Future<CompletedBatchOperation> newFuture = applyNextBatch();
+ if (newFuture != null) {
+ // we'll add this later so that we don't get a ConcurrentModException
+ newFutures.add(newFuture);
+ }
+ } else {
+ // TODO check if future succeeded and if not report fail items
+ log.warn("Failed items: {}", completed.failedItems());
+ // TODO revert....
+ //uninstallIntent(intent, RECOMPILING);
+ }
i.remove();
-
} catch (TimeoutException | InterruptedException | ExecutionException te) {
log.debug("Intallations of intent {} is still pending", intent);
}
}
+ futures.addAll(newFutures);
+ }
+
+ @Override
+ public void run() {
+ processFutures();
if (futures.isEmpty()) {
- updateIntent(intent);
+ // woohoo! we are done!
+ updateIntent();
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/LinkCollectionIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/LinkCollectionIntentInstaller.java
index ec668dc..2deb837 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/LinkCollectionIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/LinkCollectionIntentInstaller.java
@@ -4,7 +4,6 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
-import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -16,14 +15,12 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
-import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
@@ -47,9 +44,6 @@
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected FlowRuleService flowRuleService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@@ -65,18 +59,8 @@
intentManager.unregisterInstaller(PathIntent.class);
}
- /**
- * Apply a list of FlowRules.
- *
- * @param rules rules to apply
- */
- private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- return flowRuleService.applyBatch(batch);
- }
-
@Override
- public Future<CompletedBatchOperation> install(LinkCollectionIntent intent) {
+ public List<FlowRuleBatchOperation> install(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
@@ -92,11 +76,11 @@
intent.egressPoint().deviceId(),
intent.egressPoint().port()));
- return applyBatch(rules);
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
- public Future<CompletedBatchOperation> uninstall(LinkCollectionIntent intent) {
+ public List<FlowRuleBatchOperation> uninstall(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
@@ -113,7 +97,7 @@
intent.egressPoint().deviceId(),
intent.egressPoint().port()));
- return applyBatch(rules);
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
/**
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index dd6d7e5..0baea5a 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -5,7 +5,6 @@
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -16,14 +15,12 @@
import org.onlab.onos.CoreService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
-import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
-import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
@@ -45,9 +42,6 @@
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected FlowRuleService flowRuleService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@@ -63,31 +57,14 @@
intentManager.unregisterInstaller(PathIntent.class);
}
- /**
- * Apply a list of FlowRules.
- *
- * @param rules rules to apply
- */
- private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
- return future;
-// try {
-// //FIXME don't do this here
-// future.get();
-// } catch (InterruptedException | ExecutionException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// }
- }
-
@Override
- public Future<CompletedBatchOperation> install(PathIntent intent) {
+ public List<FlowRuleBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
+ // TODO Generate multiple batches
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
@@ -100,18 +77,17 @@
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
prev = link.dst();
}
-
- return applyBatch(rules);
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
- public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
+ public List<FlowRuleBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
-
+ // TODO Generate multiple batches
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
@@ -123,7 +99,7 @@
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
prev = link.dst();
}
- return applyBatch(rules);
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
// TODO refactor below this line... ----------------------------
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 55d8b1a..13aca9a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -22,10 +22,8 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoNamespace;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 5a51ad4..f88bcdb 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -17,6 +17,8 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -41,7 +43,8 @@
private final int port;
private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private EventLoopGroup workerGroup;
+ private Class<? extends Channel> channelClass;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
@@ -52,6 +55,17 @@
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ // TODO: make this configurable.
+ private void initEventLoopGroup() {
+ try {
+ workerGroup = new EpollEventLoopGroup();
+ channelClass = EpollSocketChannel.class;
+ } catch (Throwable t) {
+ workerGroup = new NioEventLoopGroup();
+ channelClass = NioSocketChannel.class;
+ }
+ }
+
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
@@ -71,6 +85,7 @@
public void activate() throws Exception {
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
+ initEventLoopGroup();
startAcceptingConnections();
}
@@ -173,7 +188,7 @@
bootstrap.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
- bootstrap.channel(NioSocketChannel.class);
+ bootstrap.channel(channelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.