blob: a20a0e6b39986563673c6f3b775e5e6fb4ad23fb [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040023import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080024import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040025import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.ManagedChannel;
27import io.grpc.Status;
28import io.grpc.StatusRuntimeException;
29import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080030import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconee5b28722018-06-22 17:28:28 +020032import org.onlab.util.SharedExecutors;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.pi.model.PiActionProfileId;
36import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070037import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020038import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080039import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020040import org.onosproject.net.pi.runtime.PiActionGroup;
41import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020042import org.onosproject.net.pi.runtime.PiCounterCellData;
43import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080044import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090045import org.onosproject.net.pi.runtime.PiMeterCellConfig;
46import org.onosproject.net.pi.runtime.PiMeterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020047import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040048import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080049import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.p4runtime.api.P4RuntimeClient;
51import org.onosproject.p4runtime.api.P4RuntimeEvent;
52import org.slf4j.Logger;
Carmelo Casconee5b28722018-06-22 17:28:28 +020053import p4.config.v1.P4InfoOuterClass.P4Info;
54import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020055import p4.v1.P4RuntimeGrpc;
56import p4.v1.P4RuntimeOuterClass;
57import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
58import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
59import p4.v1.P4RuntimeOuterClass.Entity;
60import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
61import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020062import p4.v1.P4RuntimeOuterClass.ReadRequest;
63import p4.v1.P4RuntimeOuterClass.ReadResponse;
64import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
65import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
66import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
67import p4.v1.P4RuntimeOuterClass.TableEntry;
68import p4.v1.P4RuntimeOuterClass.Uint128;
69import p4.v1.P4RuntimeOuterClass.Update;
70import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071
Carmelo Casconee5b28722018-06-22 17:28:28 +020072import java.math.BigInteger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070073import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040074import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040075import java.util.Collections;
76import java.util.Iterator;
77import java.util.List;
78import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020079import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020080import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040081import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040082import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040084import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040085import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040086import java.util.concurrent.locks.Lock;
87import java.util.concurrent.locks.ReentrantLock;
88import java.util.function.Supplier;
89import java.util.stream.Collectors;
90import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040091
Carmelo Casconed61fdb32017-10-30 10:09:57 -070092import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080093import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040094import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040095import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020096import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
97import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
98import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +020099import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200100import static p4.v1.P4RuntimeOuterClass.PacketOut;
101import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400102
103/**
104 * Implementation of a P4Runtime client.
105 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200106final class P4RuntimeClientImpl implements P4RuntimeClient {
107
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200108 // Timeout in seconds to obtain the request lock.
109 private static final int LOCK_TIMEOUT = 60;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400110
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400111 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
112 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
113 WriteOperationType.INSERT, Update.Type.INSERT,
114 WriteOperationType.MODIFY, Update.Type.MODIFY,
115 WriteOperationType.DELETE, Update.Type.DELETE
116 );
117
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400118 private final Logger log = getLogger(getClass());
119
Carmelo Casconee5b28722018-06-22 17:28:28 +0200120 private final Lock requestLock = new ReentrantLock();
121 private final Context.CancellableContext cancellableContext =
122 Context.current().withCancellation();
123
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400124 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200125 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400126 private final P4RuntimeControllerImpl controller;
127 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400128 private final ExecutorService executorService;
129 private final Executor contextExecutor;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400130 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400131
Carmelo Casconee5b28722018-06-22 17:28:28 +0200132 // Used by this client for write requests.
133 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700134
Yi Tseng82512da2017-08-16 19:46:36 -0700135 /**
136 * Default constructor.
137 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200138 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700139 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200140 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700141 * @param controller runtime client controller
142 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200143 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
144 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400145 this.deviceId = deviceId;
146 this.p4DeviceId = p4DeviceId;
147 this.controller = controller;
Carmelo Casconea966c342017-07-30 01:56:30 -0400148 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Casconee5b28722018-06-22 17:28:28 +0200149 "onos-p4runtime-client-" + deviceId.toString(), "%d"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400150 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200151 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200152 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
154 .streamChannel(new StreamChannelResponseObserver());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400155 }
156
157 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200158 * Submits a task for async execution via the given executor.
159 * All tasks submitted with this method will be executed sequentially.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400160 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200161 private <U> CompletableFuture<U> supplyWithExecutor(
162 Supplier<U> supplier, String opDescription, Executor executor) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400163 return CompletableFuture.supplyAsync(() -> {
164 // TODO: explore a more relaxed locking strategy.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200165 try {
166 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
167 log.error("LOCK TIMEOUT! This is likely a deadlock, "
168 + "please debug (executing {})",
169 opDescription);
170 throw new IllegalThreadStateException("Lock timeout");
171 }
172 } catch (InterruptedException e) {
173 log.warn("Thread interrupted while waiting for lock (executing {})",
174 opDescription);
Ray Milkeydbd38212018-07-02 09:18:09 -0700175 throw new IllegalStateException(e);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200176 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400177 try {
178 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800179 } catch (StatusRuntimeException ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200180 log.warn("Unable to execute {} on {}: {}",
181 opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800182 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400183 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200184 log.error("Exception in client of {}, executing {}",
185 deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400186 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400187 } finally {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200188 requestLock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400189 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200190 }, executor);
191 }
192
193 /**
194 * Equivalent of supplyWithExecutor using the gRPC context executor of this
195 * client, such that if the context is cancelled (e.g. client shutdown) the
196 * RPC is automatically cancelled.
197 */
198 private <U> CompletableFuture<U> supplyInContext(
199 Supplier<U> supplier, String opDescription) {
200 return supplyWithExecutor(supplier, opDescription, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400201 }
202
203 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200204 public CompletableFuture<Boolean> start() {
205 return supplyInContext(this::doInitStreamChannel,
206 "start-initStreamChannel");
207 }
208
209 @Override
210 public CompletableFuture<Void> shutdown() {
211 return supplyWithExecutor(this::doShutdown, "shutdown",
212 SharedExecutors.getPoolThreadExecutor());
213 }
214
215 @Override
216 public CompletableFuture<Boolean> becomeMaster() {
217 return supplyInContext(this::doBecomeMaster,
218 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400219 }
220
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400221 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700222 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
223 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400224 }
225
226 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400227 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
228 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200229 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
230 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400231 }
232
233 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400234 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200235 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400236 }
237
238 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200239 public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
240 return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
241 }
242
243 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200244 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200245 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200246 }
247
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200248 @Override
249 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
250 PiPipeconf pipeconf) {
251 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
252 "readCounterCells-" + cellIds.hashCode());
253 }
254
255 @Override
256 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
257 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700258 return supplyInContext(() -> doReadAllCounterCells(counterIds, pipeconf),
259 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200260 }
261
Yi Tseng82512da2017-08-16 19:46:36 -0700262 @Override
Yi Tseng8d355132018-04-13 01:40:48 +0800263 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
264 Collection<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700265 WriteOperationType opType,
266 PiPipeconf pipeconf) {
Yi Tseng8d355132018-04-13 01:40:48 +0800267 return supplyInContext(() -> doWriteActionGroupMembers(profileId, members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700268 "writeActionGroupMembers-" + opType.name());
269 }
270
Yi Tseng8d355132018-04-13 01:40:48 +0800271
Yi Tseng82512da2017-08-16 19:46:36 -0700272 @Override
273 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
274 WriteOperationType opType,
275 PiPipeconf pipeconf) {
276 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
277 "writeActionGroup-" + opType.name());
278 }
279
280 @Override
281 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
282 PiPipeconf pipeconf) {
283 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
284 "dumpGroups-" + actionProfileId.id());
285 }
286
Yi Tseng3e7f1452017-10-20 10:31:53 -0700287 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900288 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
289
290 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
291 "writeMeterCells");
292 }
293
294 @Override
295 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
296 PiPipeconf pipeconf) {
297 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
298 "readMeterCells-" + cellIds.hashCode());
299 }
300
301 @Override
302 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
303 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700304 return supplyInContext(() -> doReadAllMeterCells(meterIds, pipeconf),
305 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900306 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700307
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400308 /* Blocking method implementations below */
309
Carmelo Casconee5b28722018-06-22 17:28:28 +0200310 private boolean doBecomeMaster() {
311 final Uint128 newId = bigIntegerToUint128(
312 controller.newMasterElectionId(deviceId));
313 if (sendMasterArbitrationUpdate(newId)) {
314 clientElectionId = newId;
315 return true;
316 }
317 return false;
318 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200319
Carmelo Casconee5b28722018-06-22 17:28:28 +0200320 private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
321 log.info("Sending arbitration update to {}... electionId={}",
322 deviceId, uint128ToBigInteger(electionId));
Yi Tseng3e7f1452017-10-20 10:31:53 -0700323 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200324 streamRequestObserver.onNext(
325 StreamMessageRequest.newBuilder()
326 .setArbitration(
327 MasterArbitrationUpdate
328 .newBuilder()
329 .setDeviceId(p4DeviceId)
330 .setElectionId(electionId)
331 .build())
332 .build());
333 return true;
Yi Tsenge67e1412018-01-31 17:35:20 -0800334 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800335 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tseng3e7f1452017-10-20 10:31:53 -0700336 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200337 return false;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700338 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200339
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400340 private boolean doInitStreamChannel() {
341 // To listen for packets and other events, we need to start the RPC.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200342 // Here we send an empty StreamMessageRequest.
343 try {
344 log.info("Starting stream channel with {}...", deviceId);
345 streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
346 return true;
347 } catch (StatusRuntimeException e) {
348 log.error("Unable to start stream channel with {}: {}",
349 deviceId, e.getMessage());
350 return false;
351 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400352 }
353
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700354 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400355
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700356 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
357
358 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400359
360 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
361 if (p4Info == null) {
362 // Problem logged by PipeconfHelper.
363 return false;
364 }
365
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700366 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
367 .newBuilder()
368 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
369 .setReassign(true)
370 .setDeviceData(ByteString.copyFrom(deviceData))
371 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400372
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700373 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200374 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700375 .setP4Info(p4Info)
376 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
377 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400378
379 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
380 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100381 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200382 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400383 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100384 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400385 .build();
386
387 try {
388 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700389 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400390 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800391 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400392 return false;
393 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400394 }
395
396 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
397 PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400398 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
399
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800400 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400401 return true;
402 }
403
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700404 Collection<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800405 try {
406 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
407 .stream()
408 .map(tableEntryMsg ->
409 Update.newBuilder()
410 .setEntity(Entity.newBuilder()
411 .setTableEntry(tableEntryMsg)
412 .build())
413 .setType(UPDATE_TYPES.get(opType))
414 .build())
415 .collect(Collectors.toList());
416 } catch (EncodeException e) {
417 log.error("Unable to encode table entries, aborting {} operation: {}",
418 opType.name(), e.getMessage());
419 return false;
420 }
421
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400422 writeRequestBuilder
423 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200424 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400425 .addAllUpdates(updateMsgs)
426 .build();
427
428 try {
429 blockingStub.write(writeRequestBuilder.build());
430 return true;
431 } catch (StatusRuntimeException e) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200432 checkAndLogWriteErrors(piTableEntries, e, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400433 return false;
434 }
435 }
436
437 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
438
Carmelo Cascone9f007702017-08-24 13:30:51 +0200439 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400440
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400441 int tableId;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200442 if (piTableId == null) {
443 // Dump all tables.
444 tableId = 0;
445 } else {
446 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
447 try {
448 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
449 } catch (P4InfoBrowser.NotFoundException e) {
450 log.warn("Unable to dump table: {}", e.getMessage());
451 return Collections.emptyList();
452 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400453 }
454
455 ReadRequest requestMsg = ReadRequest.newBuilder()
456 .setDeviceId(p4DeviceId)
457 .addEntities(Entity.newBuilder()
458 .setTableEntry(TableEntry.newBuilder()
459 .setTableId(tableId)
460 .build())
461 .build())
462 .build();
463
464 Iterator<ReadResponse> responses;
465 try {
466 responses = blockingStub.read(requestMsg);
467 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800468 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400469 return Collections.emptyList();
470 }
471
472 Iterable<ReadResponse> responseIterable = () -> responses;
473 List<TableEntry> tableEntryMsgs = StreamSupport
474 .stream(responseIterable.spliterator(), false)
475 .map(ReadResponse::getEntitiesList)
476 .flatMap(List::stream)
477 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
478 .map(Entity::getTableEntry)
479 .collect(Collectors.toList());
480
Carmelo Cascone9f007702017-08-24 13:30:51 +0200481 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400482
483 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
484 }
485
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200486 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
487 try {
488 //encode the PiPacketOperation into a PacketOut
489 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
490
491 //Build the request
492 StreamMessageRequest packetOutRequest = StreamMessageRequest
493 .newBuilder().setPacket(packetOut).build();
494
495 //Send the request
496 streamRequestObserver.onNext(packetOutRequest);
497
498 } catch (P4InfoBrowser.NotFoundException e) {
499 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
500 log.debug("Exception", e);
501 return false;
502 }
503 return true;
504 }
505
Carmelo Casconea966c342017-07-30 01:56:30 -0400506 private void doPacketIn(PacketIn packetInMsg) {
507
508 // Retrieve the pipeconf for this client's device.
509 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
510 if (pipeconfService == null) {
511 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
512 }
513 final PiPipeconf pipeconf;
514 if (pipeconfService.ofDevice(deviceId).isPresent() &&
515 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
516 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
517 } else {
518 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
519 return;
520 }
521 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800522 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200523 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200524 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400525 log.debug("Received packet in: {}", event);
526 controller.postEvent(event);
527 }
528
Carmelo Casconee5b28722018-06-22 17:28:28 +0200529 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
530 // From the spec...
531 // - Election_id: The stream RPC with the highest election_id is the
532 // master. Switch populates with the highest election ID it
533 // has received from all connected controllers.
534 // - Status: Switch populates this with OK for the client that is the
535 // master, and with an error status for all other connected clients (at
536 // every mastership change).
537 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700538 return;
539 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200540 final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
541 log.info("Received arbitration update from {}: isMaster={}, electionId={}",
542 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
543 controller.postEvent(new P4RuntimeEvent(
544 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
545 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Casconea966c342017-07-30 01:56:30 -0400546 }
547
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700548 private Collection<PiCounterCellData> doReadAllCounterCells(
549 Collection<PiCounterId> counterIds, PiPipeconf pipeconf) {
550 return doReadCounterEntities(
551 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
552 pipeconf);
553 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200554
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700555 private Collection<PiCounterCellData> doReadCounterCells(
556 Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
557 return doReadCounterEntities(
558 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
559 pipeconf);
560 }
561
562 private Collection<PiCounterCellData> doReadCounterEntities(
563 Collection<Entity> counterEntities, PiPipeconf pipeconf) {
564
565 if (counterEntities.size() == 0) {
566 return Collections.emptyList();
567 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200568
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200569 final ReadRequest request = ReadRequest.newBuilder()
570 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700571 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200572 .build();
573
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200574 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200575 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200576 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200577 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800578 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200579 return Collections.emptyList();
580 }
581
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200582 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200583 .map(ReadResponse::getEntitiesList)
584 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200585 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200586
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700587 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200588 }
589
Yi Tseng8d355132018-04-13 01:40:48 +0800590 private boolean doWriteActionGroupMembers(PiActionProfileId profileId, Collection<PiActionGroupMember> members,
591 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200592 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800593
Yi Tseng8d355132018-04-13 01:40:48 +0800594 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800595 try {
Yi Tseng8d355132018-04-13 01:40:48 +0800596 actionProfileMembers.add(ActionProfileMemberEncoder.encode(profileId, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800597 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
598 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
599 opType.name(), e.getMessage(), member.toString());
600 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700601 }
Yi Tseng82512da2017-08-16 19:46:36 -0700602 }
603
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200604 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700605 .map(actionProfileMember ->
606 Update.newBuilder()
607 .setEntity(Entity.newBuilder()
608 .setActionProfileMember(actionProfileMember)
609 .build())
610 .setType(UPDATE_TYPES.get(opType))
611 .build())
612 .collect(Collectors.toList());
613
614 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200615 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700616 return true;
617 }
618
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200619 WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700620 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200621 .setElectionId(clientElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200622 .addAllUpdates(updateMsgs)
623 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700624 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200625 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700626 return true;
627 } catch (StatusRuntimeException e) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200628 checkAndLogWriteErrors(members, e, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700629 return false;
630 }
631 }
632
633 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
634 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
635 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200636
637 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700638 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200639 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700640 return Collections.emptySet();
641 }
642
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200643 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700644 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200645 actionProfileId = browser
646 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200647 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200648 .getPreamble()
649 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700650 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200651 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700652 return Collections.emptySet();
653 }
654
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200655 // Prepare read request to read all groups from the given action profile.
656 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700657 .setDeviceId(p4DeviceId)
658 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200659 .setActionProfileGroup(
660 ActionProfileGroup.newBuilder()
661 .setActionProfileId(actionProfileId)
662 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700663 .build())
664 .build();
665
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200666 // Read groups.
667 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700668 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200669 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700670 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800671 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700672 return Collections.emptySet();
673 }
674
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200675 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
676 .map(ReadResponse::getEntitiesList)
677 .flatMap(List::stream)
678 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
679 .map(Entity::getActionProfileGroup)
680 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700681
682 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200683 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700684
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200685 // Returned groups contain only a minimal description of their members.
686 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700687
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200688 // Keep a map of all member IDs for each group ID, will need it later.
689 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
690 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
691 g.getGroupId(),
692 g.getMembersList().stream()
693 .map(ActionProfileGroup.Member::getMemberId)
694 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700695
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200696 // Prepare one big read request to read all members in one shot.
697 final Set<Entity> entityMsgs = groupMsgs.stream()
698 .flatMap(g -> g.getMembersList().stream())
699 .map(ActionProfileGroup.Member::getMemberId)
700 // Prevent issuing many read requests for the same member.
701 .distinct()
702 .map(id -> ActionProfileMember.newBuilder()
703 .setActionProfileId(actionProfileId)
704 .setMemberId(id)
705 .build())
706 .map(m -> Entity.newBuilder()
707 .setActionProfileMember(m)
708 .build())
709 .collect(Collectors.toSet());
710 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
711 .addAllEntities(entityMsgs)
712 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700713
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200714 // Read members.
715 final Iterator<ReadResponse> memberResponses;
716 try {
717 memberResponses = blockingStub.read(memberRequestMsg);
718 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800719 log.warn("Unable to read members of action profile {} from {}: {}",
720 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200721 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700722 }
723
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200724 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
725 Tools.stream(() -> memberResponses)
726 .map(ReadResponse::getEntitiesList)
727 .flatMap(List::stream)
728 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
729 .map(Entity::getActionProfileMember)
730 .forEach(member -> groupIdToMemberIdsMap.asMap()
731 // Get all group IDs that contain this member.
732 .entrySet()
733 .stream()
734 .filter(entry -> entry.getValue().contains(member.getMemberId()))
735 .map(Map.Entry::getKey)
736 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
737
738 log.debug("Retrieved {} group members from action profile {} on {}...",
739 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
740
741 return groupMsgs.stream()
742 .map(groupMsg -> {
743 try {
744 return ActionProfileGroupEncoder.decode(groupMsg,
745 groupIdToMembersMap.get(groupMsg.getGroupId()),
746 pipeconf);
747 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
748 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
749 return null;
750 }
751 })
752 .filter(Objects::nonNull)
753 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700754 }
755
756 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200757 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700758 try {
759 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
760 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800761 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700762 return false;
763 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200764
765 final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700766 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200767 .setElectionId(clientElectionId)
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200768 .addUpdates(Update.newBuilder()
769 .setEntity(Entity.newBuilder()
770 .setActionProfileGroup(actionProfileGroup)
771 .build())
772 .setType(UPDATE_TYPES.get(opType))
773 .build())
774 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700775 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200776 blockingStub.write(writeRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700777 return true;
778 } catch (StatusRuntimeException e) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200779 checkAndLogWriteErrors(Collections.singleton(group), e, opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700780 return false;
781 }
782 }
783
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700784 private Collection<PiMeterCellConfig> doReadAllMeterCells(
785 Collection<PiMeterId> meterIds, PiPipeconf pipeconf) {
786 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
787 meterIds, pipeconf), pipeconf);
788 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900789
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700790 private Collection<PiMeterCellConfig> doReadMeterCells(
791 Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
792
793 final Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900794 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700795 .withMeterCellId(cellId)
796 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900797 .collect(Collectors.toList());
798
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700799 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
800 piMeterCellConfigs, pipeconf), pipeconf);
801 }
802
803 private Collection<PiMeterCellConfig> doReadMeterEntities(
804 Collection<Entity> entitiesToRead, PiPipeconf pipeconf) {
805
806 if (entitiesToRead.size() == 0) {
807 return Collections.emptyList();
808 }
809
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900810 final ReadRequest request = ReadRequest.newBuilder()
811 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700812 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900813 .build();
814
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900815 final Iterable<ReadResponse> responses;
816 try {
817 responses = () -> blockingStub.read(request);
818 } catch (StatusRuntimeException e) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700819 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900820 log.debug("exception", e);
821 return Collections.emptyList();
822 }
823
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700824 List<Entity> responseEntities = StreamSupport
825 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900826 .map(ReadResponse::getEntitiesList)
827 .flatMap(List::stream)
828 .collect(Collectors.toList());
829
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700830 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900831 }
832
833 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
834
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900835 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
836
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700837 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellIds, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900838 .stream()
839 .map(meterEntryMsg ->
840 Update.newBuilder()
841 .setEntity(meterEntryMsg)
842 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
843 .build())
844 .collect(Collectors.toList());
845
846 if (updateMsgs.size() == 0) {
847 return true;
848 }
849
850 writeRequestBuilder
851 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200852 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900853 .addAllUpdates(updateMsgs)
854 .build();
855 try {
856 blockingStub.write(writeRequestBuilder.build());
857 return true;
858 } catch (StatusRuntimeException e) {
859 log.warn("Unable to write meter entries : {}", e.getMessage());
860 log.debug("exception", e);
861 return false;
862 }
863 }
864
Carmelo Casconee5b28722018-06-22 17:28:28 +0200865 private Void doShutdown() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400866 log.info("Shutting down client for {}...", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200867 if (streamRequestObserver != null) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400868 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200869 streamRequestObserver.onCompleted();
870 } catch (IllegalStateException e) {
871 // Thrown if stream channel is already completed. Can ignore.
872 log.debug("Ignored expection: {}", e);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400873 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200874 cancellableContext.cancel(new InterruptedException(
875 "Requested client shutdown"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400876 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200877 this.executorService.shutdown();
878 try {
879 executorService.awaitTermination(5, TimeUnit.SECONDS);
880 } catch (InterruptedException e) {
881 log.warn("Executor service didn't shutdown in time.");
882 Thread.currentThread().interrupt();
883 }
884 return null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400885 }
886
Carmelo Casconee5b28722018-06-22 17:28:28 +0200887 private <E extends PiEntity> void checkAndLogWriteErrors(
888 Collection<E> writeEntities, StatusRuntimeException ex,
889 WriteOperationType opType, String entryType) {
890
891 checkGrpcException(ex);
892
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800893 List<P4RuntimeOuterClass.Error> errors = null;
894 String description = null;
895 try {
896 errors = extractWriteErrorDetails(ex);
897 } catch (InvalidProtocolBufferException e) {
898 description = ex.getStatus().getDescription();
899 }
900
901 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
902 opType.name(), writeEntities.size(), entryType, deviceId,
903 ex.getStatus().getCode().name(),
904 description == null ? "" : " - " + description);
905
906 if (errors == null || errors.isEmpty()) {
907 return;
908 }
909
910 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
911 // and that errors are reported in the same order as the corresponding
912 // written entity. Write RPC methods should be refactored to accept an
913 // order list of entities, instead of a collection.
914 if (errors.size() == writeEntities.size()) {
915 Iterator<E> entityIterator = writeEntities.iterator();
916 errors.stream()
917 .map(e -> ImmutablePair.of(e, entityIterator.next()))
918 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
919 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
920 opType.name(), entryType, parseP4Error(p.getLeft()),
921 p.getRight().toString()));
922 } else {
923 log.error("Unable to reconcile error details to updates " +
924 "(sent {} updates, but device returned {} errors)",
925 entryType, writeEntities.size(), errors.size());
926 errors.stream()
927 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
928 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
929 opType.name(), entryType, parseP4Error(err)));
930 }
931 }
932
933 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
934 StatusRuntimeException ex) throws InvalidProtocolBufferException {
935 String statusString = ex.getStatus().getDescription();
936 if (statusString == null) {
937 return Collections.emptyList();
938 }
939 com.google.rpc.Status status = com.google.rpc.Status
940 .parseFrom(statusString.getBytes());
941 return status.getDetailsList().stream()
942 .map(any -> {
943 try {
944 return any.unpack(P4RuntimeOuterClass.Error.class);
945 } catch (InvalidProtocolBufferException e) {
946 log.warn("Unable to unpack P4Runtime Error: {}",
947 any.toString());
948 return null;
949 }
950 })
951 .filter(Objects::nonNull)
952 .collect(Collectors.toList());
953
954 }
955
956 private String parseP4Error(P4RuntimeOuterClass.Error err) {
957 return format("%s %s (%s code %d)%s",
958 Status.fromCodeValue(err.getCanonicalCode()),
959 err.getMessage(),
960 err.getSpace(),
961 err.getCode(),
962 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
963 }
964
Carmelo Casconee5b28722018-06-22 17:28:28 +0200965 private void checkGrpcException(StatusRuntimeException ex) {
966 switch (ex.getStatus().getCode()) {
967 case OK:
968 break;
969 case CANCELLED:
970 break;
971 case UNKNOWN:
972 break;
973 case INVALID_ARGUMENT:
974 break;
975 case DEADLINE_EXCEEDED:
976 break;
977 case NOT_FOUND:
978 break;
979 case ALREADY_EXISTS:
980 break;
981 case PERMISSION_DENIED:
982 // Notify upper layers that this node is not master.
983 controller.postEvent(new P4RuntimeEvent(
984 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
985 new ArbitrationResponse(deviceId, false)));
986 break;
987 case RESOURCE_EXHAUSTED:
988 break;
989 case FAILED_PRECONDITION:
990 break;
991 case ABORTED:
992 break;
993 case OUT_OF_RANGE:
994 break;
995 case UNIMPLEMENTED:
996 break;
997 case INTERNAL:
998 break;
999 case UNAVAILABLE:
1000 // Channel might be closed.
1001 controller.postEvent(new P4RuntimeEvent(
1002 P4RuntimeEvent.Type.CHANNEL_EVENT,
1003 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1004 break;
1005 case DATA_LOSS:
1006 break;
1007 case UNAUTHENTICATED:
1008 break;
1009 default:
1010 break;
1011 }
1012 }
1013
1014 private Uint128 bigIntegerToUint128(BigInteger value) {
1015 final byte[] arr = value.toByteArray();
1016 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1017 .put(new byte[Long.BYTES * 2 - arr.length])
1018 .put(arr);
1019 bb.rewind();
1020 return Uint128.newBuilder()
1021 .setHigh(bb.getLong())
1022 .setLow(bb.getLong())
1023 .build();
1024 }
1025
1026 private BigInteger uint128ToBigInteger(Uint128 value) {
1027 return new BigInteger(
1028 ByteBuffer.allocate(Long.BYTES * 2)
1029 .putLong(value.getHigh())
1030 .putLong(value.getLow())
1031 .array());
1032 }
1033
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001034 /**
1035 * Handles messages received from the device on the stream channel.
1036 */
Carmelo Casconee5b28722018-06-22 17:28:28 +02001037 private class StreamChannelResponseObserver
1038 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001039
1040 @Override
1041 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001042 executorService.submit(() -> doNext(message));
1043 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001044
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001045 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001046 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001047 log.debug("Received message on stream channel from {}: {}",
1048 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001049 switch (message.getUpdateCase()) {
1050 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001051 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001052 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001053 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001054 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001055 return;
1056 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001057 log.warn("Unrecognized stream message from {}: {}",
1058 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001059 }
1060 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001061 log.error("Exception while processing stream message from {}",
1062 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001063 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001064 }
1065
1066 @Override
1067 public void onError(Throwable throwable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001068 log.warn("Error on stream channel for {}: {}",
1069 deviceId, Status.fromThrowable(throwable));
1070 controller.postEvent(new P4RuntimeEvent(
1071 P4RuntimeEvent.Type.CHANNEL_EVENT,
1072 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001073 }
1074
1075 @Override
1076 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001077 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +02001078 controller.postEvent(new P4RuntimeEvent(
1079 P4RuntimeEvent.Type.CHANNEL_EVENT,
1080 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001081 }
1082 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001083}