ONOS-5866 Dynamic Config: RPC implementation

Change-Id: I1706805fc7b9dba6e323ef9993eeb1dd51490d59
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index 0cda139..fac5073 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.config.DynamicConfigEvent;
 import org.onosproject.config.DynamicConfigListener;
 import org.onosproject.config.DynamicConfigService;
@@ -29,6 +30,11 @@
 import org.onosproject.config.DynamicConfigStoreDelegate;
 import org.onosproject.config.FailedException;
 import org.onosproject.config.Filter;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
 import org.onosproject.yang.model.RpcCaller;
 import org.onosproject.yang.model.RpcCommand;
 import org.onosproject.yang.model.RpcHandler;
@@ -55,19 +61,41 @@
     private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigStore store;
+    private ConsistentMap<RpcCommand, RpcHandler> handlerRegistry;
+    private ConsistentMap<Integer, RpcCaller> callerRegistry;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
 
     @Activate
     public void activate() {
         store.setDelegate(storeDelegate);
         eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
-        log.info("DynamicConfigService Started");
+        log.info("Started");
+        KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
+                .register(KryoNamespaces.BASIC)
+                .register(Class.class)
+                .register(RpcHandler.class)
+                .register(RpcCaller.class)
+                .register(RpcCommand.class)
+                .register(ResourceId.class);
+        callerRegistry = storageService.<Integer, RpcCaller>consistentMapBuilder()
+                .withSerializer(Serializer.using(kryoBuilder.build()))
+                .withName("config-object-store")
+                .withRelaxedReadConsistency()
+                .build();
+        handlerRegistry = storageService.<RpcCommand, RpcHandler>consistentMapBuilder()
+                .withSerializer(Serializer.using(kryoBuilder.build()))
+                .withName("config-object-store")
+                .withRelaxedReadConsistency()
+                .build();
+        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         store.unsetDelegate(storeDelegate);
         eventDispatcher.removeSink(DynamicConfigEvent.class);
-        log.info("DynamicConfigService Stopped");
+        log.info("Stopped");
     }
 
     public void createNodeRecursive(ResourceId path, DataNode node) {
@@ -103,20 +131,32 @@
     }
 
     public void registerHandler(RpcHandler handler, RpcCommand command) {
-        throw new FailedException("Not yet implemented");
+        handlerRegistry.put(command, handler);
     }
 
     public void unRegisterHandler(RpcHandler handler, RpcCommand command) {
-        //check obj1.getClass().equals(obj2.getClass())
-        throw new FailedException("Not yet implemented");
+        Versioned<RpcHandler> ret = handlerRegistry.get(command);
+        if ((ret == null) || (ret.value() == null)) {
+            throw new FailedException("No registered handler found, cannot unregister");
+        }
+        handlerRegistry.remove(command);
     }
 
     public void invokeRpc(RpcCaller caller, Integer msgId, RpcCommand command, RpcInput input) {
-        throw new FailedException("Not yet implemented");
+        callerRegistry.put(msgId, caller);
+        Versioned<RpcHandler> hndlr = handlerRegistry.get(command);
+        if ((hndlr == null) || (hndlr.value() == null)) {
+            throw new FailedException("No registered handler found, cannot invoke");
+        }
+        hndlr.value().executeRpc(msgId, command, input);
     }
 
     public void rpcResponse(Integer msgId, RpcOutput output) {
-        throw new FailedException("Not yet implemented");
+        Versioned<RpcCaller> caller = callerRegistry.get(msgId);
+        if (caller.value() == null) {
+            throw new FailedException("No registered receiver found, cannot relay response");
+        }
+        caller.value().receiveResponse(msgId, output);
     }
     /**
      * Auxiliary store delegate to receive notification about changes in the store.