blob: 35e28c7f00c5292b8cf576d0650f184554eda1b7 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Madan Jampani2af244a2015-02-22 13:12:01 -080018import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart4f60f982014-10-27 08:11:17 -070019import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.IOException;
Madan Jampani2af244a2015-02-22 13:12:01 -080022import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
Jonathan Hart4f60f982014-10-27 08:11:17 -070024
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.mastership.MastershipService;
34import org.onosproject.net.packet.OutboundPacket;
35import org.onosproject.net.packet.PacketEvent;
36import org.onosproject.net.packet.PacketEvent.Type;
37import org.onosproject.net.packet.PacketStore;
38import org.onosproject.net.packet.PacketStoreDelegate;
39import org.onosproject.store.AbstractStore;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
41import org.onosproject.store.cluster.messaging.ClusterMessage;
42import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.serializers.KryoNamespaces;
45import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart4f60f982014-10-27 08:11:17 -070046import org.onlab.util.KryoNamespace;
47import org.slf4j.Logger;
48
49/**
50 * Distributed packet store implementation allowing packets to be sent to
51 * remote instances.
52 */
53@Component(immediate = true)
54@Service
55public class DistributedPacketStore
56 extends AbstractStore<PacketEvent, PacketStoreDelegate>
57 implements PacketStore {
58
59 private final Logger log = getLogger(getClass());
60
Madan Jampani2af244a2015-02-22 13:12:01 -080061 // TODO: make this configurable.
62 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
63
Jonathan Hart4f60f982014-10-27 08:11:17 -070064 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 private MastershipService mastershipService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 private ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 private ClusterCommunicationService communicationService;
72
73 private static final MessageSubject PACKET_OUT_SUBJECT =
74 new MessageSubject("packet-out");
75
76 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
77 @Override
78 protected void setupKryoPool() {
79 serializerPool = KryoNamespace.newBuilder()
80 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080081 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
82 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -070083 }
84 };
85
Madan Jampani2af244a2015-02-22 13:12:01 -080086 private ExecutorService messageHandlingExecutor;
87
Jonathan Hart4f60f982014-10-27 08:11:17 -070088 @Activate
89 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -080090 messageHandlingExecutor = Executors.newFixedThreadPool(
91 MESSAGE_HANDLER_THREAD_POOL_SIZE,
92 groupedThreads("onos/flow", "message-handlers"));
Jonathan Hart4f60f982014-10-27 08:11:17 -070093
94 communicationService.addSubscriber(
Madan Jampani2af244a2015-02-22 13:12:01 -080095 PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
96
97 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -070098 }
99
100 @Deactivate
101 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
103 messageHandlingExecutor.shutdown();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700104 log.info("Stopped");
105 }
106
107 @Override
108 public void emit(OutboundPacket packet) {
109 NodeId myId = clusterService.getLocalNode().id();
110 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
111
Jonathan Hart7466d612014-11-24 17:09:53 -0800112 if (master == null) {
113 return;
114 }
115
Jonathan Hart4f60f982014-10-27 08:11:17 -0700116 if (myId.equals(master)) {
117 notifyDelegate(new PacketEvent(Type.EMIT, packet));
118 return;
119 }
120
121 try {
122 communicationService.unicast(new ClusterMessage(
123 myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
124 } catch (IOException e) {
125 log.warn("Failed to send packet-out to {}", master);
126 }
127 }
128
129 /**
130 * Handles incoming cluster messages.
131 */
132 private class InternalClusterMessageHandler implements ClusterMessageHandler {
133 @Override
134 public void handle(ClusterMessage message) {
135 if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
136 log.warn("Received message with wrong subject: {}", message);
137 }
138
139 OutboundPacket packet = SERIALIZER.decode(message.payload());
140 notifyDelegate(new PacketEvent(Type.EMIT, packet));
141 }
142 }
143
144}