ONOS-6835
Dynamic Config: RPC dispatcher implementation

Change-Id: I46827b0037af42cab42ec8095a25a7ec44e2a8ca
diff --git a/apps/config/src/main/java/org/onosproject/config/DynamicConfigService.java b/apps/config/src/main/java/org/onosproject/config/DynamicConfigService.java
index 72e9d8b..cf25cb5 100755
--- a/apps/config/src/main/java/org/onosproject/config/DynamicConfigService.java
+++ b/apps/config/src/main/java/org/onosproject/config/DynamicConfigService.java
@@ -31,6 +31,7 @@
 @Beta
 public interface DynamicConfigService
         extends ListenerService<DynamicConfigEvent, DynamicConfigListener> {
+
     /**
      * Creates a new node in the dynamic config store.
      * This method would throw an exception if there is a node with the same
diff --git a/apps/config/src/main/java/org/onosproject/config/ResourceIdParser.java b/apps/config/src/main/java/org/onosproject/config/ResourceIdParser.java
index 6bcdecc..3b11bcf 100755
--- a/apps/config/src/main/java/org/onosproject/config/ResourceIdParser.java
+++ b/apps/config/src/main/java/org/onosproject/config/ResourceIdParser.java
@@ -176,6 +176,17 @@
         return bldr.toString();
     }
 
+    public static String[] getService(ResourceId path) {
+        String[] res = null;
+        if (path == null) {
+            return res;
+        }
+        int last = path.nodeKeys().size() - 1;
+        res[0] = path.nodeKeys().get(last - 1).schemaId().name();
+        res[1] = path.nodeKeys().get(last).schemaId().name();
+        return res;
+    }
+
     private static void parseLeafList(LeafListKey key, StringBuilder bldr) {
         bldr.append(key.schemaId().name());
         bldr.append(NM_SEP);
diff --git a/apps/config/src/main/java/org/onosproject/config/RpcExecutor.java b/apps/config/src/main/java/org/onosproject/config/RpcExecutor.java
new file mode 100755
index 0000000..525e84f
--- /dev/null
+++ b/apps/config/src/main/java/org/onosproject/config/RpcExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.config;
+
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
+import org.onosproject.yang.model.RpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.function.Supplier;
+
+/**
+ * Rpc Executor.
+ */
+public final class RpcExecutor implements Supplier<RpcOutput> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private RpcService handler;
+    private String msgId;
+    private RpcInput input;
+    private String rpcName;
+    int svcId;
+
+    /**
+     * Creates an RpcExecutor.
+     *
+     * @param handler rpc handler
+     * @param svcId svcId
+     * @param rpcName  rpcName
+     * @param msgId rpc msgId
+     * @param input rpc input
+     */
+    public RpcExecutor(RpcService handler, int svcId, String rpcName, String msgId, RpcInput input) {
+        this.handler  = handler;
+        this.rpcName = rpcName;
+        this.svcId = svcId;
+        this.msgId = msgId;
+        this.input = input;
+    }
+
+    @Override
+    public RpcOutput get() {
+        RpcOutput ret;
+        try {
+            ret = (RpcOutput) handler.getClass().getInterfaces()[svcId].
+                    getMethod(rpcName, RpcInput.class).invoke(handler, input);
+        } catch (NoSuchMethodException | IllegalAccessException |
+                InvocationTargetException | IllegalArgumentException e) {
+            throw new FailedException(e.getMessage() + ", request:" + msgId);
+        }
+        ret.messageId(msgId);
+        return ret;
+    }
+}
\ No newline at end of file
diff --git a/apps/config/src/main/java/org/onosproject/config/RpcMessageId.java b/apps/config/src/main/java/org/onosproject/config/RpcMessageId.java
new file mode 100755
index 0000000..febb048
--- /dev/null
+++ b/apps/config/src/main/java/org/onosproject/config/RpcMessageId.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.config;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Rpc Message Identifier.
+ */
+public final class RpcMessageId {
+    private static AtomicLong count = new AtomicLong();
+    private RpcMessageId() {
+    }
+    /**
+     * Generates a new rpc message identifier.
+     *
+     * @return rpc message identifier
+     */
+    public static String generate() {
+        return String.valueOf(count.getAndIncrement());
+    }
+}
\ No newline at end of file
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index 64d4592..b45600a 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -15,17 +15,16 @@
  */
 package org.onosproject.config.impl;
 
-import com.google.common.annotations.Beta;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
+import org.onosproject.config.ResourceIdParser;
+import org.onosproject.config.RpcExecutor;
+import org.onosproject.config.RpcMessageId;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.StorageService;
 import org.onosproject.config.DynamicConfigEvent;
 import org.onosproject.config.DynamicConfigListener;
 import org.onosproject.config.DynamicConfigService;
