From e9bd61399a4b8bf47feac3b65d0e32e93d12fd77 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Wed, 1 Dec 2021 10:34:25 +0800 Subject: [PATCH 1/2] add newSqlClientConf method for DynamicTableSink --- .../pulsar/internal/PulsarClientUtils.java | 28 +++++++++++++++++++ .../pulsar/table/PulsarDynamicTableSink.java | 2 +- .../table/PulsarDynamicTableSource.java | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java index e5ef180f..6f2d5aad 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java @@ -18,15 +18,20 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.shaded.curator4.com.google.common.collect.Maps; + import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; /** Utility to create Pulsar Admin Client from adminUrl and clientConfigurationData. */ public class PulsarClientUtils { @@ -65,4 +70,27 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie } return clientConf; } + + public static ClientConfigurationData newSqlClientConf(String serviceUrl, + Properties properties) { + Map clientConfData = getClientParams(Maps.fromProperties(properties)); + ClientConfigurationData clientConf = new ClientConfigurationData(); + clientConf = + ConfigurationDataUtils.loadData( + clientConfData, clientConf, ClientConfigurationData.class); + clientConf.setServiceUrl(serviceUrl); + return clientConf; + } + + public static Map getClientParams(Map parameters) { + return parameters.keySet().stream() + .filter(k -> k.startsWith(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX)) + .collect( + Collectors.toMap( + k -> + k.substring( + PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX + .length()), + k -> parameters.get(k))); + } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java index 99d69cf5..6feca79d 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java @@ -237,7 +237,7 @@ private SinkFunction createPulsarSink( Properties properties, PulsarSerializationSchema pulsarSerializer) { final ClientConfigurationData configurationData = - PulsarClientUtils.newClientConf(serviceUrl, properties); + PulsarClientUtils.newSqlClientConf(serviceUrl, properties); return new FlinkPulsarSink( adminUrl, Optional.ofNullable(topic), diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java index 528d13f0..90301bd5 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java @@ -217,7 +217,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { createPulsarDeserialization( keyDeserialization, valueDeserialization, producedTypeInfo); final ClientConfigurationData clientConfigurationData = - PulsarClientUtils.newClientConf(serviceUrl, properties); + PulsarClientUtils.newSqlClientConf(serviceUrl, properties); FlinkPulsarSource source = new FlinkPulsarSource<>( adminUrl, clientConfigurationData, deserializationSchema, properties); From 86ef42445dd1921708bb2b171f23d99132fbc796 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Wed, 1 Dec 2021 10:39:01 +0800 Subject: [PATCH 2/2] fix unit error --- .../connectors/pulsar/internal/PulsarClientUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java index 6f2d5aad..34807601 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java @@ -71,8 +71,8 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie return clientConf; } - public static ClientConfigurationData newSqlClientConf(String serviceUrl, - Properties properties) { + public static ClientConfigurationData newSqlClientConf( + String serviceUrl, Properties properties) { Map clientConfData = getClientParams(Maps.fromProperties(properties)); ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf =