blob: cd988fd0547b834d806d45921c8bb974837b56cd [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080025import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
28import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
30import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080031import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020032import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070035import org.onosproject.net.MastershipRole;
Carmelo Cascone87892e22017-11-13 16:01:29 -080036import org.onosproject.net.pi.model.PiActionProfileId;
37import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070038import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020039import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080040import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020041import org.onosproject.net.pi.runtime.PiActionGroup;
42import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020043import org.onosproject.net.pi.runtime.PiCounterCellData;
44import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080045import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090046import org.onosproject.net.pi.runtime.PiMeterCellConfig;
47import org.onosproject.net.pi.runtime.PiMeterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020048import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040049import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080050import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040051import org.onosproject.p4runtime.api.P4RuntimeClient;
52import org.onosproject.p4runtime.api.P4RuntimeEvent;
53import org.slf4j.Logger;
54import p4.P4RuntimeGrpc;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080055import p4.P4RuntimeOuterClass;
Yi Tseng82512da2017-08-16 19:46:36 -070056import p4.P4RuntimeOuterClass.ActionProfileGroup;
57import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040058import p4.P4RuntimeOuterClass.Entity;
59import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
60import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
61import p4.P4RuntimeOuterClass.PacketIn;
62import p4.P4RuntimeOuterClass.ReadRequest;
63import p4.P4RuntimeOuterClass.ReadResponse;
64import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
65import p4.P4RuntimeOuterClass.StreamMessageRequest;
66import p4.P4RuntimeOuterClass.StreamMessageResponse;
67import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070068import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040069import p4.P4RuntimeOuterClass.Update;
70import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020071import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040072import p4.tmp.P4Config;
73
Carmelo Casconed61fdb32017-10-30 10:09:57 -070074import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040075import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040076import java.util.Collections;
77import java.util.Iterator;
78import java.util.List;
79import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020080import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020081import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040082import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070083import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040086import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040087import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040088import java.util.concurrent.locks.Lock;
89import java.util.concurrent.locks.ReentrantLock;
90import java.util.function.Supplier;
91import java.util.stream.Collectors;
92import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040093
Carmelo Casconed61fdb32017-10-30 10:09:57 -070094import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080095import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040096import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070098import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
99import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
100import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200101import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400102import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
103
104/**
105 * Implementation of a P4Runtime client.
106 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400107public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400109 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
110 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
111 WriteOperationType.INSERT, Update.Type.INSERT,
112 WriteOperationType.MODIFY, Update.Type.MODIFY,
113 WriteOperationType.DELETE, Update.Type.DELETE
114 );
115
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400116 private final Logger log = getLogger(getClass());
117
118 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200119 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400120 private final P4RuntimeControllerImpl controller;
121 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400122 private final Context.CancellableContext cancellableContext;
123 private final ExecutorService executorService;
124 private final Executor contextExecutor;
125 private final Lock writeLock = new ReentrantLock();
126 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400127
Yi Tseng3e7f1452017-10-20 10:31:53 -0700128 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
129 protected Uint128 p4RuntimeElectionId;
130
Yi Tseng82512da2017-08-16 19:46:36 -0700131 /**
132 * Default constructor.
133 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200134 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700135 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200136 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700137 * @param controller runtime client controller
138 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200139 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
140 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400141 this.deviceId = deviceId;
142 this.p4DeviceId = p4DeviceId;
143 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400144 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400145 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400146 "onos/p4runtime-client-" + deviceId.toString(),
147 deviceId.toString() + "-%d"));
148 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200149 //TODO Investigate deadline or timeout in supplyInContext Method
150 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400151 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
152 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
153 }
154
155 /**
156 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
157 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
158 * <p>
159 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
160 * <p>
161 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200162 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400163 return CompletableFuture.supplyAsync(() -> {
164 // TODO: explore a more relaxed locking strategy.
165 writeLock.lock();
166 try {
167 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800168 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800169 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800170 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400171 } catch (Throwable ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800172 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400173 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400174 } finally {
175 writeLock.unlock();
176 }
177 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400178 }
179
180 @Override
181 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200182 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400183 }
184
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400185 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700186 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
187 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400188 }
189
190 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400191 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
192 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200193 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
194 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400195 }
196
197 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400198 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200199 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400200 }
201
202 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200203 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200204 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200205 }
206
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200207 @Override
208 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
209 PiPipeconf pipeconf) {
210 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
211 "readCounterCells-" + cellIds.hashCode());
212 }
213
214 @Override
215 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
216 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700217 return supplyInContext(() -> doReadAllCounterCells(counterIds, pipeconf),
218 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200219 }
220
Yi Tseng82512da2017-08-16 19:46:36 -0700221 @Override
222 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700223 WriteOperationType opType,
224 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200225 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700226 "writeActionGroupMembers-" + opType.name());
227 }
228
229 @Override
230 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
231 WriteOperationType opType,
232 PiPipeconf pipeconf) {
233 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
234 "writeActionGroup-" + opType.name());
235 }
236
237 @Override
238 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
239 PiPipeconf pipeconf) {
240 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
241 "dumpGroups-" + actionProfileId.id());
242 }
243
Yi Tseng3e7f1452017-10-20 10:31:53 -0700244 @Override
245 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
246 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
247 }
FrankWang616d9342018-04-17 15:36:49 +0800248
249 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900250 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
251
252 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
253 "writeMeterCells");
254 }
255
256 @Override
257 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
258 PiPipeconf pipeconf) {
259 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
260 "readMeterCells-" + cellIds.hashCode());
261 }
262
263 @Override
264 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
265 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700266 return supplyInContext(() -> doReadAllMeterCells(meterIds, pipeconf),
267 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900268 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700269
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400270 /* Blocking method implementations below */
271
Yi Tseng3e7f1452017-10-20 10:31:53 -0700272 private boolean doArbitrationUpdate() {
273 CompletableFuture<Boolean> result = new CompletableFuture<>();
274 // TODO: currently we use 64-bit Long type for election id, should
275 // we use 128-bit ?
276 long nextElectId = controller.getNewMasterElectionId();
277 Uint128 newElectionId = Uint128.newBuilder()
278 .setLow(nextElectId)
279 .build();
280 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
281 .setDeviceId(p4DeviceId)
282 .setElectionId(newElectionId)
283 .build();
284 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
285 .setArbitration(arbitrationUpdate)
286 .build();
287 log.debug("Sending arbitration update to {} with election id {}...",
288 deviceId, newElectionId);
289 arbitrationUpdateMap.put(newElectionId, result);
290 try {
291 streamRequestObserver.onNext(requestMsg);
292 return result.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800293 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800294 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tsenge67e1412018-01-31 17:35:20 -0800295 arbitrationUpdateMap.remove(newElectionId);
296 return false;
297 } catch (InterruptedException | ExecutionException e) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700298 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
299 arbitrationUpdateMap.remove(newElectionId);
300 return false;
301 }
302 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400303 private boolean doInitStreamChannel() {
304 // To listen for packets and other events, we need to start the RPC.
305 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700306 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400307 }
308
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700309 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400310
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700311 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
312
313 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400314
315 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
316 if (p4Info == null) {
317 // Problem logged by PipeconfHelper.
318 return false;
319 }
320
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700321 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
322 .newBuilder()
323 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
324 .setReassign(true)
325 .setDeviceData(ByteString.copyFrom(deviceData))
326 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400327
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700328 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200329 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700330 .setP4Info(p4Info)
331 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
332 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400333
334 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
335 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100336 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700337 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400338 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100339 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400340 .build();
341
342 try {
343 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700344 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400345 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800346 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400347 return false;
348 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400349 }
350
351 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
352 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400353 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
354
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800355 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400356 return true;
357 }
358
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700359 Collection<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800360 try {
361 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
362 .stream()
363 .map(tableEntryMsg ->
364 Update.newBuilder()
365 .setEntity(Entity.newBuilder()
366 .setTableEntry(tableEntryMsg)
367 .build())
368 .setType(UPDATE_TYPES.get(opType))
369 .build())
370 .collect(Collectors.toList());
371 } catch (EncodeException e) {
372 log.error("Unable to encode table entries, aborting {} operation: {}",
373 opType.name(), e.getMessage());
374 return false;
375 }
376
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400377 writeRequestBuilder
378 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700379 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400380 .addAllUpdates(updateMsgs)
381 .build();
382
383 try {
384 blockingStub.write(writeRequestBuilder.build());
385 return true;
386 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800387 logWriteErrors(piTableEntries, e, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400388 return false;
389 }
390 }
391
392 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
393
Carmelo Cascone9f007702017-08-24 13:30:51 +0200394 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400395
396 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
397 int tableId;
398 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200399 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400400 } catch (P4InfoBrowser.NotFoundException e) {
401 log.warn("Unable to dump table: {}", e.getMessage());
402 return Collections.emptyList();
403 }
404
405 ReadRequest requestMsg = ReadRequest.newBuilder()
406 .setDeviceId(p4DeviceId)
407 .addEntities(Entity.newBuilder()
408 .setTableEntry(TableEntry.newBuilder()
409 .setTableId(tableId)
410 .build())
411 .build())
412 .build();
413
414 Iterator<ReadResponse> responses;
415 try {
416 responses = blockingStub.read(requestMsg);
417 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800418 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400419 return Collections.emptyList();
420 }
421
422 Iterable<ReadResponse> responseIterable = () -> responses;
423 List<TableEntry> tableEntryMsgs = StreamSupport
424 .stream(responseIterable.spliterator(), false)
425 .map(ReadResponse::getEntitiesList)
426 .flatMap(List::stream)
427 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
428 .map(Entity::getTableEntry)
429 .collect(Collectors.toList());
430
Carmelo Cascone9f007702017-08-24 13:30:51 +0200431 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400432
433 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
434 }
435
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200436 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
437 try {
438 //encode the PiPacketOperation into a PacketOut
439 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
440
441 //Build the request
442 StreamMessageRequest packetOutRequest = StreamMessageRequest
443 .newBuilder().setPacket(packetOut).build();
444
445 //Send the request
446 streamRequestObserver.onNext(packetOutRequest);
447
448 } catch (P4InfoBrowser.NotFoundException e) {
449 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
450 log.debug("Exception", e);
451 return false;
452 }
453 return true;
454 }
455
Carmelo Casconea966c342017-07-30 01:56:30 -0400456 private void doPacketIn(PacketIn packetInMsg) {
457
458 // Retrieve the pipeconf for this client's device.
459 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
460 if (pipeconfService == null) {
461 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
462 }
463 final PiPipeconf pipeconf;
464 if (pipeconfService.ofDevice(deviceId).isPresent() &&
465 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
466 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
467 } else {
468 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
469 return;
470 }
471 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800472 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200473 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
474 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400475 log.debug("Received packet in: {}", event);
476 controller.postEvent(event);
477 }
478
479 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700480 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400481
Yi Tseng3e7f1452017-10-20 10:31:53 -0700482 Uint128 electionId = arbitrationMsg.getElectionId();
483 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
484
485 if (mastershipFeature == null) {
486 log.warn("Can't find completable future of election id {}", electionId);
487 return;
488 }
489
490 this.p4RuntimeElectionId = electionId;
491 int statusCode = arbitrationMsg.getStatus().getCode();
492 MastershipRole arbitrationRole;
493 // arbitration update success
494
495 if (statusCode == Status.OK.getCode().value()) {
496 mastershipFeature.complete(true);
497 arbitrationRole = MastershipRole.MASTER;
498 } else {
499 mastershipFeature.complete(false);
500 arbitrationRole = MastershipRole.STANDBY;
501 }
502
503 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
504 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
505 arbitrationEventSubject);
506 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400507 }
508
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700509 private Collection<PiCounterCellData> doReadAllCounterCells(
510 Collection<PiCounterId> counterIds, PiPipeconf pipeconf) {
511 return doReadCounterEntities(
512 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
513 pipeconf);
514 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200515
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700516 private Collection<PiCounterCellData> doReadCounterCells(
517 Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
518 return doReadCounterEntities(
519 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
520 pipeconf);
521 }
522
523 private Collection<PiCounterCellData> doReadCounterEntities(
524 Collection<Entity> counterEntities, PiPipeconf pipeconf) {
525
526 if (counterEntities.size() == 0) {
527 return Collections.emptyList();
528 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200529
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200530 final ReadRequest request = ReadRequest.newBuilder()
531 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700532 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200533 .build();
534
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200535 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200536 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200537 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200538 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800539 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200540 return Collections.emptyList();
541 }
542
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200543 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200544 .map(ReadResponse::getEntitiesList)
545 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200546 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200547
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700548 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200549 }
550
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200551 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200552 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800553
554 for (PiActionGroupMember member : group.members()) {
555 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200556 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800557 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
558 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
559 opType.name(), e.getMessage(), member.toString());
560 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700561 }
Yi Tseng82512da2017-08-16 19:46:36 -0700562 }
563
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200564 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700565 .map(actionProfileMember ->
566 Update.newBuilder()
567 .setEntity(Entity.newBuilder()
568 .setActionProfileMember(actionProfileMember)
569 .build())
570 .setType(UPDATE_TYPES.get(opType))
571 .build())
572 .collect(Collectors.toList());
573
574 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200575 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700576 return true;
577 }
578
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200579 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700580 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700581 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200582 .addAllUpdates(updateMsgs)
583 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700584 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200585 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700586 return true;
587 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800588 logWriteErrors(group.members(), e, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700589 return false;
590 }
591 }
592
593 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
594 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
595 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200596
597 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700598 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200599 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700600 return Collections.emptySet();
601 }
602
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200603 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700604 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200605 actionProfileId = browser
606 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200607 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200608 .getPreamble()
609 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700610 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200611 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700612 return Collections.emptySet();
613 }
614
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200615 // Prepare read request to read all groups from the given action profile.
616 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700617 .setDeviceId(p4DeviceId)
618 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200619 .setActionProfileGroup(
620 ActionProfileGroup.newBuilder()
621 .setActionProfileId(actionProfileId)
622 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700623 .build())
624 .build();
625
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200626 // Read groups.
627 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700628 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200629 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700630 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800631 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700632 return Collections.emptySet();
633 }
634
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200635 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
636 .map(ReadResponse::getEntitiesList)
637 .flatMap(List::stream)
638 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
639 .map(Entity::getActionProfileGroup)
640 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700641
642 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200643 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700644
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200645 // Returned groups contain only a minimal description of their members.
646 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700647
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200648 // Keep a map of all member IDs for each group ID, will need it later.
649 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
650 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
651 g.getGroupId(),
652 g.getMembersList().stream()
653 .map(ActionProfileGroup.Member::getMemberId)
654 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700655
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200656 // Prepare one big read request to read all members in one shot.
657 final Set<Entity> entityMsgs = groupMsgs.stream()
658 .flatMap(g -> g.getMembersList().stream())
659 .map(ActionProfileGroup.Member::getMemberId)
660 // Prevent issuing many read requests for the same member.
661 .distinct()
662 .map(id -> ActionProfileMember.newBuilder()
663 .setActionProfileId(actionProfileId)
664 .setMemberId(id)
665 .build())
666 .map(m -> Entity.newBuilder()
667 .setActionProfileMember(m)
668 .build())
669 .collect(Collectors.toSet());
670 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
671 .addAllEntities(entityMsgs)
672 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700673
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200674 // Read members.
675 final Iterator<ReadResponse> memberResponses;
676 try {
677 memberResponses = blockingStub.read(memberRequestMsg);
678 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800679 log.warn("Unable to read members of action profile {} from {}: {}",
680 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200681 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700682 }
683
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200684 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
685 Tools.stream(() -> memberResponses)
686 .map(ReadResponse::getEntitiesList)
687 .flatMap(List::stream)
688 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
689 .map(Entity::getActionProfileMember)
690 .forEach(member -> groupIdToMemberIdsMap.asMap()
691 // Get all group IDs that contain this member.
692 .entrySet()
693 .stream()
694 .filter(entry -> entry.getValue().contains(member.getMemberId()))
695 .map(Map.Entry::getKey)
696 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
697
698 log.debug("Retrieved {} group members from action profile {} on {}...",
699 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
700
701 return groupMsgs.stream()
702 .map(groupMsg -> {
703 try {
704 return ActionProfileGroupEncoder.decode(groupMsg,
705 groupIdToMembersMap.get(groupMsg.getGroupId()),
706 pipeconf);
707 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
708 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
709 return null;
710 }
711 })
712 .filter(Objects::nonNull)
713 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700714 }
715
716 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200717 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700718 try {
719 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
720 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800721 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700722 return false;
723 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200724
725 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700726 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700727 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200728 .addUpdates(Update.newBuilder()
729 .setEntity(Entity.newBuilder()
730 .setActionProfileGroup(actionProfileGroup)
731 .build())
732 .setType(UPDATE_TYPES.get(opType))
733 .build())
734 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700735 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200736 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700737 return true;
738 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800739 logWriteErrors(Collections.singleton(group), e, opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700740 return false;
741 }
742 }
743
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700744 private Collection<PiMeterCellConfig> doReadAllMeterCells(
745 Collection<PiMeterId> meterIds, PiPipeconf pipeconf) {
746 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
747 meterIds, pipeconf), pipeconf);
748 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900749
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700750 private Collection<PiMeterCellConfig> doReadMeterCells(
751 Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
752
753 final Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900754 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700755 .withMeterCellId(cellId)
756 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900757 .collect(Collectors.toList());
758
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700759 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
760 piMeterCellConfigs, pipeconf), pipeconf);
761 }
762
763 private Collection<PiMeterCellConfig> doReadMeterEntities(
764 Collection<Entity> entitiesToRead, PiPipeconf pipeconf) {
765
766 if (entitiesToRead.size() == 0) {
767 return Collections.emptyList();
768 }
769
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900770 final ReadRequest request = ReadRequest.newBuilder()
771 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700772 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900773 .build();
774
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900775 final Iterable<ReadResponse> responses;
776 try {
777 responses = () -> blockingStub.read(request);
778 } catch (StatusRuntimeException e) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700779 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900780 log.debug("exception", e);
781 return Collections.emptyList();
782 }
783
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700784 List<Entity> responseEntities = StreamSupport
785 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900786 .map(ReadResponse::getEntitiesList)
787 .flatMap(List::stream)
788 .collect(Collectors.toList());
789
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700790 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900791 }
792
793 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
794
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900795 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
796
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700797 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellIds, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900798 .stream()
799 .map(meterEntryMsg ->
800 Update.newBuilder()
801 .setEntity(meterEntryMsg)
802 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
803 .build())
804 .collect(Collectors.toList());
805
806 if (updateMsgs.size() == 0) {
807 return true;
808 }
809
810 writeRequestBuilder
811 .setDeviceId(p4DeviceId)
812 .setElectionId(p4RuntimeElectionId)
813 .addAllUpdates(updateMsgs)
814 .build();
815 try {
816 blockingStub.write(writeRequestBuilder.build());
817 return true;
818 } catch (StatusRuntimeException e) {
819 log.warn("Unable to write meter entries : {}", e.getMessage());
820 log.debug("exception", e);
821 return false;
822 }
823 }
824
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400825 /**
826 * Returns the internal P4 device ID associated with this client.
827 *
828 * @return P4 device ID
829 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200830 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400831 return p4DeviceId;
832 }
833
834 /**
835 * For testing purpose only. TODO: remove before release.
836 *
837 * @return blocking stub
838 */
839 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
840 return this.blockingStub;
841 }
842
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200843
Andrea Campanella432f7182017-07-14 18:43:27 +0200844 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400845 public void shutdown() {
846
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400847 log.info("Shutting down client for {}...", deviceId);
848
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400849 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400850 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400851 if (streamRequestObserver != null) {
852 streamRequestObserver.onCompleted();
853 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
854 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400855
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400856 this.executorService.shutdown();
857 try {
858 executorService.awaitTermination(5, TimeUnit.SECONDS);
859 } catch (InterruptedException e) {
860 log.warn("Executor service didn't shutdown in time.");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800861 Thread.currentThread().interrupt();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400862 }
863 } finally {
864 writeLock.unlock();
865 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400866 }
867
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800868 private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
869 StatusRuntimeException ex,
870 WriteOperationType opType,
871 String entryType) {
872 List<P4RuntimeOuterClass.Error> errors = null;
873 String description = null;
874 try {
875 errors = extractWriteErrorDetails(ex);
876 } catch (InvalidProtocolBufferException e) {
877 description = ex.getStatus().getDescription();
878 }
879
880 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
881 opType.name(), writeEntities.size(), entryType, deviceId,
882 ex.getStatus().getCode().name(),
883 description == null ? "" : " - " + description);
884
885 if (errors == null || errors.isEmpty()) {
886 return;
887 }
888
889 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
890 // and that errors are reported in the same order as the corresponding
891 // written entity. Write RPC methods should be refactored to accept an
892 // order list of entities, instead of a collection.
893 if (errors.size() == writeEntities.size()) {
894 Iterator<E> entityIterator = writeEntities.iterator();
895 errors.stream()
896 .map(e -> ImmutablePair.of(e, entityIterator.next()))
897 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
898 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
899 opType.name(), entryType, parseP4Error(p.getLeft()),
900 p.getRight().toString()));
901 } else {
902 log.error("Unable to reconcile error details to updates " +
903 "(sent {} updates, but device returned {} errors)",
904 entryType, writeEntities.size(), errors.size());
905 errors.stream()
906 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
907 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
908 opType.name(), entryType, parseP4Error(err)));
909 }
910 }
911
912 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
913 StatusRuntimeException ex) throws InvalidProtocolBufferException {
914 String statusString = ex.getStatus().getDescription();
915 if (statusString == null) {
916 return Collections.emptyList();
917 }
918 com.google.rpc.Status status = com.google.rpc.Status
919 .parseFrom(statusString.getBytes());
920 return status.getDetailsList().stream()
921 .map(any -> {
922 try {
923 return any.unpack(P4RuntimeOuterClass.Error.class);
924 } catch (InvalidProtocolBufferException e) {
925 log.warn("Unable to unpack P4Runtime Error: {}",
926 any.toString());
927 return null;
928 }
929 })
930 .filter(Objects::nonNull)
931 .collect(Collectors.toList());
932
933 }
934
935 private String parseP4Error(P4RuntimeOuterClass.Error err) {
936 return format("%s %s (%s code %d)%s",
937 Status.fromCodeValue(err.getCanonicalCode()),
938 err.getMessage(),
939 err.getSpace(),
940 err.getCode(),
941 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
942 }
943
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400944 /**
945 * Handles messages received from the device on the stream channel.
946 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400947 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
948
949 @Override
950 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400951 executorService.submit(() -> doNext(message));
952 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400953
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400954 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400955 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200956 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400957 switch (message.getUpdateCase()) {
958 case PACKET:
959 // Packet-in
960 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200961 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400962 case ARBITRATION:
963 doArbitrationUpdateFromDevice(message.getArbitration());
964 return;
965 default:
966 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
967 }
968 } catch (Throwable ex) {
969 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400970 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400971 }
972
973 @Override
974 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400975 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
976 // FIXME: we might want to recreate the channel.
977 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
978 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400979 }
980
981 @Override
982 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400983 log.warn("Stream channel for {} has completed", deviceId);
984 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400985 }
986 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800987}