blob: a06d67e9c08c7d3350fb74a2e0299c7d85b90109 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone87b9b392017-10-02 18:33:20 +020019import com.google.common.collect.HashMultimap;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040020import com.google.common.collect.ImmutableMap;
Yi Tseng82512da2017-08-16 19:46:36 -070021import com.google.common.collect.Lists;
Yi Tseng82512da2017-08-16 19:46:36 -070022import com.google.common.collect.Multimap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070023import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import com.google.protobuf.ByteString;
Yi Tsenge67e1412018-01-31 17:35:20 -080025import com.google.protobuf.InvalidProtocolBufferException;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040026import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import io.grpc.ManagedChannel;
Carmelo Cascone943c6642018-09-11 13:01:03 -070028import io.grpc.Metadata;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040029import io.grpc.Status;
30import io.grpc.StatusRuntimeException;
Carmelo Cascone943c6642018-09-11 13:01:03 -070031import io.grpc.protobuf.lite.ProtoLiteUtils;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070032import io.grpc.stub.ClientCallStreamObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040033import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020034import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconee5b28722018-06-22 17:28:28 +020035import org.onlab.util.SharedExecutors;
Yi Tseng82512da2017-08-16 19:46:36 -070036import org.onlab.util.Tools;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040037import org.onosproject.net.DeviceId;
Carmelo Cascone87892e22017-11-13 16:01:29 -080038import org.onosproject.net.pi.model.PiActionProfileId;
39import org.onosproject.net.pi.model.PiCounterId;
Carmelo Cascone81929aa2018-04-07 01:38:55 -070040import org.onosproject.net.pi.model.PiMeterId;
Andrea Campanella432f7182017-07-14 18:43:27 +020041import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone87892e22017-11-13 16:01:29 -080042import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020043import org.onosproject.net.pi.runtime.PiActionGroup;
44import org.onosproject.net.pi.runtime.PiActionGroupMember;
Carmelo Casconee44592f2018-09-12 02:24:47 -070045import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
steven308017632e152018-10-20 00:51:08 +080046import org.onosproject.net.pi.runtime.PiCounterCell;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020047import org.onosproject.net.pi.runtime.PiCounterCellId;
Frank Wangd7e3b4b2017-09-24 13:37:54 +090048import org.onosproject.net.pi.runtime.PiMeterCellConfig;
49import org.onosproject.net.pi.runtime.PiMeterCellId;
Carmelo Cascone58136812018-07-19 03:40:16 +020050import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
Andrea Campanella432f7182017-07-14 18:43:27 +020051import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040052import org.onosproject.net.pi.runtime.PiTableEntry;
Yi Tsenge67e1412018-01-31 17:35:20 -080053import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import org.onosproject.p4runtime.api.P4RuntimeClient;
55import org.onosproject.p4runtime.api.P4RuntimeEvent;
56import org.slf4j.Logger;
Carmelo Casconee5b28722018-06-22 17:28:28 +020057import p4.config.v1.P4InfoOuterClass.P4Info;
58import p4.tmp.P4Config;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020059import p4.v1.P4RuntimeGrpc;
60import p4.v1.P4RuntimeOuterClass;
61import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
62import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
63import p4.v1.P4RuntimeOuterClass.Entity;
64import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070065import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
66import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020067import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
Carmelo Cascone58136812018-07-19 03:40:16 +020068import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
69import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
Carmelo Cascone6af4e172018-06-15 16:01:30 +020070import p4.v1.P4RuntimeOuterClass.ReadRequest;
71import p4.v1.P4RuntimeOuterClass.ReadResponse;
72import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
73import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
74import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
75import p4.v1.P4RuntimeOuterClass.TableEntry;
76import p4.v1.P4RuntimeOuterClass.Uint128;
77import p4.v1.P4RuntimeOuterClass.Update;
78import p4.v1.P4RuntimeOuterClass.WriteRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040079
Carmelo Casconee5b28722018-06-22 17:28:28 +020080import java.math.BigInteger;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070081import java.net.ConnectException;
Carmelo Casconed61fdb32017-10-30 10:09:57 -070082import java.nio.ByteBuffer;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040083import java.util.Collections;
84import java.util.Iterator;
85import java.util.List;
86import java.util.Map;
Carmelo Cascone87b9b392017-10-02 18:33:20 +020087import java.util.Objects;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020088import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040090import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040091import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040092import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040093import java.util.concurrent.TimeUnit;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070094import java.util.concurrent.atomic.AtomicBoolean;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040095import java.util.concurrent.locks.Lock;
96import java.util.concurrent.locks.ReentrantLock;
97import java.util.function.Supplier;
98import java.util.stream.Collectors;
99import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400100
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700101import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800102import static java.lang.String.format;
Carmelo Casconee44592f2018-09-12 02:24:47 -0700103import static java.util.Collections.singletonList;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400104import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400105import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200106import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
107import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
Carmelo Cascone58136812018-07-19 03:40:16 +0200108import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200109import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200110import static p4.v1.P4RuntimeOuterClass.PacketIn;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200111import static p4.v1.P4RuntimeOuterClass.PacketOut;
Carmelo Cascone58136812018-07-19 03:40:16 +0200112import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
Carmelo Cascone6af4e172018-06-15 16:01:30 +0200113import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400114
115/**
116 * Implementation of a P4Runtime client.
117 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200118final class P4RuntimeClientImpl implements P4RuntimeClient {
119
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200120 // Timeout in seconds to obtain the request lock.
121 private static final int LOCK_TIMEOUT = 60;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400122
Carmelo Cascone943c6642018-09-11 13:01:03 -0700123 private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
124 Metadata.Key.of("grpc-status-details-bin",
125 ProtoLiteUtils.metadataMarshaller(
126 com.google.rpc.Status.getDefaultInstance()));
127
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400128 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
129 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
130 WriteOperationType.INSERT, Update.Type.INSERT,
131 WriteOperationType.MODIFY, Update.Type.MODIFY,
132 WriteOperationType.DELETE, Update.Type.DELETE
133 );
134
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400135 private final Logger log = getLogger(getClass());
136
Carmelo Casconee5b28722018-06-22 17:28:28 +0200137 private final Lock requestLock = new ReentrantLock();
138 private final Context.CancellableContext cancellableContext =
139 Context.current().withCancellation();
140
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400141 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200142 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400143 private final P4RuntimeControllerImpl controller;
144 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400145 private final ExecutorService executorService;
146 private final Executor contextExecutor;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700147 private StreamChannelManager streamChannelManager;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400148
Carmelo Casconee5b28722018-06-22 17:28:28 +0200149 // Used by this client for write requests.
150 private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700151
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700152 private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
153
Yi Tseng82512da2017-08-16 19:46:36 -0700154 /**
155 * Default constructor.
156 *
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200157 * @param deviceId the ONOS device id
Yi Tseng82512da2017-08-16 19:46:36 -0700158 * @param p4DeviceId the P4 device id
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200159 * @param channel gRPC channel
Yi Tseng82512da2017-08-16 19:46:36 -0700160 * @param controller runtime client controller
161 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200162 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
163 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400164 this.deviceId = deviceId;
165 this.p4DeviceId = p4DeviceId;
166 this.controller = controller;
Carmelo Casconea966c342017-07-30 01:56:30 -0400167 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Casconee5b28722018-06-22 17:28:28 +0200168 "onos-p4runtime-client-" + deviceId.toString(), "%d"));
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400169 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200170 //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200171 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700172 this.streamChannelManager = new StreamChannelManager(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400173 }
174
175 /**
Carmelo Cascone58136812018-07-19 03:40:16 +0200176 * Submits a task for async execution via the given executor. All tasks
177 * submitted with this method will be executed sequentially.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400178 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200179 private <U> CompletableFuture<U> supplyWithExecutor(
180 Supplier<U> supplier, String opDescription, Executor executor) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400181 return CompletableFuture.supplyAsync(() -> {
182 // TODO: explore a more relaxed locking strategy.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200183 try {
184 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
185 log.error("LOCK TIMEOUT! This is likely a deadlock, "
186 + "please debug (executing {})",
187 opDescription);
188 throw new IllegalThreadStateException("Lock timeout");
189 }
190 } catch (InterruptedException e) {
191 log.warn("Thread interrupted while waiting for lock (executing {})",
192 opDescription);
Ray Milkeydbd38212018-07-02 09:18:09 -0700193 throw new IllegalStateException(e);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200194 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400195 try {
196 return supplier.get();
Yi Tsenge67e1412018-01-31 17:35:20 -0800197 } catch (StatusRuntimeException ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200198 log.warn("Unable to execute {} on {}: {}",
199 opDescription, deviceId, ex.toString());
Yi Tsenge67e1412018-01-31 17:35:20 -0800200 throw ex;
Carmelo Casconea966c342017-07-30 01:56:30 -0400201 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200202 log.error("Exception in client of {}, executing {}",
203 deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400204 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400205 } finally {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200206 requestLock.unlock();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400207 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200208 }, executor);
209 }
210
211 /**
212 * Equivalent of supplyWithExecutor using the gRPC context executor of this
213 * client, such that if the context is cancelled (e.g. client shutdown) the
214 * RPC is automatically cancelled.
215 */
216 private <U> CompletableFuture<U> supplyInContext(
217 Supplier<U> supplier, String opDescription) {
218 return supplyWithExecutor(supplier, opDescription, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400219 }
220
221 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700222 public CompletableFuture<Boolean> startStreamChannel() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700223 return supplyInContext(() -> sendMasterArbitrationUpdate(false),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200224 "start-initStreamChannel");
225 }
226
227 @Override
228 public CompletableFuture<Void> shutdown() {
229 return supplyWithExecutor(this::doShutdown, "shutdown",
230 SharedExecutors.getPoolThreadExecutor());
231 }
232
233 @Override
234 public CompletableFuture<Boolean> becomeMaster() {
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700235 return supplyInContext(() -> sendMasterArbitrationUpdate(true),
Carmelo Casconee5b28722018-06-22 17:28:28 +0200236 "becomeMaster");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400237 }
238
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400239 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700240 public boolean isMaster() {
241 return streamChannelManager.isOpen() && isClientMaster.get();
242 }
243
244 @Override
245 public boolean isStreamChannelOpen() {
246 return streamChannelManager.isOpen();
247 }
248
249 @Override
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700250 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
251 return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400252 }
253
254 @Override
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700255 public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
256 return doIsPipelineConfigSet(pipeconf, deviceData);
257 }
258
259 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700260 public CompletableFuture<Boolean> writeTableEntries(List<PiTableEntry> piTableEntries,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400261 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200262 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
263 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400264 }
265
266 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700267 public CompletableFuture<List<PiTableEntry>> dumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700268 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
269 return supplyInContext(() -> doDumpTables(piTableIds, defaultEntries, pipeconf),
270 "dumpTables-" + piTableIds.hashCode());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400271 }
272
273 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700274 public CompletableFuture<List<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700275 return supplyInContext(() -> doDumpTables(null, false, pipeconf), "dumpAllTables");
Carmelo Casconee5b28722018-06-22 17:28:28 +0200276 }
277
278 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200279 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200280 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200281 }
282
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200283 @Override
steven308017632e152018-10-20 00:51:08 +0800284 public CompletableFuture<List<PiCounterCell>> readCounterCells(Set<PiCounterCellId> cellIds,
285 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700286 return supplyInContext(() -> doReadCounterCells(Lists.newArrayList(cellIds), pipeconf),
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200287 "readCounterCells-" + cellIds.hashCode());
288 }
289
290 @Override
steven308017632e152018-10-20 00:51:08 +0800291 public CompletableFuture<List<PiCounterCell>> readAllCounterCells(Set<PiCounterId> counterIds,
292 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700293 return supplyInContext(() -> doReadAllCounterCells(Lists.newArrayList(counterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700294 "readAllCounterCells-" + counterIds.hashCode());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200295 }
296
Yi Tseng82512da2017-08-16 19:46:36 -0700297 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700298 public CompletableFuture<Boolean> writeActionGroupMembers(List<PiActionGroupMember> members,
Yi Tseng82512da2017-08-16 19:46:36 -0700299 WriteOperationType opType,
300 PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700301 return supplyInContext(() -> doWriteActionGroupMembers(members, opType, pipeconf),
Yi Tseng82512da2017-08-16 19:46:36 -0700302 "writeActionGroupMembers-" + opType.name());
303 }
304
Yi Tseng8d355132018-04-13 01:40:48 +0800305
Yi Tseng82512da2017-08-16 19:46:36 -0700306 @Override
307 public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
308 WriteOperationType opType,
309 PiPipeconf pipeconf) {
310 return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
311 "writeActionGroup-" + opType.name());
312 }
313
314 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700315 public CompletableFuture<List<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
316 PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700317 return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
318 "dumpGroups-" + actionProfileId.id());
319 }
320
Yi Tseng3e7f1452017-10-20 10:31:53 -0700321 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700322 public CompletableFuture<List<PiActionGroupMemberId>> dumpActionProfileMemberIds(
323 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
324 return supplyInContext(() -> doDumpActionProfileMemberIds(actionProfileId, pipeconf),
325 "dumpActionProfileMemberIds-" + actionProfileId.id());
326 }
327
328 @Override
329 public CompletableFuture<List<PiActionGroupMemberId>> removeActionProfileMembers(
330 PiActionProfileId actionProfileId,
331 List<PiActionGroupMemberId> memberIds,
332 PiPipeconf pipeconf) {
333 return supplyInContext(
334 () -> doRemoveActionProfileMembers(actionProfileId, memberIds, pipeconf),
335 "cleanupActionProfileMembers-" + actionProfileId.id());
336 }
337
338 @Override
339 public CompletableFuture<Boolean> writeMeterCells(List<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900340
341 return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
342 "writeMeterCells");
343 }
344
345 @Override
Carmelo Cascone58136812018-07-19 03:40:16 +0200346 public CompletableFuture<Boolean> writePreMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700347 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +0200348 WriteOperationType opType) {
349 return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
350 "writePreMulticastGroupEntries");
351 }
352
353 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700354 public CompletableFuture<List<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +0200355 return supplyInContext(this::doReadAllMulticastGroupEntries,
356 "readAllMulticastGroupEntries");
357 }
358
359 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700360 public CompletableFuture<List<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
361 PiPipeconf pipeconf) {
362 return supplyInContext(() -> doReadMeterCells(Lists.newArrayList(cellIds), pipeconf),
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900363 "readMeterCells-" + cellIds.hashCode());
364 }
365
366 @Override
Carmelo Casconee44592f2018-09-12 02:24:47 -0700367 public CompletableFuture<List<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
368 PiPipeconf pipeconf) {
369 return supplyInContext(() -> doReadAllMeterCells(Lists.newArrayList(meterIds), pipeconf),
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700370 "readAllMeterCells-" + meterIds.hashCode());
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900371 }
Yi Tseng3e7f1452017-10-20 10:31:53 -0700372
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400373 /* Blocking method implementations below */
374
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700375 private boolean sendMasterArbitrationUpdate(boolean asMaster) {
376 BigInteger newId = controller.newMasterElectionId(deviceId);
377 if (asMaster) {
378 // Becoming master is a race. Here we increase our chances of win
379 // against other ONOS nodes in the cluster that are calling start()
380 // (which is used to start the stream RPC session, not to become
381 // master).
382 newId = newId.add(BigInteger.valueOf(1000));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200383 }
Carmelo Cascone85d72ef2018-08-23 19:06:29 -0700384 final Uint128 idMsg = bigIntegerToUint128(
385 controller.newMasterElectionId(deviceId));
Andrea Campanella1e573442018-05-17 17:07:13 +0200386
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700387 log.debug("Sending arbitration update to {}... electionId={}",
388 deviceId, newId);
389
390 streamChannelManager.send(
391 StreamMessageRequest.newBuilder()
392 .setArbitration(
393 MasterArbitrationUpdate
394 .newBuilder()
395 .setDeviceId(p4DeviceId)
396 .setElectionId(idMsg)
397 .build())
398 .build());
399 clientElectionId = idMsg;
400 return true;
401 }
402
403 private ForwardingPipelineConfig getPipelineConfig(
404 PiPipeconf pipeconf, ByteBuffer deviceData) {
405 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
406 if (p4Info == null) {
407 // Problem logged by PipeconfHelper.
408 return null;
Yi Tseng3e7f1452017-10-20 10:31:53 -0700409 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700410
ghj050452092a48bf2018-10-22 10:50:41 -0700411 ForwardingPipelineConfig.Cookie pipeconfCookie = ForwardingPipelineConfig.Cookie
412 .newBuilder()
ghj0504520ec1a4202018-10-22 10:50:41 -0700413 .setCookie(pipeconf.fingerprint())
ghj050452092a48bf2018-10-22 10:50:41 -0700414 .build();
415
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700416 // FIXME: This is specific to PI P4Runtime implementation.
417 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
418 .newBuilder()
419 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
420 .setReassign(true)
421 .setDeviceData(ByteString.copyFrom(deviceData))
422 .build();
423
424 return ForwardingPipelineConfig
425 .newBuilder()
426 .setP4Info(p4Info)
427 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
ghj050452092a48bf2018-10-22 10:50:41 -0700428 .setCookie(pipeconfCookie)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700429 .build();
430 }
431
432 private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
433
434 GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
435 .newBuilder()
436 .setDeviceId(p4DeviceId)
ghj050452092a48bf2018-10-22 10:50:41 -0700437 .setResponseType(GetForwardingPipelineConfigRequest
438 .ResponseType.COOKIE_ONLY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700439 .build();
440
441 GetForwardingPipelineConfigResponse resp;
442 try {
443 resp = this.blockingStub
444 .getForwardingPipelineConfig(request);
445 } catch (StatusRuntimeException ex) {
446 checkGrpcException(ex);
447 // FAILED_PRECONDITION means that a pipeline config was not set in
448 // the first place. Don't bother logging.
449 if (!ex.getStatus().getCode()
450 .equals(Status.FAILED_PRECONDITION.getCode())) {
451 log.warn("Unable to get pipeline config from {}: {}",
452 deviceId, ex.getMessage());
453 }
454 return false;
455 }
ghj050452092a48bf2018-10-22 10:50:41 -0700456 if (!resp.getConfig().hasCookie()) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700457 log.warn("{} returned GetForwardingPipelineConfigResponse " +
ghj050452092a48bf2018-10-22 10:50:41 -0700458 "with 'cookie' field unset",
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700459 deviceId);
460 return false;
461 }
ghj050452092a48bf2018-10-22 10:50:41 -0700462
ghj0504520ec1a4202018-10-22 10:50:41 -0700463 return resp.getConfig().getCookie().getCookie() == pipeconf.fingerprint();
Yi Tseng3e7f1452017-10-20 10:31:53 -0700464 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200465
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700466 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400467
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700468 log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
469
470 checkNotNull(deviceData, "deviceData cannot be null");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400471
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700472 ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
473
474 if (pipelineConfig == null) {
475 // Error logged in getPipelineConfig()
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400476 return false;
477 }
478
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400479 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
480 .newBuilder()
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100481 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200482 .setElectionId(clientElectionId)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400483 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella8bcd5862017-12-11 11:34:45 +0100484 .setConfig(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400485 .build();
486
487 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700488 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400489 this.blockingStub.setForwardingPipelineConfig(request);
Carmelo Casconed61fdb32017-10-30 10:09:57 -0700490 return true;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400491 } catch (StatusRuntimeException ex) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700492 checkGrpcException(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800493 log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400494 return false;
495 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400496 }
497
Carmelo Casconee44592f2018-09-12 02:24:47 -0700498 private boolean doWriteTableEntries(List<PiTableEntry> piTableEntries, WriteOperationType opType,
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400499 PiPipeconf pipeconf) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800500 if (piTableEntries.size() == 0) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400501 return true;
502 }
503
Carmelo Casconee44592f2018-09-12 02:24:47 -0700504 List<Update> updateMsgs;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800505 try {
506 updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
507 .stream()
508 .map(tableEntryMsg ->
509 Update.newBuilder()
510 .setEntity(Entity.newBuilder()
511 .setTableEntry(tableEntryMsg)
512 .build())
513 .setType(UPDATE_TYPES.get(opType))
514 .build())
515 .collect(Collectors.toList());
516 } catch (EncodeException e) {
517 log.error("Unable to encode table entries, aborting {} operation: {}",
518 opType.name(), e.getMessage());
519 return false;
520 }
521
Carmelo Cascone58136812018-07-19 03:40:16 +0200522 return write(updateMsgs, piTableEntries, opType, "table entry");
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400523 }
524
Carmelo Casconee44592f2018-09-12 02:24:47 -0700525 private List<PiTableEntry> doDumpTables(
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700526 Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400527
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700528 log.debug("Dumping tables {} from {} (pipeconf {})...",
529 piTableIds, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400530
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700531 Set<Integer> tableIds = Sets.newHashSet();
532 if (piTableIds == null) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200533 // Dump all tables.
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700534 tableIds.add(0);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200535 } else {
536 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Carmelo Cascone58136812018-07-19 03:40:16 +0200537 if (browser == null) {
538 log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
539 return Collections.emptyList();
540 }
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700541 piTableIds.forEach(piTableId -> {
542 try {
543 tableIds.add(browser.tables().getByName(piTableId.id()).getPreamble().getId());
544 } catch (P4InfoBrowser.NotFoundException e) {
545 log.warn("Unable to dump table {}: {}", piTableId, e.getMessage());
546 }
547 });
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400548 }
549
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700550 if (tableIds.isEmpty()) {
551 return Collections.emptyList();
552 }
553
554 ReadRequest.Builder requestMsgBuilder = ReadRequest.newBuilder()
555 .setDeviceId(p4DeviceId);
556 tableIds.forEach(tableId -> requestMsgBuilder.addEntities(
557 Entity.newBuilder()
558 .setTableEntry(
559 TableEntry.newBuilder()
560 .setTableId(tableId)
561 .setIsDefaultAction(defaultEntries)
steven308017632e152018-10-20 00:51:08 +0800562 .setCounterData(P4RuntimeOuterClass.CounterData.getDefaultInstance())
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700563 .build())
564 .build())
565 .build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400566
567 Iterator<ReadResponse> responses;
568 try {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700569 responses = blockingStub.read(requestMsgBuilder.build());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400570 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700571 checkGrpcException(e);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700572 log.warn("Unable to dump tables from {}: {}", deviceId, e.getMessage());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400573 return Collections.emptyList();
574 }
575
576 Iterable<ReadResponse> responseIterable = () -> responses;
577 List<TableEntry> tableEntryMsgs = StreamSupport
578 .stream(responseIterable.spliterator(), false)
579 .map(ReadResponse::getEntitiesList)
580 .flatMap(List::stream)
581 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
582 .map(Entity::getTableEntry)
583 .collect(Collectors.toList());
584
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700585 log.debug("Retrieved {} entries from {} tables on {}...",
586 tableEntryMsgs.size(), tableIds.size(), deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400587
588 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
589 }
590
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200591 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
592 try {
593 //encode the PiPacketOperation into a PacketOut
594 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
595
596 //Build the request
597 StreamMessageRequest packetOutRequest = StreamMessageRequest
598 .newBuilder().setPacket(packetOut).build();
599
600 //Send the request
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700601 streamChannelManager.send(packetOutRequest);
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200602
603 } catch (P4InfoBrowser.NotFoundException e) {
604 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
605 log.debug("Exception", e);
606 return false;
607 }
608 return true;
609 }
610
Carmelo Casconea966c342017-07-30 01:56:30 -0400611 private void doPacketIn(PacketIn packetInMsg) {
612
613 // Retrieve the pipeconf for this client's device.
614 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
615 if (pipeconfService == null) {
616 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
617 }
618 final PiPipeconf pipeconf;
619 if (pipeconfService.ofDevice(deviceId).isPresent() &&
620 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
621 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
622 } else {
623 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
624 return;
625 }
626 // Decode packet message and post event.
Carmelo Cascone87892e22017-11-13 16:01:29 -0800627 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200628 PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200629 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400630 log.debug("Received packet in: {}", event);
631 controller.postEvent(event);
632 }
633
Carmelo Casconee5b28722018-06-22 17:28:28 +0200634 private void doArbitrationResponse(MasterArbitrationUpdate msg) {
635 // From the spec...
636 // - Election_id: The stream RPC with the highest election_id is the
637 // master. Switch populates with the highest election ID it
638 // has received from all connected controllers.
639 // - Status: Switch populates this with OK for the client that is the
640 // master, and with an error status for all other connected clients (at
641 // every mastership change).
642 if (!msg.hasElectionId() || !msg.hasStatus()) {
Yi Tseng3e7f1452017-10-20 10:31:53 -0700643 return;
644 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700645 final boolean isMaster =
646 msg.getStatus().getCode() == Status.OK.getCode().value();
647 log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
648 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200649 controller.postEvent(new P4RuntimeEvent(
650 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
651 new ArbitrationResponse(deviceId, isMaster)));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700652 isClientMaster.set(isMaster);
Carmelo Casconea966c342017-07-30 01:56:30 -0400653 }
654
steven308017632e152018-10-20 00:51:08 +0800655 private List<PiCounterCell> doReadAllCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700656 List<PiCounterId> counterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700657 return doReadCounterEntities(
658 CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
659 pipeconf);
660 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200661
steven308017632e152018-10-20 00:51:08 +0800662 private List<PiCounterCell> doReadCounterCells(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700663 List<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700664 return doReadCounterEntities(
665 CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
666 pipeconf);
667 }
668
steven308017632e152018-10-20 00:51:08 +0800669 private List<PiCounterCell> doReadCounterEntities(
Carmelo Casconee44592f2018-09-12 02:24:47 -0700670 List<Entity> counterEntities, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700671
672 if (counterEntities.size() == 0) {
673 return Collections.emptyList();
674 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200675
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200676 final ReadRequest request = ReadRequest.newBuilder()
677 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700678 .addAllEntities(counterEntities)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200679 .build();
680
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200681 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200682 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200683 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200684 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700685 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800686 log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200687 return Collections.emptyList();
688 }
689
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200690 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200691 .map(ReadResponse::getEntitiesList)
692 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200693 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200694
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700695 return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200696 }
697
Carmelo Casconee44592f2018-09-12 02:24:47 -0700698 private boolean doWriteActionGroupMembers(List<PiActionGroupMember> members,
Yi Tseng8d355132018-04-13 01:40:48 +0800699 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700700 final List<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800701
Yi Tseng8d355132018-04-13 01:40:48 +0800702 for (PiActionGroupMember member : members) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800703 try {
Carmelo Casconee44592f2018-09-12 02:24:47 -0700704 actionProfileMembers.add(ActionProfileMemberEncoder.encode(member, pipeconf));
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800705 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
706 log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
707 opType.name(), e.getMessage(), member.toString());
708 return false;
Yi Tseng82512da2017-08-16 19:46:36 -0700709 }
Yi Tseng82512da2017-08-16 19:46:36 -0700710 }
711
Carmelo Casconee44592f2018-09-12 02:24:47 -0700712 final List<Update> updateMsgs = actionProfileMembers.stream()
Yi Tseng82512da2017-08-16 19:46:36 -0700713 .map(actionProfileMember ->
714 Update.newBuilder()
715 .setEntity(Entity.newBuilder()
716 .setActionProfileMember(actionProfileMember)
717 .build())
718 .setType(UPDATE_TYPES.get(opType))
719 .build())
720 .collect(Collectors.toList());
721
722 if (updateMsgs.size() == 0) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200723 // Nothing to update.
Yi Tseng82512da2017-08-16 19:46:36 -0700724 return true;
725 }
726
Carmelo Cascone58136812018-07-19 03:40:16 +0200727 return write(updateMsgs, members, opType, "group member");
Yi Tseng82512da2017-08-16 19:46:36 -0700728 }
729
Carmelo Casconee44592f2018-09-12 02:24:47 -0700730 private List<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
Yi Tseng82512da2017-08-16 19:46:36 -0700731 log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
732 piActionProfileId.id(), deviceId, pipeconf.id());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200733
734 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
Yi Tseng82512da2017-08-16 19:46:36 -0700735 if (browser == null) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200736 log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700737 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700738 }
739
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200740 final int actionProfileId;
Yi Tseng82512da2017-08-16 19:46:36 -0700741 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200742 actionProfileId = browser
743 .actionProfiles()
Carmelo Casconecb0a49c2017-10-03 14:32:23 +0200744 .getByName(piActionProfileId.id())
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200745 .getPreamble()
746 .getId();
Yi Tseng82512da2017-08-16 19:46:36 -0700747 } catch (P4InfoBrowser.NotFoundException e) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200748 log.warn("Unable to dump groups: {}", e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700749 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700750 }
751
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200752 // Prepare read request to read all groups from the given action profile.
753 final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
Yi Tseng82512da2017-08-16 19:46:36 -0700754 .setDeviceId(p4DeviceId)
755 .addEntities(Entity.newBuilder()
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200756 .setActionProfileGroup(
757 ActionProfileGroup.newBuilder()
758 .setActionProfileId(actionProfileId)
759 .build())
Yi Tseng82512da2017-08-16 19:46:36 -0700760 .build())
761 .build();
762
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200763 // Read groups.
764 final Iterator<ReadResponse> groupResponses;
Yi Tseng82512da2017-08-16 19:46:36 -0700765 try {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200766 groupResponses = blockingStub.read(groupRequestMsg);
Yi Tseng82512da2017-08-16 19:46:36 -0700767 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700768 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800769 log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
Carmelo Casconee44592f2018-09-12 02:24:47 -0700770 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700771 }
772
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200773 final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
774 .map(ReadResponse::getEntitiesList)
775 .flatMap(List::stream)
776 .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
777 .map(Entity::getActionProfileGroup)
778 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700779
780 log.debug("Retrieved {} groups from action profile {} on {}...",
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200781 groupMsgs.size(), piActionProfileId.id(), deviceId);
Yi Tseng82512da2017-08-16 19:46:36 -0700782
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200783 // Returned groups contain only a minimal description of their members.
784 // We need to issue a new request to get the full description of each member.
Yi Tseng82512da2017-08-16 19:46:36 -0700785
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200786 // Keep a map of all member IDs for each group ID, will need it later.
787 final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
788 groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
789 g.getGroupId(),
790 g.getMembersList().stream()
791 .map(ActionProfileGroup.Member::getMemberId)
792 .collect(Collectors.toList())));
Yi Tseng82512da2017-08-16 19:46:36 -0700793
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200794 // Prepare one big read request to read all members in one shot.
795 final Set<Entity> entityMsgs = groupMsgs.stream()
796 .flatMap(g -> g.getMembersList().stream())
797 .map(ActionProfileGroup.Member::getMemberId)
798 // Prevent issuing many read requests for the same member.
799 .distinct()
800 .map(id -> ActionProfileMember.newBuilder()
801 .setActionProfileId(actionProfileId)
802 .setMemberId(id)
803 .build())
804 .map(m -> Entity.newBuilder()
805 .setActionProfileMember(m)
806 .build())
807 .collect(Collectors.toSet());
808 final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
809 .addAllEntities(entityMsgs)
810 .build();
Yi Tseng82512da2017-08-16 19:46:36 -0700811
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200812 // Read members.
813 final Iterator<ReadResponse> memberResponses;
814 try {
815 memberResponses = blockingStub.read(memberRequestMsg);
816 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700817 checkGrpcException(e);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800818 log.warn("Unable to read members of action profile {} from {}: {}",
819 piActionProfileId, deviceId, e.getMessage());
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200820 return Collections.emptyList();
Yi Tseng82512da2017-08-16 19:46:36 -0700821 }
822
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200823 final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
824 Tools.stream(() -> memberResponses)
825 .map(ReadResponse::getEntitiesList)
826 .flatMap(List::stream)
827 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
828 .map(Entity::getActionProfileMember)
829 .forEach(member -> groupIdToMemberIdsMap.asMap()
830 // Get all group IDs that contain this member.
831 .entrySet()
832 .stream()
833 .filter(entry -> entry.getValue().contains(member.getMemberId()))
834 .map(Map.Entry::getKey)
835 .forEach(gid -> groupIdToMembersMap.put(gid, member)));
836
837 log.debug("Retrieved {} group members from action profile {} on {}...",
838 groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
839
840 return groupMsgs.stream()
841 .map(groupMsg -> {
842 try {
843 return ActionProfileGroupEncoder.decode(groupMsg,
844 groupIdToMembersMap.get(groupMsg.getGroupId()),
845 pipeconf);
846 } catch (P4InfoBrowser.NotFoundException | EncodeException e) {
847 log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
848 return null;
849 }
850 })
851 .filter(Objects::nonNull)
852 .collect(Collectors.toList());
Yi Tseng82512da2017-08-16 19:46:36 -0700853 }
854
Carmelo Casconee44592f2018-09-12 02:24:47 -0700855 private List<PiActionGroupMemberId> doDumpActionProfileMemberIds(
856 PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
857
858 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
859 if (browser == null) {
860 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
861 "aborting cleanup of action profile members",
862 pipeconf);
863 return Collections.emptyList();
864 }
865
866 final int p4ActProfId;
867 try {
868 p4ActProfId = browser
869 .actionProfiles()
870 .getByName(actionProfileId.id())
871 .getPreamble()
872 .getId();
873 } catch (P4InfoBrowser.NotFoundException e) {
874 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
875 return Collections.emptyList();
876 }
877
878 final ReadRequest memberRequestMsg = ReadRequest.newBuilder()
879 .setDeviceId(p4DeviceId)
880 .addEntities(Entity.newBuilder().setActionProfileMember(
881 ActionProfileMember.newBuilder()
882 .setActionProfileId(p4ActProfId)
883 .build()).build())
884 .build();
885
886 // Read members.
887 final Iterator<ReadResponse> memberResponses;
888 try {
889 memberResponses = blockingStub.read(memberRequestMsg);
890 } catch (StatusRuntimeException e) {
891 checkGrpcException(e);
892 log.warn("Unable to read members of action profile {} from {}: {}",
893 actionProfileId, deviceId, e.getMessage());
894 return Collections.emptyList();
895 }
896
897 return Tools.stream(() -> memberResponses)
898 .map(ReadResponse::getEntitiesList)
899 .flatMap(List::stream)
900 .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
901 .map(Entity::getActionProfileMember)
902 // Perhaps not needed, but better to double check to avoid
903 // removing members of other groups.
904 .filter(m -> m.getActionProfileId() == p4ActProfId)
905 .map(ActionProfileMember::getMemberId)
906 .map(PiActionGroupMemberId::of)
907 .collect(Collectors.toList());
908 }
909
910 private List<PiActionGroupMemberId> doRemoveActionProfileMembers(
911 PiActionProfileId actionProfileId,
912 List<PiActionGroupMemberId> memberIds,
913 PiPipeconf pipeconf) {
914
915 if (memberIds.isEmpty()) {
916 return Collections.emptyList();
917 }
918
919 final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
920 if (browser == null) {
921 log.warn("Unable to get a P4Info browser for pipeconf {}, " +
922 "aborting cleanup of action profile members",
923 pipeconf);
924 return Collections.emptyList();
925 }
926
927 final int p4ActProfId;
928 try {
929 p4ActProfId = browser.actionProfiles()
930 .getByName(actionProfileId.id()).getPreamble().getId();
931 } catch (P4InfoBrowser.NotFoundException e) {
932 log.warn("Unable to cleanup action profile members: {}", e.getMessage());
933 return Collections.emptyList();
934 }
935
936 final List<Update> updateMsgs = memberIds.stream()
937 .map(m -> ActionProfileMember.newBuilder()
938 .setActionProfileId(p4ActProfId)
939 .setMemberId(m.id()).build())
940 .map(m -> Entity.newBuilder().setActionProfileMember(m).build())
941 .map(e -> Update.newBuilder().setEntity(e)
942 .setType(Update.Type.DELETE).build())
943 .collect(Collectors.toList());
944
945 log.debug("Removing {} members of action profile '{}'...",
946 memberIds.size(), actionProfileId);
947
948 return writeAndReturnSuccessEntities(
949 updateMsgs, memberIds, WriteOperationType.DELETE,
950 "action profile members");
951 }
952
Yi Tseng82512da2017-08-16 19:46:36 -0700953 private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200954 final ActionProfileGroup actionProfileGroup;
Yi Tseng82512da2017-08-16 19:46:36 -0700955 try {
956 actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
957 } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -0800958 log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
Yi Tseng82512da2017-08-16 19:46:36 -0700959 return false;
960 }
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200961
Carmelo Cascone58136812018-07-19 03:40:16 +0200962 final Update updateMsg = Update.newBuilder()
963 .setEntity(Entity.newBuilder()
964 .setActionProfileGroup(actionProfileGroup)
965 .build())
966 .setType(UPDATE_TYPES.get(opType))
Carmelo Cascone87b9b392017-10-02 18:33:20 +0200967 .build();
Carmelo Cascone58136812018-07-19 03:40:16 +0200968
Carmelo Casconee44592f2018-09-12 02:24:47 -0700969 return write(singletonList(updateMsg), singletonList(group),
Carmelo Cascone58136812018-07-19 03:40:16 +0200970 opType, "group");
Yi Tseng82512da2017-08-16 19:46:36 -0700971 }
972
Carmelo Casconee44592f2018-09-12 02:24:47 -0700973 private List<PiMeterCellConfig> doReadAllMeterCells(
974 List<PiMeterId> meterIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700975 return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
976 meterIds, pipeconf), pipeconf);
977 }
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900978
Carmelo Casconee44592f2018-09-12 02:24:47 -0700979 private List<PiMeterCellConfig> doReadMeterCells(
980 List<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700981
Carmelo Casconee44592f2018-09-12 02:24:47 -0700982 final List<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900983 .map(cellId -> PiMeterCellConfig.builder()
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700984 .withMeterCellId(cellId)
985 .build())
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900986 .collect(Collectors.toList());
987
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700988 return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
989 piMeterCellConfigs, pipeconf), pipeconf);
990 }
991
Carmelo Casconee44592f2018-09-12 02:24:47 -0700992 private List<PiMeterCellConfig> doReadMeterEntities(
993 List<Entity> entitiesToRead, PiPipeconf pipeconf) {
Carmelo Cascone81929aa2018-04-07 01:38:55 -0700994
995 if (entitiesToRead.size() == 0) {
996 return Collections.emptyList();
997 }
998
Frank Wangd7e3b4b2017-09-24 13:37:54 +0900999 final ReadRequest request = ReadRequest.newBuilder()
1000 .setDeviceId(p4DeviceId)
Carmelo Cascone81929aa2018-04-07 01:38:55 -07001001 .addAllEntities(entitiesToRead)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001002 .build();
1003
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001004 final Iterable<ReadResponse> responses;
1005 try {
1006 responses = () -> blockingStub.read(request);
1007 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001008 checkGrpcException(e);
Carmelo Cascone81929aa2018-04-07 01:38:55 -07001009 log.warn("Unable to read meter cells: {}", e.getMessage());
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001010 log.debug("exception", e);
1011 return Collections.emptyList();
1012 }
1013
Carmelo Cascone81929aa2018-04-07 01:38:55 -07001014 List<Entity> responseEntities = StreamSupport
1015 .stream(responses.spliterator(), false)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001016 .map(ReadResponse::getEntitiesList)
1017 .flatMap(List::stream)
1018 .collect(Collectors.toList());
1019
Carmelo Cascone81929aa2018-04-07 01:38:55 -07001020 return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001021 }
1022
Carmelo Casconee44592f2018-09-12 02:24:47 -07001023 private boolean doWriteMeterCells(List<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001024
Carmelo Casconee44592f2018-09-12 02:24:47 -07001025 List<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001026 .stream()
1027 .map(meterEntryMsg ->
1028 Update.newBuilder()
1029 .setEntity(meterEntryMsg)
1030 .setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
1031 .build())
1032 .collect(Collectors.toList());
1033
1034 if (updateMsgs.size() == 0) {
1035 return true;
1036 }
1037
Carmelo Cascone58136812018-07-19 03:40:16 +02001038 return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
1039 }
1040
1041 private boolean doWriteMulticastGroupEntries(
Carmelo Casconee44592f2018-09-12 02:24:47 -07001042 List<PiMulticastGroupEntry> entries,
Carmelo Cascone58136812018-07-19 03:40:16 +02001043 WriteOperationType opType) {
1044
1045 final List<Update> updateMsgs = entries.stream()
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001046 .map(piEntry -> {
1047 try {
1048 return MulticastGroupEntryCodec.encode(piEntry);
1049 } catch (EncodeException e) {
1050 log.warn("Unable to encode PiMulticastGroupEntry: {}", e.getMessage());
1051 return null;
1052 }
1053 })
1054 .filter(Objects::nonNull)
Carmelo Cascone58136812018-07-19 03:40:16 +02001055 .map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
1056 .setMulticastGroupEntry(mcMsg)
1057 .build())
1058 .map(preMsg -> Entity.newBuilder()
1059 .setPacketReplicationEngineEntry(preMsg)
1060 .build())
1061 .map(entityMsg -> Update.newBuilder()
1062 .setEntity(entityMsg)
1063 .setType(UPDATE_TYPES.get(opType))
1064 .build())
1065 .collect(Collectors.toList());
1066 return write(updateMsgs, entries, opType, "multicast group entry");
1067 }
1068
Carmelo Casconee44592f2018-09-12 02:24:47 -07001069 private List<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
Carmelo Cascone58136812018-07-19 03:40:16 +02001070
1071 final Entity entity = Entity.newBuilder()
1072 .setPacketReplicationEngineEntry(
1073 PacketReplicationEngineEntry.newBuilder()
1074 .setMulticastGroupEntry(
1075 MulticastGroupEntry.newBuilder()
1076 .build())
1077 .build())
1078 .build();
1079
1080 final ReadRequest req = ReadRequest.newBuilder()
1081 .setDeviceId(p4DeviceId)
1082 .addEntities(entity)
1083 .build();
1084
1085 Iterator<ReadResponse> responses;
1086 try {
1087 responses = blockingStub.read(req);
1088 } catch (StatusRuntimeException e) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001089 checkGrpcException(e);
Carmelo Cascone58136812018-07-19 03:40:16 +02001090 log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
1091 return Collections.emptyList();
1092 }
1093
1094 Iterable<ReadResponse> responseIterable = () -> responses;
1095 final List<PiMulticastGroupEntry> mcEntries = StreamSupport
1096 .stream(responseIterable.spliterator(), false)
1097 .map(ReadResponse::getEntitiesList)
1098 .flatMap(List::stream)
1099 .filter(e -> e.getEntityCase()
1100 .equals(PACKET_REPLICATION_ENGINE_ENTRY))
1101 .map(Entity::getPacketReplicationEngineEntry)
1102 .filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
1103 .map(PacketReplicationEngineEntry::getMulticastGroupEntry)
1104 .map(MulticastGroupEntryCodec::decode)
1105 .collect(Collectors.toList());
1106
1107 log.debug("Retrieved {} multicast group entries from {}...",
1108 mcEntries.size(), deviceId);
1109
1110 return mcEntries;
1111 }
1112
Carmelo Casconee44592f2018-09-12 02:24:47 -07001113 private <T> boolean write(List<Update> updates,
1114 List<T> writeEntities,
1115 WriteOperationType opType,
1116 String entryType) {
1117 // True if all entities were successfully written.
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001118 return writeAndReturnSuccessEntities(updates, writeEntities, opType, entryType)
1119 .size() == writeEntities.size();
Carmelo Casconee44592f2018-09-12 02:24:47 -07001120 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001121
Carmelo Casconee44592f2018-09-12 02:24:47 -07001122 private <T> List<T> writeAndReturnSuccessEntities(
1123 List<Update> updates, List<T> writeEntities,
1124 WriteOperationType opType, String entryType) {
Carmelo Cascone03ae0ac2018-10-11 08:31:59 -07001125 if (updates.isEmpty()) {
1126 return Collections.emptyList();
1127 }
1128 if (updates.size() != writeEntities.size()) {
1129 log.error("Cannot perform {} operation, provided {} " +
1130 "update messages for {} {} - BUG?",
1131 opType, updates.size(), writeEntities.size(), entryType);
1132 return Collections.emptyList();
1133 }
Carmelo Casconee44592f2018-09-12 02:24:47 -07001134 try {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001135 //noinspection ResultOfMethodCallIgnored
Carmelo Cascone58136812018-07-19 03:40:16 +02001136 blockingStub.write(writeRequest(updates));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001137 return writeEntities;
Carmelo Cascone58136812018-07-19 03:40:16 +02001138 } catch (StatusRuntimeException e) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001139 return checkAndLogWriteErrors(writeEntities, e, opType, entryType);
Carmelo Cascone58136812018-07-19 03:40:16 +02001140 }
1141 }
1142
1143 private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
1144 return WriteRequest.newBuilder()
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001145 .setDeviceId(p4DeviceId)
Carmelo Casconee5b28722018-06-22 17:28:28 +02001146 .setElectionId(clientElectionId)
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001147 .addAllUpdates(updateMsgs)
1148 .build();
Frank Wangd7e3b4b2017-09-24 13:37:54 +09001149 }
1150
Carmelo Casconee5b28722018-06-22 17:28:28 +02001151 private Void doShutdown() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001152 log.debug("Shutting down client for {}...", deviceId);
1153 streamChannelManager.complete();
1154 cancellableContext.cancel(new InterruptedException(
1155 "Requested client shutdown"));
1156 this.executorService.shutdownNow();
Carmelo Casconee5b28722018-06-22 17:28:28 +02001157 try {
1158 executorService.awaitTermination(5, TimeUnit.SECONDS);
1159 } catch (InterruptedException e) {
1160 log.warn("Executor service didn't shutdown in time.");
1161 Thread.currentThread().interrupt();
1162 }
1163 return null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001164 }
1165
Carmelo Casconee44592f2018-09-12 02:24:47 -07001166 // Returns the collection of succesfully write entities.
1167 private <T> List<T> checkAndLogWriteErrors(
1168 List<T> writeEntities, StatusRuntimeException ex,
Carmelo Casconee5b28722018-06-22 17:28:28 +02001169 WriteOperationType opType, String entryType) {
1170
1171 checkGrpcException(ex);
1172
Carmelo Cascone943c6642018-09-11 13:01:03 -07001173 final List<P4RuntimeOuterClass.Error> errors = extractWriteErrorDetails(ex);
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001174
Carmelo Cascone943c6642018-09-11 13:01:03 -07001175 if (errors.isEmpty()) {
1176 final String description = ex.getStatus().getDescription();
1177 log.warn("Unable to {} {} {}(s) on {}: {}",
Carmelo Cascone50d195f2018-09-11 13:26:38 -07001178 opType.name(), writeEntities.size(), entryType, deviceId,
1179 ex.getStatus().getCode().name(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001180 description == null ? "" : " - " + description);
Carmelo Casconee44592f2018-09-12 02:24:47 -07001181 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001182 }
1183
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001184 if (errors.size() == writeEntities.size()) {
Carmelo Casconee44592f2018-09-12 02:24:47 -07001185 List<T> okEntities = Lists.newArrayList();
1186 Iterator<T> entityIterator = writeEntities.iterator();
1187 for (P4RuntimeOuterClass.Error error : errors) {
1188 T entity = entityIterator.next();
1189 if (error.getCanonicalCode() != Status.OK.getCode().value()) {
1190 log.warn("Unable to {} {} on {}: {} [{}]",
1191 opType.name(), entryType, deviceId,
1192 parseP4Error(error), entity.toString());
1193 } else {
1194 okEntities.add(entity);
1195 }
1196 }
1197 return okEntities;
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001198 } else {
Carmelo Cascone943c6642018-09-11 13:01:03 -07001199 log.warn("Unable to reconcile error details to updates " +
Carmelo Casconee44592f2018-09-12 02:24:47 -07001200 "(sent {} updates, but device returned {} errors)",
1201 entryType, writeEntities.size(), errors.size());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001202 errors.stream()
1203 .filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
1204 .forEach(err -> log.warn("Unable to {} {} (unknown): {}",
Carmelo Cascone58136812018-07-19 03:40:16 +02001205 opType.name(), entryType, parseP4Error(err)));
Carmelo Casconee44592f2018-09-12 02:24:47 -07001206 return Collections.emptyList();
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001207 }
1208 }
1209
1210 private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
Carmelo Cascone943c6642018-09-11 13:01:03 -07001211 StatusRuntimeException ex) {
1212 if (!ex.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001213 return Collections.emptyList();
1214 }
Carmelo Cascone943c6642018-09-11 13:01:03 -07001215 com.google.rpc.Status status = ex.getTrailers().get(STATUS_DETAILS_KEY);
1216 if (status == null) {
1217 return Collections.emptyList();
1218 }
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001219 return status.getDetailsList().stream()
1220 .map(any -> {
1221 try {
1222 return any.unpack(P4RuntimeOuterClass.Error.class);
1223 } catch (InvalidProtocolBufferException e) {
1224 log.warn("Unable to unpack P4Runtime Error: {}",
1225 any.toString());
1226 return null;
1227 }
1228 })
1229 .filter(Objects::nonNull)
1230 .collect(Collectors.toList());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001231 }
1232
1233 private String parseP4Error(P4RuntimeOuterClass.Error err) {
Carmelo Cascone943c6642018-09-11 13:01:03 -07001234 return format("%s %s%s (%s:%d)",
1235 Status.fromCodeValue(err.getCanonicalCode()).getCode(),
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001236 err.getMessage(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001237 err.hasDetails() ? ", " + err.getDetails().toString() : "",
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001238 err.getSpace(),
Carmelo Cascone943c6642018-09-11 13:01:03 -07001239 err.getCode());
Carmelo Cascone5bc7e102018-02-18 18:27:55 -08001240 }
1241
Carmelo Casconee5b28722018-06-22 17:28:28 +02001242 private void checkGrpcException(StatusRuntimeException ex) {
1243 switch (ex.getStatus().getCode()) {
1244 case OK:
1245 break;
1246 case CANCELLED:
1247 break;
1248 case UNKNOWN:
1249 break;
1250 case INVALID_ARGUMENT:
1251 break;
1252 case DEADLINE_EXCEEDED:
1253 break;
1254 case NOT_FOUND:
1255 break;
1256 case ALREADY_EXISTS:
1257 break;
1258 case PERMISSION_DENIED:
1259 // Notify upper layers that this node is not master.
1260 controller.postEvent(new P4RuntimeEvent(
Carmelo Casconede3b6842018-09-05 17:45:10 -07001261 P4RuntimeEvent.Type.PERMISSION_DENIED,
1262 new BaseP4RuntimeEventSubject(deviceId)));
Carmelo Casconee5b28722018-06-22 17:28:28 +02001263 break;
1264 case RESOURCE_EXHAUSTED:
1265 break;
1266 case FAILED_PRECONDITION:
1267 break;
1268 case ABORTED:
1269 break;
1270 case OUT_OF_RANGE:
1271 break;
1272 case UNIMPLEMENTED:
1273 break;
1274 case INTERNAL:
1275 break;
1276 case UNAVAILABLE:
1277 // Channel might be closed.
1278 controller.postEvent(new P4RuntimeEvent(
1279 P4RuntimeEvent.Type.CHANNEL_EVENT,
1280 new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
1281 break;
1282 case DATA_LOSS:
1283 break;
1284 case UNAUTHENTICATED:
1285 break;
1286 default:
1287 break;
1288 }
1289 }
1290
1291 private Uint128 bigIntegerToUint128(BigInteger value) {
1292 final byte[] arr = value.toByteArray();
1293 final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
1294 .put(new byte[Long.BYTES * 2 - arr.length])
1295 .put(arr);
1296 bb.rewind();
1297 return Uint128.newBuilder()
1298 .setHigh(bb.getLong())
1299 .setLow(bb.getLong())
1300 .build();
1301 }
1302
1303 private BigInteger uint128ToBigInteger(Uint128 value) {
1304 return new BigInteger(
1305 ByteBuffer.allocate(Long.BYTES * 2)
1306 .putLong(value.getHigh())
1307 .putLong(value.getLow())
1308 .array());
1309 }
1310
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001311 /**
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001312 * A manager for the P4Runtime stream channel that opportunistically creates
1313 * new stream RCP stubs (e.g. when one fails because of errors) and posts
1314 * channel events via the P4Runtime controller.
1315 */
1316 private final class StreamChannelManager {
1317
1318 private final ManagedChannel channel;
1319 private final AtomicBoolean open;
1320 private final StreamObserver<StreamMessageResponse> responseObserver;
1321 private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
1322
1323 private StreamChannelManager(ManagedChannel channel) {
1324 this.channel = channel;
1325 this.responseObserver = new InternalStreamResponseObserver(this);
1326 this.open = new AtomicBoolean(false);
1327 }
1328
1329 private void initIfRequired() {
1330 if (requestObserver == null) {
1331 log.debug("Creating new stream channel for {}...", deviceId);
1332 requestObserver =
1333 (ClientCallStreamObserver<StreamMessageRequest>)
1334 P4RuntimeGrpc.newStub(channel)
1335 .streamChannel(responseObserver);
1336 open.set(false);
1337 }
1338 }
1339
1340 public boolean send(StreamMessageRequest value) {
1341 synchronized (this) {
1342 initIfRequired();
1343 try {
1344 requestObserver.onNext(value);
1345 // FIXME
1346 // signalOpen();
1347 return true;
1348 } catch (Throwable ex) {
1349 if (ex instanceof StatusRuntimeException) {
1350 log.warn("Unable to send {} to {}: {}",
1351 value.getUpdateCase().toString(), deviceId, ex.getMessage());
1352 } else {
1353 log.warn(format(
1354 "Exception while sending %s to %s",
1355 value.getUpdateCase().toString(), deviceId), ex);
1356 }
1357 complete();
1358 return false;
1359 }
1360 }
1361 }
1362
1363 public void complete() {
1364 synchronized (this) {
1365 signalClosed();
1366 if (requestObserver != null) {
1367 requestObserver.onCompleted();
1368 requestObserver.cancel("Terminated", null);
1369 requestObserver = null;
1370 }
1371 }
1372 }
1373
1374 void signalOpen() {
1375 synchronized (this) {
1376 final boolean wasOpen = open.getAndSet(true);
1377 if (!wasOpen) {
1378 controller.postEvent(new P4RuntimeEvent(
1379 P4RuntimeEvent.Type.CHANNEL_EVENT,
1380 new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
1381 }
1382 }
1383 }
1384
1385 void signalClosed() {
1386 synchronized (this) {
1387 final boolean wasOpen = open.getAndSet(false);
1388 if (wasOpen) {
1389 controller.postEvent(new P4RuntimeEvent(
1390 P4RuntimeEvent.Type.CHANNEL_EVENT,
1391 new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
1392 }
1393 }
1394 }
1395
1396 public boolean isOpen() {
1397 return open.get();
1398 }
1399 }
1400
1401 /**
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001402 * Handles messages received from the device on the stream channel.
1403 */
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001404 private final class InternalStreamResponseObserver
Carmelo Casconee5b28722018-06-22 17:28:28 +02001405 implements StreamObserver<StreamMessageResponse> {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001406
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001407 private final StreamChannelManager streamChannelManager;
1408
1409 private InternalStreamResponseObserver(
1410 StreamChannelManager streamChannelManager) {
1411 this.streamChannelManager = streamChannelManager;
1412 }
1413
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001414 @Override
1415 public void onNext(StreamMessageResponse message) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001416 streamChannelManager.signalOpen();
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001417 executorService.submit(() -> doNext(message));
1418 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001419
Carmelo Cascone8d99b172017-07-18 17:26:31 -04001420 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -04001421 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001422 log.debug("Received message on stream channel from {}: {}",
1423 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001424 switch (message.getUpdateCase()) {
1425 case PACKET:
Carmelo Casconea966c342017-07-30 01:56:30 -04001426 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +02001427 return;
Carmelo Casconea966c342017-07-30 01:56:30 -04001428 case ARBITRATION:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001429 doArbitrationResponse(message.getArbitration());
Carmelo Casconea966c342017-07-30 01:56:30 -04001430 return;
1431 default:
Carmelo Casconee5b28722018-06-22 17:28:28 +02001432 log.warn("Unrecognized stream message from {}: {}",
1433 deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -04001434 }
1435 } catch (Throwable ex) {
Carmelo Casconee5b28722018-06-22 17:28:28 +02001436 log.error("Exception while processing stream message from {}",
1437 deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001438 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001439 }
1440
1441 @Override
1442 public void onError(Throwable throwable) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001443 if (throwable instanceof StatusRuntimeException) {
1444 StatusRuntimeException sre = (StatusRuntimeException) throwable;
1445 if (sre.getStatus().getCause() instanceof ConnectException) {
1446 log.warn("Device {} is unreachable ({})",
1447 deviceId, sre.getCause().getMessage());
1448 } else {
1449 log.warn("Received error on stream channel for {}: {}",
1450 deviceId, throwable.getMessage());
1451 }
1452 } else {
1453 log.warn(format("Received exception on stream channel for %s",
1454 deviceId), throwable);
1455 }
1456 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001457 }
1458
1459 @Override
1460 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -04001461 log.warn("Stream channel for {} has completed", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001462 streamChannelManager.complete();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001463 }
1464 }
Carmelo Cascone87892e22017-11-13 16:01:29 -08001465}