blob: 123b7275a1d2387585c3dc72b7b569711e570266 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.packet.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.io.IOException;
21
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.NodeId;
30import org.onlab.onos.mastership.MastershipService;
31import org.onlab.onos.net.packet.OutboundPacket;
32import org.onlab.onos.net.packet.PacketEvent;
33import org.onlab.onos.net.packet.PacketEvent.Type;
34import org.onlab.onos.net.packet.PacketStore;
35import org.onlab.onos.net.packet.PacketStoreDelegate;
36import org.onlab.onos.store.AbstractStore;
37import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
40import org.onlab.onos.store.cluster.messaging.MessageSubject;
41import org.onlab.onos.store.serializers.KryoNamespaces;
42import org.onlab.onos.store.serializers.KryoSerializer;
43import org.onlab.util.KryoNamespace;
44import org.slf4j.Logger;
45
46/**
47 * Distributed packet store implementation allowing packets to be sent to
48 * remote instances.
49 */
50@Component(immediate = true)
51@Service
52public class DistributedPacketStore
53 extends AbstractStore<PacketEvent, PacketStoreDelegate>
54 implements PacketStore {
55
56 private final Logger log = getLogger(getClass());
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private MastershipService mastershipService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private ClusterService clusterService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 private ClusterCommunicationService communicationService;
66
67 private static final MessageSubject PACKET_OUT_SUBJECT =
68 new MessageSubject("packet-out");
69
70 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 @Override
72 protected void setupKryoPool() {
73 serializerPool = KryoNamespace.newBuilder()
74 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080075 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
76 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070077 }
78 };
79
80 @Activate
81 public void activate() {
82 log.info("Started");
83
84 communicationService.addSubscriber(
85 PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
86 }
87
88 @Deactivate
89 public void deactivate() {
90 log.info("Stopped");
91 }
92
93 @Override
94 public void emit(OutboundPacket packet) {
95 NodeId myId = clusterService.getLocalNode().id();
96 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
97
Jonathan Hart7466d612014-11-24 17:09:53 -080098 if (master == null) {
99 return;
100 }
101
Jonathan Hart4f60f982014-10-27 08:11:17 -0700102 if (myId.equals(master)) {
103 notifyDelegate(new PacketEvent(Type.EMIT, packet));
104 return;
105 }
106
107 try {
108 communicationService.unicast(new ClusterMessage(
109 myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
110 } catch (IOException e) {
111 log.warn("Failed to send packet-out to {}", master);
112 }
113 }
114
115 /**
116 * Handles incoming cluster messages.
117 */
118 private class InternalClusterMessageHandler implements ClusterMessageHandler {
119 @Override
120 public void handle(ClusterMessage message) {
121 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
122 log.warn("Received message with wrong subject: {}", message);
123 }
124
125 OutboundPacket packet = SERIALIZER.decode(message.payload());
126 notifyDelegate(new PacketEvent(Type.EMIT, packet));
127 }
128 }
129
130}