Adding BoundedThreadPool and BlockingBoolean

Updating EventuallyConsistentMap to use BoundedThreadPool for broadcast threads,
and disabling anti-entropy for now.

Change-Id: Id1bfcdaf1d0a19745fe7336e4ac9eaf649871d5d
diff --git a/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java b/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
new file mode 100644
index 0000000..6fb1c90
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of ThreadPoolExecutor that bounds the work queue.
+ * <p>
+ * When a new job would exceed the queue bound, the job is run on the caller's
+ * thread rather than on a thread from the pool.
+ * </p>
+ */
+public final class BoundedThreadPool extends ThreadPoolExecutor {
+
+    private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
+
+    protected static int maxQueueSize = 80_000; //TODO tune this value
+    //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
+    private static final long STATS_INTERVAL = 5_000; //ms
+
+    private final BlockingBoolean underHighLoad;
+
+    private BoundedThreadPool(int numberOfThreads,
+                              ThreadFactory threadFactory) {
+        super(numberOfThreads, numberOfThreads,
+              0L, TimeUnit.MILLISECONDS,
+              new LinkedBlockingQueue<>(maxQueueSize),
+              threadFactory,
+              new CallerFeedbackPolicy());
+        underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
+    }
+
+    /**
+     * Returns a single-thread, bounded executor service.
+     *
+     * @param threadFactory thread factory for the worker thread.
+     * @return the bounded thread pool
+     */
+    public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
+        return new BoundedThreadPool(1, threadFactory);
+    }
+
+    /**
+     * Returns a fixed-size, bounded executor service.
+     *
+     * @param threadFactory thread factory for the worker threads.
+     * @return the bounded thread pool
+     */
+    public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
+        return new BoundedThreadPool(numberOfThreads, threadFactory);
+    }
+
+    //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
+    private final Counter submitted = new Counter();
+    private final Counter taken = new Counter();
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        submitted.add(1);
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        submitted.add(1);
+        return super.submit(task, result);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        submitted.add(1);
+        super.execute(command);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        submitted.add(1);
+        return super.submit(task);
+    }
+
+
+    @Override
+    protected void beforeExecute(Thread t, Runnable r) {
+        super.beforeExecute(t, r);
+        taken.add(1);
+        periodicallyPrintStats();
+        updateLoad();
+    }
+
+    // TODO schedule this with a fixed delay from a scheduled executor
+    private final AtomicLong lastPrinted = new AtomicLong(0L);
+    private void periodicallyPrintStats() {
+        long now = System.currentTimeMillis();
+        long prev = lastPrinted.get();
+        if (now - prev > STATS_INTERVAL) {
+            if (lastPrinted.compareAndSet(prev, now)) {
+                log.warn("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
+                         getQueue().size(),
+                         submitted.throughput(), taken.throughput());
+                submitted.reset();
+                taken.reset();
+            }
+        }
+    }
+
+    // TODO consider updating load whenever queue changes
+    private void updateLoad() {
+        underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
+    }
+
+    /**
+     * Feedback policy that delays the caller's thread until the executor's work
+     * queue falls below a threshold, then runs the job on the caller's thread.
+     */
+    private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
+
+        private final BlockingBoolean underLoad = new BlockingBoolean(false);
+
+        public BlockingBoolean load() {
+            return underLoad;
+        }
+
+        /**
+         * Executes task r in the caller's thread, unless the executor
+         * has been shut down, in which case the task is discarded.
+         *
+         * @param r the runnable task requested to be executed
+         * @param e the executor attempting to execute this task
+         */
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+            if (!e.isShutdown()) {
+                // Wait for up to 1 second while the queue drains...
+                boolean notified = false;
+                try {
+                    notified = underLoad.await(false, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException exception) {
+                    log.debug("Got exception waiting for notification:", exception);
+                } finally {
+                    if (!notified) {
+                        log.info("Waited for 1 second on {}. Proceeding with work...",
+                                 Thread.currentThread().getName());
+                    } else {
+                        log.info("FIXME we got a notice");
+                    }
+                }
+                // Do the work on the submitter's thread
+                r.run();
+            }
+        }
+    }
+}
\ No newline at end of file