blob: b35b999a41f679e981bde77e808258f4e69bb594 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.Maps;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020020import com.google.common.util.concurrent.Striped;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040021import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
23import io.grpc.NameResolverProvider;
24import io.grpc.internal.DnsNameResolverProvider;
Carmelo Cascone44448a52018-06-25 23:36:57 +020025import io.grpc.netty.NettyChannelBuilder;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onosproject.event.AbstractListenerManager;
33import org.onosproject.grpc.api.GrpcChannelId;
34import org.onosproject.grpc.api.GrpcController;
35import org.onosproject.net.DeviceId;
Carmelo Casconee5b28722018-06-22 17:28:28 +020036import org.onosproject.net.device.DeviceAgentEvent;
37import org.onosproject.net.device.DeviceAgentListener;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040038import org.onosproject.p4runtime.api.P4RuntimeClient;
39import org.onosproject.p4runtime.api.P4RuntimeController;
40import org.onosproject.p4runtime.api.P4RuntimeEvent;
41import org.onosproject.p4runtime.api.P4RuntimeEventListener;
Yi Tseng3e7f1452017-10-20 10:31:53 -070042import org.onosproject.store.service.StorageService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040043import org.slf4j.Logger;
44
45import java.io.IOException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020046import java.math.BigInteger;
Andrea Campanella1e573442018-05-17 17:07:13 +020047import java.util.List;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040048import java.util.Map;
Carmelo Casconee5b28722018-06-22 17:28:28 +020049import java.util.concurrent.ConcurrentMap;
50import java.util.concurrent.CopyOnWriteArrayList;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020051import java.util.concurrent.locks.Lock;
52import java.util.function.Supplier;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040053
Carmelo Cascone158b8c42018-07-04 19:42:37 +020054import static com.google.common.base.Preconditions.checkArgument;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040055import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040056import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * P4Runtime controller implementation.
60 */
61@Component(immediate = true)
62@Service
63public class P4RuntimeControllerImpl
64 extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
65 implements P4RuntimeController {
66
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040067 private final Logger log = getLogger(getClass());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040068 private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
Carmelo Cascone158b8c42018-07-04 19:42:37 +020069
70 private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
71 private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
Carmelo Cascone158b8c42018-07-04 19:42:37 +020073
Carmelo Casconee5b28722018-06-22 17:28:28 +020074 private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
Carmelo Cascone158b8c42018-07-04 19:42:37 +020075 private final Striped<Lock> stripedLocks = Striped.lock(30);
76
Carmelo Casconee5b28722018-06-22 17:28:28 +020077 private DistributedElectionIdGenerator electionIdGenerator;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040078
Carmelo Cascone8d99b172017-07-18 17:26:31 -040079 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020080 private GrpcController grpcController;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081
Yi Tseng3e7f1452017-10-20 10:31:53 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020083 private StorageService storageService;
Yi Tseng3e7f1452017-10-20 10:31:53 -070084
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085 @Activate
86 public void activate() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +020087 eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
Carmelo Casconee5b28722018-06-22 17:28:28 +020088 electionIdGenerator = new DistributedElectionIdGenerator(storageService);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089 log.info("Started");
90 }
91
92
93 @Deactivate
94 public void deactivate() {
Carmelo Cascone158b8c42018-07-04 19:42:37 +020095 clientKeys.keySet().forEach(this::removeClient);
96 clientKeys.clear();
97 clients.clear();
98 channelIds.clear();
99 deviceAgentListeners.clear();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400100 grpcController = null;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200101 electionIdGenerator.destroy();
102 electionIdGenerator = null;
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200103 eventDispatcher.removeSink(P4RuntimeEvent.class);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400104 log.info("Stopped");
105 }
106
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400107 @Override
Carmelo Cascone44448a52018-06-25 23:36:57 +0200108 public boolean createClient(DeviceId deviceId, String serverAddr,
109 int serverPort, long p4DeviceId) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400110 checkNotNull(deviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200111 checkNotNull(serverAddr);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200112 checkArgument(serverPort > 0, "Invalid server port");
Carmelo Cascone44448a52018-06-25 23:36:57 +0200113
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200114 return withDeviceLock(() -> doCreateClient(
115 deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
116 }
117
118 private boolean doCreateClient(DeviceId deviceId, String serverAddr,
119 int serverPort, long p4DeviceId) {
120
121 ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
122
123 if (clientKeys.containsKey(deviceId)) {
124 final ClientKey existingKey = clientKeys.get(deviceId);
125 if (clientKey.equals(existingKey)) {
126 log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
127 deviceId, serverAddr, serverPort, p4DeviceId);
128 return true;
129 } else {
130 throw new IllegalStateException(
131 "A client for the same device ID but different " +
132 "server endpoints already exists");
133 }
134 }
135
136 log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
137 deviceId, serverAddr, serverPort, p4DeviceId);
138
139 GrpcChannelId channelId = GrpcChannelId.of(
140 clientKey.deviceId(), "p4runtime-" + clientKey);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200141
142 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
143 .forAddress(serverAddr, serverPort)
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200144 .usePlaintext(true)
145 .nameResolverFactory(nameResolverProvider);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400146
147 ManagedChannel channel;
148 try {
149 channel = grpcController.connectChannel(channelId, channelBuilder);
150 } catch (IOException e) {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200151 log.warn("Unable to connect to gRPC server of {}: {}",
152 clientKey.deviceId(), e.getMessage());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400153 return false;
154 }
155
Carmelo Cascone44448a52018-06-25 23:36:57 +0200156 P4RuntimeClient client = new P4RuntimeClientImpl(
157 clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400158
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200159 clientKeys.put(clientKey.deviceId(), clientKey);
160 clients.put(clientKey, client);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200161 channelIds.put(clientKey.deviceId(), channelId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162
163 return true;
164 }
165
166 @Override
167 public P4RuntimeClient getClient(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200168 if (deviceId == null) {
169 return null;
170 }
171 return withDeviceLock(() -> doGetClient(deviceId), deviceId);
172 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400173
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200174 private P4RuntimeClient doGetClient(DeviceId deviceId) {
175 if (!clientKeys.containsKey(deviceId)) {
176 return null;
177 } else {
178 return clients.get(clientKeys.get(deviceId));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400179 }
180 }
181
182 @Override
183 public void removeClient(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200184 if (deviceId == null) {
185 return;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400186 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200187 withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
188 }
189
190 private Void doRemoveClient(DeviceId deviceId) {
191 if (clientKeys.containsKey(deviceId)) {
192 final ClientKey clientKey = clientKeys.get(deviceId);
193 clients.get(clientKey).shutdown();
194 grpcController.disconnectChannel(channelIds.get(deviceId));
195 clientKeys.remove(deviceId);
196 clients.remove(clientKey);
197 channelIds.remove(deviceId);
198 }
199 return null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400200 }
201
202 @Override
203 public boolean hasClient(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200204 return clientKeys.containsKey(deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400205 }
206
207 @Override
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200208 public boolean isReachable(DeviceId deviceId) {
209 if (deviceId == null) {
210 return false;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400211 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200212 return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
213 }
214
215 private boolean doIsReacheable(DeviceId deviceId) {
216 // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
217 if (!clientKeys.containsKey(deviceId)) {
218 log.debug("No client for {}, can't check for reachability", deviceId);
219 return false;
220 }
221 return grpcController.isChannelOpen(channelIds.get(deviceId));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400222 }
223
Yi Tseng3e7f1452017-10-20 10:31:53 -0700224 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200225 public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
226 deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
227 deviceAgentListeners.get(deviceId).add(listener);
Yi Tseng3e7f1452017-10-20 10:31:53 -0700228 }
229
Andrea Campanella1e573442018-05-17 17:07:13 +0200230 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200231 public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
232 deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
233 listeners.remove(listener);
234 return listeners;
Andrea Campanella1e573442018-05-17 17:07:13 +0200235 });
236 }
237
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200238 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
239 final Lock lock = stripedLocks.get(deviceId);
240 lock.lock();
241 try {
242 return task.get();
243 } finally {
244 lock.unlock();
245 }
246 }
247
Carmelo Casconee5b28722018-06-22 17:28:28 +0200248 BigInteger newMasterElectionId(DeviceId deviceId) {
249 return electionIdGenerator.generate(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200250 }
251
Carmelo Cascone44448a52018-06-25 23:36:57 +0200252 void postEvent(P4RuntimeEvent event) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200253 switch (event.type()) {
254 case CHANNEL_EVENT:
255 handleChannelEvent(event);
256 break;
257 case ARBITRATION_RESPONSE:
258 handleArbitrationReply(event);
259 break;
260 default:
261 post(event);
262 break;
Andrea Campanella1e573442018-05-17 17:07:13 +0200263 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400264 }
Carmelo Cascone44448a52018-06-25 23:36:57 +0200265
Carmelo Casconee5b28722018-06-22 17:28:28 +0200266 private void handleChannelEvent(P4RuntimeEvent event) {
267 final ChannelEvent channelEvent = (ChannelEvent) event.subject();
268 final DeviceId deviceId = channelEvent.deviceId();
269 final DeviceAgentEvent.Type agentEventType;
270 switch (channelEvent.type()) {
271 case OPEN:
272 agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
273 break;
274 case CLOSED:
275 agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
276 break;
277 case ERROR:
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200278 agentEventType = !isReachable(deviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200279 ? DeviceAgentEvent.Type.CHANNEL_CLOSED
280 : DeviceAgentEvent.Type.CHANNEL_ERROR;
281 break;
282 default:
283 log.warn("Unrecognized channel event type {}", channelEvent.type());
284 return;
285 }
286 postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
287 }
288
289 private void handleArbitrationReply(P4RuntimeEvent event) {
290 final DeviceId deviceId = event.subject().deviceId();
291 final ArbitrationResponse response = (ArbitrationResponse) event.subject();
292 final DeviceAgentEvent.Type roleType = response.isMaster()
293 ? DeviceAgentEvent.Type.ROLE_MASTER
294 : DeviceAgentEvent.Type.ROLE_STANDBY;
295 postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
296 roleType, response.deviceId()));
297 }
298
299 private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
300 if (deviceAgentListeners.containsKey(deviceId)) {
301 deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
302 }
303 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400304}