blob: d2773b20bd358022b478a5eb7948bd50b101b01c [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053019import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.CacheLoader;
Andrea Campanella1e573442018-05-17 17:07:13 +020021import com.google.common.cache.LoadingCache;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import com.google.common.collect.Maps;
23import io.grpc.ManagedChannel;
24import io.grpc.ManagedChannelBuilder;
25import io.grpc.NameResolverProvider;
26import io.grpc.internal.DnsNameResolverProvider;
Carmelo Cascone44448a52018-06-25 23:36:57 +020027import io.grpc.netty.NettyChannelBuilder;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onosproject.event.AbstractListenerManager;
35import org.onosproject.grpc.api.GrpcChannelId;
36import org.onosproject.grpc.api.GrpcController;
37import org.onosproject.net.DeviceId;
Carmelo Casconee5b28722018-06-22 17:28:28 +020038import org.onosproject.net.device.DeviceAgentEvent;
39import org.onosproject.net.device.DeviceAgentListener;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040040import org.onosproject.p4runtime.api.P4RuntimeClient;
41import org.onosproject.p4runtime.api.P4RuntimeController;
42import org.onosproject.p4runtime.api.P4RuntimeEvent;
43import org.onosproject.p4runtime.api.P4RuntimeEventListener;
Yi Tseng3e7f1452017-10-20 10:31:53 -070044import org.onosproject.store.service.StorageService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040045import org.slf4j.Logger;
46
47import java.io.IOException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020048import java.math.BigInteger;
Andrea Campanella1e573442018-05-17 17:07:13 +020049import java.util.List;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import java.util.Map;
Carmelo Casconee5b28722018-06-22 17:28:28 +020051import java.util.concurrent.ConcurrentMap;
52import java.util.concurrent.CopyOnWriteArrayList;
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053053import java.util.concurrent.TimeUnit;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040054import java.util.concurrent.locks.ReadWriteLock;
55import java.util.concurrent.locks.ReentrantReadWriteLock;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040056
57import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040058import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * P4Runtime controller implementation.
62 */
63@Component(immediate = true)
64@Service
65public class P4RuntimeControllerImpl
66 extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
67 implements P4RuntimeController {
68
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053069 private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070 private final Logger log = getLogger(getClass());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071 private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
Carmelo Cascone44448a52018-06-25 23:36:57 +020072 private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
73 private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
Carmelo Cascone59f57de2017-07-11 19:55:09 -040074 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
Carmelo Casconee5b28722018-06-22 17:28:28 +020075 private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053076 private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
77 .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
78 .build(new CacheLoader<DeviceId, ReadWriteLock>() {
79 @Override
80 public ReadWriteLock load(DeviceId deviceId) {
81 return new ReentrantReadWriteLock();
82 }
83 });
Carmelo Casconee5b28722018-06-22 17:28:28 +020084 private DistributedElectionIdGenerator electionIdGenerator;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085
Carmelo Cascone8d99b172017-07-18 17:26:31 -040086 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020087 private GrpcController grpcController;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040088
Yi Tseng3e7f1452017-10-20 10:31:53 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020090 private StorageService storageService;
Yi Tseng3e7f1452017-10-20 10:31:53 -070091
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092 @Activate
93 public void activate() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +020094 eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
Carmelo Casconee5b28722018-06-22 17:28:28 +020095 electionIdGenerator = new DistributedElectionIdGenerator(storageService);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040096 log.info("Started");
97 }
98
99
100 @Deactivate
101 public void deactivate() {
102 grpcController = null;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200103 electionIdGenerator.destroy();
104 electionIdGenerator = null;
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200105 eventDispatcher.removeSink(P4RuntimeEvent.class);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400106 log.info("Stopped");
107 }
108
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400109 @Override
Carmelo Cascone44448a52018-06-25 23:36:57 +0200110 public boolean createClient(DeviceId deviceId, String serverAddr,
111 int serverPort, long p4DeviceId) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400112 checkNotNull(deviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200113 checkNotNull(serverAddr);
114
115 ClientKey newKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
116
117 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
118 .forAddress(serverAddr, serverPort)
119 .usePlaintext(true);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400120
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530121 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122
123 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200124 if (deviceIdToClientKey.containsKey(deviceId)) {
125 final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
126 if (newKey.equals(existingKey)) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200127 log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
128 deviceId, serverAddr, serverPort, p4DeviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200129 return true;
130 } else {
131 throw new IllegalStateException(
132 "A client for the same device ID but different " +
133 "server endpoints already exists");
134 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400135 } else {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200136 log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
137 deviceId, serverAddr, serverPort, p4DeviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200138 return doCreateClient(newKey, channelBuilder);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400139 }
140 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530141 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400142 }
143 }
144
Carmelo Cascone44448a52018-06-25 23:36:57 +0200145 private boolean doCreateClient(ClientKey clientKey, ManagedChannelBuilder channelBuilder) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400146
Carmelo Cascone44448a52018-06-25 23:36:57 +0200147 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(),
148 "p4runtime-" + clientKey.p4DeviceId());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400149
150 // Channel defaults.
151 channelBuilder.nameResolverFactory(nameResolverProvider);
152
153 ManagedChannel channel;
154 try {
155 channel = grpcController.connectChannel(channelId, channelBuilder);
156 } catch (IOException e) {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200157 log.warn("Unable to connect to gRPC server of {}: {}",
158 clientKey.deviceId(), e.getMessage());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400159 return false;
160 }
161
Carmelo Cascone44448a52018-06-25 23:36:57 +0200162 P4RuntimeClient client = new P4RuntimeClientImpl(
163 clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400164
Carmelo Cascone44448a52018-06-25 23:36:57 +0200165 channelIds.put(clientKey.deviceId(), channelId);
166 deviceIdToClientKey.put(clientKey.deviceId(), clientKey);
167 clientKeyToClient.put(clientKey, client);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400168
169 return true;
170 }
171
172 @Override
173 public P4RuntimeClient getClient(DeviceId deviceId) {
174
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530175 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400176
177 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200178 if (!deviceIdToClientKey.containsKey(deviceId)) {
179 return null;
180 } else {
181 return clientKeyToClient.get(deviceIdToClientKey.get(deviceId));
182 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400183 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530184 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400185 }
186 }
187
188 @Override
189 public void removeClient(DeviceId deviceId) {
190
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530191 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400192 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200193 if (deviceIdToClientKey.containsKey(deviceId)) {
194 final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200195 clientKeyToClient.remove(clientKey).shutdown();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200196 grpcController.disconnectChannel(channelIds.get(deviceId));
Carmelo Cascone44448a52018-06-25 23:36:57 +0200197 deviceIdToClientKey.remove(deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400198 channelIds.remove(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400199 }
200 } finally {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200201 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400202 }
203 }
204
205 @Override
206 public boolean hasClient(DeviceId deviceId) {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530207 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400208
209 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200210 return deviceIdToClientKey.containsKey(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400211 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530212 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400213 }
214 }
215
216 @Override
217 public boolean isReacheable(DeviceId deviceId) {
218
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530219 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400220
221 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200222 if (!deviceIdToClientKey.containsKey(deviceId)) {
223 log.debug("No client for {}, can't check for reachability", deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400224 return false;
225 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200226 // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400227 return grpcController.isChannelOpen(channelIds.get(deviceId));
228 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530229 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400230 }
231 }
232
Yi Tseng3e7f1452017-10-20 10:31:53 -0700233 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200234 public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
235 deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
236 deviceAgentListeners.get(deviceId).add(listener);
Yi Tseng3e7f1452017-10-20 10:31:53 -0700237 }
238
Andrea Campanella1e573442018-05-17 17:07:13 +0200239 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200240 public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
241 deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
242 listeners.remove(listener);
243 return listeners;
Andrea Campanella1e573442018-05-17 17:07:13 +0200244 });
245 }
246
Carmelo Casconee5b28722018-06-22 17:28:28 +0200247 BigInteger newMasterElectionId(DeviceId deviceId) {
248 return electionIdGenerator.generate(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200249 }
250
Carmelo Cascone44448a52018-06-25 23:36:57 +0200251 void postEvent(P4RuntimeEvent event) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200252 switch (event.type()) {
253 case CHANNEL_EVENT:
254 handleChannelEvent(event);
255 break;
256 case ARBITRATION_RESPONSE:
257 handleArbitrationReply(event);
258 break;
259 default:
260 post(event);
261 break;
Andrea Campanella1e573442018-05-17 17:07:13 +0200262 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400263 }
Carmelo Cascone44448a52018-06-25 23:36:57 +0200264
Carmelo Casconee5b28722018-06-22 17:28:28 +0200265 private void handleChannelEvent(P4RuntimeEvent event) {
266 final ChannelEvent channelEvent = (ChannelEvent) event.subject();
267 final DeviceId deviceId = channelEvent.deviceId();
268 final DeviceAgentEvent.Type agentEventType;
269 switch (channelEvent.type()) {
270 case OPEN:
271 agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
272 break;
273 case CLOSED:
274 agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
275 break;
276 case ERROR:
277 agentEventType = !isReacheable(deviceId)
278 ? DeviceAgentEvent.Type.CHANNEL_CLOSED
279 : DeviceAgentEvent.Type.CHANNEL_ERROR;
280 break;
281 default:
282 log.warn("Unrecognized channel event type {}", channelEvent.type());
283 return;
284 }
285 postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
286 }
287
288 private void handleArbitrationReply(P4RuntimeEvent event) {
289 final DeviceId deviceId = event.subject().deviceId();
290 final ArbitrationResponse response = (ArbitrationResponse) event.subject();
291 final DeviceAgentEvent.Type roleType = response.isMaster()
292 ? DeviceAgentEvent.Type.ROLE_MASTER
293 : DeviceAgentEvent.Type.ROLE_STANDBY;
294 postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
295 roleType, response.deviceId()));
296 }
297
298 private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
299 if (deviceAgentListeners.containsKey(deviceId)) {
300 deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
301 }
302 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400303}