blob: a5c3c130bd3726e6963dfd38107602bd71344cbe [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040023import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080024import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040025import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040026import io.grpc.ManagedChannel;
27import io.grpc.Status;
28import io.grpc.StatusRuntimeException;
29import io.grpc.stub.StreamObserver;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080030import org.apache.commons.lang3.tuple.ImmutablePair;
Andrea Campanella288b2732017-07-28 14:16:16 +020031import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconee5b28722018-06-22 17:28:28 +020032import org.onlab.util.SharedExecutors;
Yi Tseng82512da2017-08-16 19:46:36 -070033import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040034import org.onosproject.net.DeviceId;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.pi.model.PiActionProfileId;
36import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070037import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020038import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080039import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020040import org.onosproject.net.pi.runtime.PiActionGroup;
41import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020042import org.onosproject.net.pi.runtime.PiCounterCellData;
43import org.onosproject.net.pi.runtime.PiCounterCellId;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080044import org.onosproject.net.pi.runtime.PiEntity;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090045import org.onosproject.net.pi.runtime.PiMeterCellConfig;
46import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020047import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020048import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040049import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080050import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040051import org.onosproject.p4runtime.api.P4RuntimeClient;
52import org.onosproject.p4runtime.api.P4RuntimeEvent;
53import org.slf4j.Logger;
Carmelo Casconee5b28722018-06-22 17:28:28 +020054import p4.config.v1.P4InfoOuterClass.P4Info;
55import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020056import p4.v1.P4RuntimeGrpc;
57import p4.v1.P4RuntimeOuterClass;
58import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
59import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
60import p4.v1.P4RuntimeOuterClass.Entity;
61import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
62import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020063import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
64import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020065import p4.v1.P4RuntimeOuterClass.ReadRequest;
66import p4.v1.P4RuntimeOuterClass.ReadResponse;
67import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
68import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
69import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
70import p4.v1.P4RuntimeOuterClass.TableEntry;
71import p4.v1.P4RuntimeOuterClass.Uint128;
72import p4.v1.P4RuntimeOuterClass.Update;
73import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040074
Carmelo Casconee5b28722018-06-22 17:28:28 +020075import java.math.BigInteger;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070076import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040078import java.util.Collections;
79import java.util.Iterator;
80import java.util.List;
81import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020082import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020083import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040084import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040086import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040087import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040088import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040089import java.util.concurrent.locks.Lock;
90import java.util.concurrent.locks.ReentrantLock;
91import java.util.function.Supplier;
92import java.util.stream.Collectors;
93import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040094
Carmelo Casconed61fdb32017-10-30 10:09:57 -070095import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -080096import static java.lang.String.format;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040097import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040098import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020099import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
100import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone58136812018-07-19 03:40:16 +0200101import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200102import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200103import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200104import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200105import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200106import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400107
108/**
109 * Implementation of a P4Runtime client.
110 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200111final class P4RuntimeClientImpl implements P4RuntimeClient {
112
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200113 // Timeout in seconds to obtain the request lock.
114 private static final int LOCK_TIMEOUT = 60;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400115
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400116 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
117 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
118 WriteOperationType.INSERT, Update.Type.INSERT,
119 WriteOperationType.MODIFY, Update.Type.MODIFY,
120 WriteOperationType.DELETE, Update.Type.DELETE
121 );
122
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400123 private final Logger log = getLogger(getClass());
124
Carmelo Casconee5b28722018-06-22 17:28:28 +0200125 private final Lock requestLock = new ReentrantLock();
126 private final Context.CancellableContext cancellableContext =
127 Context.current().withCancellation();
128
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400129 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200130 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400131 private final P4RuntimeControllerImpl controller;
132 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400133 private final ExecutorService executorService;
134 private final Executor contextExecutor;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400135 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400136
Carmelo Casconee5b28722018-06-22 17:28:28 +0200137 // Used by this client for write requests.
138 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700139
Yi Tseng82512da2017-08-16 19:46:36 -0700140 /**
141 * Default constructor.
142 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200143 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700144 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200145 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700146 * @param controller runtime client controller
147 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200148 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
149 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 this.deviceId = deviceId;
151 this.p4DeviceId = p4DeviceId;
152 this.controller = controller;
Carmelo Casconea966c342017-07-30 01:56:30 -0400153 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Casconee5b28722018-06-22 17:28:28 +0200154 "onos-p4runtime-client-" + deviceId.toString(), "%d"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400155 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200156 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200157 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200158 this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
159 .streamChannel(new StreamChannelResponseObserver());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400160 }
161
162 /**
Carmelo Cascone58136812018-07-19 03:40:16 +0200163 * Submits a task for async execution via the given executor. All tasks
164 * submitted with this method will be executed sequentially.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400165 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200166 private <U> CompletableFuture<U> supplyWithExecutor(
167 Supplier<U> supplier, String opDescription, Executor executor) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400168 return CompletableFuture.supplyAsync(() -> {
169 // TODO: explore a more relaxed locking strategy.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200170 try {
171 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
172 log.error("LOCK TIMEOUT! This is likely a deadlock, "
173 + "please debug (executing {})",
174 opDescription);
175 throw new IllegalThreadStateException("Lock timeout");
176 }
177 } catch (InterruptedException e) {
178 log.warn("Thread interrupted while waiting for lock (executing {})",
179 opDescription);
Ray Milkeydbd38212018-07-02 09:18:09 -0700180 throw new IllegalStateException(e);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200181 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400182 try {
183 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800184 } catch (StatusRuntimeException ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200185 log.warn("Unable to execute {} on {}: {}",
186 opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800187 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400188 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200189 log.error("Exception in client of {}, executing {}",
190 deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400191 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400192 } finally {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200193 requestLock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400194 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200195 }, executor);
196 }
197
198 /**
199 * Equivalent of supplyWithExecutor using the gRPC context executor of this
200 * client, such that if the context is cancelled (e.g. client shutdown) the
201 * RPC is automatically cancelled.
202 */
203 private <U> CompletableFuture<U> supplyInContext(
204 Supplier<U> supplier, String opDescription) {
205 return supplyWithExecutor(supplier, opDescription, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400206 }
207
208 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200209 public CompletableFuture<Boolean> start() {
Carmelo Cascone0e427dc2018-08-14 22:15:10 -0700210 return supplyInContext(this::doBecomeMaster,
Carmelo Casconee5b28722018-06-22 17:28:28 +0200211 "start-initStreamChannel");
212 }
213
214 @Override
215 public CompletableFuture<Void> shutdown() {
216 return supplyWithExecutor(this::doShutdown, "shutdown",
217 SharedExecutors.getPoolThreadExecutor());
218 }
219
220 @Override
221 public CompletableFuture<Boolean> becomeMaster() {
222 return supplyInContext(this::doBecomeMaster,
223 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400224 }
225
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400226 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700227 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
228 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400229 }
230
231 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400232 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
233 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200234 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
235 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400236 }
237
238 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400239 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200240 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400241 }
242
243 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200244 public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
245 return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
246 }
247
248 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200249 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200250 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200251 }
252
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200253 @Override
254 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
255 PiPipeconf pipeconf) {
256 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
257 "readCounterCells-" + cellIds.hashCode());
258 }
259
260 @Override
261 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
262 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700263 return supplyInContext(() -> doReadAllCounterCells(counterIds, pipeconf),
264 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200265 }
266
Yi Tseng82512da2017-08-16 19:46:36 -0700267 @Override
Yi Tseng8d355132018-04-13 01:40:48 +0800268 public CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
269 Collection<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700270 WriteOperationType opType,
271 PiPipeconf pipeconf) {
Yi Tseng8d355132018-04-13 01:40:48 +0800272 return supplyInContext(() -> doWriteActionGroupMembers(profileId, members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700273 "writeActionGroupMembers-" + opType.name());
274 }
275
Yi Tseng8d355132018-04-13 01:40:48 +0800276
Yi Tseng82512da2017-08-16 19:46:36 -0700277 @Override
278 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
279 WriteOperationType opType,
280 PiPipeconf pipeconf) {
281 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
282 "writeActionGroup-" + opType.name());
283 }
284
285 @Override
286 public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
287 PiPipeconf pipeconf) {
288 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
289 "dumpGroups-" + actionProfileId.id());
290 }
291
Yi Tseng3e7f1452017-10-20 10:31:53 -0700292 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900293 public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
294
295 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
296 "writeMeterCells");
297 }
298
299 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200300 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
301 Collection<PiMulticastGroupEntry> entries,
302 WriteOperationType opType) {
303 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
304 "writePreMulticastGroupEntries");
305 }
306
307 @Override
308 public CompletableFuture<Collection<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
309 return supplyInContext(this::doReadAllMulticastGroupEntries,
310 "readAllMulticastGroupEntries");
311 }
312
313 @Override
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900314 public CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
315 PiPipeconf pipeconf) {
316 return supplyInContext(() -> doReadMeterCells(cellIds, pipeconf),
317 "readMeterCells-" + cellIds.hashCode());
318 }
319
320 @Override
321 public CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
Carmelo Cascone58136812018-07-19 03:40:16 +0200322 PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700323 return supplyInContext(() -> doReadAllMeterCells(meterIds, pipeconf),
324 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900325 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700326
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400327 /* Blocking method implementations below */
328
Carmelo Casconee5b28722018-06-22 17:28:28 +0200329 private boolean doBecomeMaster() {
330 final Uint128 newId = bigIntegerToUint128(
331 controller.newMasterElectionId(deviceId));
332 if (sendMasterArbitrationUpdate(newId)) {
333 clientElectionId = newId;
334 return true;
335 }
336 return false;
337 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200338
Carmelo Casconee5b28722018-06-22 17:28:28 +0200339 private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
340 log.info("Sending arbitration update to {}... electionId={}",
Carmelo Cascone58136812018-07-19 03:40:16 +0200341 deviceId, uint128ToBigInteger(electionId));
Yi Tseng3e7f1452017-10-20 10:31:53 -0700342 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200343 streamRequestObserver.onNext(
344 StreamMessageRequest.newBuilder()
345 .setArbitration(
346 MasterArbitrationUpdate
347 .newBuilder()
348 .setDeviceId(p4DeviceId)
349 .setElectionId(electionId)
350 .build())
351 .build());
352 return true;
Yi Tsenge67e1412018-01-31 17:35:20 -0800353 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800354 log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
Yi Tseng3e7f1452017-10-20 10:31:53 -0700355 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200356 return false;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700357 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200358
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700359 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400360
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700361 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
362
363 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400364
365 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
366 if (p4Info == null) {
367 // Problem logged by PipeconfHelper.
368 return false;
369 }
370
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700371 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
372 .newBuilder()
373 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
374 .setReassign(true)
375 .setDeviceData(ByteString.copyFrom(deviceData))
376 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400377
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700378 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
Andrea Campanella0288c872017-08-07 18:32:51 +0200379 .newBuilder()
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700380 .setP4Info(p4Info)
381 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
382 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400383
384 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
385 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100386 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200387 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400388 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100389 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400390 .build();
391
392 try {
393 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700394 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400395 } catch (StatusRuntimeException ex) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800396 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400397 return false;
398 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400399 }
400
401 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
402 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800403 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400404 return true;
405 }
406
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700407 Collection<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800408 try {
409 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
410 .stream()
411 .map(tableEntryMsg ->
412 Update.newBuilder()
413 .setEntity(Entity.newBuilder()
414 .setTableEntry(tableEntryMsg)
415 .build())
416 .setType(UPDATE_TYPES.get(opType))
417 .build())
418 .collect(Collectors.toList());
419 } catch (EncodeException e) {
420 log.error("Unable to encode table entries, aborting {} operation: {}",
421 opType.name(), e.getMessage());
422 return false;
423 }
424
Carmelo Cascone58136812018-07-19 03:40:16 +0200425 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400426 }
427
428 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
429
Carmelo Cascone9f007702017-08-24 13:30:51 +0200430 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400431
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400432 int tableId;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200433 if (piTableId == null) {
434 // Dump all tables.
435 tableId = 0;
436 } else {
437 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200438 if (browser == null) {
439 log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
440 return Collections.emptyList();
441 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200442 try {
443 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
444 } catch (P4InfoBrowser.NotFoundException e) {
445 log.warn("Unable to dump table: {}", e.getMessage());
446 return Collections.emptyList();
447 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400448 }
449
450 ReadRequest requestMsg = ReadRequest.newBuilder()
451 .setDeviceId(p4DeviceId)
452 .addEntities(Entity.newBuilder()
453 .setTableEntry(TableEntry.newBuilder()
454 .setTableId(tableId)
455 .build())
456 .build())
457 .build();
458
459 Iterator<ReadResponse> responses;
460 try {
461 responses = blockingStub.read(requestMsg);
462 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800463 log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400464 return Collections.emptyList();
465 }
466
467 Iterable<ReadResponse> responseIterable = () -> responses;
468 List<TableEntry> tableEntryMsgs = StreamSupport
469 .stream(responseIterable.spliterator(), false)
470 .map(ReadResponse::getEntitiesList)
471 .flatMap(List::stream)
472 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
473 .map(Entity::getTableEntry)
474 .collect(Collectors.toList());
475
Carmelo Cascone9f007702017-08-24 13:30:51 +0200476 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400477
478 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
479 }
480
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200481 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
482 try {
483 //encode the PiPacketOperation into a PacketOut
484 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
485
486 //Build the request
487 StreamMessageRequest packetOutRequest = StreamMessageRequest
488 .newBuilder().setPacket(packetOut).build();
489
490 //Send the request
491 streamRequestObserver.onNext(packetOutRequest);
492
493 } catch (P4InfoBrowser.NotFoundException e) {
494 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
495 log.debug("Exception", e);
496 return false;
497 }
498 return true;
499 }
500
Carmelo Casconea966c342017-07-30 01:56:30 -0400501 private void doPacketIn(PacketIn packetInMsg) {
502
503 // Retrieve the pipeconf for this client's device.
504 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
505 if (pipeconfService == null) {
506 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
507 }
508 final PiPipeconf pipeconf;
509 if (pipeconfService.ofDevice(deviceId).isPresent() &&
510 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
511 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
512 } else {
513 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
514 return;
515 }
516 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800517 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200518 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200519 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400520 log.debug("Received packet in: {}", event);
521 controller.postEvent(event);
522 }
523
Carmelo Casconee5b28722018-06-22 17:28:28 +0200524 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
525 // From the spec...
526 // - Election_id: The stream RPC with the highest election_id is the
527 // master. Switch populates with the highest election ID it
528 // has received from all connected controllers.
529 // - Status: Switch populates this with OK for the client that is the
530 // master, and with an error status for all other connected clients (at
531 // every mastership change).
532 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700533 return;
534 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200535 final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
536 log.info("Received arbitration update from {}: isMaster={}, electionId={}",
537 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
538 controller.postEvent(new P4RuntimeEvent(
539 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
540 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Casconea966c342017-07-30 01:56:30 -0400541 }
542
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700543 private Collection<PiCounterCellData> doReadAllCounterCells(
544 Collection<PiCounterId> counterIds, PiPipeconf pipeconf) {
545 return doReadCounterEntities(
546 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
547 pipeconf);
548 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200549
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700550 private Collection<PiCounterCellData> doReadCounterCells(
551 Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
552 return doReadCounterEntities(
553 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
554 pipeconf);
555 }
556
557 private Collection<PiCounterCellData> doReadCounterEntities(
558 Collection<Entity> counterEntities, PiPipeconf pipeconf) {
559
560 if (counterEntities.size() == 0) {
561 return Collections.emptyList();
562 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200563
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200564 final ReadRequest request = ReadRequest.newBuilder()
565 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700566 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200567 .build();
568
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200569 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200570 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200571 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200572 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800573 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200574 return Collections.emptyList();
575 }
576
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200577 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200578 .map(ReadResponse::getEntitiesList)
579 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200580 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200581
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700582 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200583 }
584
Yi Tseng8d355132018-04-13 01:40:48 +0800585 private boolean doWriteActionGroupMembers(PiActionProfileId profileId, Collection<PiActionGroupMember> members,
586 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200587 final Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800588
Yi Tseng8d355132018-04-13 01:40:48 +0800589 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800590 try {
Yi Tseng8d355132018-04-13 01:40:48 +0800591 actionProfileMembers.add(ActionProfileMemberEncoder.encode(profileId, member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800592 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
593 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
594 opType.name(), e.getMessage(), member.toString());
595 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700596 }
Yi Tseng82512da2017-08-16 19:46:36 -0700597 }
598
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200599 final Collection<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700600 .map(actionProfileMember ->
601 Update.newBuilder()
602 .setEntity(Entity.newBuilder()
603 .setActionProfileMember(actionProfileMember)
604 .build())
605 .setType(UPDATE_TYPES.get(opType))
606 .build())
607 .collect(Collectors.toList());
608
609 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200610 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700611 return true;
612 }
613
Carmelo Cascone58136812018-07-19 03:40:16 +0200614 return write(updateMsgs, members, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700615 }
616
617 private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
618 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
619 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200620
621 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700622 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200623 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700624 return Collections.emptySet();
625 }
626
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200627 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700628 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200629 actionProfileId = browser
630 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200631 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200632 .getPreamble()
633 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700634 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200635 log.warn("Unable to dump groups: {}", e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700636 return Collections.emptySet();
637 }
638
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200639 // Prepare read request to read all groups from the given action profile.
640 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700641 .setDeviceId(p4DeviceId)
642 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200643 .setActionProfileGroup(
644 ActionProfileGroup.newBuilder()
645 .setActionProfileId(actionProfileId)
646 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700647 .build())
648 .build();
649
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200650 // Read groups.
651 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700652 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200653 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700654 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800655 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Yi Tseng82512da2017-08-16 19:46:36 -0700656 return Collections.emptySet();
657 }
658
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200659 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
660 .map(ReadResponse::getEntitiesList)
661 .flatMap(List::stream)
662 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
663 .map(Entity::getActionProfileGroup)
664 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700665
666 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200667 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700668
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200669 // Returned groups contain only a minimal description of their members.
670 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700671
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200672 // Keep a map of all member IDs for each group ID, will need it later.
673 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
674 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
675 g.getGroupId(),
676 g.getMembersList().stream()
677 .map(ActionProfileGroup.Member::getMemberId)
678 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700679
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200680 // Prepare one big read request to read all members in one shot.
681 final Set<Entity> entityMsgs = groupMsgs.stream()
682 .flatMap(g -> g.getMembersList().stream())
683 .map(ActionProfileGroup.Member::getMemberId)
684 // Prevent issuing many read requests for the same member.
685 .distinct()
686 .map(id -> ActionProfileMember.newBuilder()
687 .setActionProfileId(actionProfileId)
688 .setMemberId(id)
689 .build())
690 .map(m -> Entity.newBuilder()
691 .setActionProfileMember(m)
692 .build())
693 .collect(Collectors.toSet());
694 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
695 .addAllEntities(entityMsgs)
696 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700697
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200698 // Read members.
699 final Iterator<ReadResponse> memberResponses;
700 try {
701 memberResponses = blockingStub.read(memberRequestMsg);
702 } catch (StatusRuntimeException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800703 log.warn("Unable to read members of action profile {} from {}: {}",
704 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200705 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700706 }
707
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200708 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
709 Tools.stream(() -> memberResponses)
710 .map(ReadResponse::getEntitiesList)
711 .flatMap(List::stream)
712 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
713 .map(Entity::getActionProfileMember)
714 .forEach(member -> groupIdToMemberIdsMap.asMap()
715 // Get all group IDs that contain this member.
716 .entrySet()
717 .stream()
718 .filter(entry -> entry.getValue().contains(member.getMemberId()))
719 .map(Map.Entry::getKey)
720 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
721
722 log.debug("Retrieved {} group members from action profile {} on {}...",
723 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
724
725 return groupMsgs.stream()
726 .map(groupMsg -> {
727 try {
728 return ActionProfileGroupEncoder.decode(groupMsg,
729 groupIdToMembersMap.get(groupMsg.getGroupId()),
730 pipeconf);
731 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
732 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
733 return null;
734 }
735 })
736 .filter(Objects::nonNull)
737 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700738 }
739
740 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200741 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700742 try {
743 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
744 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800745 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700746 return false;
747 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200748
Carmelo Cascone58136812018-07-19 03:40:16 +0200749 final Update updateMsg = Update.newBuilder()
750 .setEntity(Entity.newBuilder()
751 .setActionProfileGroup(actionProfileGroup)
752 .build())
753 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200754 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200755
756 return write(Collections.singleton(updateMsg), Collections.singleton(group),
757 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700758 }
759
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700760 private Collection<PiMeterCellConfig> doReadAllMeterCells(
761 Collection<PiMeterId> meterIds, PiPipeconf pipeconf) {
762 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
763 meterIds, pipeconf), pipeconf);
764 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900765
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700766 private Collection<PiMeterCellConfig> doReadMeterCells(
767 Collection<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
768
769 final Collection<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900770 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700771 .withMeterCellId(cellId)
772 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900773 .collect(Collectors.toList());
774
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700775 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
776 piMeterCellConfigs, pipeconf), pipeconf);
777 }
778
779 private Collection<PiMeterCellConfig> doReadMeterEntities(
780 Collection<Entity> entitiesToRead, PiPipeconf pipeconf) {
781
782 if (entitiesToRead.size() == 0) {
783 return Collections.emptyList();
784 }
785
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900786 final ReadRequest request = ReadRequest.newBuilder()
787 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700788 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900789 .build();
790
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900791 final Iterable<ReadResponse> responses;
792 try {
793 responses = () -> blockingStub.read(request);
794 } catch (StatusRuntimeException e) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700795 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900796 log.debug("exception", e);
797 return Collections.emptyList();
798 }
799
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700800 List<Entity> responseEntities = StreamSupport
801 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900802 .map(ReadResponse::getEntitiesList)
803 .flatMap(List::stream)
804 .collect(Collectors.toList());
805
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700806 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900807 }
808
Carmelo Cascone58136812018-07-19 03:40:16 +0200809 private boolean doWriteMeterCells(Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900810
Carmelo Cascone58136812018-07-19 03:40:16 +0200811 Collection<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900812 .stream()
813 .map(meterEntryMsg ->
814 Update.newBuilder()
815 .setEntity(meterEntryMsg)
816 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
817 .build())
818 .collect(Collectors.toList());
819
820 if (updateMsgs.size() == 0) {
821 return true;
822 }
823
Carmelo Cascone58136812018-07-19 03:40:16 +0200824 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
825 }
826
827 private boolean doWriteMulticastGroupEntries(
828 Collection<PiMulticastGroupEntry> entries,
829 WriteOperationType opType) {
830
831 final List<Update> updateMsgs = entries.stream()
832 .map(MulticastGroupEntryCodec::encode)
833 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
834 .setMulticastGroupEntry(mcMsg)
835 .build())
836 .map(preMsg -> Entity.newBuilder()
837 .setPacketReplicationEngineEntry(preMsg)
838 .build())
839 .map(entityMsg -> Update.newBuilder()
840 .setEntity(entityMsg)
841 .setType(UPDATE_TYPES.get(opType))
842 .build())
843 .collect(Collectors.toList());
844 return write(updateMsgs, entries, opType, "multicast group entry");
845 }
846
847 private Collection<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
848
849 final Entity entity = Entity.newBuilder()
850 .setPacketReplicationEngineEntry(
851 PacketReplicationEngineEntry.newBuilder()
852 .setMulticastGroupEntry(
853 MulticastGroupEntry.newBuilder()
854 .build())
855 .build())
856 .build();
857
858 final ReadRequest req = ReadRequest.newBuilder()
859 .setDeviceId(p4DeviceId)
860 .addEntities(entity)
861 .build();
862
863 Iterator<ReadResponse> responses;
864 try {
865 responses = blockingStub.read(req);
866 } catch (StatusRuntimeException e) {
867 log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
868 return Collections.emptyList();
869 }
870
871 Iterable<ReadResponse> responseIterable = () -> responses;
872 final List<PiMulticastGroupEntry> mcEntries = StreamSupport
873 .stream(responseIterable.spliterator(), false)
874 .map(ReadResponse::getEntitiesList)
875 .flatMap(List::stream)
876 .filter(e -> e.getEntityCase()
877 .equals(PACKET_REPLICATION_ENGINE_ENTRY))
878 .map(Entity::getPacketReplicationEngineEntry)
879 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
880 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
881 .map(MulticastGroupEntryCodec::decode)
882 .collect(Collectors.toList());
883
884 log.debug("Retrieved {} multicast group entries from {}...",
885 mcEntries.size(), deviceId);
886
887 return mcEntries;
888 }
889
890 private <E extends PiEntity> boolean write(Collection<Update> updates,
891 Collection<E> writeEntities,
892 WriteOperationType opType,
893 String entryType) {
894 try {
895 blockingStub.write(writeRequest(updates));
896 return true;
897 } catch (StatusRuntimeException e) {
898 checkAndLogWriteErrors(writeEntities, e, opType, entryType);
899 return false;
900 }
901 }
902
903 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
904 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900905 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200906 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900907 .addAllUpdates(updateMsgs)
908 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900909 }
910
Carmelo Casconee5b28722018-06-22 17:28:28 +0200911 private Void doShutdown() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400912 log.info("Shutting down client for {}...", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200913 if (streamRequestObserver != null) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400914 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200915 streamRequestObserver.onCompleted();
916 } catch (IllegalStateException e) {
917 // Thrown if stream channel is already completed. Can ignore.
918 log.debug("Ignored expection: {}", e);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400919 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200920 cancellableContext.cancel(new InterruptedException(
921 "Requested client shutdown"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400922 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200923 this.executorService.shutdown();
924 try {
925 executorService.awaitTermination(5, TimeUnit.SECONDS);
926 } catch (InterruptedException e) {
927 log.warn("Executor service didn't shutdown in time.");
928 Thread.currentThread().interrupt();
929 }
930 return null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400931 }
932
Carmelo Casconee5b28722018-06-22 17:28:28 +0200933 private <E extends PiEntity> void checkAndLogWriteErrors(
934 Collection<E> writeEntities, StatusRuntimeException ex,
935 WriteOperationType opType, String entryType) {
936
937 checkGrpcException(ex);
938
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800939 List<P4RuntimeOuterClass.Error> errors = null;
940 String description = null;
941 try {
942 errors = extractWriteErrorDetails(ex);
943 } catch (InvalidProtocolBufferException e) {
944 description = ex.getStatus().getDescription();
945 }
946
947 log.warn("Unable to {} {} {}(s) on {}: {}{} (detailed errors might be logged below)",
948 opType.name(), writeEntities.size(), entryType, deviceId,
949 ex.getStatus().getCode().name(),
950 description == null ? "" : " - " + description);
951
952 if (errors == null || errors.isEmpty()) {
953 return;
954 }
955
956 // FIXME: we are assuming entities is an ordered collection, e.g. a list,
957 // and that errors are reported in the same order as the corresponding
958 // written entity. Write RPC methods should be refactored to accept an
959 // order list of entities, instead of a collection.
960 if (errors.size() == writeEntities.size()) {
961 Iterator<E> entityIterator = writeEntities.iterator();
962 errors.stream()
963 .map(e -> ImmutablePair.of(e, entityIterator.next()))
964 .filter(p -> p.left.getCanonicalCode() != Status.OK.getCode().value())
965 .forEach(p -> log.warn("Unable to {} {}: {} [{}]",
966 opType.name(), entryType, parseP4Error(p.getLeft()),
967 p.getRight().toString()));
968 } else {
969 log.error("Unable to reconcile error details to updates " +
970 "(sent {} updates, but device returned {} errors)",
971 entryType, writeEntities.size(), errors.size());
972 errors.stream()
973 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
974 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +0200975 opType.name(), entryType, parseP4Error(err)));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800976 }
977 }
978
979 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
980 StatusRuntimeException ex) throws InvalidProtocolBufferException {
981 String statusString = ex.getStatus().getDescription();
982 if (statusString == null) {
983 return Collections.emptyList();
984 }
985 com.google.rpc.Status status = com.google.rpc.Status
986 .parseFrom(statusString.getBytes());
987 return status.getDetailsList().stream()
988 .map(any -> {
989 try {
990 return any.unpack(P4RuntimeOuterClass.Error.class);
991 } catch (InvalidProtocolBufferException e) {
992 log.warn("Unable to unpack P4Runtime Error: {}",
993 any.toString());
994 return null;
995 }
996 })
997 .filter(Objects::nonNull)
998 .collect(Collectors.toList());
999
1000 }
1001
1002 private String parseP4Error(P4RuntimeOuterClass.Error err) {
1003 return format("%s %s (%s code %d)%s",
1004 Status.fromCodeValue(err.getCanonicalCode()),
1005 err.getMessage(),
1006 err.getSpace(),
1007 err.getCode(),
1008 err.hasDetails() ? "\n" + err.getDetails().toString() : "");
1009 }
1010
Carmelo Casconee5b28722018-06-22 17:28:28 +02001011 private void checkGrpcException(StatusRuntimeException ex) {
1012 switch (ex.getStatus().getCode()) {
1013 case OK:
1014 break;
1015 case CANCELLED:
1016 break;
1017 case UNKNOWN:
1018 break;
1019 case INVALID_ARGUMENT:
1020 break;
1021 case DEADLINE_EXCEEDED:
1022 break;
1023 case NOT_FOUND:
1024 break;
1025 case ALREADY_EXISTS:
1026 break;
1027 case PERMISSION_DENIED:
1028 // Notify upper layers that this node is not master.
1029 controller.postEvent(new P4RuntimeEvent(
1030 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
1031 new ArbitrationResponse(deviceId, false)));
1032 break;
1033 case RESOURCE_EXHAUSTED:
1034 break;
1035 case FAILED_PRECONDITION:
1036 break;
1037 case ABORTED:
1038 break;
1039 case OUT_OF_RANGE:
1040 break;
1041 case UNIMPLEMENTED:
1042 break;
1043 case INTERNAL:
1044 break;
1045 case UNAVAILABLE:
1046 // Channel might be closed.
1047 controller.postEvent(new P4RuntimeEvent(
1048 P4RuntimeEvent.Type.CHANNEL_EVENT,
1049 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1050 break;
1051 case DATA_LOSS:
1052 break;
1053 case UNAUTHENTICATED:
1054 break;
1055 default:
1056 break;
1057 }
1058 }
1059
1060 private Uint128 bigIntegerToUint128(BigInteger value) {
1061 final byte[] arr = value.toByteArray();
1062 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1063 .put(new byte[Long.BYTES * 2 - arr.length])
1064 .put(arr);
1065 bb.rewind();
1066 return Uint128.newBuilder()
1067 .setHigh(bb.getLong())
1068 .setLow(bb.getLong())
1069 .build();
1070 }
1071
1072 private BigInteger uint128ToBigInteger(Uint128 value) {
1073 return new BigInteger(
1074 ByteBuffer.allocate(Long.BYTES * 2)
1075 .putLong(value.getHigh())
1076 .putLong(value.getLow())
1077 .array());
1078 }
1079
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001080 /**
1081 * Handles messages received from the device on the stream channel.
1082 */
Carmelo Casconee5b28722018-06-22 17:28:28 +02001083 private class StreamChannelResponseObserver
1084 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001085
1086 @Override
1087 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001088 executorService.submit(() -> doNext(message));
1089 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001090
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001091 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001092 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001093 log.debug("Received message on stream channel from {}: {}",
1094 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001095 switch (message.getUpdateCase()) {
1096 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001097 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001098 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001099 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001100 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001101 return;
1102 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001103 log.warn("Unrecognized stream message from {}: {}",
1104 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001105 }
1106 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001107 log.error("Exception while processing stream message from {}",
1108 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001109 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001110 }
1111
1112 @Override
1113 public void onError(Throwable throwable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001114 log.warn("Error on stream channel for {}: {}",
1115 deviceId, Status.fromThrowable(throwable));
1116 controller.postEvent(new P4RuntimeEvent(
1117 P4RuntimeEvent.Type.CHANNEL_EVENT,
1118 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001119 }
1120
1121 @Override
1122 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001123 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +02001124 controller.postEvent(new P4RuntimeEvent(
1125 P4RuntimeEvent.Type.CHANNEL_EVENT,
1126 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001127 }
1128 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001129}