Remove unused Netty messaging service
Change-Id: Ib56561295215ad2b7e775716a7b651bbde525559
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
deleted file mode 100644
index f59a3ed..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-/**
- * State transitions a decoder goes through as it is decoding an incoming message.
- */
-public enum DecoderState {
- READ_TYPE,
- READ_PREAMBLE,
- READ_LOGICAL_TIME,
- READ_LOGICAL_COUNTER,
- READ_MESSAGE_ID,
- READ_SENDER_IP_VERSION,
- READ_SENDER_IP,
- READ_SENDER_PORT,
- READ_SUBJECT_LENGTH,
- READ_SUBJECT,
- READ_STATUS,
- READ_CONTENT_LENGTH,
- READ_CONTENT
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
deleted file mode 100644
index 14489d5..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.onosproject.core.HybridLogicalTime;
-
-/**
- * Base class for internal messages.
- */
-public abstract class InternalMessage {
-
- /**
- * Internal message type.
- */
- public enum Type {
- REQUEST(1),
- REPLY(2);
-
- private final int id;
-
- Type(int id) {
- this.id = id;
- }
-
- /**
- * Returns the unique message type ID.
- *
- * @return the unique message type ID.
- */
- public int id() {
- return id;
- }
-
- /**
- * Returns the message type enum associated with the given ID.
- *
- * @param id the type ID.
- * @return the type enum for the given ID.
- */
- public static Type forId(int id) {
- switch (id) {
- case 1:
- return REQUEST;
- case 2:
- return REPLY;
- default:
- throw new IllegalArgumentException("Unknown status ID " + id);
- }
- }
- }
-
- private final int preamble;
- private final HybridLogicalTime time;
- private final long id;
- private final byte[] payload;
-
- protected InternalMessage(int preamble,
- HybridLogicalTime time,
- long id,
- byte[] payload) {
- this.preamble = preamble;
- this.time = time;
- this.id = id;
- this.payload = payload;
- }
-
- public abstract Type type();
-
- public boolean isRequest() {
- return type() == Type.REQUEST;
- }
-
- public boolean isReply() {
- return type() == Type.REPLY;
- }
-
- public HybridLogicalTime time() {
- return time;
- }
-
- public int preamble() {
- return preamble;
- }
-
- public long id() {
- return id;
- }
-
- public byte[] payload() {
- return payload;
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
deleted file mode 100644
index fc5706b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.MoreObjects;
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.core.HybridLogicalTime;
-
-/**
- * Internal reply message.
- */
-public final class InternalReply extends InternalMessage {
-
- /**
- * Message status.
- */
- public enum Status {
-
- // NOTE: For backwards compatibility enum constant IDs should not be changed.
-
- /**
- * All ok.
- */
- OK(0),
-
- /**
- * Response status signifying no registered handler.
- */
- ERROR_NO_HANDLER(1),
-
- /**
- * Response status signifying an exception handling the message.
- */
- ERROR_HANDLER_EXCEPTION(2),
-
- /**
- * Response status signifying invalid message structure.
- */
- PROTOCOL_EXCEPTION(3);
-
- private final int id;
-
- Status(int id) {
- this.id = id;
- }
-
- /**
- * Returns the unique status ID.
- *
- * @return the unique status ID.
- */
- public int id() {
- return id;
- }
-
- /**
- * Returns the status enum associated with the given ID.
- *
- * @param id the status ID.
- * @return the status enum for the given ID.
- */
- public static Status forId(int id) {
- switch (id) {
- case 0:
- return OK;
- case 1:
- return ERROR_NO_HANDLER;
- case 2:
- return ERROR_HANDLER_EXCEPTION;
- case 3:
- return PROTOCOL_EXCEPTION;
- default:
- throw new IllegalArgumentException("Unknown status ID " + id);
- }
- }
- }
-
- private final Status status;
-
- public InternalReply(int preamble,
- HybridLogicalTime time,
- long id,
- Status status) {
- this(preamble, time, id, new byte[0], status);
- }
-
- public InternalReply(int preamble,
- HybridLogicalTime time,
- long id,
- byte[] payload,
- Status status) {
- super(preamble, time, id, payload);
- this.status = status;
- }
-
- @Override
- public Type type() {
- return Type.REPLY;
- }
-
- public Status status() {
- return status;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("time", time())
- .add("id", id())
- .add("status", status())
- .add("payload", ByteArraySizeHashPrinter.of(payload()))
- .toString();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
deleted file mode 100644
index a432fcb..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-/**
- * Internal request message.
- */
-public final class InternalRequest extends InternalMessage {
- private final Endpoint sender;
- private final String subject;
-
- public InternalRequest(int preamble,
- HybridLogicalTime time,
- long id,
- Endpoint sender,
- String subject,
- byte[] payload) {
- super(preamble, time, id, payload);
- this.sender = sender;
- this.subject = subject;
- }
-
- @Override
- public Type type() {
- return Type.REQUEST;
- }
-
- public String subject() {
- return subject;
- }
-
- public Endpoint sender() {
- return sender;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("time", time())
- .add("id", id())
- .add("subject", subject)
- .add("sender", sender)
- .add("payload", ByteArraySizeHashPrinter.of(payload()))
- .toString();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
deleted file mode 100644
index 4b9ef6c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.base.Charsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ReplayingDecoder;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Decoder for inbound messages.
- */
-public class MessageDecoder extends ReplayingDecoder<DecoderState> {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private Version ipVersion;
- private IpAddress senderIp;
- private int senderPort;
-
- private InternalMessage.Type type;
- private int preamble;
- private long logicalTime;
- private long logicalCounter;
- private long messageId;
- private int contentLength;
- private byte[] content;
- private int subjectLength;
- private String subject;
- private InternalReply.Status status;
-
- public MessageDecoder() {
- super(DecoderState.READ_SENDER_IP_VERSION);
- }
-
- @Override
- @SuppressWarnings("squid:S128") // suppress switch fall through warning
- protected void decode(
- ChannelHandlerContext context,
- ByteBuf buffer,
- List<Object> out) throws Exception {
-
- switch (state()) {
- case READ_SENDER_IP_VERSION:
- ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
- checkpoint(DecoderState.READ_SENDER_IP);
- // FALLTHROUGH
- case READ_SENDER_IP:
- byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
- buffer.readBytes(octets);
- senderIp = IpAddress.valueOf(ipVersion, octets);
- checkpoint(DecoderState.READ_SENDER_PORT);
- // FALLTHROUGH
- case READ_SENDER_PORT:
- senderPort = buffer.readInt();
- checkpoint(DecoderState.READ_TYPE);
- // FALLTHROUGH
- case READ_TYPE:
- type = InternalMessage.Type.forId(buffer.readByte());
- checkpoint(DecoderState.READ_PREAMBLE);
- // FALLTHROUGH
- case READ_PREAMBLE:
- preamble = buffer.readInt();
- checkpoint(DecoderState.READ_LOGICAL_TIME);
- // FALLTHROUGH
- case READ_LOGICAL_TIME:
- logicalTime = buffer.readLong();
- checkpoint(DecoderState.READ_LOGICAL_COUNTER);
- // FALLTHROUGH
- case READ_LOGICAL_COUNTER:
- logicalCounter = buffer.readLong();
- checkpoint(DecoderState.READ_MESSAGE_ID);
- // FALLTHROUGH
- case READ_MESSAGE_ID:
- messageId = buffer.readLong();
- checkpoint(DecoderState.READ_CONTENT_LENGTH);
- // FALLTHROUGH
- case READ_CONTENT_LENGTH:
- contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_CONTENT);
- // FALLTHROUGH
- case READ_CONTENT:
- if (contentLength > 0) {
- //TODO Perform a sanity check on the size before allocating
- content = new byte[contentLength];
- buffer.readBytes(content);
- } else {
- content = new byte[0];
- }
-
- switch (type) {
- case REQUEST:
- checkpoint(DecoderState.READ_SUBJECT_LENGTH);
- break;
- case REPLY:
- checkpoint(DecoderState.READ_STATUS);
- break;
- default:
- checkState(false, "Must not be here");
- }
- break;
- default:
- break;
- }
-
- switch (type) {
- case REQUEST:
- switch (state()) {
- case READ_SUBJECT_LENGTH:
- subjectLength = buffer.readShort();
- checkpoint(DecoderState.READ_SUBJECT);
- // FALLTHROUGH
- case READ_SUBJECT:
- byte[] messageTypeBytes = new byte[subjectLength];
- buffer.readBytes(messageTypeBytes);
- subject = new String(messageTypeBytes, Charsets.UTF_8);
- InternalRequest message = new InternalRequest(preamble,
- new HybridLogicalTime(logicalTime, logicalCounter),
- messageId,
- new Endpoint(senderIp, senderPort),
- subject,
- content);
- out.add(message);
- checkpoint(DecoderState.READ_TYPE);
- break;
- default:
- break;
- }
- break;
- case REPLY:
- switch (state()) {
- case READ_STATUS:
- status = InternalReply.Status.forId(buffer.readByte());
- InternalReply message = new InternalReply(preamble,
- new HybridLogicalTime(logicalTime, logicalCounter),
- messageId,
- content,
- status);
- out.add(message);
- checkpoint(DecoderState.READ_TYPE);
- break;
- default:
- break;
- }
- break;
- default:
- checkState(false, "Must not be here");
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
- context.close();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
deleted file mode 100644
index 2542088..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import java.io.IOException;
-
-import com.google.common.base.Charsets;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Encode InternalMessage out into a byte buffer.
- */
-public class MessageEncoder extends MessageToByteEncoder<Object> {
-// Effectively MessageToByteEncoder<InternalMessage>,
-// had to specify <Object> to avoid Class Loader not being able to find some classes.
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final Endpoint endpoint;
- private final int preamble;
- private boolean endpointWritten;
-
- public MessageEncoder(Endpoint endpoint, int preamble) {
- super();
- this.endpoint = endpoint;
- this.preamble = preamble;
- }
-
- @Override
- protected void encode(
- ChannelHandlerContext context,
- Object rawMessage,
- ByteBuf out) throws Exception {
- if (rawMessage instanceof InternalRequest) {
- encodeRequest((InternalRequest) rawMessage, out);
- } else if (rawMessage instanceof InternalReply) {
- encodeReply((InternalReply) rawMessage, out);
- }
- }
-
- private void encodeMessage(InternalMessage message, ByteBuf out) {
- // If the endpoint hasn't been written to the channel, write it.
- if (!endpointWritten) {
- IpAddress senderIp = endpoint.host();
- if (senderIp.version() == Version.INET) {
- out.writeByte(0);
- } else {
- out.writeByte(1);
- }
- out.writeBytes(senderIp.toOctets());
-
- // write sender port
- out.writeInt(endpoint.port());
-
- endpointWritten = true;
- }
-
- out.writeByte(message.type().id());
- out.writeInt(this.preamble);
-
- // write time
- out.writeLong(message.time().logicalTime());
- out.writeLong(message.time().logicalCounter());
-
- // write message id
- out.writeLong(message.id());
-
- byte[] payload = message.payload();
-
- // write payload length
- out.writeInt(payload.length);
-
- // write payload.
- out.writeBytes(payload);
- }
-
- private void encodeRequest(InternalRequest request, ByteBuf out) {
- encodeMessage(request, out);
-
- byte[] messageTypeBytes = request.subject().getBytes(Charsets.UTF_8);
-
- // write length of message type
- out.writeShort(messageTypeBytes.length);
-
- // write message type bytes
- out.writeBytes(messageTypeBytes);
-
- }
-
- private void encodeReply(InternalReply reply, ByteBuf out) {
- encodeMessage(reply, out);
-
- // write message status value
- out.writeByte(reply.status().id());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- if (cause instanceof IOException) {
- log.debug("IOException inside channel handling pipeline.", cause);
- } else {
- log.error("non-IOException inside channel handling pipeline.", cause);
- }
- context.close();
- }
-
- // Effectively same result as one generated by MessageToByteEncoder<InternalMessage>
- @Override
- public final boolean acceptOutboundMessage(Object msg) throws Exception {
- return msg instanceof InternalMessage;
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
deleted file mode 100644
index c98c72c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ /dev/null
@@ -1,1086 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.ConnectException;
-import java.security.KeyStore;
-import java.security.MessageDigest;
-import java.security.PublicKey;
-import java.security.cert.Certificate;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
-
-/**
- * Netty based MessagingService.
- */
-@Component(enabled = false)
-@Service
-public class NettyMessagingManager implements MessagingService {
- private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
- private static final long TIMEOUT_INTERVAL = 50;
- private static final int WINDOW_SIZE = 60;
- private static final int WINDOW_UPDATE_SAMPLE_SIZE = 100;
- private static final long WINDOW_UPDATE_MILLIS = 10000;
- private static final int MIN_SAMPLES = 25;
- private static final int MIN_STANDARD_DEVIATION = 100;
- private static final int PHI_FAILURE_THRESHOLD = 12;
- private static final long MIN_TIMEOUT_MILLIS = 100;
- private static final long MAX_TIMEOUT_MILLIS = 5000;
- private static final int CHANNEL_POOL_SIZE = 8;
-
- private static final byte[] EMPTY_PAYLOAD = new byte[0];
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final LocalClientConnection localClientConnection = new LocalClientConnection();
- private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
-
- //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
- private static final String CONFIG_DIR = "../config";
- private static final String KS_FILE_NAME = "onos.jks";
- private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
- private static final String DEFAULT_KS_PASSWORD = "changeit";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected HybridLogicalClockService clockService;
-
- private Endpoint localEndpoint;
- private int preamble;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
- private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
- private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
-
- private ScheduledFuture<?> timeoutFuture;
-
- private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
-
- private EventLoopGroup serverGroup;
- private EventLoopGroup clientGroup;
- private Class<? extends ServerChannel> serverChannelClass;
- private Class<? extends Channel> clientChannelClass;
- private ScheduledExecutorService timeoutExecutor;
-
- protected static final boolean TLS_ENABLED = true;
- protected static final boolean TLS_DISABLED = false;
- protected boolean enableNettyTls = TLS_ENABLED;
-
- protected TrustManagerFactory trustManager;
- protected KeyManagerFactory keyManager;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Activate
- public void activate() throws InterruptedException {
- ControllerNode localNode = clusterMetadataService.getLocalNode();
- getTlsParameters();
-
- if (started.get()) {
- log.warn("Already running at local endpoint: {}", localEndpoint);
- return;
- }
- this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
- this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
- initEventLoopGroup();
- startAcceptingConnections();
- timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("NettyMessagingEvt", "timeout", log));
- timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
- this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
- started.set(true);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- if (started.get()) {
- serverGroup.shutdownGracefully();
- clientGroup.shutdownGracefully();
- timeoutFuture.cancel(false);
- timeoutExecutor.shutdown();
- started.set(false);
- }
- log.info("Stopped");
- }
-
- private void getTlsParameters() {
- // default is TLS enabled unless key stores cannot be loaded
- enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
-
- if (enableNettyTls) {
- enableNettyTls = loadKeyStores();
- }
- }
-
- private boolean loadKeyStores() {
- // Maintain a local copy of the trust and key managers in case anything goes wrong
- TrustManagerFactory tmf;
- KeyManagerFactory kmf;
- try {
- String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
- String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
- char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
- char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
-
- tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmf.init(ts);
-
- kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
- if (log.isInfoEnabled()) {
- logKeyStore(ks, ksLocation, ksPwd);
- }
- } catch (FileNotFoundException e) {
- log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
- return TLS_DISABLED;
- } catch (Exception e) {
- //TODO we might want to catch exceptions more specifically
- log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
- return TLS_DISABLED;
- }
- this.trustManager = tmf;
- this.keyManager = kmf;
- return TLS_ENABLED;
- }
-
- private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
- if (log.isInfoEnabled()) {
- log.info("Loaded cluster key store from: {}", ksLocation);
- try {
- for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
- String alias = e.nextElement();
- Certificate cert = ks.getCertificate(alias);
- if (cert == null) {
- log.info("No certificate for alias {}", alias);
- continue;
- }
- PublicKey key = cert.getPublicKey();
- // Compute the certificate's fingerprint (use the key if certificate cannot be found)
- MessageDigest digest = MessageDigest.getInstance("SHA1");
- digest.update(key.getEncoded());
- StringJoiner fingerprint = new StringJoiner(":");
- for (byte b : digest.digest()) {
- fingerprint.add(String.format("%02X", b));
- }
- log.info("{} -> {}", alias, fingerprint);
- }
- } catch (Exception e) {
- log.warn("Unable to print contents of key store: {}", ksLocation, e);
- }
- }
- }
-
- private void initEventLoopGroup() {
- // try Epoll first and if that does work, use nio.
- try {
- clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
- serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
- serverChannelClass = EpollServerSocketChannel.class;
- clientChannelClass = EpollSocketChannel.class;
- return;
- } catch (Throwable e) {
- log.debug("Failed to initialize native (epoll) transport. "
- + "Reason: {}. Proceeding with nio.", e.getMessage());
- }
- clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
- serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
- serverChannelClass = NioServerSocketChannel.class;
- clientChannelClass = NioSocketChannel.class;
- }
-
- /**
- * Times out response callbacks.
- */
- private void timeoutAllCallbacks() {
- // Iterate through all connections and time out callbacks.
- localClientConnection.timeoutCallbacks();
- for (RemoteClientConnection connection : clientConnections.values()) {
- connection.timeoutCallbacks();
- }
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- checkPermission(CLUSTER_WRITE);
- InternalRequest message = new InternalRequest(preamble,
- clockService.timeNow(),
- messageIdGenerator.incrementAndGet(),
- localEndpoint,
- type,
- payload);
- return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
- checkPermission(CLUSTER_WRITE);
- return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- long messageId = messageIdGenerator.incrementAndGet();
- InternalRequest message = new InternalRequest(preamble,
- clockService.timeNow(),
- messageId,
- localEndpoint,
- type,
- payload);
- return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
- }
-
- private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
- return channels.computeIfAbsent(endpoint, e -> {
- List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
- for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
- defaultList.add(null);
- }
- return Lists.newCopyOnWriteArrayList(defaultList);
- });
- }
-
- private int getChannelOffset(String messageType) {
- return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
- }
-
- private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
- List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
- int offset = getChannelOffset(messageType);
-
- CompletableFuture<Channel> channelFuture = channelPool.get(offset);
- if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
- synchronized (channelPool) {
- channelFuture = channelPool.get(offset);
- if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
- channelFuture = openChannel(endpoint);
- channelPool.set(offset, channelFuture);
- }
- }
- }
-
- CompletableFuture<Channel> future = new CompletableFuture<>();
- final CompletableFuture<Channel> finalFuture = channelFuture;
- finalFuture.whenComplete((channel, error) -> {
- if (error == null) {
- if (!channel.isActive()) {
- CompletableFuture<Channel> currentFuture;
- synchronized (channelPool) {
- currentFuture = channelPool.get(offset);
- if (currentFuture == finalFuture) {
- channelPool.set(offset, null);
- }
- }
-
- ClientConnection connection = clientConnections.remove(channel);
- if (connection != null) {
- connection.close();
- }
-
- if (currentFuture == finalFuture) {
- getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
- if (recursiveError == null) {
- future.complete(recursiveResult);
- } else {
- future.completeExceptionally(recursiveError);
- }
- });
- } else {
- currentFuture.whenComplete((recursiveResult, recursiveError) -> {
- if (recursiveError == null) {
- future.complete(recursiveResult);
- } else {
- future.completeExceptionally(recursiveError);
- }
- });
- }
- } else {
- future.complete(channel);
- }
- } else {
- future.completeExceptionally(error);
- }
- });
- return future;
- }
-
- private <T> CompletableFuture<T> executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor) {
- CompletableFuture<T> future = new CompletableFuture<T>();
- executeOnPooledConnection(endpoint, type, callback, executor, future);
- return future;
- }
-
- private <T> void executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor,
- CompletableFuture<T> future) {
- if (endpoint.equals(localEndpoint)) {
- callback.apply(localClientConnection).whenComplete((result, error) -> {
- if (error == null) {
- executor.execute(() -> future.complete(result));
- } else {
- executor.execute(() -> future.completeExceptionally(error));
- }
- });
- return;
- }
-
- getChannel(endpoint, type).whenComplete((channel, channelError) -> {
- if (channelError == null) {
- ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
- callback.apply(connection).whenComplete((result, sendError) -> {
- if (sendError == null) {
- executor.execute(() -> future.complete(result));
- } else {
- Throwable cause = Throwables.getRootCause(sendError);
- if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
- channel.close().addListener(f -> {
- connection.close();
- clientConnections.remove(channel);
- });
- }
- executor.execute(() -> future.completeExceptionally(sendError));
- }
- });
- } else {
- executor.execute(() -> future.completeExceptionally(channelError));
- }
- });
- }
-
- @Override
- public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> executor.execute(() ->
- handler.accept(message.sender(), message.payload())));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> executor.execute(() -> {
- byte[] responsePayload = null;
- InternalReply.Status status = InternalReply.Status.OK;
- try {
- responsePayload = handler.apply(message.sender(), message.payload());
- } catch (Exception e) {
- log.debug("An error occurred in a message handler: {}", e);
- status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
- }
- connection.reply(message, status, Optional.ofNullable(responsePayload));
- }));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
- checkPermission(CLUSTER_WRITE);
- handlers.put(type, (message, connection) -> {
- handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- InternalReply.Status status;
- if (error == null) {
- status = InternalReply.Status.OK;
- } else {
- log.debug("An error occurred in a message handler: {}", error);
- status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
- }
- connection.reply(message, status, Optional.ofNullable(result));
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- checkPermission(CLUSTER_WRITE);
- handlers.remove(type);
- }
-
- private Bootstrap bootstrapClient(Endpoint endpoint) {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
- bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
- bootstrap.group(clientGroup);
- bootstrap.channel(clientChannelClass);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
- if (enableNettyTls) {
- bootstrap.handler(new SslClientCommunicationChannelInitializer());
- } else {
- bootstrap.handler(new BasicChannelInitializer());
- }
- return bootstrap;
- }
-
- private void startAcceptingConnections() throws InterruptedException {
- ServerBootstrap b = new ServerBootstrap();
- b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(8 * 1024, 32 * 1024));
- b.option(ChannelOption.SO_RCVBUF, 1048576);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
- b.childOption(ChannelOption.TCP_NODELAY, true);
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.group(serverGroup, clientGroup);
- b.channel(serverChannelClass);
- if (enableNettyTls) {
- b.childHandler(new SslServerCommunicationChannelInitializer());
- } else {
- b.childHandler(new BasicChannelInitializer());
- }
- b.option(ChannelOption.SO_BACKLOG, 128);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
-
- // Bind and start to accept incoming connections.
- b.bind(localEndpoint.port()).sync().addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} accepting incoming connections on port {}",
- localEndpoint.host(), localEndpoint.port());
- } else {
- log.warn("{} failed to bind to port {} due to {}",
- localEndpoint.host(), localEndpoint.port(), future.cause());
- }
- });
- }
-
- private CompletableFuture<Channel> openChannel(Endpoint ep) {
- Bootstrap bootstrap = bootstrapClient(ep);
- CompletableFuture<Channel> retFuture = new CompletableFuture<>();
- ChannelFuture f = bootstrap.connect();
-
- f.addListener(future -> {
- if (future.isSuccess()) {
- retFuture.complete(f.channel());
- } else {
- retFuture.completeExceptionally(future.cause());
- }
- });
- log.debug("Established a new connection to {}", ep);
- return retFuture;
- }
-
- /**
- * Channel initializer for TLS servers.
- */
- private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- SSLContext serverContext = SSLContext.getInstance("TLS");
- serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
-
- SSLEngine serverSslEngine = serverContext.createSSLEngine();
-
- serverSslEngine.setNeedClientAuth(true);
- serverSslEngine.setUseClientMode(false);
- serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
- serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
- serverSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel initializer for TLS clients.
- */
- private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- SSLContext clientContext = SSLContext.getInstance("TLS");
- clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
-
- SSLEngine clientSslEngine = clientContext.createSSLEngine();
-
- clientSslEngine.setUseClientMode(true);
- clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
- clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
- clientSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel initializer for basic connections.
- */
- private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
- .addLast("decoder", new MessageDecoder())
- .addLast("handler", dispatcher);
- }
- }
-
- /**
- * Channel inbound handler that dispatches messages to the appropriate handler.
- */
- @ChannelHandler.Sharable
- private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
- // Effectively SimpleChannelInboundHandler<InternalMessage>,
- // had to specify <Object> to avoid Class Loader not being able to find some classes.
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
- InternalMessage message = (InternalMessage) rawMessage;
- try {
- if (message.isRequest()) {
- RemoteServerConnection connection =
- serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
- connection.dispatch((InternalRequest) message);
- } else {
- RemoteClientConnection connection =
- clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
- connection.dispatch((InternalReply) message);
- }
- } catch (RejectedExecutionException e) {
- log.warn("Unable to dispatch message due to {}", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
-
- RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
- if (clientConnection != null) {
- clientConnection.close();
- }
-
- RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
- if (serverConnection != null) {
- serverConnection.close();
- }
- context.close();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
- RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
- if (clientConnection != null) {
- clientConnection.close();
- }
-
- RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
- if (serverConnection != null) {
- serverConnection.close();
- }
- context.close();
- }
-
- /**
- * Returns true if the given message should be handled.
- *
- * @param msg inbound message
- * @return true if {@code msg} is {@link InternalMessage} instance.
- * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
- */
- @Override
- public final boolean acceptInboundMessage(Object msg) {
- return msg instanceof InternalMessage;
- }
- }
-
- /**
- * Wraps a {@link CompletableFuture} and tracks its type and creation time.
- */
- private final class Callback {
- private final String type;
- private final CompletableFuture<byte[]> future;
- private final long time = System.currentTimeMillis();
-
- Callback(String type, CompletableFuture<byte[]> future) {
- this.type = type;
- this.future = future;
- }
-
- public void complete(byte[] value) {
- future.complete(value);
- }
-
- public void completeExceptionally(Throwable error) {
- future.completeExceptionally(error);
- }
- }
-
- /**
- * Represents the client side of a connection to a local or remote server.
- */
- private interface ClientConnection {
-
- /**
- * Sends a message to the other side of the connection.
- *
- * @param message the message to send
- * @return a completable future to be completed once the message has been sent
- */
- CompletableFuture<Void> sendAsync(InternalRequest message);
-
- /**
- * Sends a message to the other side of the connection, awaiting a reply.
- *
- * @param message the message to send
- * @return a completable future to be completed once a reply is received or the request times out
- */
- CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
-
- /**
- * Closes the connection.
- */
- default void close() {
- }
- }
-
- /**
- * Represents the server side of a connection.
- */
- private interface ServerConnection {
-
- /**
- * Sends a reply to the other side of the connection.
- *
- * @param message the message to which to reply
- * @param status the reply status
- * @param payload the response payload
- */
- void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
-
- /**
- * Closes the connection.
- */
- default void close() {
- }
- }
-
- /**
- * Remote connection implementation.
- */
- private abstract class AbstractClientConnection implements ClientConnection {
- private final Map<Long, Callback> futures = Maps.newConcurrentMap();
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
- .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
- .build();
-
- /**
- * Times out callbacks for this connection.
- */
- void timeoutCallbacks() {
- // Store the current time.
- long currentTime = System.currentTimeMillis();
-
- // Iterate through future callbacks and time out callbacks that have been alive
- // longer than the current timeout according to the message type.
- Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
- while (iterator.hasNext()) {
- Callback callback = iterator.next().getValue();
- try {
- RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
- long elapsedTime = currentTime - callback.time;
- if (elapsedTime > MAX_TIMEOUT_MILLIS ||
- (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
- iterator.remove();
- requestMonitor.addReplyTime(elapsedTime);
- callback.completeExceptionally(
- new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
- }
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
- }
- }
-
- protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
- futures.put(id, new Callback(subject, future));
- }
-
- protected Callback completeCallback(long id) {
- Callback callback = futures.remove(id);
- if (callback != null) {
- try {
- RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
- requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
- }
- return callback;
- }
-
- protected Callback failCallback(long id) {
- return futures.remove(id);
- }
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- for (Callback callback : futures.values()) {
- callback.completeExceptionally(new ConnectException());
- }
- }
- }
- }
-
- /**
- * Local connection implementation.
- */
- private final class LocalClientConnection extends AbstractClientConnection {
- @Override
- public CompletableFuture<Void> sendAsync(InternalRequest message) {
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, localServerConnection);
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- }
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- future.whenComplete((r, e) -> completeCallback(message.id()));
- registerCallback(message.id(), message.subject(), future);
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, new LocalServerConnection(future));
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- new LocalServerConnection(future)
- .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
- }
- return future;
- }
- }
-
- /**
- * Local server connection.
- */
- private final class LocalServerConnection implements ServerConnection {
- private final CompletableFuture<byte[]> future;
-
- LocalServerConnection(CompletableFuture<byte[]> future) {
- this.future = future;
- }
-
- @Override
- public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
- if (future != null) {
- if (status == InternalReply.Status.OK) {
- future.complete(payload.orElse(EMPTY_PAYLOAD));
- } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
- future.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
- future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
- future.completeExceptionally(new MessagingException.ProtocolException());
- }
- }
- }
- }
-
- /**
- * Remote connection implementation.
- */
- private final class RemoteClientConnection extends AbstractClientConnection {
- private final Channel channel;
-
- RemoteClientConnection(Channel channel) {
- this.channel = channel;
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(InternalRequest message) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- registerCallback(message.id(), message.subject(), future);
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- Callback callback = failCallback(message.id());
- if (callback != null) {
- callback.completeExceptionally(channelFuture.cause());
- }
- }
- });
- return future;
- }
-
- /**
- * Dispatches a message to a local handler.
- *
- * @param message the message to dispatch
- */
- private void dispatch(InternalReply message) {
- if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble", message.type());
- return;
- }
-
- clockService.recordEventTime(message.time());
-
- Callback callback = completeCallback(message.id());
- if (callback != null) {
- if (message.status() == InternalReply.Status.OK) {
- callback.complete(message.payload());
- } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
- callback.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.ProtocolException());
- }
- } else {
- log.debug("Received a reply for message id:[{}] "
- + "but was unable to locate the"
- + " request handle", message.id());
- }
- }
- }
-
- /**
- * Remote server connection.
- */
- private final class RemoteServerConnection implements ServerConnection {
- private final Channel channel;
-
- RemoteServerConnection(Channel channel) {
- this.channel = channel;
- }
-
- /**
- * Dispatches a message to a local handler.
- *
- * @param message the message to dispatch
- */
- private void dispatch(InternalRequest message) {
- if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
- reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
- return;
- }
-
- clockService.recordEventTime(message.time());
-
- BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
- if (handler != null) {
- handler.accept(message, this);
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
- }
- }
-
- @Override
- public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
- InternalReply response = new InternalReply(preamble,
- clockService.timeNow(),
- message.id(),
- payload.orElse(EMPTY_PAYLOAD),
- status);
- channel.writeAndFlush(response);
- }
- }
-
- /**
- * Request-reply timeout history tracker.
- */
- private static final class RequestMonitor {
- private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
- private final AtomicLong max = new AtomicLong();
- private volatile int replyCount;
- private volatile long lastUpdate = System.currentTimeMillis();
-
- /**
- * Adds a reply time to the history.
- *
- * @param replyTime the reply time to add to the history
- */
- void addReplyTime(long replyTime) {
- max.accumulateAndGet(replyTime, Math::max);
-
- // If at least WINDOW_UPDATE_SAMPLE_SIZE response times have been recorded, and at least
- // WINDOW_UPDATE_MILLIS have passed since the last update, record the maximum response time in the samples.
- int replyCount = ++this.replyCount;
- if (replyCount >= WINDOW_UPDATE_SAMPLE_SIZE
- && System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
- synchronized (this) {
- if (System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
- long lastMax = max.get();
- if (lastMax > 0) {
- samples.addValue(lastMax);
- lastUpdate = System.currentTimeMillis();
- this.replyCount = 0;
- max.set(0);
- }
- }
- }
- }
- }
-
- /**
- * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
- *
- * @param elapsedTime the elapsed request time
- * @return indicates whether the request should be timed out
- */
- boolean isTimedOut(long elapsedTime) {
- return samples.getN() == WINDOW_SIZE && phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
- }
-
- /**
- * Compute phi for the specified node id.
- *
- * @param elapsedTime the duration since the request was sent
- * @return phi value
- */
- private double phi(long elapsedTime) {
- if (samples.getN() < MIN_SAMPLES) {
- return 0.0;
- }
- return computePhi(samples, elapsedTime);
- }
-
- /**
- * Computes the phi value from the given samples.
- *
- * @param samples the samples from which to compute phi
- * @param elapsedTime the duration since the request was sent
- * @return phi
- */
- private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
- double meanMillis = samples.getMean();
- double y = (elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), MIN_STANDARD_DEVIATION);
- double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
- if (elapsedTime > meanMillis) {
- return -Math.log10(e / (1.0 + e));
- } else {
- return -Math.log10(1.0 - 1.0 / (1.0 + e));
- }
- }
- }
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
deleted file mode 100644
index 8cfa7ff..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import java.net.ConnectException;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.onlab.junit.TestTools.findAvailablePort;
-
-/**
- * Unit tests for NettyMessaging.
- */
-public class NettyMessagingManagerTest {
-
- HybridLogicalClockService testClockService = new HybridLogicalClockService() {
- AtomicLong counter = new AtomicLong();
- @Override
- public HybridLogicalTime timeNow() {
- return new HybridLogicalTime(counter.incrementAndGet(), 0);
- }
-
- @Override
- public void recordEventTime(HybridLogicalTime time) {
- }
- };
-
- NettyMessagingManager netty1;
- NettyMessagingManager netty2;
-
- private static final String DUMMY_NAME = "node";
- private static final String IP_STRING = "127.0.0.1";
-
- Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
- Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
- Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
-
- @Before
- public void setUp() throws Exception {
- ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
- netty1 = new NettyMessagingManager();
- netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
- netty1.clockService = testClockService;
- netty1.activate();
-
- ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
- netty2 = new NettyMessagingManager();
- netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
- netty2.clockService = testClockService;
- netty2.activate();
- }
-
- /**
- * Returns a random String to be used as a test subject.
- * @return string
- */
- private String nextSubject() {
- return UUID.randomUUID().toString();
- }
-
- @After
- public void tearDown() throws Exception {
- if (netty1 != null) {
- netty1.deactivate();
- }
-
- if (netty2 != null) {
- netty2.deactivate();
- }
- }
-
- @Test
- public void testSendAsync() {
- String subject = nextSubject();
- CountDownLatch latch1 = new CountDownLatch(1);
- CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
- response.whenComplete((r, e) -> {
- assertNull(e);
- latch1.countDown();
- });
- Uninterruptibles.awaitUninterruptibly(latch1);
-
- CountDownLatch latch2 = new CountDownLatch(1);
- response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
- response.whenComplete((r, e) -> {
- assertNotNull(e);
- assertTrue(e instanceof ConnectException);
- latch2.countDown();
- });
- Uninterruptibles.awaitUninterruptibly(latch2);
- }
-
- @Test
- @Ignore // FIXME disabled on 9/29/16 due to random failures
- public void testSendAndReceive() {
- String subject = nextSubject();
- AtomicBoolean handlerInvoked = new AtomicBoolean(false);
- AtomicReference<byte[]> request = new AtomicReference<>();
- AtomicReference<Endpoint> sender = new AtomicReference<>();
-
- BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
- handlerInvoked.set(true);
- sender.set(ep);
- request.set(data);
- return "hello there".getBytes();
- };
- netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
-
- CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
- assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
- assertTrue(handlerInvoked.get());
- assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
- assertEquals(ep1, sender.get());
- }
-
- @Test
- public void testDefaultTimeout() {
- String subject = nextSubject();
- BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
- netty2.registerHandler(subject, handler);
-
- try {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- fail();
- } catch (CompletionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- @Test
- public void testDynamicTimeout() {
- String subject = nextSubject();
- AtomicInteger counter = new AtomicInteger();
- BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
- if (counter.incrementAndGet() <= 50) {
- return CompletableFuture.completedFuture(new byte[0]);
- } else {
- return new CompletableFuture<>();
- }
- };
- netty2.registerHandler(subject, handler);
-
- for (int i = 0; i < 50; i++) {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- }
- try {
- netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
- fail();
- } catch (CompletionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /*
- * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
- * and response completion occurs on the expected thread.
- */
- @Test
- @Ignore
- public void testSendAndReceiveWithExecutor() {
- String subject = nextSubject();
- ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
- ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
- AtomicReference<String> handlerThreadName = new AtomicReference<>();
- AtomicReference<String> completionThreadName = new AtomicReference<>();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
- handlerThreadName.set(Thread.currentThread().getName());
- try {
- latch.await();
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- fail("InterruptedException");
- }
- return "hello there".getBytes();
- };
- netty2.registerHandler(subject, handler, handlerExecutor);
-
- CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
- subject,
- "hello world".getBytes(),
- completionExecutor);
- response.whenComplete((r, e) -> {
- completionThreadName.set(Thread.currentThread().getName());
- });
- latch.countDown();
-
- // Verify that the message was request handling and response completion happens on the correct thread.
- assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
- assertEquals("completion-thread", completionThreadName.get());
- assertEquals("handler-thread", handlerThreadName.get());
- }
-
- private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
- return new ClusterMetadataService() {
- @Override
- public ClusterMetadata getClusterMetadata() {
- return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
- name, getLocalNode(), Sets.newHashSet(), Sets.newHashSet());
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return new ControllerNode() {
- @Override
- public NodeId id() {
- return null;
- }
-
- @Override
- public String host() {
- return ipAddress;
- }
-
- @Override
- public IpAddress ip() {
- return IpAddress.valueOf(ipAddress);
- }
-
- @Override
- public IpAddress ip(boolean resolve) {
- return ip();
- }
-
- @Override
- public int tcpPort() {
- return ep.port();
- }
- };
- }
-
- @Override
- public void addListener(ClusterMetadataEventListener listener) {}
-
- @Override
- public void removeListener(ClusterMetadataEventListener listener) {}
- };
- }
-}