blob: 2f97782c87d5c6cc468ec94cbe5438927e775425 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Madan Jampani9b19a822014-11-04 21:37:13 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Madan Jampani9b19a822014-11-04 21:37:13 -08005import java.util.Vector;
6
7import net.kuujo.copycat.cluster.TcpClusterConfig;
8import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -08009import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani9b19a822014-11-04 21:37:13 -080010import net.kuujo.copycat.internal.log.ConfigurationEntry;
11import net.kuujo.copycat.internal.log.CopycatEntry;
12import net.kuujo.copycat.internal.log.OperationEntry;
13import net.kuujo.copycat.internal.log.SnapshotEntry;
14import net.kuujo.copycat.protocol.PingRequest;
15import net.kuujo.copycat.protocol.PingResponse;
16import net.kuujo.copycat.protocol.PollRequest;
17import net.kuujo.copycat.protocol.PollResponse;
18import net.kuujo.copycat.protocol.Response.Status;
19import net.kuujo.copycat.protocol.SubmitRequest;
20import net.kuujo.copycat.protocol.SubmitResponse;
21import net.kuujo.copycat.protocol.SyncRequest;
22import net.kuujo.copycat.protocol.SyncResponse;
23import net.kuujo.copycat.spi.protocol.Protocol;
24import net.kuujo.copycat.spi.protocol.ProtocolClient;
25import net.kuujo.copycat.spi.protocol.ProtocolServer;
26
27import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080034import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
35import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080036import org.onlab.onos.store.serializers.KryoNamespaces;
Madan Jampani9b19a822014-11-04 21:37:13 -080037import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080038import org.onlab.onos.store.serializers.StoreSerializer;
39import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
40import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
Madan Jampani9b19a822014-11-04 21:37:13 -080041import org.onlab.util.KryoNamespace;
42import org.slf4j.Logger;
43
Madan Jampani9b19a822014-11-04 21:37:13 -080044/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080045 * ONOS Cluster messaging based Copycat protocol.
Madan Jampani9b19a822014-11-04 21:37:13 -080046 */
Yuta HIGUCHI2103df42014-12-02 10:59:37 -080047@Component(immediate = false)
Madan Jampani9b19a822014-11-04 21:37:13 -080048@Service
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080049public class ClusterMessagingProtocol
50 implements DatabaseProtocolService, Protocol<TcpMember> {
Madan Jampani9b19a822014-11-04 21:37:13 -080051
52 private final Logger log = getLogger(getClass());
53
54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080055 protected ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080056
57 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080058 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080059
60 public static final MessageSubject COPYCAT_PING =
61 new MessageSubject("copycat-raft-consensus-ping");
62 public static final MessageSubject COPYCAT_SYNC =
63 new MessageSubject("copycat-raft-consensus-sync");
64 public static final MessageSubject COPYCAT_POLL =
65 new MessageSubject("copycat-raft-consensus-poll");
66 public static final MessageSubject COPYCAT_SUBMIT =
67 new MessageSubject("copycat-raft-consensus-submit");
68
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080069 static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
70
71 static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
72 .register(KryoNamespaces.API)
73 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani9b19a822014-11-04 21:37:13 -080074 .register(PingRequest.class)
75 .register(PingResponse.class)
76 .register(PollRequest.class)
77 .register(PollResponse.class)
78 .register(SyncRequest.class)
79 .register(SyncResponse.class)
80 .register(SubmitRequest.class)
81 .register(SubmitResponse.class)
82 .register(Status.class)
83 .register(ConfigurationEntry.class)
84 .register(SnapshotEntry.class)
85 .register(CopycatEntry.class)
86 .register(OperationEntry.class)
87 .register(TcpClusterConfig.class)
88 .register(TcpMember.class)
Madan Jampani5ce30252014-11-17 20:53:17 -080089 .register(LeaderElectEvent.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080090 .register(Vector.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080091 .build();
92
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080093 // serializer used for CopyCat Protocol
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080094 public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
Madan Jampani9b19a822014-11-04 21:37:13 -080095 @Override
96 protected void setupKryoPool() {
97 serializerPool = KryoNamespace.newBuilder()
98 .register(COPYCAT)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080099 .nextId(AFTER_COPYCAT)
100 // for snapshot
101 .register(State.class)
102 .register(TableMetadata.class)
103 // TODO: Move this out ?
104 .register(TableModificationEvent.class)
105 .register(TableModificationEvent.Type.class)
106 .build();
Madan Jampani9b19a822014-11-04 21:37:13 -0800107 }
108 };
109
110 @Activate
111 public void activate() {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800112 log.info("Started");
Madan Jampani9b19a822014-11-04 21:37:13 -0800113 }
114
115 @Deactivate
116 public void deactivate() {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800117 log.info("Stopped");
Madan Jampani9b19a822014-11-04 21:37:13 -0800118 }
119
120 @Override
121 public ProtocolServer createServer(TcpMember member) {
122 return new ClusterMessagingProtocolServer(clusterCommunicator);
123 }
124
125 @Override
126 public ProtocolClient createClient(TcpMember member) {
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800127 return new ClusterMessagingProtocolClient(clusterService,
128 clusterCommunicator,
129 clusterService.getLocalNode(),
130 member);
Madan Jampani9b19a822014-11-04 21:37:13 -0800131 }
Madan Jampani7e634e62014-11-09 21:24:13 -0800132}