blob: 987356b0d8c33b57236ac00a5b01abb9b0c7c443 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053019import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.CacheLoader;
Andrea Campanella1e573442018-05-17 17:07:13 +020021import com.google.common.cache.LoadingCache;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import com.google.common.collect.Maps;
23import io.grpc.ManagedChannel;
24import io.grpc.ManagedChannelBuilder;
25import io.grpc.NameResolverProvider;
26import io.grpc.internal.DnsNameResolverProvider;
Carmelo Cascone44448a52018-06-25 23:36:57 +020027import io.grpc.netty.NettyChannelBuilder;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onosproject.event.AbstractListenerManager;
35import org.onosproject.grpc.api.GrpcChannelId;
36import org.onosproject.grpc.api.GrpcController;
37import org.onosproject.net.DeviceId;
Andrea Campanella1e573442018-05-17 17:07:13 +020038import org.onosproject.net.device.ChannelEvent;
39import org.onosproject.net.device.ChannelListener;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040040import org.onosproject.p4runtime.api.P4RuntimeClient;
41import org.onosproject.p4runtime.api.P4RuntimeController;
42import org.onosproject.p4runtime.api.P4RuntimeEvent;
43import org.onosproject.p4runtime.api.P4RuntimeEventListener;
Yi Tseng3e7f1452017-10-20 10:31:53 -070044import org.onosproject.store.service.AtomicCounter;
45import org.onosproject.store.service.StorageService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040046import org.slf4j.Logger;
47
48import java.io.IOException;
Andrea Campanella1e573442018-05-17 17:07:13 +020049import java.util.ArrayList;
50import java.util.List;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040051import java.util.Map;
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053052import java.util.concurrent.TimeUnit;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040053import java.util.concurrent.locks.ReadWriteLock;
54import java.util.concurrent.locks.ReentrantReadWriteLock;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040055
56import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040057import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * P4Runtime controller implementation.
61 */
62@Component(immediate = true)
63@Service
64public class P4RuntimeControllerImpl
65 extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
66 implements P4RuntimeController {
67
Yi Tseng3e7f1452017-10-20 10:31:53 -070068 private static final String P4R_ELECTION = "p4runtime-election";
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053069 private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070 private final Logger log = getLogger(getClass());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071 private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
Carmelo Cascone44448a52018-06-25 23:36:57 +020072 private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
73 private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
Carmelo Cascone59f57de2017-07-11 19:55:09 -040074 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
Andrea Campanella1e573442018-05-17 17:07:13 +020075 private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053076 private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
77 .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
78 .build(new CacheLoader<DeviceId, ReadWriteLock>() {
79 @Override
80 public ReadWriteLock load(DeviceId deviceId) {
81 return new ReentrantReadWriteLock();
82 }
83 });
84
Yi Tseng3e7f1452017-10-20 10:31:53 -070085 private AtomicCounter electionIdGenerator;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040086
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020088 private GrpcController grpcController;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089
Yi Tseng3e7f1452017-10-20 10:31:53 -070090 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone44448a52018-06-25 23:36:57 +020091 private StorageService storageService;
Yi Tseng3e7f1452017-10-20 10:31:53 -070092
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040093 @Activate
94 public void activate() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +020095 eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
Yi Tseng3e7f1452017-10-20 10:31:53 -070096 electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
97
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040098 log.info("Started");
99 }
100
101
102 @Deactivate
103 public void deactivate() {
104 grpcController = null;
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200105 eventDispatcher.removeSink(P4RuntimeEvent.class);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400106 log.info("Stopped");
107 }
108
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400109 @Override
Carmelo Cascone44448a52018-06-25 23:36:57 +0200110 public boolean createClient(DeviceId deviceId, String serverAddr,
111 int serverPort, long p4DeviceId) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400112 checkNotNull(deviceId);
Carmelo Cascone44448a52018-06-25 23:36:57 +0200113 checkNotNull(serverAddr);
114
115 ClientKey newKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
116
117 ManagedChannelBuilder channelBuilder = NettyChannelBuilder
118 .forAddress(serverAddr, serverPort)
119 .usePlaintext(true);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400120
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530121 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Cascone44448a52018-06-25 23:36:57 +0200122 log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
123 deviceId, serverAddr, serverPort, p4DeviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400124
125 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200126 if (deviceIdToClientKey.containsKey(deviceId)) {
127 final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
128 if (newKey.equals(existingKey)) {
129 return true;
130 } else {
131 throw new IllegalStateException(
132 "A client for the same device ID but different " +
133 "server endpoints already exists");
134 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400135 } else {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200136 return doCreateClient(newKey, channelBuilder);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400137 }
138 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530139 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400140 }
141 }
142
Carmelo Cascone44448a52018-06-25 23:36:57 +0200143 private boolean doCreateClient(ClientKey clientKey, ManagedChannelBuilder channelBuilder) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400144
Carmelo Cascone44448a52018-06-25 23:36:57 +0200145 GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(),
146 "p4runtime-" + clientKey.p4DeviceId());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400147
148 // Channel defaults.
149 channelBuilder.nameResolverFactory(nameResolverProvider);
150
151 ManagedChannel channel;
152 try {
153 channel = grpcController.connectChannel(channelId, channelBuilder);
154 } catch (IOException e) {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200155 log.warn("Unable to connect to gRPC server of {}: {}",
156 clientKey.deviceId(), e.getMessage());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400157 return false;
158 }
159
Carmelo Cascone44448a52018-06-25 23:36:57 +0200160 P4RuntimeClient client = new P4RuntimeClientImpl(
161 clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162
Carmelo Cascone44448a52018-06-25 23:36:57 +0200163 channelIds.put(clientKey.deviceId(), channelId);
164 deviceIdToClientKey.put(clientKey.deviceId(), clientKey);
165 clientKeyToClient.put(clientKey, client);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400166
167 return true;
168 }
169
170 @Override
171 public P4RuntimeClient getClient(DeviceId deviceId) {
172
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530173 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400174
175 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200176 if (!deviceIdToClientKey.containsKey(deviceId)) {
177 return null;
178 } else {
179 return clientKeyToClient.get(deviceIdToClientKey.get(deviceId));
180 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400181 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530182 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400183 }
184 }
185
186 @Override
187 public void removeClient(DeviceId deviceId) {
188
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530189 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400190
191 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200192 if (deviceIdToClientKey.containsKey(deviceId)) {
193 final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400194 grpcController.disconnectChannel(channelIds.get(deviceId));
Carmelo Cascone44448a52018-06-25 23:36:57 +0200195 clientKeyToClient.remove(clientKey).shutdown();
196 deviceIdToClientKey.remove(deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400197 channelIds.remove(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400198 }
199 } finally {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200200 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400201 }
202 }
203
204 @Override
205 public boolean hasClient(DeviceId deviceId) {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530206 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400207
208 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200209 return deviceIdToClientKey.containsKey(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400210 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530211 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400212 }
213 }
214
215 @Override
216 public boolean isReacheable(DeviceId deviceId) {
217
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530218 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400219
220 try {
Carmelo Cascone44448a52018-06-25 23:36:57 +0200221 if (!deviceIdToClientKey.containsKey(deviceId)) {
222 log.debug("No client for {}, can't check for reachability", deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400223 return false;
224 }
225
226 return grpcController.isChannelOpen(channelIds.get(deviceId));
227 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530228 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400229 }
230 }
231
Yi Tseng3e7f1452017-10-20 10:31:53 -0700232 @Override
233 public long getNewMasterElectionId() {
234 return electionIdGenerator.incrementAndGet();
235 }
236
Andrea Campanella1e573442018-05-17 17:07:13 +0200237 @Override
238 public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
239 channelListeners.compute(deviceId, (devId, listeners) -> {
240 List<ChannelListener> newListeners;
241 if (listeners != null) {
242 newListeners = listeners;
243 } else {
244 newListeners = new ArrayList<>();
245 }
246 newListeners.add(listener);
247 return newListeners;
248 });
249 }
250
251 @Override
252 public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
253 channelListeners.compute(deviceId, (devId, listeners) -> {
254 if (listeners != null) {
255 listeners.remove(listener);
256 return listeners;
257 } else {
258 log.debug("Device {} has no listener registered", deviceId);
259 return null;
260 }
261 });
262 }
263
Carmelo Cascone44448a52018-06-25 23:36:57 +0200264 void postEvent(P4RuntimeEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200265 if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
266 DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
267 DeviceId deviceId = event.subject().deviceId();
268 ChannelEvent channelEvent = null;
269 //If disconnection is already known we propagate it.
270 if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
271 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
272 channelError.throwable());
273 } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
274 //If we don't know what the error is we check for reachability
275 if (!isReacheable(deviceId)) {
276 //if false the channel has disconnected
277 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
278 channelError.throwable());
279 } else {
280 // else we propagate the event.
281 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
282 channelError.throwable());
283 }
284 }
285 //Ignoring CHANNEL_CONNECTED
286 if (channelEvent != null && channelListeners.get(deviceId) != null) {
287 for (ChannelListener listener : channelListeners.get(deviceId)) {
288 listener.event(channelEvent);
289 }
290 }
291 } else {
292 post(event);
293 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400294 }
Carmelo Cascone44448a52018-06-25 23:36:57 +0200295
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400296}