blob: d06999e9e47d588b657ab6c3affe73bc1898de46 [file] [log] [blame]
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
/**
* {@link NettyMessagingService} based Copycat protocol server.
*/
public class NettyProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
private final NettyMessagingService messagingService;
private RequestHandler handler;
public NettyProtocolServer(TcpMember member) {
messagingService = new NettyMessagingService(member.host(), member.port());
messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
}
protected NettyMessagingService getNettyMessagingService() {
return messagingService;
}
@Override
public void requestHandler(RequestHandler handler) {
this.handler = handler;
}
@Override
public CompletableFuture<Void> listen() {
try {
messagingService.activate();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
messagingService.deactivate();
future.complete(null);
return future;
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
}
private class CopycatMessageHandler<T> implements MessageHandler {
@Override
public void handle(Message message) throws IOException {
T request = NettyProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to ping request", e);
}
});
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to poll request", e);
}
});
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to sync request", e);
}
});
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to submit request", e);
}
});
}
}
}
}