diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d698a568d9f4a..1e1badf1a60d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1856,6 +1856,9 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder() + ", webServiceAddress: " + webServiceAddress); } PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); + // most of the admin request requires to make zk-call so, keep the max read-timeout based on + // zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it + builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. @@ -1887,9 +1890,6 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder() .enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled()); } - // most of the admin request requires to make zk-call so, keep the max read-timeout based on - // zk-operation timeout - builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); return builder; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e4b2f33502378..0d964b02a5fb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1653,6 +1653,10 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c PulsarAdminBuilder builder = PulsarAdmin.builder(); ServiceConfiguration conf = pulsar.getConfig(); + // most of the admin request requires to make zk-call so, keep the max read-timeout based on + // zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it + builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. // See https://github.com/apache/pulsar/issues/8509 for more information. @@ -1708,10 +1712,6 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c ); } - // most of the admin request requires to make zk-call so, keep the max read-timeout based on - // zk-operation timeout - builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); - PulsarAdmin adminClient = builder.build(); log.info("created admin with url {} ", adminApiUrl); return adminClient; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index acb53fede2e64..6a85a681b603c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.URL; import java.util.Map; -import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; @@ -148,8 +147,6 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa ClientBuilder clientBuilder = ClientBuilder.newBuilder() .withConfig(httpConfig) - .connectTimeout(this.clientConfigData.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) - .readTimeout(this.clientConfigData.getReadTimeoutMs(), TimeUnit.MILLISECONDS) .register(JacksonConfigurator.class).register(JacksonFeature.class); boolean useTls = clientConfigData.getServiceUrl().startsWith("https://"); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index 4bd361d803f59..6a76daae3b8a6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -53,7 +53,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; import lombok.Data; @@ -63,7 +62,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; @@ -89,7 +87,6 @@ import org.asynchttpclient.SslEngineFactory; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.asynchttpclient.uri.Uri; -import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientRequest; import org.glassfish.jersey.client.ClientResponse; import org.glassfish.jersey.client.spi.AsyncConnectorCallback; @@ -122,13 +119,10 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { @Setter private boolean followRedirects = true; - public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds, + public AsyncHttpConnector(ClientConfigurationData conf, int autoCertRefreshTimeSeconds, boolean acceptGzipCompression) { - this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT), - (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT), - PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, - autoCertRefreshTimeSeconds, - conf, acceptGzipCompression, null); + this(conf.getConnectionTimeoutMs(), conf.getReadTimeoutMs(), conf.getRequestTimeoutMs(), + autoCertRefreshTimeSeconds, conf, acceptGzipCompression, null); } @SneakyThrows @@ -220,7 +214,6 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); confBuilder.setFollowRedirect(false); - confBuilder.setRequestTimeout(conf.getRequestTimeoutMs()); confBuilder.setConnectTimeout(connectTimeoutMs); confBuilder.setReadTimeout(readTimeoutMs); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s%s", diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java index 2f1e8f798a6c6..ee28322bfc112 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java @@ -47,7 +47,7 @@ public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefr @Override public Connector getConnector(Client client, Configuration runtimeConfig) { if (connector == null) { - connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression); + connector = new AsyncHttpConnector(conf, autoCertRefreshTimeSeconds, acceptGzipCompression); connector.setFollowRedirects(followRedirects); } return connector; diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java index 27c6fd96079f5..8b87435499417 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java @@ -18,12 +18,27 @@ */ package org.apache.pulsar.client.admin.internal; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import java.util.List; +import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** @@ -31,6 +46,27 @@ */ public class PulsarAdminImplTest { + WireMockServer server; + + @BeforeClass(alwaysRun = true) + void beforeClass() { + server = new WireMockServer(WireMockConfiguration.wireMockConfig() + .port(0)); + server.start(); + } + + @AfterClass(alwaysRun = true) + void afterClass() { + if (server != null) { + server.stop(); + } + } + + @BeforeMethod + public void beforeEach() { + server.resetAll(); + } + @Test public void testAuthDisabledWhenAuthNotSpecifiedAnywhere() { assertThat(createAdminAndGetAuth(new ClientConfigurationData())) @@ -51,4 +87,166 @@ private Authentication createAdminAndGetAuth(ClientConfigurationData conf) { return admin.auth; } } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorSuccessWithoutRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 5000; + int serverDelayMs = 3000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-without-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("success") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + List clusters = admin.clusters().getClusters(); + assertThat(clusters).containsOnly("test-cluster"); + + server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-success-without-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "success"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorTimeoutWithoutRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 5000; + int serverDelayMs = 8000; + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-without-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("end") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> { + admin.clusters().getClusters(); + }); + + server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-timeout-without-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "end"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorSuccessWithRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 10000; + int serverDelayMs = 7000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-with-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("first-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-with-retry") + .whenScenarioStateIs("first-call") + .willSetStateTo("success") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + List clusters = admin.clusters().getClusters(); + assertThat(clusters).containsOnly("test-cluster"); + + server.verify(2, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-success-with-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "success"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorTimeoutWithRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 14000; + int serverDelayMs = 6000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("first-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs("first-call") + .willSetStateTo("second-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs("second-call") + .willSetStateTo("end") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> { + admin.clusters().getClusters(); + }); + + server.verify(3, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-timeout-with-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "end"); + } + }