blob: 0449b8a6b1aa129210ada1e21dc675520659bb5a [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.concurrent.CompletableFuture;
6
7import net.kuujo.copycat.protocol.PingRequest;
8import net.kuujo.copycat.protocol.PollRequest;
9import net.kuujo.copycat.protocol.RequestHandler;
10import net.kuujo.copycat.protocol.SubmitRequest;
11import net.kuujo.copycat.protocol.SyncRequest;
12import net.kuujo.copycat.spi.protocol.ProtocolServer;
13
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
17import org.slf4j.Logger;
18
19/**
20 * Licensed to the Apache Software Foundation (ASF) under one
21 * or more contributor license agreements. See the NOTICE file
22 * distributed with this work for additional information
23 * regarding copyright ownership. The ASF licenses this file
24 * to you under the Apache License, Version 2.0 (the
25 * "License"); you may not use this file except in compliance
26 * with the License. You may obtain a copy of the License at
27 *
28 * http://www.apache.org/licenses/LICENSE-2.0
29
30 * Unless required by applicable law or agreed to in writing,
31 * software distributed under the License is distributed on an
32 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
33 * KIND, either express or implied. See the License for the
34 * specific language governing permissions and limitations under
35 * the License.
36 */
37
38public class ClusterMessagingProtocolServer implements ProtocolServer {
39
40 private final Logger log = getLogger(getClass());
41 private RequestHandler handler;
42
43 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
44
45 clusterCommunicator.addSubscriber(
46 ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
47 clusterCommunicator.addSubscriber(
48 ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
49 clusterCommunicator.addSubscriber(
50 ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
51 clusterCommunicator.addSubscriber(
52 ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
53 }
54
55 @Override
56 public void requestHandler(RequestHandler handler) {
57 this.handler = handler;
58 }
59
60 @Override
61 public CompletableFuture<Void> listen() {
62 return CompletableFuture.completedFuture(null);
63 }
64
65 @Override
66 public CompletableFuture<Void> close() {
67 return CompletableFuture.completedFuture(null);
68 }
69
70 private class CopycatMessageHandler<T> implements ClusterMessageHandler {
71
72 @Override
73 public void handle(ClusterMessage message) {
74 T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
75 if (request.getClass().equals(PingRequest.class)) {
76 handler.ping((PingRequest) request).whenComplete((response, error) -> {
77 try {
78 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
79 } catch (Exception e) {
80 log.error("Failed to respond to ping request", e);
81 }
82 });
83 } else if (request.getClass().equals(PollRequest.class)) {
84 handler.poll((PollRequest) request).whenComplete((response, error) -> {
85 try {
86 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
87 } catch (Exception e) {
88 log.error("Failed to respond to poll request", e);
89 }
90 });
91 } else if (request.getClass().equals(SyncRequest.class)) {
92 handler.sync((SyncRequest) request).whenComplete((response, error) -> {
93 try {
94 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
95 } catch (Exception e) {
96 log.error("Failed to respond to sync request", e);
97 }
98 });
99 } else if (request.getClass().equals(SubmitRequest.class)) {
100 handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
101 try {
102 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
103 } catch (Exception e) {
104 log.error("Failed to respond to submit request", e);
105 }
106 });
107 }
108 }
109 }
110}