blob: d69ec32326f6e6c21913c572aee33255d89c1e86 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070023import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080025import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.ManagedChannel;
Carmelo Cascone943c6642018-09-11 13:01:03 -070027import io.grpc.Metadata;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040028import io.grpc.Status;
29import io.grpc.StatusRuntimeException;
Carmelo Cascone943c6642018-09-11 13:01:03 -070030import io.grpc.protobuf.lite.ProtoLiteUtils;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070031import io.grpc.stub.ClientCallStreamObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040032import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020033import org.onlab.osgi.DefaultServiceDirectory;
Yi Tseng82512da2017-08-16 19:46:36 -070034import org.onlab.util.Tools;
Yi Tseng2a340f72018-11-02 16:52:47 -070035import org.onosproject.grpc.ctl.AbstractGrpcClient;
Carmelo Cascone87892e22017-11-13 16:01:29 -080036import org.onosproject.net.pi.model.PiActionProfileId;
37import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070038import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020039import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080040import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020041import org.onosproject.net.pi.runtime.PiActionGroup;
42import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconee44592f2018-09-12 02:24:47 -070043import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
steven308017632e152018-10-20 00:51:08 +080044import org.onosproject.net.pi.runtime.PiCounterCell;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020045import org.onosproject.net.pi.runtime.PiCounterCellId;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090046import org.onosproject.net.pi.runtime.PiMeterCellConfig;
47import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020048import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020049import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080051import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.p4runtime.api.P4RuntimeClient;
Yi Tseng2a340f72018-11-02 16:52:47 -070053import org.onosproject.p4runtime.api.P4RuntimeClientKey;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import org.onosproject.p4runtime.api.P4RuntimeEvent;
Carmelo Casconee5b28722018-06-22 17:28:28 +020055import p4.config.v1.P4InfoOuterClass.P4Info;
56import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020057import p4.v1.P4RuntimeGrpc;
58import p4.v1.P4RuntimeOuterClass;
59import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
60import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
61import p4.v1.P4RuntimeOuterClass.Entity;
62import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070063import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
64import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020065import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020066import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
67import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020068import p4.v1.P4RuntimeOuterClass.ReadRequest;
69import p4.v1.P4RuntimeOuterClass.ReadResponse;
70import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
71import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
72import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
73import p4.v1.P4RuntimeOuterClass.TableEntry;
74import p4.v1.P4RuntimeOuterClass.Uint128;
75import p4.v1.P4RuntimeOuterClass.Update;
76import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077
Carmelo Casconee5b28722018-06-22 17:28:28 +020078import java.math.BigInteger;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070079import java.net.ConnectException;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070080import java.nio.ByteBuffer;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081import java.util.Collections;
82import java.util.Iterator;
83import java.util.List;
84import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020085import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020086import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040087import java.util.concurrent.CompletableFuture;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070088import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.stream.Collectors;
90import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040091
Carmelo Casconed61fdb32017-10-30 10:09:57 -070092import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080093import static java.lang.String.format;
Carmelo Casconee44592f2018-09-12 02:24:47 -070094import static java.util.Collections.singletonList;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020095import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
96import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone58136812018-07-19 03:40:16 +020097import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020098import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +020099import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200100import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200101import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200102import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400103
104/**
105 * Implementation of a P4Runtime client.
106 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700107final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108
Carmelo Cascone943c6642018-09-11 13:01:03 -0700109 private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
110 Metadata.Key.of("grpc-status-details-bin",
111 ProtoLiteUtils.metadataMarshaller(
112 com.google.rpc.Status.getDefaultInstance()));
113
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400114 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
115 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
116 WriteOperationType.INSERT, Update.Type.INSERT,
117 WriteOperationType.MODIFY, Update.Type.MODIFY,
118 WriteOperationType.DELETE, Update.Type.DELETE
119 );
120
Carmelo Casconef423bec2017-08-30 01:56:25 +0200121 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122 private final P4RuntimeControllerImpl controller;
123 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700124 private StreamChannelManager streamChannelManager;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400125
Carmelo Casconee5b28722018-06-22 17:28:28 +0200126 // Used by this client for write requests.
127 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700128
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700129 private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
130
Yi Tseng82512da2017-08-16 19:46:36 -0700131 /**
132 * Default constructor.
133 *
Yi Tseng2a340f72018-11-02 16:52:47 -0700134 * @param clientKey the client key of this client
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200135 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700136 * @param controller runtime client controller
137 */
Yi Tseng2a340f72018-11-02 16:52:47 -0700138 P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
Carmelo Casconef423bec2017-08-30 01:56:25 +0200139 P4RuntimeControllerImpl controller) {
Yi Tseng2a340f72018-11-02 16:52:47 -0700140
141 super(clientKey, channel);
142 this.p4DeviceId = clientKey.p4DeviceId();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400143 this.controller = controller;
Yi Tseng2a340f72018-11-02 16:52:47 -0700144
Carmelo Casconee5b28722018-06-22 17:28:28 +0200145 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200146 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700147 this.streamChannelManager = new StreamChannelManager(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400148 }
149
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700151 public CompletableFuture<Boolean> startStreamChannel() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700152 return supplyInContext(() -> sendMasterArbitrationUpdate(false),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 "start-initStreamChannel");
154 }
155
156 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200157 public CompletableFuture<Boolean> becomeMaster() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700158 return supplyInContext(() -> sendMasterArbitrationUpdate(true),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200159 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400160 }
161
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400162 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700163 public boolean isMaster() {
164 return streamChannelManager.isOpen() && isClientMaster.get();
165 }
166
167 @Override
168 public boolean isStreamChannelOpen() {
169 return streamChannelManager.isOpen();
170 }
171
172 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700173 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
174 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400175 }
176
177 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700178 public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
179 return doIsPipelineConfigSet(pipeconf, deviceData);
180 }
181
182 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700183 public CompletableFuture<Boolean> writeTableEntries(List<PiTableEntry> piTableEntries,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400184 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200185 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
186 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400187 }
188
189 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700190 public CompletableFuture<List<PiTableEntry>> dumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700191 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
192 return supplyInContext(() -> doDumpTables(piTableIds, defaultEntries, pipeconf),
193 "dumpTables-" + piTableIds.hashCode());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400194 }
195
196 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700197 public CompletableFuture<List<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700198 return supplyInContext(() -> doDumpTables(null, false, pipeconf), "dumpAllTables");
Carmelo Casconee5b28722018-06-22 17:28:28 +0200199 }
200
201 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200202 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200203 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200204 }
205
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200206 @Override
steven308017632e152018-10-20 00:51:08 +0800207 public CompletableFuture<List<PiCounterCell>> readCounterCells(Set<PiCounterCellId> cellIds,
208 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700209 return supplyInContext(() -> doReadCounterCells(Lists.newArrayList(cellIds), pipeconf),
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200210 "readCounterCells-" + cellIds.hashCode());
211 }
212
213 @Override
steven308017632e152018-10-20 00:51:08 +0800214 public CompletableFuture<List<PiCounterCell>> readAllCounterCells(Set<PiCounterId> counterIds,
215 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700216 return supplyInContext(() -> doReadAllCounterCells(Lists.newArrayList(counterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700217 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200218 }
219
Yi Tseng82512da2017-08-16 19:46:36 -0700220 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700221 public CompletableFuture<Boolean> writeActionGroupMembers(List<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700222 WriteOperationType opType,
223 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700224 return supplyInContext(() -> doWriteActionGroupMembers(members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700225 "writeActionGroupMembers-" + opType.name());
226 }
227
Yi Tseng8d355132018-04-13 01:40:48 +0800228
Yi Tseng82512da2017-08-16 19:46:36 -0700229 @Override
230 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
231 WriteOperationType opType,
ghj0504520ed7340c2018-10-26 13:06:35 -0700232 PiPipeconf pipeconf,
233 int maxMemberSize) {
234 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf, maxMemberSize),
Yi Tseng82512da2017-08-16 19:46:36 -0700235 "writeActionGroup-" + opType.name());
236 }
237
238 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700239 public CompletableFuture<List<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
240 PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700241 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
242 "dumpGroups-" + actionProfileId.id());
243 }
244
Yi Tseng3e7f1452017-10-20 10:31:53 -0700245 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700246 public CompletableFuture<List<PiActionGroupMemberId>> dumpActionProfileMemberIds(
247 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
248 return supplyInContext(() -> doDumpActionProfileMemberIds(actionProfileId, pipeconf),
249 "dumpActionProfileMemberIds-" + actionProfileId.id());
250 }
251
252 @Override
253 public CompletableFuture<List<PiActionGroupMemberId>> removeActionProfileMembers(
254 PiActionProfileId actionProfileId,
255 List<PiActionGroupMemberId> memberIds,
256 PiPipeconf pipeconf) {
257 return supplyInContext(
258 () -> doRemoveActionProfileMembers(actionProfileId, memberIds, pipeconf),
259 "cleanupActionProfileMembers-" + actionProfileId.id());
260 }
261
262 @Override
263 public CompletableFuture<Boolean> writeMeterCells(List<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900264
265 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
266 "writeMeterCells");
267 }
268
269 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200270 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700271 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200272 WriteOperationType opType) {
273 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
274 "writePreMulticastGroupEntries");
275 }
276
277 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700278 public CompletableFuture<List<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200279 return supplyInContext(this::doReadAllMulticastGroupEntries,
280 "readAllMulticastGroupEntries");
281 }
282
283 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700284 public CompletableFuture<List<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
285 PiPipeconf pipeconf) {
286 return supplyInContext(() -> doReadMeterCells(Lists.newArrayList(cellIds), pipeconf),
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900287 "readMeterCells-" + cellIds.hashCode());
288 }
289
290 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700291 public CompletableFuture<List<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
292 PiPipeconf pipeconf) {
293 return supplyInContext(() -> doReadAllMeterCells(Lists.newArrayList(meterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700294 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900295 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700296
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400297 /* Blocking method implementations below */
298
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700299 private boolean sendMasterArbitrationUpdate(boolean asMaster) {
300 BigInteger newId = controller.newMasterElectionId(deviceId);
301 if (asMaster) {
302 // Becoming master is a race. Here we increase our chances of win
303 // against other ONOS nodes in the cluster that are calling start()
304 // (which is used to start the stream RPC session, not to become
305 // master).
306 newId = newId.add(BigInteger.valueOf(1000));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200307 }
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700308 final Uint128 idMsg = bigIntegerToUint128(
309 controller.newMasterElectionId(deviceId));
Andrea Campanella1e573442018-05-17 17:07:13 +0200310
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700311 log.debug("Sending arbitration update to {}... electionId={}",
312 deviceId, newId);
313
314 streamChannelManager.send(
315 StreamMessageRequest.newBuilder()
316 .setArbitration(
317 MasterArbitrationUpdate
318 .newBuilder()
319 .setDeviceId(p4DeviceId)
320 .setElectionId(idMsg)
321 .build())
322 .build());
323 clientElectionId = idMsg;
324 return true;
325 }
326
327 private ForwardingPipelineConfig getPipelineConfig(
328 PiPipeconf pipeconf, ByteBuffer deviceData) {
329 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
330 if (p4Info == null) {
331 // Problem logged by PipeconfHelper.
332 return null;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700333 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700334
ghj050452092a48bf2018-10-22 10:50:41 -0700335 ForwardingPipelineConfig.Cookie pipeconfCookie = ForwardingPipelineConfig.Cookie
336 .newBuilder()
ghj0504520ec1a4202018-10-22 10:50:41 -0700337 .setCookie(pipeconf.fingerprint())
ghj050452092a48bf2018-10-22 10:50:41 -0700338 .build();
339
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700340 // FIXME: This is specific to PI P4Runtime implementation.
341 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
342 .newBuilder()
343 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
344 .setReassign(true)
345 .setDeviceData(ByteString.copyFrom(deviceData))
346 .build();
347
348 return ForwardingPipelineConfig
349 .newBuilder()
350 .setP4Info(p4Info)
351 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
ghj050452092a48bf2018-10-22 10:50:41 -0700352 .setCookie(pipeconfCookie)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700353 .build();
354 }
355
356 private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
357
358 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
359 .newBuilder()
360 .setDeviceId(p4DeviceId)
ghj050452092a48bf2018-10-22 10:50:41 -0700361 .setResponseType(GetForwardingPipelineConfigRequest
362 .ResponseType.COOKIE_ONLY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700363 .build();
364
365 GetForwardingPipelineConfigResponse resp;
366 try {
367 resp = this.blockingStub
368 .getForwardingPipelineConfig(request);
369 } catch (StatusRuntimeException ex) {
370 checkGrpcException(ex);
371 // FAILED_PRECONDITION means that a pipeline config was not set in
372 // the first place. Don't bother logging.
373 if (!ex.getStatus().getCode()
374 .equals(Status.FAILED_PRECONDITION.getCode())) {
375 log.warn("Unable to get pipeline config from {}: {}",
376 deviceId, ex.getMessage());
377 }
378 return false;
379 }
ghj050452092a48bf2018-10-22 10:50:41 -0700380 if (!resp.getConfig().hasCookie()) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700381 log.warn("{} returned GetForwardingPipelineConfigResponse " +
ghj050452092a48bf2018-10-22 10:50:41 -0700382 "with 'cookie' field unset",
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700383 deviceId);
384 return false;
385 }
ghj050452092a48bf2018-10-22 10:50:41 -0700386
ghj0504520ec1a4202018-10-22 10:50:41 -0700387 return resp.getConfig().getCookie().getCookie() == pipeconf.fingerprint();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700388 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200389
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700390 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400391
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700392 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
393
394 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400395
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700396 ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
397
398 if (pipelineConfig == null) {
399 // Error logged in getPipelineConfig()
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400400 return false;
401 }
402
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400403 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
404 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100405 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200406 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400407 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100408 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400409 .build();
410
411 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700412 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400413 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700414 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400415 } catch (StatusRuntimeException ex) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700416 checkGrpcException(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800417 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400418 return false;
419 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400420 }
421
Carmelo Casconee44592f2018-09-12 02:24:47 -0700422 private boolean doWriteTableEntries(List<PiTableEntry> piTableEntries, WriteOperationType opType,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400423 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800424 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400425 return true;
426 }
427
Carmelo Casconee44592f2018-09-12 02:24:47 -0700428 List<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800429 try {
430 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
431 .stream()
432 .map(tableEntryMsg ->
433 Update.newBuilder()
434 .setEntity(Entity.newBuilder()
435 .setTableEntry(tableEntryMsg)
436 .build())
437 .setType(UPDATE_TYPES.get(opType))
438 .build())
439 .collect(Collectors.toList());
440 } catch (EncodeException e) {
441 log.error("Unable to encode table entries, aborting {} operation: {}",
442 opType.name(), e.getMessage());
443 return false;
444 }
445
Carmelo Cascone58136812018-07-19 03:40:16 +0200446 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400447 }
448
Carmelo Casconee44592f2018-09-12 02:24:47 -0700449 private List<PiTableEntry> doDumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700450 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400451
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700452 log.debug("Dumping tables {} from {} (pipeconf {})...",
453 piTableIds, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400454
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700455 Set<Integer> tableIds = Sets.newHashSet();
456 if (piTableIds == null) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200457 // Dump all tables.
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700458 tableIds.add(0);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200459 } else {
460 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200461 if (browser == null) {
462 log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
463 return Collections.emptyList();
464 }
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700465 piTableIds.forEach(piTableId -> {
466 try {
467 tableIds.add(browser.tables().getByName(piTableId.id()).getPreamble().getId());
468 } catch (P4InfoBrowser.NotFoundException e) {
469 log.warn("Unable to dump table {}: {}", piTableId, e.getMessage());
470 }
471 });
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400472 }
473
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700474 if (tableIds.isEmpty()) {
475 return Collections.emptyList();
476 }
477
478 ReadRequest.Builder requestMsgBuilder = ReadRequest.newBuilder()
479 .setDeviceId(p4DeviceId);
480 tableIds.forEach(tableId -> requestMsgBuilder.addEntities(
481 Entity.newBuilder()
482 .setTableEntry(
483 TableEntry.newBuilder()
484 .setTableId(tableId)
485 .setIsDefaultAction(defaultEntries)
steven308017632e152018-10-20 00:51:08 +0800486 .setCounterData(P4RuntimeOuterClass.CounterData.getDefaultInstance())
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700487 .build())
488 .build())
489 .build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400490
491 Iterator<ReadResponse> responses;
492 try {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700493 responses = blockingStub.read(requestMsgBuilder.build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400494 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700495 checkGrpcException(e);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700496 log.warn("Unable to dump tables from {}: {}", deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400497 return Collections.emptyList();
498 }
499
500 Iterable<ReadResponse> responseIterable = () -> responses;
501 List<TableEntry> tableEntryMsgs = StreamSupport
502 .stream(responseIterable.spliterator(), false)
503 .map(ReadResponse::getEntitiesList)
504 .flatMap(List::stream)
505 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
506 .map(Entity::getTableEntry)
507 .collect(Collectors.toList());
508
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700509 log.debug("Retrieved {} entries from {} tables on {}...",
510 tableEntryMsgs.size(), tableIds.size(), deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400511
512 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
513 }
514
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200515 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
516 try {
517 //encode the PiPacketOperation into a PacketOut
518 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
519
520 //Build the request
521 StreamMessageRequest packetOutRequest = StreamMessageRequest
522 .newBuilder().setPacket(packetOut).build();
523
524 //Send the request
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700525 streamChannelManager.send(packetOutRequest);
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200526
527 } catch (P4InfoBrowser.NotFoundException e) {
528 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
529 log.debug("Exception", e);
530 return false;
531 }
532 return true;
533 }
534
Carmelo Casconea966c342017-07-30 01:56:30 -0400535 private void doPacketIn(PacketIn packetInMsg) {
536
537 // Retrieve the pipeconf for this client's device.
538 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
539 if (pipeconfService == null) {
540 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
541 }
542 final PiPipeconf pipeconf;
543 if (pipeconfService.ofDevice(deviceId).isPresent() &&
544 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
545 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
546 } else {
547 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
548 return;
549 }
550 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800551 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200552 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200553 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400554 log.debug("Received packet in: {}", event);
555 controller.postEvent(event);
556 }
557
Carmelo Casconee5b28722018-06-22 17:28:28 +0200558 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
559 // From the spec...
560 // - Election_id: The stream RPC with the highest election_id is the
561 // master. Switch populates with the highest election ID it
562 // has received from all connected controllers.
563 // - Status: Switch populates this with OK for the client that is the
564 // master, and with an error status for all other connected clients (at
565 // every mastership change).
566 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700567 return;
568 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700569 final boolean isMaster =
570 msg.getStatus().getCode() == Status.OK.getCode().value();
571 log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
572 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200573 controller.postEvent(new P4RuntimeEvent(
574 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
575 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700576 isClientMaster.set(isMaster);
Carmelo Casconea966c342017-07-30 01:56:30 -0400577 }
578
steven308017632e152018-10-20 00:51:08 +0800579 private List<PiCounterCell> doReadAllCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700580 List<PiCounterId> counterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700581 return doReadCounterEntities(
582 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
583 pipeconf);
584 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200585
steven308017632e152018-10-20 00:51:08 +0800586 private List<PiCounterCell> doReadCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700587 List<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700588 return doReadCounterEntities(
589 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
590 pipeconf);
591 }
592
steven308017632e152018-10-20 00:51:08 +0800593 private List<PiCounterCell> doReadCounterEntities(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700594 List<Entity> counterEntities, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700595
596 if (counterEntities.size() == 0) {
597 return Collections.emptyList();
598 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200599
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200600 final ReadRequest request = ReadRequest.newBuilder()
601 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700602 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200603 .build();
604
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200605 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200606 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200607 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200608 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700609 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800610 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200611 return Collections.emptyList();
612 }
613
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200614 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200615 .map(ReadResponse::getEntitiesList)
616 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200617 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200618
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700619 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200620 }
621
Carmelo Casconee44592f2018-09-12 02:24:47 -0700622 private boolean doWriteActionGroupMembers(List<PiActionGroupMember> members,
Yi Tseng8d355132018-04-13 01:40:48 +0800623 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700624 final List<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800625
Yi Tseng8d355132018-04-13 01:40:48 +0800626 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800627 try {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700628 actionProfileMembers.add(ActionProfileMemberEncoder.encode(member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800629 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
630 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
631 opType.name(), e.getMessage(), member.toString());
632 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700633 }
Yi Tseng82512da2017-08-16 19:46:36 -0700634 }
635
Carmelo Casconee44592f2018-09-12 02:24:47 -0700636 final List<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700637 .map(actionProfileMember ->
638 Update.newBuilder()
639 .setEntity(Entity.newBuilder()
640 .setActionProfileMember(actionProfileMember)
641 .build())
642 .setType(UPDATE_TYPES.get(opType))
643 .build())
644 .collect(Collectors.toList());
645
646 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200647 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700648 return true;
649 }
650
Carmelo Cascone58136812018-07-19 03:40:16 +0200651 return write(updateMsgs, members, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700652 }
653
Carmelo Casconee44592f2018-09-12 02:24:47 -0700654 private List<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700655 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
656 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200657
658 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700659 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200660 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700661 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700662 }
663
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200664 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700665 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200666 actionProfileId = browser
667 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200668 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200669 .getPreamble()
670 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700671 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200672 log.warn("Unable to dump groups: {}", e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700673 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700674 }
675
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200676 // Prepare read request to read all groups from the given action profile.
677 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700678 .setDeviceId(p4DeviceId)
679 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200680 .setActionProfileGroup(
681 ActionProfileGroup.newBuilder()
682 .setActionProfileId(actionProfileId)
683 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700684 .build())
685 .build();
686
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200687 // Read groups.
688 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700689 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200690 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700691 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700692 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800693 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700694 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700695 }
696
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200697 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
698 .map(ReadResponse::getEntitiesList)
699 .flatMap(List::stream)
700 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
701 .map(Entity::getActionProfileGroup)
702 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700703
704 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200705 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700706
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200707 // Returned groups contain only a minimal description of their members.
708 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700709
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200710 // Keep a map of all member IDs for each group ID, will need it later.
711 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
712 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
713 g.getGroupId(),
714 g.getMembersList().stream()
715 .map(ActionProfileGroup.Member::getMemberId)
716 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700717
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200718 // Prepare one big read request to read all members in one shot.
719 final Set<Entity> entityMsgs = groupMsgs.stream()
720 .flatMap(g -> g.getMembersList().stream())
721 .map(ActionProfileGroup.Member::getMemberId)
722 // Prevent issuing many read requests for the same member.
723 .distinct()
724 .map(id -> ActionProfileMember.newBuilder()
725 .setActionProfileId(actionProfileId)
726 .setMemberId(id)
727 .build())
728 .map(m -> Entity.newBuilder()
729 .setActionProfileMember(m)
730 .build())
731 .collect(Collectors.toSet());
732 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
733 .addAllEntities(entityMsgs)
734 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700735
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200736 // Read members.
737 final Iterator<ReadResponse> memberResponses;
738 try {
739 memberResponses = blockingStub.read(memberRequestMsg);
740 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700741 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800742 log.warn("Unable to read members of action profile {} from {}: {}",
743 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200744 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700745 }
746
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200747 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
748 Tools.stream(() -> memberResponses)
749 .map(ReadResponse::getEntitiesList)
750 .flatMap(List::stream)
751 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
752 .map(Entity::getActionProfileMember)
753 .forEach(member -> groupIdToMemberIdsMap.asMap()
754 // Get all group IDs that contain this member.
755 .entrySet()
756 .stream()
757 .filter(entry -> entry.getValue().contains(member.getMemberId()))
758 .map(Map.Entry::getKey)
759 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
760
761 log.debug("Retrieved {} group members from action profile {} on {}...",
762 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
763
764 return groupMsgs.stream()
765 .map(groupMsg -> {
766 try {
767 return ActionProfileGroupEncoder.decode(groupMsg,
768 groupIdToMembersMap.get(groupMsg.getGroupId()),
769 pipeconf);
770 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
771 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
772 return null;
773 }
774 })
775 .filter(Objects::nonNull)
776 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700777 }
778
Carmelo Casconee44592f2018-09-12 02:24:47 -0700779 private List<PiActionGroupMemberId> doDumpActionProfileMemberIds(
780 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
781
782 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
783 if (browser == null) {
784 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
785 "aborting cleanup of action profile members",
786 pipeconf);
787 return Collections.emptyList();
788 }
789
790 final int p4ActProfId;
791 try {
792 p4ActProfId = browser
793 .actionProfiles()
794 .getByName(actionProfileId.id())
795 .getPreamble()
796 .getId();
797 } catch (P4InfoBrowser.NotFoundException e) {
798 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
799 return Collections.emptyList();
800 }
801
802 final ReadRequest memberRequestMsg = ReadRequest.newBuilder()
803 .setDeviceId(p4DeviceId)
804 .addEntities(Entity.newBuilder().setActionProfileMember(
805 ActionProfileMember.newBuilder()
806 .setActionProfileId(p4ActProfId)
807 .build()).build())
808 .build();
809
810 // Read members.
811 final Iterator<ReadResponse> memberResponses;
812 try {
813 memberResponses = blockingStub.read(memberRequestMsg);
814 } catch (StatusRuntimeException e) {
815 checkGrpcException(e);
816 log.warn("Unable to read members of action profile {} from {}: {}",
817 actionProfileId, deviceId, e.getMessage());
818 return Collections.emptyList();
819 }
820
821 return Tools.stream(() -> memberResponses)
822 .map(ReadResponse::getEntitiesList)
823 .flatMap(List::stream)
824 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
825 .map(Entity::getActionProfileMember)
826 // Perhaps not needed, but better to double check to avoid
827 // removing members of other groups.
828 .filter(m -> m.getActionProfileId() == p4ActProfId)
829 .map(ActionProfileMember::getMemberId)
830 .map(PiActionGroupMemberId::of)
831 .collect(Collectors.toList());
832 }
833
834 private List<PiActionGroupMemberId> doRemoveActionProfileMembers(
835 PiActionProfileId actionProfileId,
836 List<PiActionGroupMemberId> memberIds,
837 PiPipeconf pipeconf) {
838
839 if (memberIds.isEmpty()) {
840 return Collections.emptyList();
841 }
842
843 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
844 if (browser == null) {
845 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
846 "aborting cleanup of action profile members",
847 pipeconf);
848 return Collections.emptyList();
849 }
850
851 final int p4ActProfId;
852 try {
853 p4ActProfId = browser.actionProfiles()
854 .getByName(actionProfileId.id()).getPreamble().getId();
855 } catch (P4InfoBrowser.NotFoundException e) {
856 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
857 return Collections.emptyList();
858 }
859
860 final List<Update> updateMsgs = memberIds.stream()
861 .map(m -> ActionProfileMember.newBuilder()
862 .setActionProfileId(p4ActProfId)
863 .setMemberId(m.id()).build())
864 .map(m -> Entity.newBuilder().setActionProfileMember(m).build())
865 .map(e -> Update.newBuilder().setEntity(e)
866 .setType(Update.Type.DELETE).build())
867 .collect(Collectors.toList());
868
869 log.debug("Removing {} members of action profile '{}'...",
870 memberIds.size(), actionProfileId);
871
872 return writeAndReturnSuccessEntities(
873 updateMsgs, memberIds, WriteOperationType.DELETE,
874 "action profile members");
875 }
876
ghj0504520ed7340c2018-10-26 13:06:35 -0700877 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf,
878 int maxMemberSize) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200879 final ActionProfileGroup actionProfileGroup;
ghj0504520ed7340c2018-10-26 13:06:35 -0700880 if (opType == P4RuntimeClient.WriteOperationType.INSERT && maxMemberSize < group.members().size()) {
881 log.warn("Unable to encode group, since group member larger than maximum member size");
882 return false;
883 }
Yi Tseng82512da2017-08-16 19:46:36 -0700884 try {
ghj0504520ed7340c2018-10-26 13:06:35 -0700885 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf, maxMemberSize);
Yi Tseng82512da2017-08-16 19:46:36 -0700886 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800887 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700888 return false;
889 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200890
Carmelo Cascone58136812018-07-19 03:40:16 +0200891 final Update updateMsg = Update.newBuilder()
892 .setEntity(Entity.newBuilder()
893 .setActionProfileGroup(actionProfileGroup)
894 .build())
895 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200896 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200897
Carmelo Casconee44592f2018-09-12 02:24:47 -0700898 return write(singletonList(updateMsg), singletonList(group),
Carmelo Cascone58136812018-07-19 03:40:16 +0200899 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700900 }
901
Carmelo Casconee44592f2018-09-12 02:24:47 -0700902 private List<PiMeterCellConfig> doReadAllMeterCells(
903 List<PiMeterId> meterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700904 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
905 meterIds, pipeconf), pipeconf);
906 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900907
Carmelo Casconee44592f2018-09-12 02:24:47 -0700908 private List<PiMeterCellConfig> doReadMeterCells(
909 List<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700910
Carmelo Casconee44592f2018-09-12 02:24:47 -0700911 final List<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900912 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700913 .withMeterCellId(cellId)
914 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900915 .collect(Collectors.toList());
916
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700917 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
918 piMeterCellConfigs, pipeconf), pipeconf);
919 }
920
Carmelo Casconee44592f2018-09-12 02:24:47 -0700921 private List<PiMeterCellConfig> doReadMeterEntities(
922 List<Entity> entitiesToRead, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700923
924 if (entitiesToRead.size() == 0) {
925 return Collections.emptyList();
926 }
927
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900928 final ReadRequest request = ReadRequest.newBuilder()
929 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700930 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900931 .build();
932
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900933 final Iterable<ReadResponse> responses;
934 try {
935 responses = () -> blockingStub.read(request);
936 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700937 checkGrpcException(e);
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700938 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900939 log.debug("exception", e);
940 return Collections.emptyList();
941 }
942
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700943 List<Entity> responseEntities = StreamSupport
944 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900945 .map(ReadResponse::getEntitiesList)
946 .flatMap(List::stream)
947 .collect(Collectors.toList());
948
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700949 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900950 }
951
Carmelo Casconee44592f2018-09-12 02:24:47 -0700952 private boolean doWriteMeterCells(List<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900953
Carmelo Casconee44592f2018-09-12 02:24:47 -0700954 List<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900955 .stream()
956 .map(meterEntryMsg ->
957 Update.newBuilder()
958 .setEntity(meterEntryMsg)
959 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
960 .build())
961 .collect(Collectors.toList());
962
963 if (updateMsgs.size() == 0) {
964 return true;
965 }
966
Carmelo Cascone58136812018-07-19 03:40:16 +0200967 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
968 }
969
970 private boolean doWriteMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700971 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200972 WriteOperationType opType) {
973
974 final List<Update> updateMsgs = entries.stream()
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -0700975 .map(piEntry -> {
976 try {
977 return MulticastGroupEntryCodec.encode(piEntry);
978 } catch (EncodeException e) {
979 log.warn("Unable to encode PiMulticastGroupEntry: {}", e.getMessage());
980 return null;
981 }
982 })
983 .filter(Objects::nonNull)
Carmelo Cascone58136812018-07-19 03:40:16 +0200984 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
985 .setMulticastGroupEntry(mcMsg)
986 .build())
987 .map(preMsg -> Entity.newBuilder()
988 .setPacketReplicationEngineEntry(preMsg)
989 .build())
990 .map(entityMsg -> Update.newBuilder()
991 .setEntity(entityMsg)
992 .setType(UPDATE_TYPES.get(opType))
993 .build())
994 .collect(Collectors.toList());
995 return write(updateMsgs, entries, opType, "multicast group entry");
996 }
997
Carmelo Casconee44592f2018-09-12 02:24:47 -0700998 private List<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200999
1000 final Entity entity = Entity.newBuilder()
1001 .setPacketReplicationEngineEntry(
1002 PacketReplicationEngineEntry.newBuilder()
1003 .setMulticastGroupEntry(
1004 MulticastGroupEntry.newBuilder()
1005 .build())
1006 .build())
1007 .build();
1008
1009 final ReadRequest req = ReadRequest.newBuilder()
1010 .setDeviceId(p4DeviceId)
1011 .addEntities(entity)
1012 .build();
1013
1014 Iterator<ReadResponse> responses;
1015 try {
1016 responses = blockingStub.read(req);
1017 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001018 checkGrpcException(e);
Carmelo Cascone58136812018-07-19 03:40:16 +02001019 log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
1020 return Collections.emptyList();
1021 }
1022
1023 Iterable<ReadResponse> responseIterable = () -> responses;
1024 final List<PiMulticastGroupEntry> mcEntries = StreamSupport
1025 .stream(responseIterable.spliterator(), false)
1026 .map(ReadResponse::getEntitiesList)
1027 .flatMap(List::stream)
1028 .filter(e -> e.getEntityCase()
1029 .equals(PACKET_REPLICATION_ENGINE_ENTRY))
1030 .map(Entity::getPacketReplicationEngineEntry)
1031 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
1032 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
1033 .map(MulticastGroupEntryCodec::decode)
1034 .collect(Collectors.toList());
1035
1036 log.debug("Retrieved {} multicast group entries from {}...",
1037 mcEntries.size(), deviceId);
1038
1039 return mcEntries;
1040 }
1041
Carmelo Casconee44592f2018-09-12 02:24:47 -07001042 private <T> boolean write(List<Update> updates,
1043 List<T> writeEntities,
1044 WriteOperationType opType,
1045 String entryType) {
1046 // True if all entities were successfully written.
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001047 return writeAndReturnSuccessEntities(updates, writeEntities, opType, entryType)
1048 .size() == writeEntities.size();
Carmelo Casconee44592f2018-09-12 02:24:47 -07001049 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001050
Carmelo Casconee44592f2018-09-12 02:24:47 -07001051 private <T> List<T> writeAndReturnSuccessEntities(
1052 List<Update> updates, List<T> writeEntities,
1053 WriteOperationType opType, String entryType) {
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001054 if (updates.isEmpty()) {
1055 return Collections.emptyList();
1056 }
1057 if (updates.size() != writeEntities.size()) {
1058 log.error("Cannot perform {} operation, provided {} " +
1059 "update messages for {} {} - BUG?",
1060 opType, updates.size(), writeEntities.size(), entryType);
1061 return Collections.emptyList();
1062 }
Carmelo Casconee44592f2018-09-12 02:24:47 -07001063 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001064 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone58136812018-07-19 03:40:16 +02001065 blockingStub.write(writeRequest(updates));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001066 return writeEntities;
Carmelo Cascone58136812018-07-19 03:40:16 +02001067 } catch (StatusRuntimeException e) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001068 return checkAndLogWriteErrors(writeEntities, e, opType, entryType);
Carmelo Cascone58136812018-07-19 03:40:16 +02001069 }
1070 }
1071
1072 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
1073 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001074 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +02001075 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001076 .addAllUpdates(updateMsgs)
1077 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001078 }
1079
Yi Tseng2a340f72018-11-02 16:52:47 -07001080 protected Void doShutdown() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001081 streamChannelManager.complete();
Yi Tseng2a340f72018-11-02 16:52:47 -07001082 return super.doShutdown();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001083 }
1084
Carmelo Casconee44592f2018-09-12 02:24:47 -07001085 // Returns the collection of succesfully write entities.
1086 private <T> List<T> checkAndLogWriteErrors(
1087 List<T> writeEntities, StatusRuntimeException ex,
Carmelo Casconee5b28722018-06-22 17:28:28 +02001088 WriteOperationType opType, String entryType) {
1089
1090 checkGrpcException(ex);
1091
Carmelo Cascone943c6642018-09-11 13:01:03 -07001092 final List<P4RuntimeOuterClass.Error> errors = extractWriteErrorDetails(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001093
Carmelo Cascone943c6642018-09-11 13:01:03 -07001094 if (errors.isEmpty()) {
1095 final String description = ex.getStatus().getDescription();
1096 log.warn("Unable to {} {} {}(s) on {}: {}",
Carmelo Cascone50d195f2018-09-11 13:26:38 -07001097 opType.name(), writeEntities.size(), entryType, deviceId,
1098 ex.getStatus().getCode().name(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001099 description == null ? "" : " - " + description);
Carmelo Casconee44592f2018-09-12 02:24:47 -07001100 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001101 }
1102
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001103 if (errors.size() == writeEntities.size()) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001104 List<T> okEntities = Lists.newArrayList();
1105 Iterator<T> entityIterator = writeEntities.iterator();
1106 for (P4RuntimeOuterClass.Error error : errors) {
1107 T entity = entityIterator.next();
1108 if (error.getCanonicalCode() != Status.OK.getCode().value()) {
1109 log.warn("Unable to {} {} on {}: {} [{}]",
1110 opType.name(), entryType, deviceId,
1111 parseP4Error(error), entity.toString());
1112 } else {
1113 okEntities.add(entity);
1114 }
1115 }
1116 return okEntities;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001117 } else {
Carmelo Casconeb5324e72018-11-25 02:26:32 -08001118 log.warn("Unable to reconcile error details to {} updates " +
Carmelo Casconee44592f2018-09-12 02:24:47 -07001119 "(sent {} updates, but device returned {} errors)",
1120 entryType, writeEntities.size(), errors.size());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001121 errors.stream()
1122 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
1123 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +02001124 opType.name(), entryType, parseP4Error(err)));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001125 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001126 }
1127 }
1128
1129 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
Carmelo Cascone943c6642018-09-11 13:01:03 -07001130 StatusRuntimeException ex) {
1131 if (!ex.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001132 return Collections.emptyList();
1133 }
Carmelo Cascone943c6642018-09-11 13:01:03 -07001134 com.google.rpc.Status status = ex.getTrailers().get(STATUS_DETAILS_KEY);
1135 if (status == null) {
1136 return Collections.emptyList();
1137 }
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001138 return status.getDetailsList().stream()
1139 .map(any -> {
1140 try {
1141 return any.unpack(P4RuntimeOuterClass.Error.class);
1142 } catch (InvalidProtocolBufferException e) {
1143 log.warn("Unable to unpack P4Runtime Error: {}",
1144 any.toString());
1145 return null;
1146 }
1147 })
1148 .filter(Objects::nonNull)
1149 .collect(Collectors.toList());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001150 }
1151
1152 private String parseP4Error(P4RuntimeOuterClass.Error err) {
Carmelo Cascone943c6642018-09-11 13:01:03 -07001153 return format("%s %s%s (%s:%d)",
1154 Status.fromCodeValue(err.getCanonicalCode()).getCode(),
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001155 err.getMessage(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001156 err.hasDetails() ? ", " + err.getDetails().toString() : "",
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001157 err.getSpace(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001158 err.getCode());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001159 }
1160
Carmelo Casconee5b28722018-06-22 17:28:28 +02001161 private void checkGrpcException(StatusRuntimeException ex) {
1162 switch (ex.getStatus().getCode()) {
1163 case OK:
1164 break;
1165 case CANCELLED:
1166 break;
1167 case UNKNOWN:
1168 break;
1169 case INVALID_ARGUMENT:
1170 break;
1171 case DEADLINE_EXCEEDED:
1172 break;
1173 case NOT_FOUND:
1174 break;
1175 case ALREADY_EXISTS:
1176 break;
1177 case PERMISSION_DENIED:
1178 // Notify upper layers that this node is not master.
1179 controller.postEvent(new P4RuntimeEvent(
Carmelo Casconede3b6842018-09-05 17:45:10 -07001180 P4RuntimeEvent.Type.PERMISSION_DENIED,
1181 new BaseP4RuntimeEventSubject(deviceId)));
Carmelo Casconee5b28722018-06-22 17:28:28 +02001182 break;
1183 case RESOURCE_EXHAUSTED:
1184 break;
1185 case FAILED_PRECONDITION:
1186 break;
1187 case ABORTED:
1188 break;
1189 case OUT_OF_RANGE:
1190 break;
1191 case UNIMPLEMENTED:
1192 break;
1193 case INTERNAL:
1194 break;
1195 case UNAVAILABLE:
1196 // Channel might be closed.
1197 controller.postEvent(new P4RuntimeEvent(
1198 P4RuntimeEvent.Type.CHANNEL_EVENT,
1199 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1200 break;
1201 case DATA_LOSS:
1202 break;
1203 case UNAUTHENTICATED:
1204 break;
1205 default:
1206 break;
1207 }
1208 }
1209
1210 private Uint128 bigIntegerToUint128(BigInteger value) {
1211 final byte[] arr = value.toByteArray();
1212 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1213 .put(new byte[Long.BYTES * 2 - arr.length])
1214 .put(arr);
1215 bb.rewind();
1216 return Uint128.newBuilder()
1217 .setHigh(bb.getLong())
1218 .setLow(bb.getLong())
1219 .build();
1220 }
1221
1222 private BigInteger uint128ToBigInteger(Uint128 value) {
1223 return new BigInteger(
1224 ByteBuffer.allocate(Long.BYTES * 2)
1225 .putLong(value.getHigh())
1226 .putLong(value.getLow())
1227 .array());
1228 }
1229
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001230 /**
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001231 * A manager for the P4Runtime stream channel that opportunistically creates
1232 * new stream RCP stubs (e.g. when one fails because of errors) and posts
1233 * channel events via the P4Runtime controller.
1234 */
1235 private final class StreamChannelManager {
1236
1237 private final ManagedChannel channel;
1238 private final AtomicBoolean open;
1239 private final StreamObserver<StreamMessageResponse> responseObserver;
1240 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
1241
1242 private StreamChannelManager(ManagedChannel channel) {
1243 this.channel = channel;
1244 this.responseObserver = new InternalStreamResponseObserver(this);
1245 this.open = new AtomicBoolean(false);
1246 }
1247
1248 private void initIfRequired() {
1249 if (requestObserver == null) {
1250 log.debug("Creating new stream channel for {}...", deviceId);
1251 requestObserver =
1252 (ClientCallStreamObserver<StreamMessageRequest>)
1253 P4RuntimeGrpc.newStub(channel)
1254 .streamChannel(responseObserver);
1255 open.set(false);
1256 }
1257 }
1258
1259 public boolean send(StreamMessageRequest value) {
1260 synchronized (this) {
1261 initIfRequired();
1262 try {
1263 requestObserver.onNext(value);
1264 // FIXME
1265 // signalOpen();
1266 return true;
1267 } catch (Throwable ex) {
1268 if (ex instanceof StatusRuntimeException) {
1269 log.warn("Unable to send {} to {}: {}",
1270 value.getUpdateCase().toString(), deviceId, ex.getMessage());
1271 } else {
1272 log.warn(format(
1273 "Exception while sending %s to %s",
1274 value.getUpdateCase().toString(), deviceId), ex);
1275 }
1276 complete();
1277 return false;
1278 }
1279 }
1280 }
1281
1282 public void complete() {
1283 synchronized (this) {
1284 signalClosed();
1285 if (requestObserver != null) {
1286 requestObserver.onCompleted();
1287 requestObserver.cancel("Terminated", null);
1288 requestObserver = null;
1289 }
1290 }
1291 }
1292
1293 void signalOpen() {
1294 synchronized (this) {
1295 final boolean wasOpen = open.getAndSet(true);
1296 if (!wasOpen) {
1297 controller.postEvent(new P4RuntimeEvent(
1298 P4RuntimeEvent.Type.CHANNEL_EVENT,
1299 new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
1300 }
1301 }
1302 }
1303
1304 void signalClosed() {
1305 synchronized (this) {
1306 final boolean wasOpen = open.getAndSet(false);
1307 if (wasOpen) {
1308 controller.postEvent(new P4RuntimeEvent(
1309 P4RuntimeEvent.Type.CHANNEL_EVENT,
1310 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
1311 }
1312 }
1313 }
1314
1315 public boolean isOpen() {
1316 return open.get();
1317 }
1318 }
1319
1320 /**
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001321 * Handles messages received from the device on the stream channel.
1322 */
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001323 private final class InternalStreamResponseObserver
Carmelo Casconee5b28722018-06-22 17:28:28 +02001324 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001325
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001326 private final StreamChannelManager streamChannelManager;
1327
1328 private InternalStreamResponseObserver(
1329 StreamChannelManager streamChannelManager) {
1330 this.streamChannelManager = streamChannelManager;
1331 }
1332
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001333 @Override
1334 public void onNext(StreamMessageResponse message) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001335 streamChannelManager.signalOpen();
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001336 executorService.submit(() -> doNext(message));
1337 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001338
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001339 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001340 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001341 log.debug("Received message on stream channel from {}: {}",
1342 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001343 switch (message.getUpdateCase()) {
1344 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001345 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001346 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001347 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001348 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001349 return;
1350 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001351 log.warn("Unrecognized stream message from {}: {}",
1352 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001353 }
1354 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001355 log.error("Exception while processing stream message from {}",
1356 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001357 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001358 }
1359
1360 @Override
1361 public void onError(Throwable throwable) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001362 if (throwable instanceof StatusRuntimeException) {
1363 StatusRuntimeException sre = (StatusRuntimeException) throwable;
1364 if (sre.getStatus().getCause() instanceof ConnectException) {
1365 log.warn("Device {} is unreachable ({})",
1366 deviceId, sre.getCause().getMessage());
1367 } else {
1368 log.warn("Received error on stream channel for {}: {}",
1369 deviceId, throwable.getMessage());
1370 }
1371 } else {
1372 log.warn(format("Received exception on stream channel for %s",
1373 deviceId), throwable);
1374 }
1375 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001376 }
1377
1378 @Override
1379 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001380 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001381 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001382 }
1383 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001384}