From 2eea44a0a8fd4cde898225e8d9c19fac195df030 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 31 Oct 2025 08:31:25 +0800 Subject: [PATCH 1/4] [fix][broker] Fix replicator producer can not share with other exclusive producers --- .../broker/service/AbstractReplicator.java | 2 ++ .../pulsar/broker/service/AbstractTopic.java | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index f996d328090ca..5fd5eb874ff55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerBuilderImpl; @@ -197,6 +198,7 @@ public void startProducer() { ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; builderImpl.getConf().setNonPartitionedTopicExpected(true); builderImpl.getConf().setReplProducer(true); + builderImpl.getConf().setAccessMode(ProducerAccessMode.Shared); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 24bce1e39bb8b..a652b5d29ec5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -803,7 +803,8 @@ protected CompletableFuture> incrementTopicEpochIfNeeded(Producer try { switch (producer.getAccessMode()) { case Shared: - if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) { + // replication producer must be shared mode + if ((!producer.isRemote()) && (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty())) { return FutureUtil.failedFuture( new ProducerBusyException( "Topic has an existing exclusive producer: " + exclusiveProducerName)); @@ -817,12 +818,12 @@ protected CompletableFuture> incrementTopicEpochIfNeeded(Producer return FutureUtil.failedFuture( new ProducerFencedException( "Topic has an existing exclusive producer: " + exclusiveProducerName)); - } else if (!producers.isEmpty()) { + } else if (hasAnyNonReplicatorProducer()) { return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers")); } return handleTopicEpochForExclusiveProducer(producer); case ExclusiveWithFencing: - if (hasExclusiveProducer || !producers.isEmpty()) { + if (hasExclusiveProducer || hasAnyNonReplicatorProducer()) { // clear all waiting producers // otherwise closing any producer will trigger the promotion // of the next pending producer @@ -836,13 +837,15 @@ protected CompletableFuture> incrementTopicEpochIfNeeded(Producer handle.getKey().close(true); }); producers.forEach((k, currentProducer) -> { - log.info("[{}] Fencing out producer {}", topic, currentProducer); - currentProducer.close(true); + if (!currentProducer.isRemote()) { + log.info("[{}] Fencing out producer {}", topic, currentProducer); + currentProducer.close(true); + } }); } return handleTopicEpochForExclusiveProducer(producer); case WaitForExclusive: { - if (hasExclusiveProducer || !producers.isEmpty()) { + if (hasExclusiveProducer || (hasAnyNonReplicatorProducer())) { CompletableFuture> future = new CompletableFuture<>(); log.info("[{}] Queuing producer {} since there's already a producer", topic, producer); waitingExclusiveProducers.add(Pair.of(producer, future)); @@ -1381,4 +1384,8 @@ public boolean isSystemCursor(String sub) { return COMPACTION_SUBSCRIPTION.equals(sub) || (additionalSystemCursorNames != null && additionalSystemCursorNames.contains(sub)); } + + private boolean hasAnyNonReplicatorProducer() { + return !producers.isEmpty() && !producers.values().stream().allMatch(Producer::isRemote); + } } From 51d41ec4082b9bd2953169a4d6c14496e23a07a6 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 31 Oct 2025 13:26:31 +0800 Subject: [PATCH 2/4] add producer exclusive mode test --- .../broker/service/OneWayReplicatorTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index d7b8d71a5a489..e38f47ef5d470 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -63,6 +63,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.AllArgsConstructor; +import lombok.Cleanup; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -87,6 +88,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -1858,4 +1860,29 @@ public void testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() thr // Verify: all inflight tasks are done. ensureNoBacklogByInflightTask(getReplicator(topicName)); } + + @DataProvider(name = "producerAccessMode") + public Object[][] producerAccessModeProvider() { + return new Object[][]{{ProducerAccessMode.Exclusive}, {ProducerAccessMode.ExclusiveWithFencing}, + {ProducerAccessMode.WaitForExclusive}}; + } + + @Test(dataProvider = "producerAccessMode") + public void testReplicatorProducerWithExclusiveAccessMode(ProducerAccessMode producerAccessMode) throws Exception { + final String topicName = + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/exclusive-topic"); + final String subscribeName = "subscribe_1"; + final byte[] msgValue = "test".getBytes(); + + // cluster1 replicates exclusive-topic messages to cluster2, but cluster2 has an exclusive producer. + @Cleanup Producer producer1 = client1.newProducer().topic(topicName).create(); + @Cleanup Producer producer2 = + client2.newProducer().topic(topicName).accessMode(producerAccessMode).create(); + @Cleanup Consumer consumer2 = + client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe(); + producer1.newMessage().value(msgValue).send(); + pulsar1.getBrokerService().checkReplicationPolicies(); + assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue); + } + } From 3fa1e77cb713deb83aecf514f0cf7903c5e04a76 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 31 Oct 2025 21:31:15 +0800 Subject: [PATCH 3/4] modify hasAnyNonReplicatorProducer --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index a652b5d29ec5f..4dfcab71ac748 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1386,6 +1386,6 @@ public boolean isSystemCursor(String sub) { } private boolean hasAnyNonReplicatorProducer() { - return !producers.isEmpty() && !producers.values().stream().allMatch(Producer::isRemote); + return !producers.isEmpty() && producers.values().stream().anyMatch(p -> !p.isRemote()); } } From 0a026cc592e19e3e79805de3b98f75384c45e5aa Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Sat, 22 Nov 2025 08:00:17 +0800 Subject: [PATCH 4/4] add testExclusiveProducerWithReplicatorProducer test --- .../broker/service/OneWayReplicatorTest.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index e38f47ef5d470..030725570642c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1868,13 +1868,14 @@ public Object[][] producerAccessModeProvider() { } @Test(dataProvider = "producerAccessMode") - public void testReplicatorProducerWithExclusiveAccessMode(ProducerAccessMode producerAccessMode) throws Exception { + public void testReplicatorProducerWithExclusiveProducer(ProducerAccessMode producerAccessMode) throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/exclusive-topic"); final String subscribeName = "subscribe_1"; final byte[] msgValue = "test".getBytes(); - // cluster1 replicates exclusive-topic messages to cluster2, but cluster2 has an exclusive producer. + // cluster1 replicates topic messages to cluster2 first, so the replicator producer is added in cluster2 + // then add an exclusive producer in cluster2 @Cleanup Producer producer1 = client1.newProducer().topic(topicName).create(); @Cleanup Producer producer2 = client2.newProducer().topic(topicName).accessMode(producerAccessMode).create(); @@ -1885,4 +1886,22 @@ public void testReplicatorProducerWithExclusiveAccessMode(ProducerAccessMode pro assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue); } + @Test(dataProvider = "producerAccessMode") + public void testExclusiveProducerWithReplicatorProducer(ProducerAccessMode producerAccessMode) throws Exception { + final String topicName = + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/exclusive-topic"); + final String subscribeName = "subscribe_1"; + final byte[] msgValue = "test".getBytes(); + + // cluster2 has an exclusive producer first, then cluster1 replicates topic messages to cluster2 + @Cleanup Producer producer2 = + client2.newProducer().topic(topicName).accessMode(producerAccessMode).create(); + @Cleanup Producer producer1 = client1.newProducer().topic(topicName).create(); + @Cleanup Consumer consumer2 = + client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe(); + producer1.newMessage().value(msgValue).send(); + pulsar1.getBrokerService().checkReplicationPolicies(); + assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue); + } + }