blob: 2e7fe1190348cea0fdca6d1ff9b20502436a1b32 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
9import java.util.HashMap;
10import java.util.HashSet;
11import java.util.LinkedList;
12import java.util.Vector;
13
14import net.kuujo.copycat.cluster.TcpClusterConfig;
15import net.kuujo.copycat.cluster.TcpMember;
16import net.kuujo.copycat.internal.log.ConfigurationEntry;
17import net.kuujo.copycat.internal.log.CopycatEntry;
18import net.kuujo.copycat.internal.log.OperationEntry;
19import net.kuujo.copycat.internal.log.SnapshotEntry;
20import net.kuujo.copycat.protocol.PingRequest;
21import net.kuujo.copycat.protocol.PingResponse;
22import net.kuujo.copycat.protocol.PollRequest;
23import net.kuujo.copycat.protocol.PollResponse;
24import net.kuujo.copycat.protocol.Response.Status;
25import net.kuujo.copycat.protocol.SubmitRequest;
26import net.kuujo.copycat.protocol.SubmitResponse;
27import net.kuujo.copycat.protocol.SyncRequest;
28import net.kuujo.copycat.protocol.SyncResponse;
29import net.kuujo.copycat.spi.protocol.Protocol;
30import net.kuujo.copycat.spi.protocol.ProtocolClient;
31import net.kuujo.copycat.spi.protocol.ProtocolServer;
32
33import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
36import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
38import org.apache.felix.scr.annotations.Service;
39import org.onlab.onos.cluster.ClusterService;
40import org.onlab.onos.cluster.ControllerNode;
41import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42import org.onlab.onos.store.cluster.messaging.MessageSubject;
43import org.onlab.onos.store.serializers.ImmutableListSerializer;
44import org.onlab.onos.store.serializers.ImmutableMapSerializer;
45import org.onlab.onos.store.serializers.ImmutableSetSerializer;
46import org.onlab.onos.store.serializers.KryoSerializer;
47import org.onlab.onos.store.service.ReadRequest;
48import org.onlab.onos.store.service.ReadResult;
49import org.onlab.onos.store.service.VersionedValue;
50import org.onlab.onos.store.service.WriteRequest;
51import org.onlab.onos.store.service.WriteResult;
52import org.onlab.util.KryoNamespace;
53import org.slf4j.Logger;
54
55import com.esotericsoftware.kryo.Kryo;
56import com.esotericsoftware.kryo.io.Input;
57import com.esotericsoftware.kryo.serializers.CollectionSerializer;
58import com.google.common.collect.ImmutableList;
59import com.google.common.collect.ImmutableMap;
60import com.google.common.collect.ImmutableSet;
61
62/**
63 * Licensed to the Apache Software Foundation (ASF) under one
64 * or more contributor license agreements. See the NOTICE file
65 * distributed with this work for additional information
66 * regarding copyright ownership. The ASF licenses this file
67 * to you under the Apache License, Version 2.0 (the
68 * "License"); you may not use this file except in compliance
69 * with the License. You may obtain a copy of the License at
70 *
71 * http://www.apache.org/licenses/LICENSE-2.0
72
73 * Unless required by applicable law or agreed to in writing,
74 * software distributed under the License is distributed on an
75 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
76 * KIND, either express or implied. See the License for the
77 * specific language governing permissions and limitations under
78 * the License.
79 */
80
81@Component(immediate = true)
82@Service
83public class ClusterMessagingProtocol implements Protocol<TcpMember> {
84
85 private final Logger log = getLogger(getClass());
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 ClusterService clusterService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 ClusterCommunicationService clusterCommunicator;
92
93 public static final MessageSubject COPYCAT_PING =
94 new MessageSubject("copycat-raft-consensus-ping");
95 public static final MessageSubject COPYCAT_SYNC =
96 new MessageSubject("copycat-raft-consensus-sync");
97 public static final MessageSubject COPYCAT_POLL =
98 new MessageSubject("copycat-raft-consensus-poll");
99 public static final MessageSubject COPYCAT_SUBMIT =
100 new MessageSubject("copycat-raft-consensus-submit");
101
102 private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
103 .register(PingRequest.class)
104 .register(PingResponse.class)
105 .register(PollRequest.class)
106 .register(PollResponse.class)
107 .register(SyncRequest.class)
108 .register(SyncResponse.class)
109 .register(SubmitRequest.class)
110 .register(SubmitResponse.class)
111 .register(Status.class)
112 .register(ConfigurationEntry.class)
113 .register(SnapshotEntry.class)
114 .register(CopycatEntry.class)
115 .register(OperationEntry.class)
116 .register(TcpClusterConfig.class)
117 .register(TcpMember.class)
118 .build();
119
120 private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
121 .register(ReadRequest.class)
122 .register(WriteRequest.class)
123 .register(InternalReadResult.class)
124 .register(InternalWriteResult.class)
125 .register(InternalReadResult.Status.class)
126 .register(WriteResult.class)
127 .register(ReadResult.class)
128 .register(InternalWriteResult.Status.class)
129 .register(VersionedValue.class)
130 .build();
131
132 public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
133 .register(Arrays.asList().getClass(), new CollectionSerializer() {
134 @Override
135 @SuppressWarnings("rawtypes")
136 protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
137 return new ArrayList();
138 }
139 })
140 .register(ImmutableMap.class, new ImmutableMapSerializer())
141 .register(ImmutableList.class, new ImmutableListSerializer())
142 .register(ImmutableSet.class, new ImmutableSetSerializer())
143 .register(
144 Vector.class,
145 ArrayList.class,
146 Arrays.asList().getClass(),
147 HashMap.class,
148 HashSet.class,
149 LinkedList.class,
150 byte[].class)
151 .build();
152
153 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
154 @Override
155 protected void setupKryoPool() {
156 serializerPool = KryoNamespace.newBuilder()
157 .register(COPYCAT)
158 .register(COMMON)
159 .register(DATABASE)
160 .build()
161 .populate(1);
162 }
163 };
164
165 @Activate
166 public void activate() {
167 log.info("Started.");
168 }
169
170 @Deactivate
171 public void deactivate() {
172 log.info("Stopped.");
173 }
174
175 @Override
176 public ProtocolServer createServer(TcpMember member) {
177 return new ClusterMessagingProtocolServer(clusterCommunicator);
178 }
179
180 @Override
181 public ProtocolClient createClient(TcpMember member) {
182 ControllerNode node = getControllerNode(member.host(), member.port());
183 checkNotNull(node, "A valid controller node is expected");
184 return new ClusterMessagingProtocolClient(
185 clusterCommunicator, node);
186 }
187
188 private ControllerNode getControllerNode(String host, int port) {
189 for (ControllerNode node : clusterService.getNodes()) {
190 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
191 return node;
192 }
193 }
194 return null;
195 }
196}