@@ -37,10 +36,15 @@
 import org.onosproject.yang.model.RpcOutput;
 import org.onosproject.yang.model.DataNode;
 import org.onosproject.yang.model.ResourceId;
-//TODO import org.onosproject.yang.model.RpcRegistry;
-//TODO import org.onosproject.yang.model.RpcService;
+import org.onosproject.yang.model.RpcRegistry;
+import org.onosproject.yang.model.RpcService;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 
 import org.slf4j.Logger;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -49,43 +53,29 @@
  * Implementation of the Dynamic Config Service.
  *
  */
-@Beta
 @Component(immediate = true)
 @Service
 public class DynamicConfigManager
         extends AbstractListenerManager<DynamicConfigEvent, DynamicConfigListener>
-        implements DynamicConfigService {
+        implements DynamicConfigService, RpcRegistry {
     private final Logger log = getLogger(getClass());
     private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigStore store;
-    //TODO after 14420 is merged
-    //private ConsistentMap<RpcService, RpcHandler> handlerRegistry;
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
+    private ConcurrentHashMap<String, RpcService> handlerRegistry = new ConcurrentHashMap<>();
 
     @Activate
     public void activate() {
         store.setDelegate(storeDelegate);
         eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
         log.info("Started");
-        KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
-                .register(KryoNamespaces.BASIC)
-                .register(Class.class)
-                .register(ResourceId.class);
-        //TODO after 14420 is merged
-        /*handlerRegistry = storageService.<RpcService, RpcHandler>consistentMapBuilder()
-                .withSerializer(Serializer.using(kryoBuilder.build()))
-                .withName("config-object-store")
-                .withRelaxedReadConsistency()
-                .build();*/
-        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         store.unsetDelegate(storeDelegate);
         eventDispatcher.removeSink(DynamicConfigEvent.class);
+        handlerRegistry.clear();
         log.info("Stopped");
     }
 
@@ -113,22 +103,57 @@
         return store.nodeExist(path).join();
     }
 
-    //TODO after RPC abstractions are merged; else will lead to build failure
-    /*public void registerHandler(RpcHandler handler, RpcCommand command) {
-        handlerRegistry.put(command, handler);
+    public Set<RpcService> getRpcServices() {
+        Set<RpcService> res = new HashSet();
+        for (Map.Entry<String, RpcService> e : handlerRegistry.entrySet()) {
+            res.add(e.getValue());
+        }
+        return res;
     }
 
-    public void unRegisterHandler(RpcHandler handler, RpcCommand command) {
-        Versioned<RpcHandler> ret = handlerRegistry.get(command);
-        if ((ret == null) || (ret.value() == null)) {
-            throw new FailedException("No registered handler found, cannot unregister");
+    public RpcService getRpcService(Class<? extends RpcService> intfc) {
+        return handlerRegistry.get(intfc.getSimpleName());
+    }
+
+    public void registerRpcService(RpcService handler) {
+        for (Class<?> intfc : handler.getClass().getInterfaces()) {
+            if (RpcService.class.isAssignableFrom(intfc)) {
+                handlerRegistry.put(intfc.getSimpleName(), handler);
+            }
         }
-        handlerRegistry.remove(command);
-    }*/
+    }
+
+    public void unregisterRpcService(RpcService handler) {
+        for (Class<?> intfc : handler.getClass().getInterfaces()) {
+            if (RpcService.class.isAssignableFrom(intfc)) {
+                String key = intfc.getSimpleName();
+                if (handlerRegistry.get(key) == null) {
+                    throw new FailedException("No registered handler found, cannot unregister");
+                }
+                handlerRegistry.remove(key);
+            }
+        }
+    }
+
+    private int getSvcId(RpcService handler, String srvc) {
+        Class<?>[] intfcs = handler.getClass().getInterfaces();
+        for (int i = 0; i < intfcs.length; i++) {
+            if (intfcs[i].getSimpleName().compareTo(srvc) == 0) {
+                return i;
+            }
+        }
+        throw new FailedException("No handler found, cannot invoke");
+    }
 
     public CompletableFuture<RpcOutput> invokeRpc(ResourceId id, RpcInput input) {
-        //TODO after RPC abstractions are merged; else will lead to build failure
-        throw new FailedException("Not yet implemented");
+        String[] ctxt = ResourceIdParser.getService(id);
+        RpcService handler = handlerRegistry.get(ctxt[0]);
+        if (handler == null) {
+            throw new FailedException("No registered handler found, cannot invoke");
+        }
+        return CompletableFuture.supplyAsync(
+                new RpcExecutor(handler, getSvcId(handler, ctxt[0]), ctxt[1], RpcMessageId.generate(), input),
+                Executors.newSingleThreadExecutor());
     }
 
     /**