ExecutorService with somewhat predictable thread assignment.

- ExecutorService which allows the caller or the Task to
  express hint about which Thread it needs to be executed.

Change-Id: If1cc58f6b2369bb5afce4f402c195eacedf67f05
diff --git a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
new file mode 100644
index 0000000..836d3ae
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * (Somewhat) predictable ExecutorService.
+ * <p>
+ * ExecutorService which behaves similar to the one created by
+ * {@link Executors#newFixedThreadPool(int, ThreadFactory)},
+ * but assigns command to specific thread based on
+ * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly
+ * specified when the command was passed to this {@link PredictableExecutor}.
+ */
+public class PredictableExecutor
+        extends AbstractExecutorService
+        implements ExecutorService {
+
+    private final List<ExecutorService> backends;
+
+    /**
+     * Creates {@link PredictableExecutor} instance.
+     *
+     * @param buckets number of buckets or 0 to match available processors
+     * @param threadFactory {@link ThreadFactory} to use to create threads
+     * @return {@link PredictableExecutor}
+     */
+    public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory)  {
+        return new PredictableExecutor(buckets, threadFactory);
+    }
+
+    /**
+     * Creates {@link PredictableExecutor} instance.
+     *
+     * @param buckets number of buckets or 0 to match available processors
+     * @param threadFactory {@link ThreadFactory} to use to create threads
+     */
+    public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
+        checkArgument(buckets >= 0, "number of buckets must be non zero");
+        checkNotNull(threadFactory);
+        if (buckets == 0) {
+            buckets = Runtime.getRuntime().availableProcessors();
+        }
+        this.backends = new ArrayList<>(buckets);
+
+        for (int i = 0; i < buckets; ++i) {
+            this.backends.add(backendExecutorService(threadFactory));
+        }
+    }
+
+    /**
+     * Creates {@link PredictableExecutor} instance with
+     * bucket size set to number of available processors.
+     *
+     * @param threadFactory {@link ThreadFactory} to use to create threads
+     */
+    public PredictableExecutor(ThreadFactory threadFactory) {
+        this(0, threadFactory);
+    }
+
+    /**
+     * Creates a single thread {@link ExecutorService} to use in the backend.
+     *
+     * @param threadFactory {@link ThreadFactory} to use to create threads
+     * @return single thread {@link ExecutorService}
+     */
+    protected ExecutorService backendExecutorService(ThreadFactory threadFactory) {
+        return Executors.newSingleThreadExecutor(threadFactory);
+    }
+
+
+    /**
+     * Executes given command at some time in the future.
+     *
+     * @param command the {@link Runnable} task
+     * @param hint value to pick thread to run on.
+     */
+    public void execute(Runnable command, int hint) {
+        int index = Math.abs(hint) % backends.size();
+        backends.get(index).execute(command);
+    }
+
+    /**
+     * Executes given command at some time in the future.
+     *
+     * @param command the {@link Runnable} task
+     * @param hintFunction Function to compute hint value
+     */
+    public void execute(Runnable command, Function<Runnable, Integer> hintFunction) {
+        execute(command, hintFunction.apply(command));
+    }
+
+
+    private static int hint(Runnable command) {
+        if (command instanceof PickyTask) {
+            return ((PickyTask) command).hint();
+        } else {
+            return Objects.hashCode(command);
+        }
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        execute(command, PredictableExecutor::hint);
+    }
+
+    @Override
+    public void shutdown() {
+        backends.stream().forEach(ExecutorService::shutdown);
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return backends.stream()
+                .map(ExecutorService::shutdownNow)
+                .flatMap(List::stream)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return backends.stream().allMatch(ExecutorService::isShutdown);
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return backends.stream().allMatch(ExecutorService::isTerminated);
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Note: It'll try, but is not assured that the method will return by specified timeout.
+     */
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+
+        final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS);
+        final Instant start = Instant.now();
+
+        return backends.parallelStream()
+                .filter(es -> !es.isTerminated())
+                .map(es -> {
+                    long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis();
+                    try {
+                        return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return false;
+                    }
+                })
+                .allMatch(result -> result);
+    }
+
+    @Override
+    protected <T> PickyFutureTask<T> newTaskFor(Callable<T> callable) {
+        return new PickyFutureTask<>(callable);
+    }
+
+    @Override
+    protected <T> PickyFutureTask<T> newTaskFor(Runnable runnable, T value) {
+        return new PickyFutureTask<>(runnable, value);
+    }
+
+    /**
+     * {@link Runnable} also implementing {@link PickyTask}.
+     */
+    public static interface PickyRunnable extends PickyTask, Runnable { }
+
+    /**
+     * {@link Callable} also implementing {@link PickyTask}.
+     *
+     * @param <T> result type
+     */
+    public static interface PickyCallable<T> extends PickyTask, Callable<T> { }
+
+    /**
+     * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
+     *
+     * @param runnable {@link Runnable}
+     * @param hint hint value
+     * @return {@link PickyRunnable}
+     */
+    public static PickyRunnable picky(Runnable runnable, int hint) {
+        return picky(runnable, (r) -> hint);
+    }
+
+    /**
+     * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
+     *
+     * @param runnable {@link Runnable}
+     * @param hint hint function
+     * @return {@link PickyRunnable}
+     */
+    public static PickyRunnable picky(Runnable runnable, Function<Runnable, Integer> hint) {
+        checkNotNull(runnable);
+        checkNotNull(hint);
+        return new PickyRunnable() {
+
+            @Override
+            public void run() {
+                runnable.run();
+            }
+
+            @Override
+            public int hint() {
+                return hint.apply(runnable);
+            }
+        };
+    }
+
+    /**
+     * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
+     *
+     * @param callable {@link Callable}
+     * @param hint hint value
+     * @return {@link PickyCallable}
+     */
+    public static <T> PickyCallable<T> picky(Callable<T> callable, int hint) {
+        return picky(callable, (c) -> hint);
+    }
+
+    /**
+     * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
+     *
+     * @param callable {@link Callable}
+     * @param hint hint function
+     * @return {@link PickyCallable}
+     */
+    public static <T> PickyCallable<T> picky(Callable<T> callable, Function<Callable<T>, Integer> hint) {
+        checkNotNull(callable);
+        checkNotNull(hint);
+        return new PickyCallable<T>() {
+
+            @Override
+            public T call() throws Exception {
+                return callable.call();
+            }
+
+            @Override
+            public int hint() {
+                return hint.apply(callable);
+            }
+
+        };
+    }
+
+    /**
+     * Abstraction to give a task a way to express it's preference to run on
+     * certain thread.
+     */
+    public static interface PickyTask {
+
+        /**
+         * Returns hint for choosing which Thread to run this task on.
+         *
+         * @return hint value
+         */
+        int hint();
+    }
+
+    /**
+     * A {@link FutureTask} implementing {@link PickyTask}.
+     * <p>
+     * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of
+     * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}.
+     *
+     * @param <T> result type.
+     */
+    public static class PickyFutureTask<T>
+        extends FutureTask<T>
+        implements PickyTask {
+
+        private final Object runnableOrCallable;
+
+        /**
+         * Same as {@link FutureTask#FutureTask(Runnable, Object)}.
+         */
+        public PickyFutureTask(Runnable runnable, T value) {
+            super(runnable, value);
+            runnableOrCallable = checkNotNull(runnable);
+        }
+
+        /**
+         * Same as {@link FutureTask#FutureTask(Callable)}.
+         */
+        public PickyFutureTask(Callable<T> callable) {
+            super(callable);
+            runnableOrCallable = checkNotNull(callable);
+        }
+
+        @Override
+        public int hint() {
+            if (runnableOrCallable instanceof PickyTask) {
+                return ((PickyTask) runnableOrCallable).hint();
+            } else {
+                return runnableOrCallable.hashCode();
+            }
+        }
+    }
+}
diff --git a/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java
new file mode 100644
index 0000000..5a2d6e5
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/PredictableExecutorTest.java
@@ -0,0 +1,124 @@
+package org.onlab.util;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
+import com.google.common.testing.EqualsTester;
+
+public class PredictableExecutorTest {
+
+    private PredictableExecutor pexecutor;
+    private ExecutorService executor;
+
+    @Before
+    public void setUp() {
+        pexecutor = new PredictableExecutor(3, Tools.namedThreads("Thread-%d"));
+        executor = pexecutor;
+    }
+
+    @After
+    public void tearDown() {
+        pexecutor.shutdownNow();
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(7);
+        AtomicReference<String> hintValue0 = new AtomicReference<>("");
+        AtomicReference<String> hintValue1 = new AtomicReference<>("");
+        AtomicReference<String> hintFunction0 = new AtomicReference<>("");
+        AtomicReference<String> pickyRunnable0 = new AtomicReference<>("");
+        AtomicReference<String> pickyRunnable1 = new AtomicReference<>("");
+        AtomicReference<String> pickyCallable0 = new AtomicReference<>("");
+        AtomicReference<String> hashCode0 = new AtomicReference<>("");
+
+        pexecutor.execute(() -> {
+            hintValue0.set(Thread.currentThread().getName());
+            latch.countDown();
+        }, 0);
+
+        pexecutor.execute(() -> {
+            hintValue1.set(Thread.currentThread().getName());
+            latch.countDown();
+        }, 1);
+
+        pexecutor.execute(() -> {
+            hintFunction0.set(Thread.currentThread().getName());
+            latch.countDown();
+        }, (runnable) -> 0);
+
+        pexecutor.execute(new PickyRunnable() {
+
+            @Override
+            public void run() {
+                pickyRunnable0.set(Thread.currentThread().getName());
+                latch.countDown();
+            }
+
+            @Override
+            public int hint() {
+                return 0;
+            }
+        });
+
+        executor.execute(new PickyRunnable() {
+
+            @Override
+            public void run() {
+                pickyRunnable1.set(Thread.currentThread().getName());
+                latch.countDown();
+            }
+
+            @Override
+            public int hint() {
+                return 1;
+            }
+        });
+
+        Callable<Void> callable = new Callable<Void>() {
+            @Override
+            public Void call() {
+                pickyCallable0.set(Thread.currentThread().getName());
+                latch.countDown();
+                return null;
+            }
+        };
+
+        executor.submit(PredictableExecutor.picky(callable, 0));
+
+
+        executor.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                hashCode0.set(Thread.currentThread().getName());
+                latch.countDown();
+
+            }
+
+            @Override
+            public int hashCode() {
+                return 0;
+            }
+        });
+
+        latch.await(1, TimeUnit.SECONDS);
+
+        new EqualsTester()
+            .addEqualityGroup(hintValue0.get(),
+                              hintFunction0.get(),
+                              pickyRunnable0.get(),
+                              pickyCallable0.get(),
+                              hashCode0.get())
+            .addEqualityGroup(hintValue1.get(),
+                              pickyRunnable1.get())
+            .testEquals();
+    }
+}