ONOS-5866 Dynamic Config: RPC implementation
Change-Id: I1706805fc7b9dba6e323ef9993eeb1dd51490d59
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index 0cda139..fac5073 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -22,6 +22,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onosproject.config.DynamicConfigEvent;
import org.onosproject.config.DynamicConfigListener;
import org.onosproject.config.DynamicConfigService;
@@ -29,6 +30,11 @@
import org.onosproject.config.DynamicConfigStoreDelegate;
import org.onosproject.config.FailedException;
import org.onosproject.config.Filter;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
import org.onosproject.yang.model.RpcCaller;
import org.onosproject.yang.model.RpcCommand;
import org.onosproject.yang.model.RpcHandler;
@@ -55,19 +61,41 @@
private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DynamicConfigStore store;
+ private ConsistentMap<RpcCommand, RpcHandler> handlerRegistry;
+ private ConsistentMap<Integer, RpcCaller> callerRegistry;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
@Activate
public void activate() {
store.setDelegate(storeDelegate);
eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
- log.info("DynamicConfigService Started");
+ log.info("Started");
+ KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
+ .register(KryoNamespaces.BASIC)
+ .register(Class.class)
+ .register(RpcHandler.class)
+ .register(RpcCaller.class)
+ .register(RpcCommand.class)
+ .register(ResourceId.class);
+ callerRegistry = storageService.<Integer, RpcCaller>consistentMapBuilder()
+ .withSerializer(Serializer.using(kryoBuilder.build()))
+ .withName("config-object-store")
+ .withRelaxedReadConsistency()
+ .build();
+ handlerRegistry = storageService.<RpcCommand, RpcHandler>consistentMapBuilder()
+ .withSerializer(Serializer.using(kryoBuilder.build()))
+ .withName("config-object-store")
+ .withRelaxedReadConsistency()
+ .build();
+ log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(storeDelegate);
eventDispatcher.removeSink(DynamicConfigEvent.class);
- log.info("DynamicConfigService Stopped");
+ log.info("Stopped");
}
public void createNodeRecursive(ResourceId path, DataNode node) {
@@ -103,20 +131,32 @@
}
public void registerHandler(RpcHandler handler, RpcCommand command) {
- throw new FailedException("Not yet implemented");
+ handlerRegistry.put(command, handler);
}
public void unRegisterHandler(RpcHandler handler, RpcCommand command) {
- //check obj1.getClass().equals(obj2.getClass())
- throw new FailedException("Not yet implemented");
+ Versioned<RpcHandler> ret = handlerRegistry.get(command);
+ if ((ret == null) || (ret.value() == null)) {
+ throw new FailedException("No registered handler found, cannot unregister");
+ }
+ handlerRegistry.remove(command);
}
public void invokeRpc(RpcCaller caller, Integer msgId, RpcCommand command, RpcInput input) {
- throw new FailedException("Not yet implemented");
+ callerRegistry.put(msgId, caller);
+ Versioned<RpcHandler> hndlr = handlerRegistry.get(command);
+ if ((hndlr == null) || (hndlr.value() == null)) {
+ throw new FailedException("No registered handler found, cannot invoke");
+ }
+ hndlr.value().executeRpc(msgId, command, input);
}
public void rpcResponse(Integer msgId, RpcOutput output) {
- throw new FailedException("Not yet implemented");
+ Versioned<RpcCaller> caller = callerRegistry.get(msgId);
+ if (caller.value() == null) {
+ throw new FailedException("No registered receiver found, cannot relay response");
+ }
+ caller.value().receiveResponse(msgId, output);
}
/**
* Auxiliary store delegate to receive notification about changes in the store.