blob: 47ebe68591973b79a5c8adefde6d3133b682a7bc [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053019import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.CacheLoader;
Andrea Campanella1e573442018-05-17 17:07:13 +020021import com.google.common.cache.LoadingCache;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import com.google.common.collect.Maps;
23import io.grpc.ManagedChannel;
24import io.grpc.ManagedChannelBuilder;
25import io.grpc.NameResolverProvider;
26import io.grpc.internal.DnsNameResolverProvider;
27import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.onosproject.event.AbstractListenerManager;
34import org.onosproject.grpc.api.GrpcChannelId;
35import org.onosproject.grpc.api.GrpcController;
36import org.onosproject.net.DeviceId;
Andrea Campanella1e573442018-05-17 17:07:13 +020037import org.onosproject.net.device.ChannelEvent;
38import org.onosproject.net.device.ChannelListener;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040039import org.onosproject.p4runtime.api.P4RuntimeClient;
40import org.onosproject.p4runtime.api.P4RuntimeController;
41import org.onosproject.p4runtime.api.P4RuntimeEvent;
42import org.onosproject.p4runtime.api.P4RuntimeEventListener;
Yi Tseng3e7f1452017-10-20 10:31:53 -070043import org.onosproject.store.service.AtomicCounter;
44import org.onosproject.store.service.StorageService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040045import org.slf4j.Logger;
46
47import java.io.IOException;
Andrea Campanella1e573442018-05-17 17:07:13 +020048import java.util.ArrayList;
49import java.util.List;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import java.util.Map;
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053051import java.util.concurrent.TimeUnit;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040052import java.util.concurrent.locks.ReadWriteLock;
53import java.util.concurrent.locks.ReentrantReadWriteLock;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054
55import static com.google.common.base.Preconditions.checkNotNull;
56import static java.lang.String.format;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040057import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * P4Runtime controller implementation.
61 */
62@Component(immediate = true)
63@Service
64public class P4RuntimeControllerImpl
65 extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
66 implements P4RuntimeController {
67
Yi Tseng3e7f1452017-10-20 10:31:53 -070068 private static final String P4R_ELECTION = "p4runtime-election";
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053069 private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070 private final Logger log = getLogger(getClass());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071 private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072 private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
73 private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
Andrea Campanella1e573442018-05-17 17:07:13 +020074 private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +053075 private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
76 .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
77 .build(new CacheLoader<DeviceId, ReadWriteLock>() {
78 @Override
79 public ReadWriteLock load(DeviceId deviceId) {
80 return new ReentrantReadWriteLock();
81 }
82 });
83
Yi Tseng3e7f1452017-10-20 10:31:53 -070084 private AtomicCounter electionIdGenerator;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085
Carmelo Cascone8d99b172017-07-18 17:26:31 -040086 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconec8e84982017-07-26 15:34:42 -040087 public GrpcController grpcController;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040088
Yi Tseng3e7f1452017-10-20 10:31:53 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 public StorageService storageService;
91
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092 @Activate
93 public void activate() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +020094 eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
Yi Tseng3e7f1452017-10-20 10:31:53 -070095 electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
96
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097 log.info("Started");
98 }
99
100
101 @Deactivate
102 public void deactivate() {
103 grpcController = null;
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200104 eventDispatcher.removeSink(P4RuntimeEvent.class);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400105 log.info("Stopped");
106 }
107
108
109 @Override
Carmelo Casconef423bec2017-08-30 01:56:25 +0200110 public boolean createClient(DeviceId deviceId, long p4DeviceId, ManagedChannelBuilder channelBuilder) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400111 checkNotNull(deviceId);
112 checkNotNull(channelBuilder);
113
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530114 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115 log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
116
117 try {
118 if (clients.containsKey(deviceId)) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200119 // TODO might want to consider a more fine-grained check such as same port/p4DeviceId
120 log.warn("A client already exists for {}", deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400121 throw new IllegalStateException(format("A client already exists for %s", deviceId));
122 } else {
123 return doCreateClient(deviceId, p4DeviceId, channelBuilder);
124 }
125 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530126 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400127 }
128 }
129
Carmelo Casconef423bec2017-08-30 01:56:25 +0200130 private boolean doCreateClient(DeviceId deviceId, long p4DeviceId, ManagedChannelBuilder channelBuilder) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400131
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400132 GrpcChannelId channelId = GrpcChannelId.of(deviceId, "p4runtime");
133
134 // Channel defaults.
135 channelBuilder.nameResolverFactory(nameResolverProvider);
136
137 ManagedChannel channel;
138 try {
139 channel = grpcController.connectChannel(channelId, channelBuilder);
140 } catch (IOException e) {
141 log.warn("Unable to connect to gRPC server of {}: {}", deviceId, e.getMessage());
142 return false;
143 }
144
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 P4RuntimeClient client = new P4RuntimeClientImpl(deviceId, p4DeviceId, channel, this);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400146
147 channelIds.put(deviceId, channelId);
148 clients.put(deviceId, client);
149
150 return true;
151 }
152
153 @Override
154 public P4RuntimeClient getClient(DeviceId deviceId) {
155
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530156 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400157
158 try {
159 return clients.get(deviceId);
160 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530161 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162 }
163 }
164
165 @Override
166 public void removeClient(DeviceId deviceId) {
167
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530168 deviceLocks.getUnchecked(deviceId).writeLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400169
170 try {
171 if (clients.containsKey(deviceId)) {
172 clients.get(deviceId).shutdown();
173 grpcController.disconnectChannel(channelIds.get(deviceId));
174 clients.remove(deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400175 channelIds.remove(deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400176 }
177 } finally {
Andrea Campanellaa74bdba2018-05-15 16:45:00 +0200178 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400179 }
180 }
181
182 @Override
183 public boolean hasClient(DeviceId deviceId) {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530184 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400185
186 try {
187 return clients.containsKey(deviceId);
188 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530189 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400190 }
191 }
192
193 @Override
194 public boolean isReacheable(DeviceId deviceId) {
195
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530196 deviceLocks.getUnchecked(deviceId).readLock().lock();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400197
198 try {
199 if (!clients.containsKey(deviceId)) {
200 log.warn("No client for {}, can't check for reachability", deviceId);
201 return false;
202 }
203
204 return grpcController.isChannelOpen(channelIds.get(deviceId));
205 } finally {
Manjunath Vanaraj59ad6572017-12-26 11:10:57 +0530206 deviceLocks.getUnchecked(deviceId).readLock().unlock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400207 }
208 }
209
Yi Tseng3e7f1452017-10-20 10:31:53 -0700210 @Override
211 public long getNewMasterElectionId() {
212 return electionIdGenerator.incrementAndGet();
213 }
214
Andrea Campanella1e573442018-05-17 17:07:13 +0200215 @Override
216 public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
217 channelListeners.compute(deviceId, (devId, listeners) -> {
218 List<ChannelListener> newListeners;
219 if (listeners != null) {
220 newListeners = listeners;
221 } else {
222 newListeners = new ArrayList<>();
223 }
224 newListeners.add(listener);
225 return newListeners;
226 });
227 }
228
229 @Override
230 public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
231 channelListeners.compute(deviceId, (devId, listeners) -> {
232 if (listeners != null) {
233 listeners.remove(listener);
234 return listeners;
235 } else {
236 log.debug("Device {} has no listener registered", deviceId);
237 return null;
238 }
239 });
240 }
241
Yi Tseng82512da2017-08-16 19:46:36 -0700242 public void postEvent(P4RuntimeEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200243 if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
244 DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
245 DeviceId deviceId = event.subject().deviceId();
246 ChannelEvent channelEvent = null;
247 //If disconnection is already known we propagate it.
248 if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
249 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
250 channelError.throwable());
251 } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
252 //If we don't know what the error is we check for reachability
253 if (!isReacheable(deviceId)) {
254 //if false the channel has disconnected
255 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
256 channelError.throwable());
257 } else {
258 // else we propagate the event.
259 channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
260 channelError.throwable());
261 }
262 }
263 //Ignoring CHANNEL_CONNECTED
264 if (channelEvent != null && channelListeners.get(deviceId) != null) {
265 for (ChannelListener listener : channelListeners.get(deviceId)) {
266 listener.event(channelEvent);
267 }
268 }
269 } else {
270 post(event);
271 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400272 }
273}