blob: 609c5c5530d4ea09b5939a2db700e1539af77f9a [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020022import com.google.common.collect.Maps;
Yi Tseng82512da2017-08-16 19:46:36 -070023import com.google.common.collect.Multimap;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020024import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040025import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080026import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040027import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import io.grpc.ManagedChannel;
29import io.grpc.Status;
30import io.grpc.StatusRuntimeException;
31import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080032import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020033import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070034import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040035import org.onosproject.net.DeviceId;
Yi Tseng3e7f1452017-10-20 10:31:53 -070036import org.onosproject.net.MastershipRole;
Carmelo Cascone87892e22017-11-13 16:01:29 -080037import org.onosproject.net.pi.model.PiActionProfileId;
38import org.onosproject.net.pi.model.PiCounterId;
39import org.onosproject.net.pi.model.PiCounterType;
Andrea Campanella432f7182017-07-14 18:43:27 +020040import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080041import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020042import org.onosproject.net.pi.runtime.PiActionGroup;
43import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020044import org.onosproject.net.pi.runtime.PiCounterCellData;
45import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080046import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090047import org.onosproject.net.pi.runtime.PiMeterCellConfig;
48import org.onosproject.net.pi.runtime.PiMeterCellId;
49import org.onosproject.net.pi.model.PiMeterType;
50import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020051import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080053import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import org.onosproject.p4runtime.api.P4RuntimeClient;
55import org.onosproject.p4runtime.api.P4RuntimeEvent;
56import org.slf4j.Logger;
57import p4.P4RuntimeGrpc;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080058import p4.P4RuntimeOuterClass;
Yi Tseng82512da2017-08-16 19:46:36 -070059import p4.P4RuntimeOuterClass.ActionProfileGroup;
60import p4.P4RuntimeOuterClass.ActionProfileMember;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040061import p4.P4RuntimeOuterClass.Entity;
62import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
63import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
64import p4.P4RuntimeOuterClass.PacketIn;
65import p4.P4RuntimeOuterClass.ReadRequest;
66import p4.P4RuntimeOuterClass.ReadResponse;
67import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
68import p4.P4RuntimeOuterClass.StreamMessageRequest;
69import p4.P4RuntimeOuterClass.StreamMessageResponse;
70import p4.P4RuntimeOuterClass.TableEntry;
Yi Tseng3e7f1452017-10-20 10:31:53 -070071import p4.P4RuntimeOuterClass.Uint128;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import p4.P4RuntimeOuterClass.Update;
73import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020074import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040075import p4.tmp.P4Config;
76
Carmelo Casconed61fdb32017-10-30 10:09:57 -070077import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040078import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040079import java.util.Collections;
80import java.util.Iterator;
81import java.util.List;
82import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020083import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020084import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085import java.util.concurrent.CompletableFuture;
Yi Tseng3e7f1452017-10-20 10:31:53 -070086import java.util.concurrent.ExecutionException;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040090import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040091import java.util.concurrent.locks.Lock;
92import java.util.concurrent.locks.ReentrantLock;
93import java.util.function.Supplier;
94import java.util.stream.Collectors;
95import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040096
Carmelo Casconed61fdb32017-10-30 10:09:57 -070097import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080098import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040099import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400100import static org.slf4j.LoggerFactory.getLogger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700101import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
102import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
103import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200104import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400105import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
106
107/**
108 * Implementation of a P4Runtime client.
109 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400110public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400111
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400112 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
113 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
114 WriteOperationType.INSERT, Update.Type.INSERT,
115 WriteOperationType.MODIFY, Update.Type.MODIFY,
116 WriteOperationType.DELETE, Update.Type.DELETE
117 );
118
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400119 private final Logger log = getLogger(getClass());
120
121 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200122 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400123 private final P4RuntimeControllerImpl controller;
124 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400125 private final Context.CancellableContext cancellableContext;
126 private final ExecutorService executorService;
127 private final Executor contextExecutor;
128 private final Lock writeLock = new ReentrantLock();
129 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400130
Yi Tseng3e7f1452017-10-20 10:31:53 -0700131 private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
132 protected Uint128 p4RuntimeElectionId;
133
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900134 private static final long DEFAULT_INDEX = 0;
135
Yi Tseng82512da2017-08-16 19:46:36 -0700136 /**
137 * Default constructor.
138 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200139 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700140 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200141 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700142 * @param controller runtime client controller
143 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200144 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
145 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400146 this.deviceId = deviceId;
147 this.p4DeviceId = p4DeviceId;
148 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400149 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400150 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400151 "onos/p4runtime-client-" + deviceId.toString(),
152 deviceId.toString() + "-%d"));
153 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200154 //TODO Investigate deadline or timeout in supplyInContext Method
155 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400156 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
157 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
158 }
159
160 /**
161 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
162 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
163 * <p>
164 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
165 * <p>
166 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200167 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400168 return CompletableFuture.supplyAsync(() -> {
169 // TODO: explore a more relaxed locking strategy.
170 writeLock.lock();
171 try {
172 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800173 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800174 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800175 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400176 } catch (Throwable ex) {
Yi Tsenge67e1412018-01-31 17:35:20 -0800177 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400178 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400179 } finally {
180 writeLock.unlock();
181 }
182 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400183 }
184
185 @Override
186 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200187 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400188 }
189
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400190 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700191 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
192 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400193 }
194
195 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400196 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
197 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200198 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
199 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400200 }
201
202 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400203 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200204 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400205 }
206
207 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200208 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200209 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200210 }
211
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200212 @Override
213 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
214 PiPipeconf pipeconf) {
215 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
216 "readCounterCells-" + cellIds.hashCode());
217 }
218
219 @Override
220 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
221 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200222
223 /*
224 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
225 CounterEntry:
226 - All counter cells for all meters if counter_id = 0 (default).
227 - All counter cells for given counter_id if index = 0 (default).
228 DirectCounterEntry:
229 - All counter cells for all meters if counter_id = 0 (default).
230 - All counter cells for given counter_id if table_entry.match is empty.
231 */
232
233 Set<PiCounterCellId> cellIds = Sets.newHashSet();
234
235 for (PiCounterId counterId : counterIds) {
Carmelo Cascone87892e22017-11-13 16:01:29 -0800236 if (!pipeconf.pipelineModel().counter(counterId).isPresent()) {
237 log.warn("Unable to find counter '{}' in pipeline model", counterId);
238 continue;
239 }
240 PiCounterType counterType = pipeconf.pipelineModel().counter(counterId).get().counterType();
241 switch (counterType) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200242 case INDIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800243 cellIds.add(PiCounterCellId.ofIndirect(counterId, 0));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200244 break;
245 case DIRECT:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800246 cellIds.add(PiCounterCellId.ofDirect(counterId, PiTableEntry.EMTPY));
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200247 break;
248 default:
Carmelo Cascone87892e22017-11-13 16:01:29 -0800249 log.warn("Unrecognized PI counter type '{}'", counterType);
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200250 }
251 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200252
253 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
254 "readAllCounterCells-" + cellIds.hashCode());
255 }
256
Yi Tseng82512da2017-08-16 19:46:36 -0700257 @Override
258 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
Yi Tseng82512da2017-08-16 19:46:36 -0700259 WriteOperationType opType,
260 PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200261 return supplyInContext(() -> doWriteActionGroupMembers(group, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700262 "writeActionGroupMembers-" + opType.name());
263 }
264
265 @Override
266 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
267 WriteOperationType opType,
268 PiPipeconf pipeconf) {
269 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
270 "writeActionGroup-" + opType.name());
271 }
272
273 @Override
274 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
275 PiPipeconf pipeconf) {
276 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
277 "dumpGroups-" + actionProfileId.id());
278 }
279
Yi Tseng3e7f1452017-10-20 10:31:53 -0700280 @Override
281 public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
282 return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
283 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900284 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
285
286 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
287 "writeMeterCells");
288 }
289
290 @Override
291 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
292 PiPipeconf pipeconf) {
293 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
294 "readMeterCells-" + cellIds.hashCode());
295 }
296
297 @Override
298 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
299 PiPipeconf pipeconf) {
300
301 /*
302 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
303 MeterEntry:
304 - All meter cells for all meters if meter_id = 0 (default).
305 - All meter cells for given meter_id if index = 0 (default).
306 DirectCounterEntry:
307 - All meter cells for all meters if meter_id = 0 (default).
308 - All meter cells for given meter_id if table_entry.match is empty.
309 */
310
311 Set<PiMeterCellId> cellIds = Sets.newHashSet();
312 for (PiMeterId meterId : meterIds) {
313 PiMeterType meterType = pipeconf.pipelineModel().meter(meterId).get().meterType();
314 switch (meterType) {
315 case INDIRECT:
316 cellIds.add(PiMeterCellId.ofIndirect(meterId, DEFAULT_INDEX));
317 break;
318 case DIRECT:
319 cellIds.add(PiMeterCellId.ofDirect(meterId, PiTableEntry.EMTPY));
320 break;
321 default:
322 log.warn("Unrecognized PI meter type '{}'", meterType);
323 }
324 }
325
326 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
327 "readAllMeterCells-" + cellIds.hashCode());
328 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700329
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400330 /* Blocking method implementations below */
331
Yi Tseng3e7f1452017-10-20 10:31:53 -0700332 private boolean doArbitrationUpdate() {
333 CompletableFuture<Boolean> result = new CompletableFuture<>();
334 // TODO: currently we use 64-bit Long type for election id, should
335 // we use 128-bit ?
336 long nextElectId = controller.getNewMasterElectionId();
337 Uint128 newElectionId = Uint128.newBuilder()
338 .setLow(nextElectId)
339 .build();
340 MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
341 .setDeviceId(p4DeviceId)
342 .setElectionId(newElectionId)
343 .build();
344 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
345 .setArbitration(arbitrationUpdate)
346 .build();
347 log.debug("Sending arbitration update to {} with election id {}...",
348 deviceId, newElectionId);
349 arbitrationUpdateMap.put(newElectionId, result);
350 try {
351 streamRequestObserver.onNext(requestMsg);
352 return result.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800353 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800354 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tsenge67e1412018-01-31 17:35:20 -0800355 arbitrationUpdateMap.remove(newElectionId);
356 return false;
357 } catch (InterruptedException | ExecutionException e) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700358 log.warn("Arbitration update failed for {} due to {}", deviceId, e);
359 arbitrationUpdateMap.remove(newElectionId);
360 return false;
361 }
362 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400363 private boolean doInitStreamChannel() {
364 // To listen for packets and other events, we need to start the RPC.
365 // Here we do it by sending a master arbitration update.
Yi Tseng3e7f1452017-10-20 10:31:53 -0700366 return doArbitrationUpdate();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400367 }
368
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700369 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400370
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700371 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
372
373 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400374
375 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
376 if (p4Info == null) {
377 // Problem logged by PipeconfHelper.
378 return false;
379 }
380
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700381 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
382 .newBuilder()
383 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
384 .setReassign(true)
385 .setDeviceData(ByteString.copyFrom(deviceData))
386 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400387
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700388 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200389 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700390 .setP4Info(p4Info)
391 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
392 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400393
394 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
395 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100396 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700397 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400398 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100399 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400400 .build();
401
402 try {
403 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700404 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400405 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800406 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400407 return false;
408 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 }
410
411 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
412 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400413 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
414
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800415 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400416 return true;
417 }
418
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800419 Collection<Update> updateMsgs = null;
420 try {
421 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
422 .stream()
423 .map(tableEntryMsg ->
424 Update.newBuilder()
425 .setEntity(Entity.newBuilder()
426 .setTableEntry(tableEntryMsg)
427 .build())
428 .setType(UPDATE_TYPES.get(opType))
429 .build())
430 .collect(Collectors.toList());
431 } catch (EncodeException e) {
432 log.error("Unable to encode table entries, aborting {} operation: {}",
433 opType.name(), e.getMessage());
434 return false;
435 }
436
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400437 writeRequestBuilder
438 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700439 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400440 .addAllUpdates(updateMsgs)
441 .build();
442
443 try {
444 blockingStub.write(writeRequestBuilder.build());
445 return true;
446 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800447 logWriteErrors(piTableEntries, e, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400448 return false;
449 }
450 }
451
452 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
453
Carmelo Cascone9f007702017-08-24 13:30:51 +0200454 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400455
456 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
457 int tableId;
458 try {
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200459 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400460 } catch (P4InfoBrowser.NotFoundException e) {
461 log.warn("Unable to dump table: {}", e.getMessage());
462 return Collections.emptyList();
463 }
464
465 ReadRequest requestMsg = ReadRequest.newBuilder()
466 .setDeviceId(p4DeviceId)
467 .addEntities(Entity.newBuilder()
468 .setTableEntry(TableEntry.newBuilder()
469 .setTableId(tableId)
470 .build())
471 .build())
472 .build();
473
474 Iterator<ReadResponse> responses;
475 try {
476 responses = blockingStub.read(requestMsg);
477 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800478 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400479 return Collections.emptyList();
480 }
481
482 Iterable<ReadResponse> responseIterable = () -> responses;
483 List<TableEntry> tableEntryMsgs = StreamSupport
484 .stream(responseIterable.spliterator(), false)
485 .map(ReadResponse::getEntitiesList)
486 .flatMap(List::stream)
487 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
488 .map(Entity::getTableEntry)
489 .collect(Collectors.toList());
490
Carmelo Cascone9f007702017-08-24 13:30:51 +0200491 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400492
493 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
494 }
495
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200496 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
497 try {
498 //encode the PiPacketOperation into a PacketOut
499 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
500
501 //Build the request
502 StreamMessageRequest packetOutRequest = StreamMessageRequest
503 .newBuilder().setPacket(packetOut).build();
504
505 //Send the request
506 streamRequestObserver.onNext(packetOutRequest);
507
508 } catch (P4InfoBrowser.NotFoundException e) {
509 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
510 log.debug("Exception", e);
511 return false;
512 }
513 return true;
514 }
515
Carmelo Casconea966c342017-07-30 01:56:30 -0400516 private void doPacketIn(PacketIn packetInMsg) {
517
518 // Retrieve the pipeconf for this client's device.
519 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
520 if (pipeconfService == null) {
521 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
522 }
523 final PiPipeconf pipeconf;
524 if (pipeconfService.ofDevice(deviceId).isPresent() &&
525 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
526 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
527 } else {
528 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
529 return;
530 }
531 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800532 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200533 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
534 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400535 log.debug("Received packet in: {}", event);
536 controller.postEvent(event);
537 }
538
539 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700540 log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
Carmelo Casconea966c342017-07-30 01:56:30 -0400541
Yi Tseng3e7f1452017-10-20 10:31:53 -0700542 Uint128 electionId = arbitrationMsg.getElectionId();
543 CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
544
545 if (mastershipFeature == null) {
546 log.warn("Can't find completable future of election id {}", electionId);
547 return;
548 }
549
550 this.p4RuntimeElectionId = electionId;
551 int statusCode = arbitrationMsg.getStatus().getCode();
552 MastershipRole arbitrationRole;
553 // arbitration update success
554
555 if (statusCode == Status.OK.getCode().value()) {
556 mastershipFeature.complete(true);
557 arbitrationRole = MastershipRole.MASTER;
558 } else {
559 mastershipFeature.complete(false);
560 arbitrationRole = MastershipRole.STANDBY;
561 }
562
563 DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
564 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
565 arbitrationEventSubject);
566 controller.postEvent(event);
Carmelo Casconea966c342017-07-30 01:56:30 -0400567 }
568
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200569 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
570
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200571 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200572 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
573
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200574 final ReadRequest request = ReadRequest.newBuilder()
575 .setDeviceId(p4DeviceId)
576 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
577 .build();
578
579 if (request.getEntitiesList().size() == 0) {
580 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200581 }
582
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200583 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200584 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200585 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200586 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800587 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200588 return Collections.emptyList();
589 }
590
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200591 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200592 .map(ReadResponse::getEntitiesList)
593 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200594 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200595
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200596 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200597 }
598
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200599 private boolean doWriteActionGroupMembers(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200600 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800601
602 for (PiActionGroupMember member : group.members()) {
603 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200604 actionProfileMembers.add(ActionProfileMemberEncoder.encode(group, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800605 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
606 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
607 opType.name(), e.getMessage(), member.toString());
608 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700609 }
Yi Tseng82512da2017-08-16 19:46:36 -0700610 }
611
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200612 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700613 .map(actionProfileMember ->
614 Update.newBuilder()
615 .setEntity(Entity.newBuilder()
616 .setActionProfileMember(actionProfileMember)
617 .build())
618 .setType(UPDATE_TYPES.get(opType))
619 .build())
620 .collect(Collectors.toList());
621
622 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200623 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700624 return true;
625 }
626
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200627 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700628 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700629 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200630 .addAllUpdates(updateMsgs)
631 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700632 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200633 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700634 return true;
635 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800636 logWriteErrors(group.members(), e, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700637 return false;
638 }
639 }
640
641 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
642 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
643 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200644
645 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700646 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200647 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700648 return Collections.emptySet();
649 }
650
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200651 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700652 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 actionProfileId = browser
654 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200655 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200656 .getPreamble()
657 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700658 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200659 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700660 return Collections.emptySet();
661 }
662
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200663 // Prepare read request to read all groups from the given action profile.
664 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700665 .setDeviceId(p4DeviceId)
666 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200667 .setActionProfileGroup(
668 ActionProfileGroup.newBuilder()
669 .setActionProfileId(actionProfileId)
670 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700671 .build())
672 .build();
673
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200674 // Read groups.
675 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700676 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200677 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700678 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800679 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700680 return Collections.emptySet();
681 }
682
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200683 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
684 .map(ReadResponse::getEntitiesList)
685 .flatMap(List::stream)
686 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
687 .map(Entity::getActionProfileGroup)
688 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700689
690 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200691 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700692
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200693 // Returned groups contain only a minimal description of their members.
694 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700695
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200696 // Keep a map of all member IDs for each group ID, will need it later.
697 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
698 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
699 g.getGroupId(),
700 g.getMembersList().stream()
701 .map(ActionProfileGroup.Member::getMemberId)
702 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700703
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200704 // Prepare one big read request to read all members in one shot.
705 final Set<Entity> entityMsgs = groupMsgs.stream()
706 .flatMap(g -> g.getMembersList().stream())
707 .map(ActionProfileGroup.Member::getMemberId)
708 // Prevent issuing many read requests for the same member.
709 .distinct()
710 .map(id -> ActionProfileMember.newBuilder()
711 .setActionProfileId(actionProfileId)
712 .setMemberId(id)
713 .build())
714 .map(m -> Entity.newBuilder()
715 .setActionProfileMember(m)
716 .build())
717 .collect(Collectors.toSet());
718 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
719 .addAllEntities(entityMsgs)
720 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700721
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200722 // Read members.
723 final Iterator<ReadResponse> memberResponses;
724 try {
725 memberResponses = blockingStub.read(memberRequestMsg);
726 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800727 log.warn("Unable to read members of action profile {} from {}: {}",
728 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200729 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700730 }
731
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200732 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
733 Tools.stream(() -> memberResponses)
734 .map(ReadResponse::getEntitiesList)
735 .flatMap(List::stream)
736 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
737 .map(Entity::getActionProfileMember)
738 .forEach(member -> groupIdToMemberIdsMap.asMap()
739 // Get all group IDs that contain this member.
740 .entrySet()
741 .stream()
742 .filter(entry -> entry.getValue().contains(member.getMemberId()))
743 .map(Map.Entry::getKey)
744 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
745
746 log.debug("Retrieved {} group members from action profile {} on {}...",
747 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
748
749 return groupMsgs.stream()
750 .map(groupMsg -> {
751 try {
752 return ActionProfileGroupEncoder.decode(groupMsg,
753 groupIdToMembersMap.get(groupMsg.getGroupId()),
754 pipeconf);
755 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
756 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
757 return null;
758 }
759 })
760 .filter(Objects::nonNull)
761 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700762 }
763
764 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200765 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700766 try {
767 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
768 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800769 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700770 return false;
771 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200772
773 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700774 .setDeviceId(p4DeviceId)
Yi Tseng3e7f1452017-10-20 10:31:53 -0700775 .setElectionId(p4RuntimeElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200776 .addUpdates(Update.newBuilder()
777 .setEntity(Entity.newBuilder()
778 .setActionProfileGroup(actionProfileGroup)
779 .build())
780 .setType(UPDATE_TYPES.get(opType))
781 .build())
782 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700783 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200784 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700785 return true;
786 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800787 logWriteErrors(Collections.singleton(group), e, opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700788 return false;
789 }
790 }
791
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900792 private Collection<PiMeterCellConfig> doReadMeterCells(Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
793
794 // We use this map to remember the original PI meter IDs of the returned response.
795 Map<Integer, PiMeterId> meterIdMap = Maps.newHashMap();
796 Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
797 .map(cellId -> PiMeterCellConfig.builder()
798 .withMeterCellId(cellId).build())
799 .collect(Collectors.toList());
800
801 final ReadRequest request = ReadRequest.newBuilder()
802 .setDeviceId(p4DeviceId)
803 .addAllEntities(MeterEntryCodec.encodePiMeterCellConfigs(piMeterCellConfigs, meterIdMap, pipeconf))
804 .build();
805
806 if (request.getEntitiesList().size() == 0) {
807 return Collections.emptyList();
808 }
809
810 final Iterable<ReadResponse> responses;
811 try {
812 responses = () -> blockingStub.read(request);
813 } catch (StatusRuntimeException e) {
814 log.warn("Unable to read meters config: {}", e.getMessage());
815 log.debug("exception", e);
816 return Collections.emptyList();
817 }
818
819 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
820 .map(ReadResponse::getEntitiesList)
821 .flatMap(List::stream)
822 .collect(Collectors.toList());
823
824 return MeterEntryCodec.decodeMeterEntities(entities, meterIdMap, pipeconf);
825 }
826
827 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
828
829 final Map<Integer, PiMeterId> meterIdMap = Maps.newHashMap();
830 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
831
832 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellIds, meterIdMap, pipeconf)
833 .stream()
834 .map(meterEntryMsg ->
835 Update.newBuilder()
836 .setEntity(meterEntryMsg)
837 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
838 .build())
839 .collect(Collectors.toList());
840
841 if (updateMsgs.size() == 0) {
842 return true;
843 }
844
845 writeRequestBuilder
846 .setDeviceId(p4DeviceId)
847 .setElectionId(p4RuntimeElectionId)
848 .addAllUpdates(updateMsgs)
849 .build();
850 try {
851 blockingStub.write(writeRequestBuilder.build());
852 return true;
853 } catch (StatusRuntimeException e) {
854 log.warn("Unable to write meter entries : {}", e.getMessage());
855 log.debug("exception", e);
856 return false;
857 }
858 }
859
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400860 /**
861 * Returns the internal P4 device ID associated with this client.
862 *
863 * @return P4 device ID
864 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200865 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400866 return p4DeviceId;
867 }
868
869 /**
870 * For testing purpose only. TODO: remove before release.
871 *
872 * @return blocking stub
873 */
874 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
875 return this.blockingStub;
876 }
877
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200878
Andrea Campanella432f7182017-07-14 18:43:27 +0200879 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400880 public void shutdown() {
881
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400882 log.info("Shutting down client for {}...", deviceId);
883
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400884 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400885 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400886 if (streamRequestObserver != null) {
887 streamRequestObserver.onCompleted();
888 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
889 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400890
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400891 this.executorService.shutdown();
892 try {
893 executorService.awaitTermination(5, TimeUnit.SECONDS);
894 } catch (InterruptedException e) {
895 log.warn("Executor service didn't shutdown in time.");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800896 Thread.currentThread().interrupt();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400897 }
898 } finally {
899 writeLock.unlock();
900 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400901 }
902
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800903 private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
904 StatusRuntimeException ex,
905 WriteOperationType opType,
906 String entryType) {
907 List<P4RuntimeOuterClass.Error> errors = null;
908 String description = null;
909 try {
910 errors = extractWriteErrorDetails(ex);
911 } catch (InvalidProtocolBufferException e) {
912 description = ex.getStatus().getDescription();
913 }
914
915 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
916 opType.name(), writeEntities.size(), entryType, deviceId,
917 ex.getStatus().getCode().name(),
918 description == null ? "" : " - " + description);
919
920 if (errors == null || errors.isEmpty()) {
921 return;
922 }
923
924 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
925 // and that errors are reported in the same order as the corresponding
926 // written entity. Write RPC methods should be refactored to accept an
927 // order list of entities, instead of a collection.
928 if (errors.size() == writeEntities.size()) {
929 Iterator<E> entityIterator = writeEntities.iterator();
930 errors.stream()
931 .map(e -> ImmutablePair.of(e, entityIterator.next()))
932 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
933 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
934 opType.name(), entryType, parseP4Error(p.getLeft()),
935 p.getRight().toString()));
936 } else {
937 log.error("Unable to reconcile error details to updates " +
938 "(sent {} updates, but device returned {} errors)",
939 entryType, writeEntities.size(), errors.size());
940 errors.stream()
941 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
942 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
943 opType.name(), entryType, parseP4Error(err)));
944 }
945 }
946
947 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
948 StatusRuntimeException ex) throws InvalidProtocolBufferException {
949 String statusString = ex.getStatus().getDescription();
950 if (statusString == null) {
951 return Collections.emptyList();
952 }
953 com.google.rpc.Status status = com.google.rpc.Status
954 .parseFrom(statusString.getBytes());
955 return status.getDetailsList().stream()
956 .map(any -> {
957 try {
958 return any.unpack(P4RuntimeOuterClass.Error.class);
959 } catch (InvalidProtocolBufferException e) {
960 log.warn("Unable to unpack P4Runtime Error: {}",
961 any.toString());
962 return null;
963 }
964 })
965 .filter(Objects::nonNull)
966 .collect(Collectors.toList());
967
968 }
969
970 private String parseP4Error(P4RuntimeOuterClass.Error err) {
971 return format("%s %s (%s code %d)%s",
972 Status.fromCodeValue(err.getCanonicalCode()),
973 err.getMessage(),
974 err.getSpace(),
975 err.getCode(),
976 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
977 }
978
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400979 /**
980 * Handles messages received from the device on the stream channel.
981 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400982 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
983
984 @Override
985 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400986 executorService.submit(() -> doNext(message));
987 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400988
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400989 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400990 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200991 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400992 switch (message.getUpdateCase()) {
993 case PACKET:
994 // Packet-in
995 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200996 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400997 case ARBITRATION:
998 doArbitrationUpdateFromDevice(message.getArbitration());
999 return;
1000 default:
1001 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
1002 }
1003 } catch (Throwable ex) {
1004 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001005 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001006 }
1007
1008 @Override
1009 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001010 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
1011 // FIXME: we might want to recreate the channel.
1012 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
1013 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001014 }
1015
1016 @Override
1017 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001018 log.warn("Stream channel for {} has completed", deviceId);
1019 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001020 }
1021 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001022}