blob: bb7f8750ecb8db674fd0f60a067c637373da707e [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import org.onosproject.core.ApplicationId;
19import org.onosproject.net.config.ConfigFactory;
20import org.onosproject.net.config.NetworkConfigEvent;
21import org.onosproject.net.config.NetworkConfigListener;
22import org.onosproject.net.config.basics.SubjectFactories;
23import org.onosproject.workflow.api.DefaultWorkflowDescription;
24import org.onosproject.workflow.api.RpcDescription;
25import org.onosproject.workflow.api.WorkflowService;
26import org.onosproject.workflow.api.WorkflowException;
27import org.onosproject.workflow.api.DefaultWorkplaceDescription;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.util.Collection;
32import java.util.HashMap;
33import java.util.Map;
34import java.util.concurrent.ScheduledExecutorService;
35
36import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
37import static org.onlab.util.Tools.groupedThreads;
38
39public class WorkflowNetConfigListener implements NetworkConfigListener {
40
41 private static final Logger log = LoggerFactory.getLogger(WorkflowNetConfigListener.class);
42
43 public static final String CONFIG_KEY = "workflow";
44 public static final String EXECUTOR_GROUPNAME = "onos/workflow-netcfg";
45 public static final String EXECUTOR_PATTERN = "netcfg-event-handler";
46
47 private final ConfigFactory<ApplicationId, WorkflowNetConfig> configFactory =
48 new ConfigFactory<ApplicationId, WorkflowNetConfig>(
49 SubjectFactories.APP_SUBJECT_FACTORY, WorkflowNetConfig.class, CONFIG_KEY) {
50 @Override
51 public WorkflowNetConfig createConfig() {
52 return new WorkflowNetConfig();
53 }
54 };
55
56 private final WorkflowService workflowService;
57
58 private final ScheduledExecutorService executor =
59 newSingleThreadScheduledExecutor(groupedThreads(EXECUTOR_GROUPNAME, EXECUTOR_PATTERN));
60
61 public WorkflowNetConfigListener(WorkflowService workflowService) {
62 this.workflowService = workflowService;
63 }
64
65 public ConfigFactory<ApplicationId, WorkflowNetConfig> getConfigFactory() {
66 return configFactory;
67 }
68
69 @Override
70 public boolean isRelevant(NetworkConfigEvent event) {
jaegonkime0f45b52018-10-09 20:23:26 +090071 return event.config().isPresent() && event.config().get() instanceof WorkflowNetConfig;
jaegonkim6a7b5242018-09-12 23:09:42 +090072 }
73
74 @Override
75 public void event(NetworkConfigEvent event) {
76 log.info("Configuration event: {}", event);
77 switch (event.type()) {
78 case CONFIG_ADDED:
79 case CONFIG_UPDATED:
80 if (!event.config().isPresent()) {
81 log.error("No configuration found");
82 return;
83 }
84 WorkflowNetConfig config = (WorkflowNetConfig) event.config().get();
85
86 //Single thread executor(locking is not required)
87 executor.execute(new Handler(workflowService, config));
88 break;
89 default:
90 break;
91 }
92 }
93
94 public static class Handler implements Runnable {
95
96 private WorkflowService workflowService;
97 private WorkflowNetConfig config;
98
99 public Handler(WorkflowService workflowService, WorkflowNetConfig config) {
100 this.workflowService = workflowService;
101 this.config = config;
102 }
103
104 @Override
105 public void run() {
106
107 try {
108 Collection<RpcDescription> rpcs = config.getRpcDescriptions();
109 log.info("" + rpcs);
110 for (RpcDescription rpc : rpcs) {
111 if (!rpcMap.containsKey(rpc.op())) {
112 log.error("Invalid RPC: {}", rpc);
113 continue;
114 }
115
116 rpcMap.get(rpc.op()).apply(this.workflowService, rpc);
117 }
118 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900119 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900120 }
121 }
122 }
123
124 @FunctionalInterface
125 public interface RpcCall {
126 void apply(WorkflowService workflowService, RpcDescription rpcDesc) throws WorkflowException;
127 }
128
129 private static Map<String, RpcCall> rpcMap = new HashMap<>();
130 static {
131 rpcMap.put("workplace.create",
132 (service, desc) -> service.createWorkplace(DefaultWorkplaceDescription.valueOf(desc.params())));
133 rpcMap.put("workplace.remove",
134 (service, desc) -> service.removeWorkplace(DefaultWorkplaceDescription.valueOf(desc.params())));
135 rpcMap.put("workflow.invoke",
136 (service, desc) -> service.invokeWorkflow(desc.params()));
137 rpcMap.put("workflow.terminate",
138 (service, desc) -> service.terminateWorkflow(DefaultWorkflowDescription.valueOf(desc.params())));
139 }
140}