blob: 45b540d1dd72f40de75eb82180f0fa5913aec4e9 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Jonathan Hart4f60f982014-10-27 08:11:17 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080024import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.packet.OutboundPacket;
29import org.onosproject.net.packet.PacketEvent;
30import org.onosproject.net.packet.PacketEvent.Type;
31import org.onosproject.net.packet.PacketStore;
32import org.onosproject.net.packet.PacketStoreDelegate;
33import org.onosproject.store.AbstractStore;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart4f60f982014-10-27 08:11:17 -070040import org.slf4j.Logger;
41
Brian O'Connor5eb77c82015-03-02 18:09:39 -080042import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44
45import static org.onlab.util.Tools.groupedThreads;
46import static org.slf4j.LoggerFactory.getLogger;
47
Jonathan Hart4f60f982014-10-27 08:11:17 -070048/**
49 * Distributed packet store implementation allowing packets to be sent to
50 * remote instances.
51 */
52@Component(immediate = true)
53@Service
54public class DistributedPacketStore
55 extends AbstractStore<PacketEvent, PacketStoreDelegate>
56 implements PacketStore {
57
58 private final Logger log = getLogger(getClass());
59
Madan Jampani2af244a2015-02-22 13:12:01 -080060 // TODO: make this configurable.
61 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
62
Jonathan Hart4f60f982014-10-27 08:11:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 private MastershipService mastershipService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 private ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 private ClusterCommunicationService communicationService;
71
72 private static final MessageSubject PACKET_OUT_SUBJECT =
73 new MessageSubject("packet-out");
74
75 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
76 @Override
77 protected void setupKryoPool() {
78 serializerPool = KryoNamespace.newBuilder()
79 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080080 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
81 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070082 }
83 };
84
Madan Jampani2af244a2015-02-22 13:12:01 -080085 private ExecutorService messageHandlingExecutor;
86
Jonathan Hart4f60f982014-10-27 08:11:17 -070087 @Activate
88 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -080089 messageHandlingExecutor = Executors.newFixedThreadPool(
90 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -080091 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -070092
93 communicationService.addSubscriber(
Madan Jampani2af244a2015-02-22 13:12:01 -080094 PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
95
96 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -070097 }
98
99 @Deactivate
100 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800101 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
102 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700103 log.info("Stopped");
104 }
105
106 @Override
107 public void emit(OutboundPacket packet) {
108 NodeId myId = clusterService.getLocalNode().id();
109 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
110
Jonathan Hart7466d612014-11-24 17:09:53 -0800111 if (master == null) {
112 return;
113 }
114
Jonathan Hart4f60f982014-10-27 08:11:17 -0700115 if (myId.equals(master)) {
116 notifyDelegate(new PacketEvent(Type.EMIT, packet));
117 return;
118 }
119
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800120 // TODO check unicast return value
121 communicationService.unicast(new ClusterMessage(
122 myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
123 // error log: log.warn("Failed to send packet-out to {}", master);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700124 }
125
126 /**
127 * Handles incoming cluster messages.
128 */
129 private class InternalClusterMessageHandler implements ClusterMessageHandler {
130 @Override
131 public void handle(ClusterMessage message) {
132 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
133 log.warn("Received message with wrong subject: {}", message);
134 }
135
136 OutboundPacket packet = SERIALIZER.decode(message.payload());
137 notifyDelegate(new PacketEvent(Type.EMIT, packet));
138 }
139 }
140
141}