Adding BoundedThreadPool and BlockingBoolean
Updating EventuallyConsistentMap to use BoundedThreadPool for broadcast threads,
and disabling anti-entropy for now.
Change-Id: Id1bfcdaf1d0a19745fe7336e4ac9eaf649871d5d
diff --git a/utils/misc/src/main/java/org/onlab/util/BlockingBoolean.java b/utils/misc/src/main/java/org/onlab/util/BlockingBoolean.java
new file mode 100644
index 0000000..cffdc1a
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/BlockingBoolean.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * Mutable boolean that allows threads to wait for a specified value.
+ */
+public class BlockingBoolean extends AbstractQueuedSynchronizer {
+
+ private static final int TRUE = 1;
+ private static final int FALSE = 0;
+
+ /**
+ * Creates a new blocking boolean with the specified value.
+ *
+ * @param value the starting value
+ */
+ public BlockingBoolean(boolean value) {
+ setState(value ? TRUE : FALSE);
+ }
+
+ /**
+ * Causes the current thread to wait until the boolean equals the specified
+ * value unless the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ * @param value specified value
+ * @throws InterruptedException
+ */
+ public void await(boolean value) throws InterruptedException {
+ acquireSharedInterruptibly(value ? TRUE : FALSE);
+ }
+
+ /**
+ * Causes the current thread to wait until the boolean equals the specified
+ * value unless the thread is {@linkplain Thread#interrupt interrupted},
+ * or the specified waiting time elapses.
+ *
+ * @param value specified value
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if the count reached zero and {@code false}
+ * if the waiting time elapsed before the count reached zero
+ * @throws InterruptedException
+ */
+ public boolean await(boolean value, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return tryAcquireSharedNanos(value ? TRUE : FALSE, unit.toNanos(timeout));
+ }
+
+ protected int tryAcquireShared(int acquires) {
+ return (getState() == acquires) ? 1 : -1;
+ }
+
+ /**
+ * Sets the value of the blocking boolean.
+ *
+ * @param value new value
+ */
+ public void set(boolean value) {
+ releaseShared(value ? TRUE : FALSE);
+ }
+
+ /**
+ * Gets the value of the blocking boolean.
+ *
+ * @return current value
+ */
+ public boolean get() {
+ return getState() == TRUE;
+ }
+
+ protected boolean tryReleaseShared(int releases) {
+ // Signal on state change only
+ int state = getState();
+ if (state == releases) {
+ return false;
+ }
+ return compareAndSetState(state, releases);
+ }
+
+}
diff --git a/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java b/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
new file mode 100644
index 0000000..6fb1c90
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of ThreadPoolExecutor that bounds the work queue.
+ * <p>
+ * When a new job would exceed the queue bound, the job is run on the caller's
+ * thread rather than on a thread from the pool.
+ * </p>
+ */
+public final class BoundedThreadPool extends ThreadPoolExecutor {
+
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
+
+ protected static int maxQueueSize = 80_000; //TODO tune this value
+ //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
+ private static final long STATS_INTERVAL = 5_000; //ms
+
+ private final BlockingBoolean underHighLoad;
+
+ private BoundedThreadPool(int numberOfThreads,
+ ThreadFactory threadFactory) {
+ super(numberOfThreads, numberOfThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(maxQueueSize),
+ threadFactory,
+ new CallerFeedbackPolicy());
+ underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
+ }
+
+ /**
+ * Returns a single-thread, bounded executor service.
+ *
+ * @param threadFactory thread factory for the worker thread.
+ * @return the bounded thread pool
+ */
+ public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
+ return new BoundedThreadPool(1, threadFactory);
+ }
+
+ /**
+ * Returns a fixed-size, bounded executor service.
+ *
+ * @param threadFactory thread factory for the worker threads.
+ * @return the bounded thread pool
+ */
+ public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
+ return new BoundedThreadPool(numberOfThreads, threadFactory);
+ }
+
+ //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
+ private final Counter submitted = new Counter();
+ private final Counter taken = new Counter();
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ submitted.add(1);
+ return super.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ submitted.add(1);
+ return super.submit(task, result);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ submitted.add(1);
+ super.execute(command);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ submitted.add(1);
+ return super.submit(task);
+ }
+
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ taken.add(1);
+ periodicallyPrintStats();
+ updateLoad();
+ }
+
+ // TODO schedule this with a fixed delay from a scheduled executor
+ private final AtomicLong lastPrinted = new AtomicLong(0L);
+ private void periodicallyPrintStats() {
+ long now = System.currentTimeMillis();
+ long prev = lastPrinted.get();
+ if (now - prev > STATS_INTERVAL) {
+ if (lastPrinted.compareAndSet(prev, now)) {
+ log.warn("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
+ getQueue().size(),
+ submitted.throughput(), taken.throughput());
+ submitted.reset();
+ taken.reset();
+ }
+ }
+ }
+
+ // TODO consider updating load whenever queue changes
+ private void updateLoad() {
+ underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
+ }
+
+ /**
+ * Feedback policy that delays the caller's thread until the executor's work
+ * queue falls below a threshold, then runs the job on the caller's thread.
+ */
+ private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
+
+ private final BlockingBoolean underLoad = new BlockingBoolean(false);
+
+ public BlockingBoolean load() {
+ return underLoad;
+ }
+
+ /**
+ * Executes task r in the caller's thread, unless the executor
+ * has been shut down, in which case the task is discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ // Wait for up to 1 second while the queue drains...
+ boolean notified = false;
+ try {
+ notified = underLoad.await(false, 1, TimeUnit.SECONDS);
+ } catch (InterruptedException exception) {
+ log.debug("Got exception waiting for notification:", exception);
+ } finally {
+ if (!notified) {
+ log.info("Waited for 1 second on {}. Proceeding with work...",
+ Thread.currentThread().getName());
+ } else {
+ log.info("FIXME we got a notice");
+ }
+ }
+ // Do the work on the submitter's thread
+ r.run();
+ }
+ }
+ }
+}
\ No newline at end of file