MeterManager: Use Executor to call provider to mitigate DistributedMeterStore timeouts

Change-Id: Ifc25d50d97829c347b3be65cb95848406efdc46d
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 2db8ec2..f7418ab 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -19,6 +19,7 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -54,8 +55,12 @@
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -68,6 +73,16 @@
         implements MeterService, MeterProviderRegistry {
 
     private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
+    private static final String NUM_THREAD = "numThreads";
+    private static final String WORKER_PATTERN = "installer-%d";
+    private static final String GROUP_THREAD_NAME = "onos/meter";
+
+    private static final int DEFAULT_NUM_THREADS = 4;
+    @Property(name = NUM_THREAD,
+            intValue = DEFAULT_NUM_THREADS,
+            label = "Number of worker threads")
+    private int numThreads = DEFAULT_NUM_THREADS;
+
     private final Logger log = getLogger(getClass());
     private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
 
@@ -85,6 +100,8 @@
 
     private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
 
+    private ExecutorService executorService;
+
     @Activate
     public void activate() {
         store.setDelegate(delegate);
@@ -104,6 +121,9 @@
                 });
 
             };
+
+        executorService = newFixedThreadPool(numThreads,
+                groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
         log.info("Started");
     }
 
@@ -111,6 +131,7 @@
     public void deactivate() {
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(MeterEvent.class);
+        executorService.shutdown();
         log.info("Stopped");
     }
 
@@ -308,15 +329,14 @@
         @Override
         public void notify(MeterEvent event) {
             DeviceId deviceId = event.subject().deviceId();
-            MeterProvider p = getProvider(event.subject().deviceId());
             switch (event.type()) {
                 case METER_ADD_REQ:
-                    p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
-                                                                         MeterOperation.Type.ADD));
+                    executorService.execute(new MeterInstaller(deviceId, event.subject(),
+                            MeterOperation.Type.ADD));
                     break;
                 case METER_REM_REQ:
-                    p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
-                                                                         MeterOperation.Type.REMOVE));
+                    executorService.execute(new MeterInstaller(deviceId, event.subject(),
+                            MeterOperation.Type.REMOVE));
                     break;
                 case METER_ADDED:
                     log.info("Meter added {}", event.subject());
@@ -332,5 +352,29 @@
 
         }
     }
+    /**
+     * Task that passes the meter down to the provider.
+     */
+    private class MeterInstaller implements Runnable {
+        private final DeviceId deviceId;
+        private final Meter meter;
+        private final MeterOperation.Type op;
+
+        public MeterInstaller(DeviceId deviceId, Meter meter, MeterOperation.Type op) {
+            this.deviceId = checkNotNull(deviceId);
+            this.meter = checkNotNull(meter);
+            this.op = checkNotNull(op);
+        }
+
+        @Override
+        public void run() {
+            MeterProvider p = getProvider(this.deviceId);
+            if (p == null) {
+                log.error("Unable to recover {}'s provider", deviceId);
+                return;
+            }
+            p.performMeterOperation(deviceId, new MeterOperation(meter, op));
+        }
+    }
 
 }