blob: 43d01e5b8c450f17ecf82ea430ca98795f62935e [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080025import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
28import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
30import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080031import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020032import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070035import org.onosproject.net.MastershipRole;
Andrea Campanella1e573442018-05-17 17:07:13 +020036import org.onosproject.net.device.ChannelEvent;
Carmelo Cascone87892e22017-11-13 16:01:29 -080037import org.onosproject.net.pi.model.PiActionProfileId;
38import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070039import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020040import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080041import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020042import org.onosproject.net.pi.runtime.PiActionGroup;
43import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020044import org.onosproject.net.pi.runtime.PiCounterCellData;
45import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080046import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090047import org.onosproject.net.pi.runtime.PiMeterCellConfig;
48import org.onosproject.net.pi.runtime.PiMeterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020049import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080051import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.p4runtime.api.P4RuntimeClient;
53import org.onosproject.p4runtime.api.P4RuntimeEvent;
54import org.slf4j.Logger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020055import p4.v1.P4RuntimeGrpc;
56import p4.v1.P4RuntimeOuterClass;
57import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
58import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
59import p4.v1.P4RuntimeOuterClass.Entity;
60import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
61import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
62import p4.v1.P4RuntimeOuterClass.PacketIn;
63import p4.v1.P4RuntimeOuterClass.ReadRequest;
64import p4.v1.P4RuntimeOuterClass.ReadResponse;
65import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
66import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
67import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
68import p4.v1.P4RuntimeOuterClass.TableEntry;
69import p4.v1.P4RuntimeOuterClass.Uint128;
70import p4.v1.P4RuntimeOuterClass.Update;
71import p4.v1.P4RuntimeOuterClass.WriteRequest;
72import p4.config.v1.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040073import p4.tmp.P4Config;
74
Carmelo Casconed61fdb32017-10-30 10:09:57 -070075import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040076import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040077import java.util.Collections;
78import java.util.Iterator;
79import java.util.List;
80import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020081import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020082import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070084import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040086import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.concurrent.locks.Lock;
90import java.util.concurrent.locks.ReentrantLock;
91import java.util.function.Supplier;
92import java.util.stream.Collectors;
93import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040094
Carmelo Casconed61fdb32017-10-30 10:09:57 -070095import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080096import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040097import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040098import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020099import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
100import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
101import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
102import static p4.v1.P4RuntimeOuterClass.PacketOut;
103import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400104
105/**
106 * Implementation of a P4Runtime client.
107 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400108public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400109
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400110 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
111 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
112 WriteOperationType.INSERT, Update.Type.INSERT,
113 WriteOperationType.MODIFY, Update.Type.MODIFY,
114 WriteOperationType.DELETE, Update.Type.DELETE
115 );
116
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400117 private final Logger log = getLogger(getClass());
118
119 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200120 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400121 private final P4RuntimeControllerImpl controller;
122 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400123 private final Context.CancellableContext cancellableContext;
124 private final ExecutorService executorService;
125 private final Executor contextExecutor;
126 private final Lock writeLock = new ReentrantLock();
127 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400128
Yi Tseng3e7f1452017-10-20 10:31:53 -0700129 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
130 protected Uint128 p4RuntimeElectionId;
131
Yi Tseng82512da2017-08-16 19:46:36 -0700132 /**
133 * Default constructor.
134 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200135 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700136 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200137 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700138 * @param controller runtime client controller
139 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200140 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
141 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400142 this.deviceId = deviceId;
143 this.p4DeviceId = p4DeviceId;
144 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400146 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400147 "onos/p4runtime-client-" + deviceId.toString(),
148 deviceId.toString() + "-%d"));
149 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200150 //TODO Investigate deadline or timeout in supplyInContext Method
151 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400152 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
153 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
154 }
155
156 /**
157 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
158 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
159 * <p>
160 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
161 * <p>
162 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200163 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400164 return CompletableFuture.supplyAsync(() -> {
165 // TODO: explore a more relaxed locking strategy.
166 writeLock.lock();
167 try {
168 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800169 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800170 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800171 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400172 } catch (Throwable ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800173 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400174 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400175 } finally {
176 writeLock.unlock();
177 }
178 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400179 }
180
181 @Override
182 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200183 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400184 }
185
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400186 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700187 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
188 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400189 }
190
191 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400192 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
193 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200194 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
195 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400196 }
197
198 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400199 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200200 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400201 }
202
203 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200204 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200205 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200206 }
207
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200208 @Override
209 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
210 PiPipeconf pipeconf) {
211 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
212 "readCounterCells-" + cellIds.hashCode());
213 }
214
215 @Override
216 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
217 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700218 return supplyInContext(() -> doReadAllCounterCells(counterIds, pipeconf),
219 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200220 }
221
Yi Tseng82512da2017-08-16 19:46:36 -0700222 @Override
Yi Tseng8d355132018-04-13 01:40:48 +0800223 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
224 Collection<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700225 WriteOperationType opType,
226 PiPipeconf pipeconf) {
Yi Tseng8d355132018-04-13 01:40:48 +0800227 return supplyInContext(() -> doWriteActionGroupMembers(profileId, members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700228 "writeActionGroupMembers-" + opType.name());
229 }
230
Yi Tseng8d355132018-04-13 01:40:48 +0800231
Yi Tseng82512da2017-08-16 19:46:36 -0700232 @Override
233 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
234 WriteOperationType opType,
235 PiPipeconf pipeconf) {
236 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
237 "writeActionGroup-" + opType.name());
238 }
239
240 @Override
241 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
242 PiPipeconf pipeconf) {
243 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
244 "dumpGroups-" + actionProfileId.id());
245 }
246
Yi Tseng3e7f1452017-10-20 10:31:53 -0700247 @Override
248 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
249 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
250 }
FrankWang9ea72762018-04-17 15:36:49 +0800251
252 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900253 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
254
255 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
256 "writeMeterCells");
257 }
258
259 @Override
260 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
261 PiPipeconf pipeconf) {
262 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
263 "readMeterCells-" + cellIds.hashCode());
264 }
265
266 @Override
267 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
268 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700269 return supplyInContext(() -> doReadAllMeterCells(meterIds, pipeconf),
270 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900271 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700272
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400273 /* Blocking method implementations below */
274
Yi Tseng3e7f1452017-10-20 10:31:53 -0700275 private boolean doArbitrationUpdate() {
Andrea Campanella1e573442018-05-17 17:07:13 +0200276
Yi Tseng3e7f1452017-10-20 10:31:53 -0700277 CompletableFuture<Boolean> result = new CompletableFuture<>();
278 // TODO: currently we use 64-bit Long type for election id, should
279 // we use 128-bit ?
280 long nextElectId = controller.getNewMasterElectionId();
281 Uint128 newElectionId = Uint128.newBuilder()
282 .setLow(nextElectId)
283 .build();
284 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
285 .setDeviceId(p4DeviceId)
286 .setElectionId(newElectionId)
287 .build();
288 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
289 .setArbitration(arbitrationUpdate)
290 .build();
291 log.debug("Sending arbitration update to {} with election id {}...",
292 deviceId, newElectionId);
293 arbitrationUpdateMap.put(newElectionId, result);
294 try {
295 streamRequestObserver.onNext(requestMsg);
296 return result.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800297 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800298 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tsenge67e1412018-01-31 17:35:20 -0800299 arbitrationUpdateMap.remove(newElectionId);
300 return false;
301 } catch (InterruptedException | ExecutionException e) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700302 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
303 arbitrationUpdateMap.remove(newElectionId);
304 return false;
305 }
306 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400307 private boolean doInitStreamChannel() {
308 // To listen for packets and other events, we need to start the RPC.
309 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700310 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400311 }
312
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700313 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400314
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700315 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
316
317 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400318
319 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
320 if (p4Info == null) {
321 // Problem logged by PipeconfHelper.
322 return false;
323 }
324
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700325 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
326 .newBuilder()
327 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
328 .setReassign(true)
329 .setDeviceData(ByteString.copyFrom(deviceData))
330 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400331
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700332 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200333 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700334 .setP4Info(p4Info)
335 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
336 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400337
338 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
339 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100340 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700341 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400342 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100343 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400344 .build();
345
346 try {
347 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700348 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400349 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800350 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400351 return false;
352 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400353 }
354
355 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
356 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400357 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
358
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800359 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400360 return true;
361 }
362
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700363 Collection<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800364 try {
365 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
366 .stream()
367 .map(tableEntryMsg ->
368 Update.newBuilder()
369 .setEntity(Entity.newBuilder()
370 .setTableEntry(tableEntryMsg)
371 .build())
372 .setType(UPDATE_TYPES.get(opType))
373 .build())
374 .collect(Collectors.toList());
375 } catch (EncodeException e) {
376 log.error("Unable to encode table entries, aborting {} operation: {}",
377 opType.name(), e.getMessage());
378 return false;
379 }
380
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400381 writeRequestBuilder
382 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700383 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400384 .addAllUpdates(updateMsgs)
385 .build();
386
387 try {
388 blockingStub.write(writeRequestBuilder.build());
389 return true;
390 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800391 logWriteErrors(piTableEntries, e, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400392 return false;
393 }
394 }
395
396 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
397
Carmelo Cascone9f007702017-08-24 13:30:51 +0200398 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400399
400 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
401 int tableId;
402 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200403 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400404 } catch (P4InfoBrowser.NotFoundException e) {
405 log.warn("Unable to dump table: {}", e.getMessage());
406 return Collections.emptyList();
407 }
408
409 ReadRequest requestMsg = ReadRequest.newBuilder()
410 .setDeviceId(p4DeviceId)
411 .addEntities(Entity.newBuilder()
412 .setTableEntry(TableEntry.newBuilder()
413 .setTableId(tableId)
414 .build())
415 .build())
416 .build();
417
418 Iterator<ReadResponse> responses;
419 try {
420 responses = blockingStub.read(requestMsg);
421 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800422 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400423 return Collections.emptyList();
424 }
425
426 Iterable<ReadResponse> responseIterable = () -> responses;
427 List<TableEntry> tableEntryMsgs = StreamSupport
428 .stream(responseIterable.spliterator(), false)
429 .map(ReadResponse::getEntitiesList)
430 .flatMap(List::stream)
431 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
432 .map(Entity::getTableEntry)
433 .collect(Collectors.toList());
434
Carmelo Cascone9f007702017-08-24 13:30:51 +0200435 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400436
437 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
438 }
439
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200440 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
441 try {
442 //encode the PiPacketOperation into a PacketOut
443 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
444
445 //Build the request
446 StreamMessageRequest packetOutRequest = StreamMessageRequest
447 .newBuilder().setPacket(packetOut).build();
448
449 //Send the request
450 streamRequestObserver.onNext(packetOutRequest);
451
452 } catch (P4InfoBrowser.NotFoundException e) {
453 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
454 log.debug("Exception", e);
455 return false;
456 }
457 return true;
458 }
459
Carmelo Casconea966c342017-07-30 01:56:30 -0400460 private void doPacketIn(PacketIn packetInMsg) {
461
462 // Retrieve the pipeconf for this client's device.
463 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
464 if (pipeconfService == null) {
465 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
466 }
467 final PiPipeconf pipeconf;
468 if (pipeconfService.ofDevice(deviceId).isPresent() &&
469 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
470 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
471 } else {
472 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
473 return;
474 }
475 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800476 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200477 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
478 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400479 log.debug("Received packet in: {}", event);
480 controller.postEvent(event);
481 }
482
483 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700484 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400485
Yi Tseng3e7f1452017-10-20 10:31:53 -0700486 Uint128 electionId = arbitrationMsg.getElectionId();
487 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
488
489 if (mastershipFeature == null) {
490 log.warn("Can't find completable future of election id {}", electionId);
491 return;
492 }
493
494 this.p4RuntimeElectionId = electionId;
495 int statusCode = arbitrationMsg.getStatus().getCode();
496 MastershipRole arbitrationRole;
497 // arbitration update success
498
499 if (statusCode == Status.OK.getCode().value()) {
500 mastershipFeature.complete(true);
501 arbitrationRole = MastershipRole.MASTER;
502 } else {
503 mastershipFeature.complete(false);
504 arbitrationRole = MastershipRole.STANDBY;
505 }
506
Andrea Campanella1e573442018-05-17 17:07:13 +0200507 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
Yi Tseng3e7f1452017-10-20 10:31:53 -0700508 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
509 arbitrationEventSubject);
510 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400511 }
512
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700513 private Collection<PiCounterCellData> doReadAllCounterCells(
514 Collection<PiCounterId> counterIds, PiPipeconf pipeconf) {
515 return doReadCounterEntities(
516 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
517 pipeconf);
518 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200519
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700520 private Collection<PiCounterCellData> doReadCounterCells(
521 Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
522 return doReadCounterEntities(
523 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
524 pipeconf);
525 }
526
527 private Collection<PiCounterCellData> doReadCounterEntities(
528 Collection<Entity> counterEntities, PiPipeconf pipeconf) {
529
530 if (counterEntities.size() == 0) {
531 return Collections.emptyList();
532 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200533
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200534 final ReadRequest request = ReadRequest.newBuilder()
535 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700536 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200537 .build();
538
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200539 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200540 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200541 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200542 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800543 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200544 return Collections.emptyList();
545 }
546
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200547 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200548 .map(ReadResponse::getEntitiesList)
549 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200550 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200551
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700552 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200553 }
554
Yi Tseng8d355132018-04-13 01:40:48 +0800555 private boolean doWriteActionGroupMembers(PiActionProfileId profileId, Collection<PiActionGroupMember> members,
556 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200557 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800558
Yi Tseng8d355132018-04-13 01:40:48 +0800559 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800560 try {
Yi Tseng8d355132018-04-13 01:40:48 +0800561 actionProfileMembers.add(ActionProfileMemberEncoder.encode(profileId, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800562 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
563 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
564 opType.name(), e.getMessage(), member.toString());
565 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700566 }
Yi Tseng82512da2017-08-16 19:46:36 -0700567 }
568
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200569 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700570 .map(actionProfileMember ->
571 Update.newBuilder()
572 .setEntity(Entity.newBuilder()
573 .setActionProfileMember(actionProfileMember)
574 .build())
575 .setType(UPDATE_TYPES.get(opType))
576 .build())
577 .collect(Collectors.toList());
578
579 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200580 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700581 return true;
582 }
583
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200584 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700585 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700586 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200587 .addAllUpdates(updateMsgs)
588 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700589 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200590 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700591 return true;
592 } catch (StatusRuntimeException e) {
Yi Tseng8d355132018-04-13 01:40:48 +0800593 logWriteErrors(members, e, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700594 return false;
595 }
596 }
597
598 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
599 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
600 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200601
602 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700603 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200604 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700605 return Collections.emptySet();
606 }
607
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200608 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700609 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200610 actionProfileId = browser
611 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200612 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200613 .getPreamble()
614 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700615 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200616 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700617 return Collections.emptySet();
618 }
619
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200620 // Prepare read request to read all groups from the given action profile.
621 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700622 .setDeviceId(p4DeviceId)
623 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200624 .setActionProfileGroup(
625 ActionProfileGroup.newBuilder()
626 .setActionProfileId(actionProfileId)
627 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700628 .build())
629 .build();
630
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200631 // Read groups.
632 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700633 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200634 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700635 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800636 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700637 return Collections.emptySet();
638 }
639
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200640 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
641 .map(ReadResponse::getEntitiesList)
642 .flatMap(List::stream)
643 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
644 .map(Entity::getActionProfileGroup)
645 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700646
647 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200648 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700649
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200650 // Returned groups contain only a minimal description of their members.
651 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700652
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 // Keep a map of all member IDs for each group ID, will need it later.
654 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
655 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
656 g.getGroupId(),
657 g.getMembersList().stream()
658 .map(ActionProfileGroup.Member::getMemberId)
659 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700660
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200661 // Prepare one big read request to read all members in one shot.
662 final Set<Entity> entityMsgs = groupMsgs.stream()
663 .flatMap(g -> g.getMembersList().stream())
664 .map(ActionProfileGroup.Member::getMemberId)
665 // Prevent issuing many read requests for the same member.
666 .distinct()
667 .map(id -> ActionProfileMember.newBuilder()
668 .setActionProfileId(actionProfileId)
669 .setMemberId(id)
670 .build())
671 .map(m -> Entity.newBuilder()
672 .setActionProfileMember(m)
673 .build())
674 .collect(Collectors.toSet());
675 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
676 .addAllEntities(entityMsgs)
677 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700678
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200679 // Read members.
680 final Iterator<ReadResponse> memberResponses;
681 try {
682 memberResponses = blockingStub.read(memberRequestMsg);
683 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800684 log.warn("Unable to read members of action profile {} from {}: {}",
685 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200686 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700687 }
688
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200689 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
690 Tools.stream(() -> memberResponses)
691 .map(ReadResponse::getEntitiesList)
692 .flatMap(List::stream)
693 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
694 .map(Entity::getActionProfileMember)
695 .forEach(member -> groupIdToMemberIdsMap.asMap()
696 // Get all group IDs that contain this member.
697 .entrySet()
698 .stream()
699 .filter(entry -> entry.getValue().contains(member.getMemberId()))
700 .map(Map.Entry::getKey)
701 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
702
703 log.debug("Retrieved {} group members from action profile {} on {}...",
704 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
705
706 return groupMsgs.stream()
707 .map(groupMsg -> {
708 try {
709 return ActionProfileGroupEncoder.decode(groupMsg,
710 groupIdToMembersMap.get(groupMsg.getGroupId()),
711 pipeconf);
712 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
713 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
714 return null;
715 }
716 })
717 .filter(Objects::nonNull)
718 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700719 }
720
721 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200722 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700723 try {
724 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
725 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800726 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700727 return false;
728 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200729
730 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700731 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700732 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200733 .addUpdates(Update.newBuilder()
734 .setEntity(Entity.newBuilder()
735 .setActionProfileGroup(actionProfileGroup)
736 .build())
737 .setType(UPDATE_TYPES.get(opType))
738 .build())
739 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700740 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200741 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700742 return true;
743 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800744 logWriteErrors(Collections.singleton(group), e, opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700745 return false;
746 }
747 }
748
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700749 private Collection<PiMeterCellConfig> doReadAllMeterCells(
750 Collection<PiMeterId> meterIds, PiPipeconf pipeconf) {
751 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
752 meterIds, pipeconf), pipeconf);
753 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900754
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700755 private Collection<PiMeterCellConfig> doReadMeterCells(
756 Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
757
758 final Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900759 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700760 .withMeterCellId(cellId)
761 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900762 .collect(Collectors.toList());
763
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700764 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
765 piMeterCellConfigs, pipeconf), pipeconf);
766 }
767
768 private Collection<PiMeterCellConfig> doReadMeterEntities(
769 Collection<Entity> entitiesToRead, PiPipeconf pipeconf) {
770
771 if (entitiesToRead.size() == 0) {
772 return Collections.emptyList();
773 }
774
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900775 final ReadRequest request = ReadRequest.newBuilder()
776 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700777 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900778 .build();
779
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900780 final Iterable<ReadResponse> responses;
781 try {
782 responses = () -> blockingStub.read(request);
783 } catch (StatusRuntimeException e) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700784 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900785 log.debug("exception", e);
786 return Collections.emptyList();
787 }
788
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700789 List<Entity> responseEntities = StreamSupport
790 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900791 .map(ReadResponse::getEntitiesList)
792 .flatMap(List::stream)
793 .collect(Collectors.toList());
794
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700795 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900796 }
797
798 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
799
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900800 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
801
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700802 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellIds, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900803 .stream()
804 .map(meterEntryMsg ->
805 Update.newBuilder()
806 .setEntity(meterEntryMsg)
807 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
808 .build())
809 .collect(Collectors.toList());
810
811 if (updateMsgs.size() == 0) {
812 return true;
813 }
814
815 writeRequestBuilder
816 .setDeviceId(p4DeviceId)
817 .setElectionId(p4RuntimeElectionId)
818 .addAllUpdates(updateMsgs)
819 .build();
820 try {
821 blockingStub.write(writeRequestBuilder.build());
822 return true;
823 } catch (StatusRuntimeException e) {
824 log.warn("Unable to write meter entries : {}", e.getMessage());
825 log.debug("exception", e);
826 return false;
827 }
828 }
829
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400830 /**
831 * Returns the internal P4 device ID associated with this client.
832 *
833 * @return P4 device ID
834 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200835 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400836 return p4DeviceId;
837 }
838
839 /**
840 * For testing purpose only. TODO: remove before release.
841 *
842 * @return blocking stub
843 */
844 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
845 return this.blockingStub;
846 }
847
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200848
Andrea Campanella432f7182017-07-14 18:43:27 +0200849 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400850 public void shutdown() {
851
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400852 log.info("Shutting down client for {}...", deviceId);
853
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400854 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400855 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400856 if (streamRequestObserver != null) {
857 streamRequestObserver.onCompleted();
858 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
859 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400860
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400861 this.executorService.shutdown();
862 try {
863 executorService.awaitTermination(5, TimeUnit.SECONDS);
864 } catch (InterruptedException e) {
865 log.warn("Executor service didn't shutdown in time.");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800866 Thread.currentThread().interrupt();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400867 }
868 } finally {
869 writeLock.unlock();
870 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400871 }
872
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800873 private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
874 StatusRuntimeException ex,
875 WriteOperationType opType,
876 String entryType) {
877 List<P4RuntimeOuterClass.Error> errors = null;
878 String description = null;
879 try {
880 errors = extractWriteErrorDetails(ex);
881 } catch (InvalidProtocolBufferException e) {
882 description = ex.getStatus().getDescription();
883 }
884
885 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
886 opType.name(), writeEntities.size(), entryType, deviceId,
887 ex.getStatus().getCode().name(),
888 description == null ? "" : " - " + description);
889
890 if (errors == null || errors.isEmpty()) {
891 return;
892 }
893
894 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
895 // and that errors are reported in the same order as the corresponding
896 // written entity. Write RPC methods should be refactored to accept an
897 // order list of entities, instead of a collection.
898 if (errors.size() == writeEntities.size()) {
899 Iterator<E> entityIterator = writeEntities.iterator();
900 errors.stream()
901 .map(e -> ImmutablePair.of(e, entityIterator.next()))
902 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
903 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
904 opType.name(), entryType, parseP4Error(p.getLeft()),
905 p.getRight().toString()));
906 } else {
907 log.error("Unable to reconcile error details to updates " +
908 "(sent {} updates, but device returned {} errors)",
909 entryType, writeEntities.size(), errors.size());
910 errors.stream()
911 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
912 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
913 opType.name(), entryType, parseP4Error(err)));
914 }
915 }
916
917 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
918 StatusRuntimeException ex) throws InvalidProtocolBufferException {
919 String statusString = ex.getStatus().getDescription();
920 if (statusString == null) {
921 return Collections.emptyList();
922 }
923 com.google.rpc.Status status = com.google.rpc.Status
924 .parseFrom(statusString.getBytes());
925 return status.getDetailsList().stream()
926 .map(any -> {
927 try {
928 return any.unpack(P4RuntimeOuterClass.Error.class);
929 } catch (InvalidProtocolBufferException e) {
930 log.warn("Unable to unpack P4Runtime Error: {}",
931 any.toString());
932 return null;
933 }
934 })
935 .filter(Objects::nonNull)
936 .collect(Collectors.toList());
937
938 }
939
940 private String parseP4Error(P4RuntimeOuterClass.Error err) {
941 return format("%s %s (%s code %d)%s",
942 Status.fromCodeValue(err.getCanonicalCode()),
943 err.getMessage(),
944 err.getSpace(),
945 err.getCode(),
946 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
947 }
948
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400949 /**
950 * Handles messages received from the device on the stream channel.
951 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400952 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
953
954 @Override
955 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400956 executorService.submit(() -> doNext(message));
957 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400958
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400959 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400960 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200961 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400962 switch (message.getUpdateCase()) {
963 case PACKET:
964 // Packet-in
965 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200966 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400967 case ARBITRATION:
968 doArbitrationUpdateFromDevice(message.getArbitration());
969 return;
970 default:
971 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
972 }
973 } catch (Throwable ex) {
974 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400975 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400976 }
977
978 @Override
979 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400980 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
Andrea Campanella1e573442018-05-17 17:07:13 +0200981 controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
982 new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
983 throwable)));
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400984 // FIXME: we might want to recreate the channel.
985 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
986 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400987 }
988
989 @Override
990 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400991 log.warn("Stream channel for {} has completed", deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200992 controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
993 new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
994 "Stream channel has completed")));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400995 }
996 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800997}