blob: 36f9f5428a73f096380706083784296035eda6d0 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Jonathan Hart4f60f982014-10-27 08:11:17 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080024import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.packet.OutboundPacket;
29import org.onosproject.net.packet.PacketEvent;
30import org.onosproject.net.packet.PacketEvent.Type;
31import org.onosproject.net.packet.PacketStore;
32import org.onosproject.net.packet.PacketStoreDelegate;
33import org.onosproject.store.AbstractStore;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart4f60f982014-10-27 08:11:17 -070040import org.slf4j.Logger;
41
Brian O'Connor5eb77c82015-03-02 18:09:39 -080042import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44
45import static org.onlab.util.Tools.groupedThreads;
46import static org.slf4j.LoggerFactory.getLogger;
47
Jonathan Hart4f60f982014-10-27 08:11:17 -070048/**
49 * Distributed packet store implementation allowing packets to be sent to
50 * remote instances.
51 */
52@Component(immediate = true)
53@Service
54public class DistributedPacketStore
55 extends AbstractStore<PacketEvent, PacketStoreDelegate>
56 implements PacketStore {
57
58 private final Logger log = getLogger(getClass());
59
Madan Jampani2af244a2015-02-22 13:12:01 -080060 // TODO: make this configurable.
61 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
62
Jonathan Hart4f60f982014-10-27 08:11:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070064 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070065
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070067 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070068
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070070 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070071
72 private static final MessageSubject PACKET_OUT_SUBJECT =
73 new MessageSubject("packet-out");
74
75 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
76 @Override
77 protected void setupKryoPool() {
78 serializerPool = KryoNamespace.newBuilder()
79 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080080 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
81 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070082 }
83 };
84
Madan Jampani2af244a2015-02-22 13:12:01 -080085 private ExecutorService messageHandlingExecutor;
86
Jonathan Hart4f60f982014-10-27 08:11:17 -070087 @Activate
88 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -080089 messageHandlingExecutor = Executors.newFixedThreadPool(
90 MESSAGE_HANDLER_THREAD_POOL_SIZE,
Madan Jampani6b5b7172015-02-23 13:02:26 -080091 groupedThreads("onos/store/packet", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -070092
Thomas Vachuskaff965232015-03-17 14:10:52 -070093 communicationService.addSubscriber(PACKET_OUT_SUBJECT,
94 new InternalClusterMessageHandler(),
95 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -080096
97 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -070098 }
99
100 @Deactivate
101 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
103 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700104 log.info("Stopped");
105 }
106
107 @Override
108 public void emit(OutboundPacket packet) {
109 NodeId myId = clusterService.getLocalNode().id();
110 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
111
Jonathan Hart7466d612014-11-24 17:09:53 -0800112 if (master == null) {
113 return;
114 }
115
Jonathan Hart4f60f982014-10-27 08:11:17 -0700116 if (myId.equals(master)) {
117 notifyDelegate(new PacketEvent(Type.EMIT, packet));
118 return;
119 }
120
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800121 // TODO check unicast return value
Thomas Vachuskaff965232015-03-17 14:10:52 -0700122 communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
123 SERIALIZER.encode(packet)),
124 master);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800125 // error log: log.warn("Failed to send packet-out to {}", master);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700126 }
127
128 /**
129 * Handles incoming cluster messages.
130 */
131 private class InternalClusterMessageHandler implements ClusterMessageHandler {
132 @Override
133 public void handle(ClusterMessage message) {
134 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
135 log.warn("Received message with wrong subject: {}", message);
136 }
137
138 OutboundPacket packet = SERIALIZER.decode(message.payload());
139 notifyDelegate(new PacketEvent(Type.EMIT, packet));
140 }
141 }
142
143}