Skip to content

Commit ca4506d

Browse files
authored
Merge pull request #8450 from apache/port/clojure-tests-phase5
Port Clojure tests to Java: metrics_test, nimbus_auth_test (Phase 5)
2 parents 06ea406 + 383ee5d commit ca4506d

3 files changed

Lines changed: 610 additions & 0 deletions

File tree

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.storm.metric;
20+
21+
import java.util.Collection;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.apache.storm.Config;
29+
import org.apache.storm.LocalCluster;
30+
import org.apache.storm.Testing;
31+
import org.apache.storm.Thrift;
32+
import org.apache.storm.metric.api.CountMetric;
33+
import org.apache.storm.task.OutputCollector;
34+
import org.apache.storm.task.TopologyContext;
35+
import org.apache.storm.testing.FeederSpout;
36+
import org.apache.storm.topology.OutputFieldsDeclarer;
37+
import org.apache.storm.topology.base.BaseRichBolt;
38+
import org.apache.storm.tuple.Fields;
39+
import org.apache.storm.tuple.Tuple;
40+
import org.apache.storm.utils.Utils;
41+
import org.awaitility.Awaitility;
42+
import org.awaitility.core.ConditionTimeoutException;
43+
import org.hamcrest.CoreMatchers;
44+
import org.junit.jupiter.api.Test;
45+
46+
import static org.junit.jupiter.api.Assertions.*;
47+
48+
/**
49+
* Integration tests for custom metrics with FakeMetricConsumer.
50+
*
51+
* Ported from storm-core/test/clj/org/apache/storm/metrics_test.clj
52+
*/
53+
public class MetricsIntegrationTest {
54+
55+
/**
56+
* Bolt that acks every tuple and increments a custom CountMetric.
57+
*/
58+
static class CountAcksBolt extends BaseRichBolt {
59+
private OutputCollector collector;
60+
private CountMetric customMetric;
61+
62+
@Override
63+
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
64+
this.collector = collector;
65+
this.customMetric = new CountMetric();
66+
context.registerMetric("my-custom-metric", customMetric, 5);
67+
}
68+
69+
@Override
70+
public void execute(Tuple tuple) {
71+
customMetric.incr();
72+
collector.ack(tuple);
73+
}
74+
75+
@Override
76+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
77+
}
78+
}
79+
80+
private static Map<String, Object> metricsConf() {
81+
Map<String, Object> conf = new HashMap<>();
82+
conf.put(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER,
83+
List.of(Map.of("class", "org.apache.storm.metric.FakeMetricConsumer")));
84+
conf.put("storm.zookeeper.connection.timeout", 30000);
85+
conf.put("storm.zookeeper.session.timeout", 60000);
86+
return conf;
87+
}
88+
89+
private static void waitForAtLeastNBuckets(int n, String compId, String metricName,
90+
LocalCluster cluster) throws Exception {
91+
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
92+
() -> {
93+
Map<Integer, Collection<Object>> taskIdToBuckets =
94+
FakeMetricConsumer.getTaskIdToBuckets(compId, metricName);
95+
if (n != 0 && taskIdToBuckets == null) {
96+
return true;
97+
}
98+
if (taskIdToBuckets == null) {
99+
return false;
100+
}
101+
for (Collection<Object> buckets : taskIdToBuckets.values()) {
102+
if (buckets.size() < n) {
103+
return true;
104+
}
105+
}
106+
return false;
107+
},
108+
() -> {
109+
try {
110+
cluster.advanceClusterTime(1);
111+
} catch (InterruptedException e) {
112+
Thread.currentThread().interrupt();
113+
}
114+
});
115+
}
116+
117+
private static List<Object> lookupBuckets(String compId, String metricName) {
118+
Map<Integer, Collection<Object>> taskIdToBuckets =
119+
FakeMetricConsumer.getTaskIdToBuckets(compId, metricName);
120+
if (taskIdToBuckets == null || taskIdToBuckets.isEmpty()) {
121+
return List.of();
122+
}
123+
Collection<Object> buckets = taskIdToBuckets.values().iterator().next();
124+
return List.copyOf(buckets);
125+
}
126+
127+
private static void assertMetricRunningSum(String compId, String metricName,
128+
long expected, int minBuckets,
129+
LocalCluster cluster) throws Exception {
130+
waitForAtLeastNBuckets(minBuckets, compId, metricName, cluster);
131+
try {
132+
Awaitility.with()
133+
.pollInterval(10, TimeUnit.MILLISECONDS)
134+
.conditionEvaluationListener(condition -> {
135+
try {
136+
cluster.advanceClusterTime(1);
137+
} catch (InterruptedException e) {
138+
Thread.currentThread().interrupt();
139+
}
140+
})
141+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
142+
.until((Callable<Long>) () -> {
143+
List<Object> buckets = lookupBuckets(compId, metricName);
144+
long sum = 0;
145+
for (Object b : buckets) {
146+
sum += ((Number) b).longValue();
147+
}
148+
return sum;
149+
}, CoreMatchers.equalTo(expected));
150+
} catch (ConditionTimeoutException e) {
151+
throw new AssertionError(e.getMessage());
152+
}
153+
}
154+
155+
@Test
156+
public void testCustomMetric() throws Exception {
157+
try (LocalCluster cluster = new LocalCluster.Builder()
158+
.withSimulatedTime()
159+
.withDaemonConf(metricsConf())
160+
.build()) {
161+
162+
FeederSpout feeder = new FeederSpout(new Fields("field1"));
163+
var topology = Thrift.buildTopology(
164+
Map.of("1", Thrift.prepareSpoutDetails(feeder)),
165+
Map.of("2", Thrift.prepareBoltDetails(
166+
Map.of(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()),
167+
new CountAcksBolt())));
168+
169+
cluster.submitTopology("metrics-tester", Map.of(), topology);
170+
171+
feeder.feed(List.of("a"), 1);
172+
cluster.advanceClusterTime(6);
173+
assertMetricRunningSum("2", "my-custom-metric", 1, 1, cluster);
174+
175+
cluster.advanceClusterTime(5);
176+
assertMetricRunningSum("2", "my-custom-metric", 1, 2, cluster);
177+
178+
cluster.advanceClusterTime(20);
179+
assertMetricRunningSum("2", "my-custom-metric", 1, 6, cluster);
180+
181+
feeder.feed(List.of("b"), 2);
182+
feeder.feed(List.of("c"), 3);
183+
cluster.advanceClusterTime(5);
184+
assertMetricRunningSum("2", "my-custom-metric", 3, 7, cluster);
185+
}
186+
}
187+
188+
@Test
189+
public void testCustomMetricWithMultiTasks() throws Exception {
190+
try (LocalCluster cluster = new LocalCluster.Builder()
191+
.withSimulatedTime()
192+
.withDaemonConf(metricsConf())
193+
.build()) {
194+
195+
FeederSpout feeder = new FeederSpout(new Fields("field1"));
196+
Map<String, Object> boltConf = new HashMap<>();
197+
boltConf.put(Config.TOPOLOGY_TASKS, 2);
198+
199+
var topology = Thrift.buildTopology(
200+
Map.of("1", Thrift.prepareSpoutDetails(feeder)),
201+
Map.of("2", Thrift.prepareBoltDetails(
202+
Map.of(Utils.getGlobalStreamId("1", null), Thrift.prepareAllGrouping()),
203+
new CountAcksBolt(), 1, boltConf)));
204+
205+
cluster.submitTopology("metrics-tester-with-multitasks", Map.of(), topology);
206+
207+
feeder.feed(List.of("a"), 1);
208+
cluster.advanceClusterTime(6);
209+
assertMetricRunningSum("2", "my-custom-metric", 1, 1, cluster);
210+
211+
cluster.advanceClusterTime(5);
212+
assertMetricRunningSum("2", "my-custom-metric", 1, 2, cluster);
213+
214+
cluster.advanceClusterTime(20);
215+
assertMetricRunningSum("2", "my-custom-metric", 1, 6, cluster);
216+
217+
feeder.feed(List.of("b"), 2);
218+
feeder.feed(List.of("c"), 3);
219+
cluster.advanceClusterTime(5);
220+
assertMetricRunningSum("2", "my-custom-metric", 3, 7, cluster);
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)