ONOS-6835
Dynamic Config: RPC dispatcher implementation

Change-Id: I46827b0037af42cab42ec8095a25a7ec44e2a8ca
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index 64d4592..b45600a 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -15,17 +15,16 @@
  */
 package org.onosproject.config.impl;
 
-import com.google.common.annotations.Beta;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
+import org.onosproject.config.ResourceIdParser;
+import org.onosproject.config.RpcExecutor;
+import org.onosproject.config.RpcMessageId;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.StorageService;
 import org.onosproject.config.DynamicConfigEvent;
 import org.onosproject.config.DynamicConfigListener;
 import org.onosproject.config.DynamicConfigService;
@@ -37,10 +36,15 @@
 import org.onosproject.yang.model.RpcOutput;
 import org.onosproject.yang.model.DataNode;
 import org.onosproject.yang.model.ResourceId;
-//TODO import org.onosproject.yang.model.RpcRegistry;
-//TODO import org.onosproject.yang.model.RpcService;
+import org.onosproject.yang.model.RpcRegistry;
+import org.onosproject.yang.model.RpcService;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 
 import org.slf4j.Logger;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -49,43 +53,29 @@
  * Implementation of the Dynamic Config Service.
  *
  */
-@Beta
 @Component(immediate = true)
 @Service
 public class DynamicConfigManager
         extends AbstractListenerManager<DynamicConfigEvent, DynamicConfigListener>
-        implements DynamicConfigService {
+        implements DynamicConfigService, RpcRegistry {
     private final Logger log = getLogger(getClass());
     private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigStore store;
-    //TODO after 14420 is merged
-    //private ConsistentMap<RpcService, RpcHandler> handlerRegistry;
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
+    private ConcurrentHashMap<String, RpcService> handlerRegistry = new ConcurrentHashMap<>();
 
     @Activate
     public void activate() {
         store.setDelegate(storeDelegate);
         eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
         log.info("Started");
-        KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
-                .register(KryoNamespaces.BASIC)
-                .register(Class.class)
-                .register(ResourceId.class);
-        //TODO after 14420 is merged
-        /*handlerRegistry = storageService.<RpcService, RpcHandler>consistentMapBuilder()
-                .withSerializer(Serializer.using(kryoBuilder.build()))
-                .withName("config-object-store")
-                .withRelaxedReadConsistency()
-                .build();*/
-        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         store.unsetDelegate(storeDelegate);
         eventDispatcher.removeSink(DynamicConfigEvent.class);
+        handlerRegistry.clear();
         log.info("Stopped");
     }
 
@@ -113,22 +103,57 @@
         return store.nodeExist(path).join();
     }
 
-    //TODO after RPC abstractions are merged; else will lead to build failure
-    /*public void registerHandler(RpcHandler handler, RpcCommand command) {
-        handlerRegistry.put(command, handler);
+    public Set<RpcService> getRpcServices() {
+        Set<RpcService> res = new HashSet();
+        for (Map.Entry<String, RpcService> e : handlerRegistry.entrySet()) {
+            res.add(e.getValue());
+        }
+        return res;
     }
 
-    public void unRegisterHandler(RpcHandler handler, RpcCommand command) {
-        Versioned<RpcHandler> ret = handlerRegistry.get(command);
-        if ((ret == null) || (ret.value() == null)) {
-            throw new FailedException("No registered handler found, cannot unregister");
+    public RpcService getRpcService(Class<? extends RpcService> intfc) {
+        return handlerRegistry.get(intfc.getSimpleName());
+    }
+
+    public void registerRpcService(RpcService handler) {
+        for (Class<?> intfc : handler.getClass().getInterfaces()) {
+            if (RpcService.class.isAssignableFrom(intfc)) {
+                handlerRegistry.put(intfc.getSimpleName(), handler);
+            }
         }
-        handlerRegistry.remove(command);
-    }*/
+    }
+
+    public void unregisterRpcService(RpcService handler) {
+        for (Class<?> intfc : handler.getClass().getInterfaces()) {
+            if (RpcService.class.isAssignableFrom(intfc)) {
+                String key = intfc.getSimpleName();
+                if (handlerRegistry.get(key) == null) {
+                    throw new FailedException("No registered handler found, cannot unregister");
+                }
+                handlerRegistry.remove(key);
+            }
+        }
+    }
+
+    private int getSvcId(RpcService handler, String srvc) {
+        Class<?>[] intfcs = handler.getClass().getInterfaces();
+        for (int i = 0; i < intfcs.length; i++) {
+            if (intfcs[i].getSimpleName().compareTo(srvc) == 0) {
+                return i;
+            }
+        }
+        throw new FailedException("No handler found, cannot invoke");
+    }
 
     public CompletableFuture<RpcOutput> invokeRpc(ResourceId id, RpcInput input) {
-        //TODO after RPC abstractions are merged; else will lead to build failure
-        throw new FailedException("Not yet implemented");
+        String[] ctxt = ResourceIdParser.getService(id);
+        RpcService handler = handlerRegistry.get(ctxt[0]);
+        if (handler == null) {
+            throw new FailedException("No registered handler found, cannot invoke");
+        }
+        return CompletableFuture.supplyAsync(
+                new RpcExecutor(handler, getSvcId(handler, ctxt[0]), ctxt[1], RpcMessageId.generate(), input),
+                Executors.newSingleThreadExecutor());
     }
 
     /**