blob: ac6d9f01cbfd40cbf9aa75ab445c3f7dbbfb7883 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080026import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040027import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import io.grpc.ManagedChannel;
29import io.grpc.Status;
30import io.grpc.StatusRuntimeException;
31import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080032import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020033import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070034import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040035import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070036import org.onosproject.net.MastershipRole;
Carmelo Cascone87892e22017-11-13 16:01:29 -080037import org.onosproject.net.pi.model.PiActionProfileId;
38import org.onosproject.net.pi.model.PiCounterId;
39import org.onosproject.net.pi.model.PiCounterType;
Andrea Campanella432f7182017-07-14 18:43:27 +020040import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080041import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020042import org.onosproject.net.pi.runtime.PiActionGroup;
43import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020044import org.onosproject.net.pi.runtime.PiCounterCellData;
45import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080046import org.onosproject.net.pi.runtime.PiEntity;
Andrea Campanella432f7182017-07-14 18:43:27 +020047import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040048import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080049import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.p4runtime.api.P4RuntimeClient;
51import org.onosproject.p4runtime.api.P4RuntimeEvent;
52import org.slf4j.Logger;
53import p4.P4RuntimeGrpc;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080054import p4.P4RuntimeOuterClass;
Yi Tseng82512da2017-08-16 19:46:36 -070055import p4.P4RuntimeOuterClass.ActionProfileGroup;
56import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040057import p4.P4RuntimeOuterClass.Entity;
58import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
59import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
60import p4.P4RuntimeOuterClass.PacketIn;
61import p4.P4RuntimeOuterClass.ReadRequest;
62import p4.P4RuntimeOuterClass.ReadResponse;
63import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
64import p4.P4RuntimeOuterClass.StreamMessageRequest;
65import p4.P4RuntimeOuterClass.StreamMessageResponse;
66import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070067import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040068import p4.P4RuntimeOuterClass.Update;
69import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020070import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071import p4.tmp.P4Config;
72
Carmelo Casconed61fdb32017-10-30 10:09:57 -070073import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040074import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040075import java.util.Collections;
76import java.util.Iterator;
77import java.util.List;
78import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020079import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020080import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070082import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040083import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040084import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040086import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087import java.util.concurrent.locks.Lock;
88import java.util.concurrent.locks.ReentrantLock;
89import java.util.function.Supplier;
90import java.util.stream.Collectors;
91import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040092
Carmelo Casconed61fdb32017-10-30 10:09:57 -070093import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080094import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040095import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040096import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070097import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
98import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
99import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200100import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400101import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
102
103/**
104 * Implementation of a P4Runtime client.
105 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400106public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400107
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400108 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
109 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
110 WriteOperationType.INSERT, Update.Type.INSERT,
111 WriteOperationType.MODIFY, Update.Type.MODIFY,
112 WriteOperationType.DELETE, Update.Type.DELETE
113 );
114
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115 private final Logger log = getLogger(getClass());
116
117 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200118 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400119 private final P4RuntimeControllerImpl controller;
120 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400121 private final Context.CancellableContext cancellableContext;
122 private final ExecutorService executorService;
123 private final Executor contextExecutor;
124 private final Lock writeLock = new ReentrantLock();
125 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400126
Yi Tseng3e7f1452017-10-20 10:31:53 -0700127 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
128 protected Uint128 p4RuntimeElectionId;
129
Yi Tseng82512da2017-08-16 19:46:36 -0700130 /**
131 * Default constructor.
132 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200133 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700134 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200135 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700136 * @param controller runtime client controller
137 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200138 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
139 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400140 this.deviceId = deviceId;
141 this.p4DeviceId = p4DeviceId;
142 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400143 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400144 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 "onos/p4runtime-client-" + deviceId.toString(),
146 deviceId.toString() + "-%d"));
147 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200148 //TODO Investigate deadline or timeout in supplyInContext Method
149 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400150 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
151 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
152 }
153
154 /**
155 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
156 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
157 * <p>
158 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
159 * <p>
160 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200161 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400162 return CompletableFuture.supplyAsync(() -> {
163 // TODO: explore a more relaxed locking strategy.
164 writeLock.lock();
165 try {
166 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800167 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800168 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800169 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400170 } catch (Throwable ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800171 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400172 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400173 } finally {
174 writeLock.unlock();
175 }
176 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400177 }
178
179 @Override
180 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200181 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400182 }
183
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400184 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700185 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
186 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400187 }
188
189 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400190 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
191 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200192 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
193 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400194 }
195
196 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400197 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200198 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400199 }
200
201 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200202 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200203 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200204 }
205
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200206 @Override
207 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
208 PiPipeconf pipeconf) {
209 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
210 "readCounterCells-" + cellIds.hashCode());
211 }
212
213 @Override
214 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
215 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200216
217 /*
218 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
219 CounterEntry:
220 - All counter cells for all meters if counter_id = 0 (default).
221 - All counter cells for given counter_id if index = 0 (default).
222 DirectCounterEntry:
223 - All counter cells for all meters if counter_id = 0 (default).
224 - All counter cells for given counter_id if table_entry.match is empty.
225 */
226
227 Set<PiCounterCellId> cellIds = Sets.newHashSet();
228
229 for (PiCounterId counterId : counterIds) {
Carmelo Cascone87892e22017-11-13 16:01:29 -0800230 if (!pipeconf.pipelineModel().counter(counterId).isPresent()) {
231 log.warn("Unable to find counter '{}' in pipeline model", counterId);
232 continue;
233 }
234 PiCounterType counterType = pipeconf.pipelineModel().counter(counterId).get().counterType();
235 switch (counterType) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200236 case INDIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800237 cellIds.add(PiCounterCellId.ofIndirect(counterId, 0));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200238 break;
239 case DIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800240 cellIds.add(PiCounterCellId.ofDirect(counterId, PiTableEntry.EMTPY));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200241 break;
242 default:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800243 log.warn("Unrecognized PI counter type '{}'", counterType);
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200244 }
245 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200246
247 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
248 "readAllCounterCells-" + cellIds.hashCode());
249 }
250
Yi Tseng82512da2017-08-16 19:46:36 -0700251 @Override
252 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700253 WriteOperationType opType,
254 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200255 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700256 "writeActionGroupMembers-" + opType.name());
257 }
258
259 @Override
260 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
261 WriteOperationType opType,
262 PiPipeconf pipeconf) {
263 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
264 "writeActionGroup-" + opType.name());
265 }
266
267 @Override
268 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
269 PiPipeconf pipeconf) {
270 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
271 "dumpGroups-" + actionProfileId.id());
272 }
273
Yi Tseng3e7f1452017-10-20 10:31:53 -0700274 @Override
275 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
276 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
277 }
278
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400279 /* Blocking method implementations below */
280
Yi Tseng3e7f1452017-10-20 10:31:53 -0700281 private boolean doArbitrationUpdate() {
282 CompletableFuture<Boolean> result = new CompletableFuture<>();
283 // TODO: currently we use 64-bit Long type for election id, should
284 // we use 128-bit ?
285 long nextElectId = controller.getNewMasterElectionId();
286 Uint128 newElectionId = Uint128.newBuilder()
287 .setLow(nextElectId)
288 .build();
289 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
290 .setDeviceId(p4DeviceId)
291 .setElectionId(newElectionId)
292 .build();
293 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
294 .setArbitration(arbitrationUpdate)
295 .build();
296 log.debug("Sending arbitration update to {} with election id {}...",
297 deviceId, newElectionId);
298 arbitrationUpdateMap.put(newElectionId, result);
299 try {
300 streamRequestObserver.onNext(requestMsg);
301 return result.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800302 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800303 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tsenge67e1412018-01-31 17:35:20 -0800304 arbitrationUpdateMap.remove(newElectionId);
305 return false;
306 } catch (InterruptedException | ExecutionException e) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700307 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
308 arbitrationUpdateMap.remove(newElectionId);
309 return false;
310 }
311 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400312 private boolean doInitStreamChannel() {
313 // To listen for packets and other events, we need to start the RPC.
314 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700315 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400316 }
317
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700318 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400319
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700320 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
321
322 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400323
324 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
325 if (p4Info == null) {
326 // Problem logged by PipeconfHelper.
327 return false;
328 }
329
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700330 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
331 .newBuilder()
332 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
333 .setReassign(true)
334 .setDeviceData(ByteString.copyFrom(deviceData))
335 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400336
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700337 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200338 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700339 .setP4Info(p4Info)
340 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
341 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400342
343 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
344 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100345 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700346 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400347 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100348 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400349 .build();
350
351 try {
352 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700353 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400354 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800355 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400356 return false;
357 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400358 }
359
360 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
361 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400362 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
363
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800364 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400365 return true;
366 }
367
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800368 Collection<Update> updateMsgs = null;
369 try {
370 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
371 .stream()
372 .map(tableEntryMsg ->
373 Update.newBuilder()
374 .setEntity(Entity.newBuilder()
375 .setTableEntry(tableEntryMsg)
376 .build())
377 .setType(UPDATE_TYPES.get(opType))
378 .build())
379 .collect(Collectors.toList());
380 } catch (EncodeException e) {
381 log.error("Unable to encode table entries, aborting {} operation: {}",
382 opType.name(), e.getMessage());
383 return false;
384 }
385
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400386 writeRequestBuilder
387 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700388 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400389 .addAllUpdates(updateMsgs)
390 .build();
391
392 try {
393 blockingStub.write(writeRequestBuilder.build());
394 return true;
395 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800396 logWriteErrors(piTableEntries, e, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400397 return false;
398 }
399 }
400
401 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
402
Carmelo Cascone9f007702017-08-24 13:30:51 +0200403 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400404
405 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
406 int tableId;
407 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200408 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 } catch (P4InfoBrowser.NotFoundException e) {
410 log.warn("Unable to dump table: {}", e.getMessage());
411 return Collections.emptyList();
412 }
413
414 ReadRequest requestMsg = ReadRequest.newBuilder()
415 .setDeviceId(p4DeviceId)
416 .addEntities(Entity.newBuilder()
417 .setTableEntry(TableEntry.newBuilder()
418 .setTableId(tableId)
419 .build())
420 .build())
421 .build();
422
423 Iterator<ReadResponse> responses;
424 try {
425 responses = blockingStub.read(requestMsg);
426 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800427 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400428 return Collections.emptyList();
429 }
430
431 Iterable<ReadResponse> responseIterable = () -> responses;
432 List<TableEntry> tableEntryMsgs = StreamSupport
433 .stream(responseIterable.spliterator(), false)
434 .map(ReadResponse::getEntitiesList)
435 .flatMap(List::stream)
436 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
437 .map(Entity::getTableEntry)
438 .collect(Collectors.toList());
439
Carmelo Cascone9f007702017-08-24 13:30:51 +0200440 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400441
442 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
443 }
444
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200445 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
446 try {
447 //encode the PiPacketOperation into a PacketOut
448 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
449
450 //Build the request
451 StreamMessageRequest packetOutRequest = StreamMessageRequest
452 .newBuilder().setPacket(packetOut).build();
453
454 //Send the request
455 streamRequestObserver.onNext(packetOutRequest);
456
457 } catch (P4InfoBrowser.NotFoundException e) {
458 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
459 log.debug("Exception", e);
460 return false;
461 }
462 return true;
463 }
464
Carmelo Casconea966c342017-07-30 01:56:30 -0400465 private void doPacketIn(PacketIn packetInMsg) {
466
467 // Retrieve the pipeconf for this client's device.
468 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
469 if (pipeconfService == null) {
470 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
471 }
472 final PiPipeconf pipeconf;
473 if (pipeconfService.ofDevice(deviceId).isPresent() &&
474 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
475 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
476 } else {
477 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
478 return;
479 }
480 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800481 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200482 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
483 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400484 log.debug("Received packet in: {}", event);
485 controller.postEvent(event);
486 }
487
488 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700489 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400490
Yi Tseng3e7f1452017-10-20 10:31:53 -0700491 Uint128 electionId = arbitrationMsg.getElectionId();
492 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
493
494 if (mastershipFeature == null) {
495 log.warn("Can't find completable future of election id {}", electionId);
496 return;
497 }
498
499 this.p4RuntimeElectionId = electionId;
500 int statusCode = arbitrationMsg.getStatus().getCode();
501 MastershipRole arbitrationRole;
502 // arbitration update success
503
504 if (statusCode == Status.OK.getCode().value()) {
505 mastershipFeature.complete(true);
506 arbitrationRole = MastershipRole.MASTER;
507 } else {
508 mastershipFeature.complete(false);
509 arbitrationRole = MastershipRole.STANDBY;
510 }
511
512 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
513 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
514 arbitrationEventSubject);
515 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400516 }
517
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200518 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
519
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200520 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200521 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
522
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200523 final ReadRequest request = ReadRequest.newBuilder()
524 .setDeviceId(p4DeviceId)
525 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
526 .build();
527
528 if (request.getEntitiesList().size() == 0) {
529 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200530 }
531
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200532 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200533 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200534 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200535 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800536 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200537 return Collections.emptyList();
538 }
539
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200540 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200541 .map(ReadResponse::getEntitiesList)
542 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200543 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200544
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200545 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200546 }
547
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200548 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200549 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800550
551 for (PiActionGroupMember member : group.members()) {
552 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200553 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800554 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
555 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
556 opType.name(), e.getMessage(), member.toString());
557 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700558 }
Yi Tseng82512da2017-08-16 19:46:36 -0700559 }
560
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200561 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700562 .map(actionProfileMember ->
563 Update.newBuilder()
564 .setEntity(Entity.newBuilder()
565 .setActionProfileMember(actionProfileMember)
566 .build())
567 .setType(UPDATE_TYPES.get(opType))
568 .build())
569 .collect(Collectors.toList());
570
571 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200572 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700573 return true;
574 }
575
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200576 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700577 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700578 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200579 .addAllUpdates(updateMsgs)
580 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700581 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200582 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700583 return true;
584 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800585 logWriteErrors(group.members(), e, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700586 return false;
587 }
588 }
589
590 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
591 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
592 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200593
594 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700595 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200596 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700597 return Collections.emptySet();
598 }
599
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200600 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700601 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200602 actionProfileId = browser
603 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200604 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200605 .getPreamble()
606 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700607 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200608 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700609 return Collections.emptySet();
610 }
611
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200612 // Prepare read request to read all groups from the given action profile.
613 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700614 .setDeviceId(p4DeviceId)
615 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200616 .setActionProfileGroup(
617 ActionProfileGroup.newBuilder()
618 .setActionProfileId(actionProfileId)
619 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700620 .build())
621 .build();
622
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200623 // Read groups.
624 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700625 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200626 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700627 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800628 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700629 return Collections.emptySet();
630 }
631
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200632 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
633 .map(ReadResponse::getEntitiesList)
634 .flatMap(List::stream)
635 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
636 .map(Entity::getActionProfileGroup)
637 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700638
639 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200640 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700641
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200642 // Returned groups contain only a minimal description of their members.
643 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700644
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200645 // Keep a map of all member IDs for each group ID, will need it later.
646 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
647 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
648 g.getGroupId(),
649 g.getMembersList().stream()
650 .map(ActionProfileGroup.Member::getMemberId)
651 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700652
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 // Prepare one big read request to read all members in one shot.
654 final Set<Entity> entityMsgs = groupMsgs.stream()
655 .flatMap(g -> g.getMembersList().stream())
656 .map(ActionProfileGroup.Member::getMemberId)
657 // Prevent issuing many read requests for the same member.
658 .distinct()
659 .map(id -> ActionProfileMember.newBuilder()
660 .setActionProfileId(actionProfileId)
661 .setMemberId(id)
662 .build())
663 .map(m -> Entity.newBuilder()
664 .setActionProfileMember(m)
665 .build())
666 .collect(Collectors.toSet());
667 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
668 .addAllEntities(entityMsgs)
669 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700670
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200671 // Read members.
672 final Iterator<ReadResponse> memberResponses;
673 try {
674 memberResponses = blockingStub.read(memberRequestMsg);
675 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800676 log.warn("Unable to read members of action profile {} from {}: {}",
677 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200678 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700679 }
680
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200681 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
682 Tools.stream(() -> memberResponses)
683 .map(ReadResponse::getEntitiesList)
684 .flatMap(List::stream)
685 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
686 .map(Entity::getActionProfileMember)
687 .forEach(member -> groupIdToMemberIdsMap.asMap()
688 // Get all group IDs that contain this member.
689 .entrySet()
690 .stream()
691 .filter(entry -> entry.getValue().contains(member.getMemberId()))
692 .map(Map.Entry::getKey)
693 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
694
695 log.debug("Retrieved {} group members from action profile {} on {}...",
696 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
697
698 return groupMsgs.stream()
699 .map(groupMsg -> {
700 try {
701 return ActionProfileGroupEncoder.decode(groupMsg,
702 groupIdToMembersMap.get(groupMsg.getGroupId()),
703 pipeconf);
704 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
705 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
706 return null;
707 }
708 })
709 .filter(Objects::nonNull)
710 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700711 }
712
713 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200714 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700715 try {
716 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
717 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800718 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700719 return false;
720 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200721
722 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700723 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700724 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200725 .addUpdates(Update.newBuilder()
726 .setEntity(Entity.newBuilder()
727 .setActionProfileGroup(actionProfileGroup)
728 .build())
729 .setType(UPDATE_TYPES.get(opType))
730 .build())
731 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700732 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200733 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700734 return true;
735 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800736 logWriteErrors(Collections.singleton(group), e, opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700737 return false;
738 }
739 }
740
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400741 /**
742 * Returns the internal P4 device ID associated with this client.
743 *
744 * @return P4 device ID
745 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200746 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400747 return p4DeviceId;
748 }
749
750 /**
751 * For testing purpose only. TODO: remove before release.
752 *
753 * @return blocking stub
754 */
755 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
756 return this.blockingStub;
757 }
758
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200759
Andrea Campanella432f7182017-07-14 18:43:27 +0200760 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400761 public void shutdown() {
762
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400763 log.info("Shutting down client for {}...", deviceId);
764
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400765 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400766 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400767 if (streamRequestObserver != null) {
768 streamRequestObserver.onCompleted();
769 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
770 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400771
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400772 this.executorService.shutdown();
773 try {
774 executorService.awaitTermination(5, TimeUnit.SECONDS);
775 } catch (InterruptedException e) {
776 log.warn("Executor service didn't shutdown in time.");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800777 Thread.currentThread().interrupt();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400778 }
779 } finally {
780 writeLock.unlock();
781 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400782 }
783
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800784 private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
785 StatusRuntimeException ex,
786 WriteOperationType opType,
787 String entryType) {
788 List<P4RuntimeOuterClass.Error> errors = null;
789 String description = null;
790 try {
791 errors = extractWriteErrorDetails(ex);
792 } catch (InvalidProtocolBufferException e) {
793 description = ex.getStatus().getDescription();
794 }
795
796 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
797 opType.name(), writeEntities.size(), entryType, deviceId,
798 ex.getStatus().getCode().name(),
799 description == null ? "" : " - " + description);
800
801 if (errors == null || errors.isEmpty()) {
802 return;
803 }
804
805 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
806 // and that errors are reported in the same order as the corresponding
807 // written entity. Write RPC methods should be refactored to accept an
808 // order list of entities, instead of a collection.
809 if (errors.size() == writeEntities.size()) {
810 Iterator<E> entityIterator = writeEntities.iterator();
811 errors.stream()
812 .map(e -> ImmutablePair.of(e, entityIterator.next()))
813 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
814 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
815 opType.name(), entryType, parseP4Error(p.getLeft()),
816 p.getRight().toString()));
817 } else {
818 log.error("Unable to reconcile error details to updates " +
819 "(sent {} updates, but device returned {} errors)",
820 entryType, writeEntities.size(), errors.size());
821 errors.stream()
822 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
823 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
824 opType.name(), entryType, parseP4Error(err)));
825 }
826 }
827
828 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
829 StatusRuntimeException ex) throws InvalidProtocolBufferException {
830 String statusString = ex.getStatus().getDescription();
831 if (statusString == null) {
832 return Collections.emptyList();
833 }
834 com.google.rpc.Status status = com.google.rpc.Status
835 .parseFrom(statusString.getBytes());
836 return status.getDetailsList().stream()
837 .map(any -> {
838 try {
839 return any.unpack(P4RuntimeOuterClass.Error.class);
840 } catch (InvalidProtocolBufferException e) {
841 log.warn("Unable to unpack P4Runtime Error: {}",
842 any.toString());
843 return null;
844 }
845 })
846 .filter(Objects::nonNull)
847 .collect(Collectors.toList());
848
849 }
850
851 private String parseP4Error(P4RuntimeOuterClass.Error err) {
852 return format("%s %s (%s code %d)%s",
853 Status.fromCodeValue(err.getCanonicalCode()),
854 err.getMessage(),
855 err.getSpace(),
856 err.getCode(),
857 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
858 }
859
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400860 /**
861 * Handles messages received from the device on the stream channel.
862 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400863 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
864
865 @Override
866 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400867 executorService.submit(() -> doNext(message));
868 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400869
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400870 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400871 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200872 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400873 switch (message.getUpdateCase()) {
874 case PACKET:
875 // Packet-in
876 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200877 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400878 case ARBITRATION:
879 doArbitrationUpdateFromDevice(message.getArbitration());
880 return;
881 default:
882 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
883 }
884 } catch (Throwable ex) {
885 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400886 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400887 }
888
889 @Override
890 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400891 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
892 // FIXME: we might want to recreate the channel.
893 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
894 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400895 }
896
897 @Override
898 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400899 log.warn("Stream channel for {} has completed", deviceId);
900 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400901 }
902 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800903}