blob: 430211f5bc86194b7da136a9486f08408e646c64 [file] [log] [blame]
Madan Jampani81208c72015-03-04 23:07:01 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.databaseperf;
17
18import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
19import static org.onlab.util.Tools.delay;
20import static org.onlab.util.Tools.groupedThreads;
21import static org.slf4j.LoggerFactory.getLogger;
22
23import java.util.Timer;
24import java.util.TimerTask;
25import java.util.UUID;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.atomic.AtomicInteger;
30import java.util.stream.IntStream;
31
32import org.apache.commons.lang.math.RandomUtils;
33import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
36import org.apache.felix.scr.annotations.Reference;
37import org.onlab.util.KryoNamespace;
38import org.onosproject.cluster.ClusterService;
39import org.onosproject.cluster.ControllerNode;
40import org.onosproject.core.ApplicationId;
41import org.onosproject.core.CoreService;
42import org.onosproject.store.service.ConsistentMap;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.StorageService;
45import org.slf4j.Logger;
46
47/**
48 * Application to measure partitioned database performance.
49 */
50@Component(immediate = true)
51public class DatabasePerfInstaller {
52
53 private final Logger log = getLogger(getClass());
54
55 @Reference(cardinality = MANDATORY_UNARY)
56 protected CoreService coreService;
57
58 @Reference(cardinality = MANDATORY_UNARY)
59 protected ClusterService clusterService;
60
61 @Reference(cardinality = MANDATORY_UNARY)
62 protected StorageService storageService;
63
64 private boolean stopped;
65
66 private ApplicationId appId;
67
68 private static final long REPORT_PERIOD = 5000L; //ms
69 private Timer reportTimer;
70
71 private AtomicInteger successCount = new AtomicInteger(0);
72 private AtomicInteger failureCount = new AtomicInteger(0);
73
74 private ConsistentMap<String, String> cmap;
75
76 private ControllerNode localNode;
77
78 private long reportStartTime = System.currentTimeMillis();
79
80 private static final int NUM_TASK_THREADS = 2;
81 private ExecutorService taskExecutor;
82
83 private static final Serializer SERIALIZER = new Serializer() {
84
85 KryoNamespace kryo = new KryoNamespace.Builder().build();
86
87 @Override
88 public <T> byte[] encode(T object) {
89 return kryo.serialize(object);
90 }
91
92 @Override
93 public <T> T decode(byte[] bytes) {
94 return kryo.deserialize(bytes);
95 }
96
97 };
98
99 @Activate
100 public void activate() {
101 localNode = clusterService.getLocalNode();
102 String nodeId = localNode.ip().toString();
103 appId = coreService.registerApplication("org.onosproject.nettyperf."
104 + nodeId);
105
106 cmap = storageService.createConsistentMap("onos-app-database-perf-test-map", SERIALIZER);
107 taskExecutor = Executors.newFixedThreadPool(NUM_TASK_THREADS, groupedThreads("onos/database-perf", "worker"));
108 log.info("Started with Application ID {}", appId.id());
109 start();
110 }
111
112 @Deactivate
113 public void deactivate() {
114 stop();
115 log.info("Stopped");
116 }
117
118 public void start() {
119 long delay = System.currentTimeMillis() % REPORT_PERIOD;
120 reportTimer = new Timer("onos-netty-perf-reporter");
121 reportTimer.scheduleAtFixedRate(new TimerTask() {
122 @Override
123 public void run() {
124 report();
125 }
126 }, delay, REPORT_PERIOD);
127
128 stopped = false;
129 IntStream.range(0, NUM_TASK_THREADS).forEach(i -> {
130 taskExecutor.submit(() -> {
131 delay(2000); // take a breath to start
132 while (!stopped) {
133 performDBOperation();
134 delay(2); // take a breather
135 }
136 });
137 });
138 }
139
140 private void performDBOperation() {
141 String key = String.format("test%d", RandomUtils.nextInt(1000));
142 try {
143 if (RandomUtils.nextBoolean()) {
144 cmap.put(key, UUID.randomUUID().toString());
145 } else {
146 cmap.get(key);
147 }
148 successCount.incrementAndGet();
149 } catch (Exception e) {
150 failureCount.incrementAndGet();
151 }
152 }
153
154 private void report() {
155 long delta = System.currentTimeMillis() - reportStartTime;
156 if (delta > 0) {
157 int rate = (int) Math.round(((successCount.get() * 1000.0) / delta));
158 log.info("Passed: {}, Failed: {}, Rate: {}",
159 successCount.getAndSet(0), failureCount.getAndSet(0), rate);
160 reportStartTime = System.currentTimeMillis();
161 }
162 }
163
164 public void stop() {
165 reportTimer.cancel();
166 reportTimer = null;
167 stopped = true;
168 try {
169 taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
170 } catch (InterruptedException e) {
171 log.warn("Failed to stop worker.");
172 }
173 }
174}