blob: a06d67e9c08c7d3350fb74a2e0299c7d85b90109 [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.p4runtime.ctl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.lite.ProtoLiteUtils;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiTableId;
import org.onosproject.net.pi.runtime.PiActionGroup;
import org.onosproject.net.pi.runtime.PiActionGroupMember;
import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
import org.onosproject.net.pi.runtime.PiCounterCell;
import org.onosproject.net.pi.runtime.PiCounterCellId;
import org.onosproject.net.pi.runtime.PiMeterCellConfig;
import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.net.pi.runtime.PiMulticastGroupEntry;
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.runtime.PiTableEntry;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.slf4j.Logger;
import p4.config.v1.P4InfoOuterClass.P4Info;
import p4.tmp.P4Config;
import p4.v1.P4RuntimeGrpc;
import p4.v1.P4RuntimeOuterClass;
import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
import p4.v1.P4RuntimeOuterClass.Entity;
import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
import p4.v1.P4RuntimeOuterClass.ReadRequest;
import p4.v1.P4RuntimeOuterClass.ReadResponse;
import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
import p4.v1.P4RuntimeOuterClass.TableEntry;
import p4.v1.P4RuntimeOuterClass.Uint128;
import p4.v1.P4RuntimeOuterClass.Update;
import p4.v1.P4RuntimeOuterClass.WriteRequest;
import java.math.BigInteger;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
import static p4.v1.P4RuntimeOuterClass.PacketIn;
import static p4.v1.P4RuntimeOuterClass.PacketOut;
import static p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry.TypeCase.MULTICAST_GROUP_ENTRY;
import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
/**
* Implementation of a P4Runtime client.
*/
final class P4RuntimeClientImpl implements P4RuntimeClient {
// Timeout in seconds to obtain the request lock.
private static final int LOCK_TIMEOUT = 60;
private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin",
ProtoLiteUtils.metadataMarshaller(
com.google.rpc.Status.getDefaultInstance()));
private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
WriteOperationType.INSERT, Update.Type.INSERT,
WriteOperationType.MODIFY, Update.Type.MODIFY,
WriteOperationType.DELETE, Update.Type.DELETE
);
private final Logger log = getLogger(getClass());
private final Lock requestLock = new ReentrantLock();
private final Context.CancellableContext cancellableContext =
Context.current().withCancellation();
private final DeviceId deviceId;
private final long p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
private final ExecutorService executorService;
private final Executor contextExecutor;
private StreamChannelManager streamChannelManager;
// Used by this client for write requests.
private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
/**
* Default constructor.
*
* @param deviceId the ONOS device id
* @param p4DeviceId the P4 device id
* @param channel gRPC channel
* @param controller runtime client controller
*/
P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
P4RuntimeControllerImpl controller) {
this.deviceId = deviceId;
this.p4DeviceId = p4DeviceId;
this.controller = controller;
this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
"onos-p4runtime-client-" + deviceId.toString(), "%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
//TODO Investigate use of stub deadlines instead of timeout in supplyInContext
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
this.streamChannelManager = new StreamChannelManager(channel);
}
/**
* Submits a task for async execution via the given executor. All tasks
* submitted with this method will be executed sequentially.
*/
private <U> CompletableFuture<U> supplyWithExecutor(
Supplier<U> supplier, String opDescription, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
// TODO: explore a more relaxed locking strategy.
try {
if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
log.error("LOCK TIMEOUT! This is likely a deadlock, "
+ "please debug (executing {})",
opDescription);
throw new IllegalThreadStateException("Lock timeout");
}
} catch (InterruptedException e) {
log.warn("Thread interrupted while waiting for lock (executing {})",
opDescription);
throw new IllegalStateException(e);
}
try {
return supplier.get();
} catch (StatusRuntimeException ex) {
log.warn("Unable to execute {} on {}: {}",
opDescription, deviceId, ex.toString());
throw ex;
} catch (Throwable ex) {
log.error("Exception in client of {}, executing {}",
deviceId, opDescription, ex);
throw ex;
} finally {
requestLock.unlock();
}
}, executor);
}
/**
* Equivalent of supplyWithExecutor using the gRPC context executor of this
* client, such that if the context is cancelled (e.g. client shutdown) the
* RPC is automatically cancelled.
*/
private <U> CompletableFuture<U> supplyInContext(
Supplier<U> supplier, String opDescription) {
return supplyWithExecutor(supplier, opDescription, contextExecutor);
}
@Override
public CompletableFuture<Boolean> startStreamChannel() {
return supplyInContext(() -> sendMasterArbitrationUpdate(false),
"start-initStreamChannel");
}
@Override
public CompletableFuture<Void> shutdown() {
return supplyWithExecutor(this::doShutdown, "shutdown",
SharedExecutors.getPoolThreadExecutor());
}
@Override
public CompletableFuture<Boolean> becomeMaster() {
return supplyInContext(() -> sendMasterArbitrationUpdate(true),
"becomeMaster");
}
@Override
public boolean isMaster() {
return streamChannelManager.isOpen() && isClientMaster.get();
}
@Override
public boolean isStreamChannelOpen() {
return streamChannelManager.isOpen();
}
@Override
public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
}
@Override
public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
return doIsPipelineConfigSet(pipeconf, deviceData);
}
@Override
public CompletableFuture<Boolean> writeTableEntries(List<PiTableEntry> piTableEntries,
WriteOperationType opType, PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
"writeTableEntries-" + opType.name());
}
@Override
public CompletableFuture<List<PiTableEntry>> dumpTables(
Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
return supplyInContext(() -> doDumpTables(piTableIds, defaultEntries, pipeconf),
"dumpTables-" + piTableIds.hashCode());
}
@Override
public CompletableFuture<List<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
return supplyInContext(() -> doDumpTables(null, false, pipeconf), "dumpAllTables");
}
@Override
public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
}
@Override
public CompletableFuture<List<PiCounterCell>> readCounterCells(Set<PiCounterCellId> cellIds,
PiPipeconf pipeconf) {
return supplyInContext(() -> doReadCounterCells(Lists.newArrayList(cellIds), pipeconf),
"readCounterCells-" + cellIds.hashCode());
}
@Override
public CompletableFuture<List<PiCounterCell>> readAllCounterCells(Set<PiCounterId> counterIds,
PiPipeconf pipeconf) {
return supplyInContext(() -> doReadAllCounterCells(Lists.newArrayList(counterIds), pipeconf),
"readAllCounterCells-" + counterIds.hashCode());
}
@Override
public CompletableFuture<Boolean> writeActionGroupMembers(List<PiActionGroupMember> members,
WriteOperationType opType,
PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteActionGroupMembers(members, opType, pipeconf),
"writeActionGroupMembers-" + opType.name());
}
@Override
public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
WriteOperationType opType,
PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
"writeActionGroup-" + opType.name());
}
@Override
public CompletableFuture<List<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
PiPipeconf pipeconf) {
return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
"dumpGroups-" + actionProfileId.id());
}
@Override
public CompletableFuture<List<PiActionGroupMemberId>> dumpActionProfileMemberIds(
PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
return supplyInContext(() -> doDumpActionProfileMemberIds(actionProfileId, pipeconf),
"dumpActionProfileMemberIds-" + actionProfileId.id());
}
@Override
public CompletableFuture<List<PiActionGroupMemberId>> removeActionProfileMembers(
PiActionProfileId actionProfileId,
List<PiActionGroupMemberId> memberIds,
PiPipeconf pipeconf) {
return supplyInContext(
() -> doRemoveActionProfileMembers(actionProfileId, memberIds, pipeconf),
"cleanupActionProfileMembers-" + actionProfileId.id());
}
@Override
public CompletableFuture<Boolean> writeMeterCells(List<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
"writeMeterCells");
}
@Override
public CompletableFuture<Boolean> writePreMulticastGroupEntries(
List<PiMulticastGroupEntry> entries,
WriteOperationType opType) {
return supplyInContext(() -> doWriteMulticastGroupEntries(entries, opType),
"writePreMulticastGroupEntries");
}
@Override
public CompletableFuture<List<PiMulticastGroupEntry>> readAllMulticastGroupEntries() {
return supplyInContext(this::doReadAllMulticastGroupEntries,
"readAllMulticastGroupEntries");
}
@Override
public CompletableFuture<List<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
PiPipeconf pipeconf) {
return supplyInContext(() -> doReadMeterCells(Lists.newArrayList(cellIds), pipeconf),
"readMeterCells-" + cellIds.hashCode());
}
@Override
public CompletableFuture<List<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
PiPipeconf pipeconf) {
return supplyInContext(() -> doReadAllMeterCells(Lists.newArrayList(meterIds), pipeconf),
"readAllMeterCells-" + meterIds.hashCode());
}
/* Blocking method implementations below */
private boolean sendMasterArbitrationUpdate(boolean asMaster) {
BigInteger newId = controller.newMasterElectionId(deviceId);
if (asMaster) {
// Becoming master is a race. Here we increase our chances of win
// against other ONOS nodes in the cluster that are calling start()
// (which is used to start the stream RPC session, not to become
// master).
newId = newId.add(BigInteger.valueOf(1000));
}
final Uint128 idMsg = bigIntegerToUint128(
controller.newMasterElectionId(deviceId));
log.debug("Sending arbitration update to {}... electionId={}",
deviceId, newId);
streamChannelManager.send(
StreamMessageRequest.newBuilder()
.setArbitration(
MasterArbitrationUpdate
.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(idMsg)
.build())
.build());
clientElectionId = idMsg;
return true;
}
private ForwardingPipelineConfig getPipelineConfig(
PiPipeconf pipeconf, ByteBuffer deviceData) {
P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
if (p4Info == null) {
// Problem logged by PipeconfHelper.
return null;
}
ForwardingPipelineConfig.Cookie pipeconfCookie = ForwardingPipelineConfig.Cookie
.newBuilder()
.setCookie(pipeconf.fingerprint())
.build();
// FIXME: This is specific to PI P4Runtime implementation.
P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
.newBuilder()
.setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
.setReassign(true)
.setDeviceData(ByteString.copyFrom(deviceData))
.build();
return ForwardingPipelineConfig
.newBuilder()
.setP4Info(p4Info)
.setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
.setCookie(pipeconfCookie)
.build();
}
private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
.newBuilder()
.setDeviceId(p4DeviceId)
.setResponseType(GetForwardingPipelineConfigRequest
.ResponseType.COOKIE_ONLY)
.build();
GetForwardingPipelineConfigResponse resp;
try {
resp = this.blockingStub
.getForwardingPipelineConfig(request);
} catch (StatusRuntimeException ex) {
checkGrpcException(ex);
// FAILED_PRECONDITION means that a pipeline config was not set in
// the first place. Don't bother logging.
if (!ex.getStatus().getCode()
.equals(Status.FAILED_PRECONDITION.getCode())) {
log.warn("Unable to get pipeline config from {}: {}",
deviceId, ex.getMessage());
}
return false;
}
if (!resp.getConfig().hasCookie()) {
log.warn("{} returned GetForwardingPipelineConfigResponse " +
"with 'cookie' field unset",
deviceId);
return false;
}
return resp.getConfig().getCookie().getCookie() == pipeconf.fingerprint();
}
private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
log.info("Setting pipeline config for {} to {}...", deviceId, pipeconf.id());
checkNotNull(deviceData, "deviceData cannot be null");
ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
if (pipelineConfig == null) {
// Error logged in getPipelineConfig()
return false;
}
SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(clientElectionId)
.setAction(VERIFY_AND_COMMIT)
.setConfig(pipelineConfig)
.build();
try {
//noinspection ResultOfMethodCallIgnored
this.blockingStub.setForwardingPipelineConfig(request);
return true;
} catch (StatusRuntimeException ex) {
checkGrpcException(ex);
log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
return false;
}
}
private boolean doWriteTableEntries(List<PiTableEntry> piTableEntries, WriteOperationType opType,
PiPipeconf pipeconf) {
if (piTableEntries.size() == 0) {
return true;
}
List<Update> updateMsgs;
try {
updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
.stream()
.map(tableEntryMsg ->
Update.newBuilder()
.setEntity(Entity.newBuilder()
.setTableEntry(tableEntryMsg)
.build())
.setType(UPDATE_TYPES.get(opType))
.build())
.collect(Collectors.toList());
} catch (EncodeException e) {
log.error("Unable to encode table entries, aborting {} operation: {}",
opType.name(), e.getMessage());
return false;
}
return write(updateMsgs, piTableEntries, opType, "table entry");
}
private List<PiTableEntry> doDumpTables(
Set<PiTableId> piTableIds, boolean defaultEntries, PiPipeconf pipeconf) {
log.debug("Dumping tables {} from {} (pipeconf {})...",
piTableIds, deviceId, pipeconf.id());
Set<Integer> tableIds = Sets.newHashSet();
if (piTableIds == null) {
// Dump all tables.
tableIds.add(0);
} else {
P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
if (browser == null) {
log.warn("Unable to get a P4Info browser for pipeconf {}", pipeconf);
return Collections.emptyList();
}
piTableIds.forEach(piTableId -> {
try {
tableIds.add(browser.tables().getByName(piTableId.id()).getPreamble().getId());
} catch (P4InfoBrowser.NotFoundException e) {
log.warn("Unable to dump table {}: {}", piTableId, e.getMessage());
}
});
}
if (tableIds.isEmpty()) {
return Collections.emptyList();
}
ReadRequest.Builder requestMsgBuilder = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId);
tableIds.forEach(tableId -> requestMsgBuilder.addEntities(
Entity.newBuilder()
.setTableEntry(
TableEntry.newBuilder()
.setTableId(tableId)
.setIsDefaultAction(defaultEntries)
.setCounterData(P4RuntimeOuterClass.CounterData.getDefaultInstance())
.build())
.build())
.build());
Iterator<ReadResponse> responses;
try {
responses = blockingStub.read(requestMsgBuilder.build());
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to dump tables from {}: {}", deviceId, e.getMessage());
return Collections.emptyList();
}
Iterable<ReadResponse> responseIterable = () -> responses;
List<TableEntry> tableEntryMsgs = StreamSupport
.stream(responseIterable.spliterator(), false)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
.map(Entity::getTableEntry)
.collect(Collectors.toList());
log.debug("Retrieved {} entries from {} tables on {}...",
tableEntryMsgs.size(), tableIds.size(), deviceId);
return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
}
private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
try {
//encode the PiPacketOperation into a PacketOut
PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
//Build the request
StreamMessageRequest packetOutRequest = StreamMessageRequest
.newBuilder().setPacket(packetOut).build();
//Send the request
streamChannelManager.send(packetOutRequest);
} catch (P4InfoBrowser.NotFoundException e) {
log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
log.debug("Exception", e);
return false;
}
return true;
}
private void doPacketIn(PacketIn packetInMsg) {
// Retrieve the pipeconf for this client's device.
PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
if (pipeconfService == null) {
throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
}
final PiPipeconf pipeconf;
if (pipeconfService.ofDevice(deviceId).isPresent() &&
pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
} else {
log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
return;
}
// Decode packet message and post event.
PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
log.debug("Received packet in: {}", event);
controller.postEvent(event);
}
private void doArbitrationResponse(MasterArbitrationUpdate msg) {
// From the spec...
// - Election_id: The stream RPC with the highest election_id is the
// master. Switch populates with the highest election ID it
// has received from all connected controllers.
// - Status: Switch populates this with OK for the client that is the
// master, and with an error status for all other connected clients (at
// every mastership change).
if (!msg.hasElectionId() || !msg.hasStatus()) {
return;
}
final boolean isMaster =
msg.getStatus().getCode() == Status.OK.getCode().value();
log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
new ArbitrationResponse(deviceId, isMaster)));
isClientMaster.set(isMaster);
}
private List<PiCounterCell> doReadAllCounterCells(
List<PiCounterId> counterIds, PiPipeconf pipeconf) {
return doReadCounterEntities(
CounterEntryCodec.readAllCellsEntities(counterIds, pipeconf),
pipeconf);
}
private List<PiCounterCell> doReadCounterCells(
List<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
return doReadCounterEntities(
CounterEntryCodec.encodePiCounterCellIds(cellIds, pipeconf),
pipeconf);
}
private List<PiCounterCell> doReadCounterEntities(
List<Entity> counterEntities, PiPipeconf pipeconf) {
if (counterEntities.size() == 0) {
return Collections.emptyList();
}
final ReadRequest request = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId)
.addAllEntities(counterEntities)
.build();
final Iterable<ReadResponse> responses;
try {
responses = () -> blockingStub.read(request);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
return Collections.emptyList();
}
List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.collect(Collectors.toList());
return CounterEntryCodec.decodeCounterEntities(entities, pipeconf);
}
private boolean doWriteActionGroupMembers(List<PiActionGroupMember> members,
WriteOperationType opType, PiPipeconf pipeconf) {
final List<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
for (PiActionGroupMember member : members) {
try {
actionProfileMembers.add(ActionProfileMemberEncoder.encode(member, pipeconf));
} catch (EncodeException | P4InfoBrowser.NotFoundException e) {
log.warn("Unable to encode group member, aborting {} operation: {} [{}]",
opType.name(), e.getMessage(), member.toString());
return false;
}
}
final List<Update> updateMsgs = actionProfileMembers.stream()
.map(actionProfileMember ->
Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileMember(actionProfileMember)
.build())
.setType(UPDATE_TYPES.get(opType))
.build())
.collect(Collectors.toList());
if (updateMsgs.size() == 0) {
// Nothing to update.
return true;
}
return write(updateMsgs, members, opType, "group member");
}
private List<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
piActionProfileId.id(), deviceId, pipeconf.id());
final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
if (browser == null) {
log.warn("Unable to get a P4Info browser for pipeconf {}, aborting dump action profile", pipeconf);
return Collections.emptyList();
}
final int actionProfileId;
try {
actionProfileId = browser
.actionProfiles()
.getByName(piActionProfileId.id())
.getPreamble()
.getId();
} catch (P4InfoBrowser.NotFoundException e) {
log.warn("Unable to dump groups: {}", e.getMessage());
return Collections.emptyList();
}
// Prepare read request to read all groups from the given action profile.
final ReadRequest groupRequestMsg = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId)
.addEntities(Entity.newBuilder()
.setActionProfileGroup(
ActionProfileGroup.newBuilder()
.setActionProfileId(actionProfileId)
.build())
.build())
.build();
// Read groups.
final Iterator<ReadResponse> groupResponses;
try {
groupResponses = blockingStub.read(groupRequestMsg);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
return Collections.emptyList();
}
final List<ActionProfileGroup> groupMsgs = Tools.stream(() -> groupResponses)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
.map(Entity::getActionProfileGroup)
.collect(Collectors.toList());
log.debug("Retrieved {} groups from action profile {} on {}...",
groupMsgs.size(), piActionProfileId.id(), deviceId);
// Returned groups contain only a minimal description of their members.
// We need to issue a new request to get the full description of each member.
// Keep a map of all member IDs for each group ID, will need it later.
final Multimap<Integer, Integer> groupIdToMemberIdsMap = HashMultimap.create();
groupMsgs.forEach(g -> groupIdToMemberIdsMap.putAll(
g.getGroupId(),
g.getMembersList().stream()
.map(ActionProfileGroup.Member::getMemberId)
.collect(Collectors.toList())));
// Prepare one big read request to read all members in one shot.
final Set<Entity> entityMsgs = groupMsgs.stream()
.flatMap(g -> g.getMembersList().stream())
.map(ActionProfileGroup.Member::getMemberId)
// Prevent issuing many read requests for the same member.
.distinct()
.map(id -> ActionProfileMember.newBuilder()
.setActionProfileId(actionProfileId)
.setMemberId(id)
.build())
.map(m -> Entity.newBuilder()
.setActionProfileMember(m)
.build())
.collect(Collectors.toSet());
final ReadRequest memberRequestMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
.addAllEntities(entityMsgs)
.build();
// Read members.
final Iterator<ReadResponse> memberResponses;
try {
memberResponses = blockingStub.read(memberRequestMsg);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to read members of action profile {} from {}: {}",
piActionProfileId, deviceId, e.getMessage());
return Collections.emptyList();
}
final Multimap<Integer, ActionProfileMember> groupIdToMembersMap = HashMultimap.create();
Tools.stream(() -> memberResponses)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
.map(Entity::getActionProfileMember)
.forEach(member -> groupIdToMemberIdsMap.asMap()
// Get all group IDs that contain this member.
.entrySet()
.stream()
.filter(entry -> entry.getValue().contains(member.getMemberId()))
.map(Map.Entry::getKey)
.forEach(gid -> groupIdToMembersMap.put(gid, member)));
log.debug("Retrieved {} group members from action profile {} on {}...",
groupIdToMembersMap.size(), piActionProfileId.id(), deviceId);
return groupMsgs.stream()
.map(groupMsg -> {
try {
return ActionProfileGroupEncoder.decode(groupMsg,
groupIdToMembersMap.get(groupMsg.getGroupId()),
pipeconf);
} catch (P4InfoBrowser.NotFoundException | EncodeException e) {
log.warn("Unable to decode group: {}\n {}", e.getMessage(), groupMsg);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private List<PiActionGroupMemberId> doDumpActionProfileMemberIds(
PiActionProfileId actionProfileId, PiPipeconf pipeconf) {
final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
if (browser == null) {
log.warn("Unable to get a P4Info browser for pipeconf {}, " +
"aborting cleanup of action profile members",
pipeconf);
return Collections.emptyList();
}
final int p4ActProfId;
try {
p4ActProfId = browser
.actionProfiles()
.getByName(actionProfileId.id())
.getPreamble()
.getId();
} catch (P4InfoBrowser.NotFoundException e) {
log.warn("Unable to cleanup action profile members: {}", e.getMessage());
return Collections.emptyList();
}
final ReadRequest memberRequestMsg = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId)
.addEntities(Entity.newBuilder().setActionProfileMember(
ActionProfileMember.newBuilder()
.setActionProfileId(p4ActProfId)
.build()).build())
.build();
// Read members.
final Iterator<ReadResponse> memberResponses;
try {
memberResponses = blockingStub.read(memberRequestMsg);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to read members of action profile {} from {}: {}",
actionProfileId, deviceId, e.getMessage());
return Collections.emptyList();
}
return Tools.stream(() -> memberResponses)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
.map(Entity::getActionProfileMember)
// Perhaps not needed, but better to double check to avoid
// removing members of other groups.
.filter(m -> m.getActionProfileId() == p4ActProfId)
.map(ActionProfileMember::getMemberId)
.map(PiActionGroupMemberId::of)
.collect(Collectors.toList());
}
private List<PiActionGroupMemberId> doRemoveActionProfileMembers(
PiActionProfileId actionProfileId,
List<PiActionGroupMemberId> memberIds,
PiPipeconf pipeconf) {
if (memberIds.isEmpty()) {
return Collections.emptyList();
}
final P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
if (browser == null) {
log.warn("Unable to get a P4Info browser for pipeconf {}, " +
"aborting cleanup of action profile members",
pipeconf);
return Collections.emptyList();
}
final int p4ActProfId;
try {
p4ActProfId = browser.actionProfiles()
.getByName(actionProfileId.id()).getPreamble().getId();
} catch (P4InfoBrowser.NotFoundException e) {
log.warn("Unable to cleanup action profile members: {}", e.getMessage());
return Collections.emptyList();
}
final List<Update> updateMsgs = memberIds.stream()
.map(m -> ActionProfileMember.newBuilder()
.setActionProfileId(p4ActProfId)
.setMemberId(m.id()).build())
.map(m -> Entity.newBuilder().setActionProfileMember(m).build())
.map(e -> Update.newBuilder().setEntity(e)
.setType(Update.Type.DELETE).build())
.collect(Collectors.toList());
log.debug("Removing {} members of action profile '{}'...",
memberIds.size(), actionProfileId);
return writeAndReturnSuccessEntities(
updateMsgs, memberIds, WriteOperationType.DELETE,
"action profile members");
}
private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
final ActionProfileGroup actionProfileGroup;
try {
actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
} catch (EncodeException | P4InfoBrowser.NotFoundException e) {
log.warn("Unable to encode group, aborting {} operation: {}", e.getMessage(), opType.name());
return false;
}
final Update updateMsg = Update.newBuilder()
.setEntity(Entity.newBuilder()
.setActionProfileGroup(actionProfileGroup)
.build())
.setType(UPDATE_TYPES.get(opType))
.build();
return write(singletonList(updateMsg), singletonList(group),
opType, "group");
}
private List<PiMeterCellConfig> doReadAllMeterCells(
List<PiMeterId> meterIds, PiPipeconf pipeconf) {
return doReadMeterEntities(MeterEntryCodec.readAllCellsEntities(
meterIds, pipeconf), pipeconf);
}
private List<PiMeterCellConfig> doReadMeterCells(
List<PiMeterCellId> cellIds, PiPipeconf pipeconf) {
final List<PiMeterCellConfig> piMeterCellConfigs = cellIds.stream()
.map(cellId -> PiMeterCellConfig.builder()
.withMeterCellId(cellId)
.build())
.collect(Collectors.toList());
return doReadMeterEntities(MeterEntryCodec.encodePiMeterCellConfigs(
piMeterCellConfigs, pipeconf), pipeconf);
}
private List<PiMeterCellConfig> doReadMeterEntities(
List<Entity> entitiesToRead, PiPipeconf pipeconf) {
if (entitiesToRead.size() == 0) {
return Collections.emptyList();
}
final ReadRequest request = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId)
.addAllEntities(entitiesToRead)
.build();
final Iterable<ReadResponse> responses;
try {
responses = () -> blockingStub.read(request);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to read meter cells: {}", e.getMessage());
log.debug("exception", e);
return Collections.emptyList();
}
List<Entity> responseEntities = StreamSupport
.stream(responses.spliterator(), false)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.collect(Collectors.toList());
return MeterEntryCodec.decodeMeterEntities(responseEntities, pipeconf);
}
private boolean doWriteMeterCells(List<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf) {
List<Update> updateMsgs = MeterEntryCodec.encodePiMeterCellConfigs(cellConfigs, pipeconf)
.stream()
.map(meterEntryMsg ->
Update.newBuilder()
.setEntity(meterEntryMsg)
.setType(UPDATE_TYPES.get(WriteOperationType.MODIFY))
.build())
.collect(Collectors.toList());
if (updateMsgs.size() == 0) {
return true;
}
return write(updateMsgs, cellConfigs, WriteOperationType.MODIFY, "meter cell config");
}
private boolean doWriteMulticastGroupEntries(
List<PiMulticastGroupEntry> entries,
WriteOperationType opType) {
final List<Update> updateMsgs = entries.stream()
.map(piEntry -> {
try {
return MulticastGroupEntryCodec.encode(piEntry);
} catch (EncodeException e) {
log.warn("Unable to encode PiMulticastGroupEntry: {}", e.getMessage());
return null;
}
})
.filter(Objects::nonNull)
.map(mcMsg -> PacketReplicationEngineEntry.newBuilder()
.setMulticastGroupEntry(mcMsg)
.build())
.map(preMsg -> Entity.newBuilder()
.setPacketReplicationEngineEntry(preMsg)
.build())
.map(entityMsg -> Update.newBuilder()
.setEntity(entityMsg)
.setType(UPDATE_TYPES.get(opType))
.build())
.collect(Collectors.toList());
return write(updateMsgs, entries, opType, "multicast group entry");
}
private List<PiMulticastGroupEntry> doReadAllMulticastGroupEntries() {
final Entity entity = Entity.newBuilder()
.setPacketReplicationEngineEntry(
PacketReplicationEngineEntry.newBuilder()
.setMulticastGroupEntry(
MulticastGroupEntry.newBuilder()
.build())
.build())
.build();
final ReadRequest req = ReadRequest.newBuilder()
.setDeviceId(p4DeviceId)
.addEntities(entity)
.build();
Iterator<ReadResponse> responses;
try {
responses = blockingStub.read(req);
} catch (StatusRuntimeException e) {
checkGrpcException(e);
log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
return Collections.emptyList();
}
Iterable<ReadResponse> responseIterable = () -> responses;
final List<PiMulticastGroupEntry> mcEntries = StreamSupport
.stream(responseIterable.spliterator(), false)
.map(ReadResponse::getEntitiesList)
.flatMap(List::stream)
.filter(e -> e.getEntityCase()
.equals(PACKET_REPLICATION_ENGINE_ENTRY))
.map(Entity::getPacketReplicationEngineEntry)
.filter(e -> e.getTypeCase().equals(MULTICAST_GROUP_ENTRY))
.map(PacketReplicationEngineEntry::getMulticastGroupEntry)
.map(MulticastGroupEntryCodec::decode)
.collect(Collectors.toList());
log.debug("Retrieved {} multicast group entries from {}...",
mcEntries.size(), deviceId);
return mcEntries;
}
private <T> boolean write(List<Update> updates,
List<T> writeEntities,
WriteOperationType opType,
String entryType) {
// True if all entities were successfully written.
return writeAndReturnSuccessEntities(updates, writeEntities, opType, entryType)
.size() == writeEntities.size();
}
private <T> List<T> writeAndReturnSuccessEntities(
List<Update> updates, List<T> writeEntities,
WriteOperationType opType, String entryType) {
if (updates.isEmpty()) {
return Collections.emptyList();
}
if (updates.size() != writeEntities.size()) {
log.error("Cannot perform {} operation, provided {} " +
"update messages for {} {} - BUG?",
opType, updates.size(), writeEntities.size(), entryType);
return Collections.emptyList();
}
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.write(writeRequest(updates));
return writeEntities;
} catch (StatusRuntimeException e) {
return checkAndLogWriteErrors(writeEntities, e, opType, entryType);
}
}
private WriteRequest writeRequest(Iterable<Update> updateMsgs) {
return WriteRequest.newBuilder()
.setDeviceId(p4DeviceId)
.setElectionId(clientElectionId)
.addAllUpdates(updateMsgs)
.build();
}
private Void doShutdown() {
log.debug("Shutting down client for {}...", deviceId);
streamChannelManager.complete();
cancellableContext.cancel(new InterruptedException(
"Requested client shutdown"));
this.executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Executor service didn't shutdown in time.");
Thread.currentThread().interrupt();
}
return null;
}
// Returns the collection of succesfully write entities.
private <T> List<T> checkAndLogWriteErrors(
List<T> writeEntities, StatusRuntimeException ex,
WriteOperationType opType, String entryType) {
checkGrpcException(ex);
final List<P4RuntimeOuterClass.Error> errors = extractWriteErrorDetails(ex);
if (errors.isEmpty()) {
final String description = ex.getStatus().getDescription();
log.warn("Unable to {} {} {}(s) on {}: {}",
opType.name(), writeEntities.size(), entryType, deviceId,
ex.getStatus().getCode().name(),
description == null ? "" : " - " + description);
return Collections.emptyList();
}
if (errors.size() == writeEntities.size()) {
List<T> okEntities = Lists.newArrayList();
Iterator<T> entityIterator = writeEntities.iterator();
for (P4RuntimeOuterClass.Error error : errors) {
T entity = entityIterator.next();
if (error.getCanonicalCode() != Status.OK.getCode().value()) {
log.warn("Unable to {} {} on {}: {} [{}]",
opType.name(), entryType, deviceId,
parseP4Error(error), entity.toString());
} else {
okEntities.add(entity);
}
}
return okEntities;
} else {
log.warn("Unable to reconcile error details to updates " +
"(sent {} updates, but device returned {} errors)",
entryType, writeEntities.size(), errors.size());
errors.stream()
.filter(err -> err.getCanonicalCode() != Status.OK.getCode().value())
.forEach(err -> log.warn("Unable to {} {} (unknown): {}",
opType.name(), entryType, parseP4Error(err)));
return Collections.emptyList();
}
}
private List<P4RuntimeOuterClass.Error> extractWriteErrorDetails(
StatusRuntimeException ex) {
if (!ex.getTrailers().containsKey(STATUS_DETAILS_KEY)) {
return Collections.emptyList();
}
com.google.rpc.Status status = ex.getTrailers().get(STATUS_DETAILS_KEY);
if (status == null) {
return Collections.emptyList();
}
return status.getDetailsList().stream()
.map(any -> {
try {
return any.unpack(P4RuntimeOuterClass.Error.class);
} catch (InvalidProtocolBufferException e) {
log.warn("Unable to unpack P4Runtime Error: {}",
any.toString());
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private String parseP4Error(P4RuntimeOuterClass.Error err) {
return format("%s %s%s (%s:%d)",
Status.fromCodeValue(err.getCanonicalCode()).getCode(),
err.getMessage(),
err.hasDetails() ? ", " + err.getDetails().toString() : "",
err.getSpace(),
err.getCode());
}
private void checkGrpcException(StatusRuntimeException ex) {
switch (ex.getStatus().getCode()) {
case OK:
break;
case CANCELLED:
break;
case UNKNOWN:
break;
case INVALID_ARGUMENT:
break;
case DEADLINE_EXCEEDED:
break;
case NOT_FOUND:
break;
case ALREADY_EXISTS:
break;
case PERMISSION_DENIED:
// Notify upper layers that this node is not master.
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.PERMISSION_DENIED,
new BaseP4RuntimeEventSubject(deviceId)));
break;
case RESOURCE_EXHAUSTED:
break;
case FAILED_PRECONDITION:
break;
case ABORTED:
break;
case OUT_OF_RANGE:
break;
case UNIMPLEMENTED:
break;
case INTERNAL:
break;
case UNAVAILABLE:
// Channel might be closed.
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.CHANNEL_EVENT,
new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
break;
case DATA_LOSS:
break;
case UNAUTHENTICATED:
break;
default:
break;
}
}
private Uint128 bigIntegerToUint128(BigInteger value) {
final byte[] arr = value.toByteArray();
final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
.put(new byte[Long.BYTES * 2 - arr.length])
.put(arr);
bb.rewind();
return Uint128.newBuilder()
.setHigh(bb.getLong())
.setLow(bb.getLong())
.build();
}
private BigInteger uint128ToBigInteger(Uint128 value) {
return new BigInteger(
ByteBuffer.allocate(Long.BYTES * 2)
.putLong(value.getHigh())
.putLong(value.getLow())
.array());
}
/**
* A manager for the P4Runtime stream channel that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* channel events via the P4Runtime controller.
*/
private final class StreamChannelManager {
private final ManagedChannel channel;
private final AtomicBoolean open;
private final StreamObserver<StreamMessageResponse> responseObserver;
private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
private StreamChannelManager(ManagedChannel channel) {
this.channel = channel;
this.responseObserver = new InternalStreamResponseObserver(this);
this.open = new AtomicBoolean(false);
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Creating new stream channel for {}...", deviceId);
requestObserver =
(ClientCallStreamObserver<StreamMessageRequest>)
P4RuntimeGrpc.newStub(channel)
.streamChannel(responseObserver);
open.set(false);
}
}
public boolean send(StreamMessageRequest value) {
synchronized (this) {
initIfRequired();
try {
requestObserver.onNext(value);
// FIXME
// signalOpen();
return true;
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send {} to {}: {}",
value.getUpdateCase().toString(), deviceId, ex.getMessage());
} else {
log.warn(format(
"Exception while sending %s to %s",
value.getUpdateCase().toString(), deviceId), ex);
}
complete();
return false;
}
}
}
public void complete() {
synchronized (this) {
signalClosed();
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
void signalOpen() {
synchronized (this) {
final boolean wasOpen = open.getAndSet(true);
if (!wasOpen) {
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.CHANNEL_EVENT,
new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
}
}
}
void signalClosed() {
synchronized (this) {
final boolean wasOpen = open.getAndSet(false);
if (wasOpen) {
controller.postEvent(new P4RuntimeEvent(
P4RuntimeEvent.Type.CHANNEL_EVENT,
new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
}
}
}
public boolean isOpen() {
return open.get();
}
}
/**
* Handles messages received from the device on the stream channel.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<StreamMessageResponse> {
private final StreamChannelManager streamChannelManager;
private InternalStreamResponseObserver(
StreamChannelManager streamChannelManager) {
this.streamChannelManager = streamChannelManager;
}
@Override
public void onNext(StreamMessageResponse message) {
streamChannelManager.signalOpen();
executorService.submit(() -> doNext(message));
}
private void doNext(StreamMessageResponse message) {
try {
log.debug("Received message on stream channel from {}: {}",
deviceId, message.getUpdateCase());
switch (message.getUpdateCase()) {
case PACKET:
doPacketIn(message.getPacket());
return;
case ARBITRATION:
doArbitrationResponse(message.getArbitration());
return;
default:
log.warn("Unrecognized stream message from {}: {}",
deviceId, message.getUpdateCase());
}
} catch (Throwable ex) {
log.error("Exception while processing stream message from {}",
deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("Device {} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Received error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.warn(format("Received exception on stream channel for %s",
deviceId), throwable);
}
streamChannelManager.complete();
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
streamChannelManager.complete();
}
}
}