MeterManager: Use Executor to call provider to mitigate DistributedMeterStore timeouts
Change-Id: Ifc25d50d97829c347b3be65cb95848406efdc46d
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 2db8ec2..f7418ab 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -19,6 +19,7 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -54,8 +55,12 @@
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -68,6 +73,16 @@
implements MeterService, MeterProviderRegistry {
private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
+ private static final String NUM_THREAD = "numThreads";
+ private static final String WORKER_PATTERN = "installer-%d";
+ private static final String GROUP_THREAD_NAME = "onos/meter";
+
+ private static final int DEFAULT_NUM_THREADS = 4;
+ @Property(name = NUM_THREAD,
+ intValue = DEFAULT_NUM_THREADS,
+ label = "Number of worker threads")
+ private int numThreads = DEFAULT_NUM_THREADS;
+
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
@@ -85,6 +100,8 @@
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
+ private ExecutorService executorService;
+
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -104,6 +121,9 @@
});
};
+
+ executorService = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
log.info("Started");
}
@@ -111,6 +131,7 @@
public void deactivate() {
store.unsetDelegate(delegate);
eventDispatcher.removeSink(MeterEvent.class);
+ executorService.shutdown();
log.info("Stopped");
}
@@ -308,15 +329,14 @@
@Override
public void notify(MeterEvent event) {
DeviceId deviceId = event.subject().deviceId();
- MeterProvider p = getProvider(event.subject().deviceId());
switch (event.type()) {
case METER_ADD_REQ:
- p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
- MeterOperation.Type.ADD));
+ executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ MeterOperation.Type.ADD));
break;
case METER_REM_REQ:
- p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
- MeterOperation.Type.REMOVE));
+ executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ MeterOperation.Type.REMOVE));
break;
case METER_ADDED:
log.info("Meter added {}", event.subject());
@@ -332,5 +352,29 @@
}
}
+ /**
+ * Task that passes the meter down to the provider.
+ */
+ private class MeterInstaller implements Runnable {
+ private final DeviceId deviceId;
+ private final Meter meter;
+ private final MeterOperation.Type op;
+
+ public MeterInstaller(DeviceId deviceId, Meter meter, MeterOperation.Type op) {
+ this.deviceId = checkNotNull(deviceId);
+ this.meter = checkNotNull(meter);
+ this.op = checkNotNull(op);
+ }
+
+ @Override
+ public void run() {
+ MeterProvider p = getProvider(this.deviceId);
+ if (p == null) {
+ log.error("Unable to recover {}'s provider", deviceId);
+ return;
+ }
+ p.performMeterOperation(deviceId, new MeterOperation(meter, op));
+ }
+ }
}