Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
Expand Down Expand Up @@ -197,6 +198,7 @@ public void startProducer() {
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
builderImpl.getConf().setReplProducer(true);
builderImpl.getConf().setAccessMode(ProducerAccessMode.Shared);
Comment thread
oneby-wang marked this conversation as resolved.
return producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
// replication producer must be shared mode
if ((!producer.isRemote()) && (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty())) {
return FutureUtil.failedFuture(
new ProducerBusyException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
Expand All @@ -817,12 +818,12 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
return FutureUtil.failedFuture(
new ProducerFencedException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
} else if (!producers.isEmpty()) {
} else if (hasAnyNonReplicatorProducer()) {
return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
}
return handleTopicEpochForExclusiveProducer(producer);
case ExclusiveWithFencing:
if (hasExclusiveProducer || !producers.isEmpty()) {
if (hasExclusiveProducer || hasAnyNonReplicatorProducer()) {
// clear all waiting producers
// otherwise closing any producer will trigger the promotion
// of the next pending producer
Expand All @@ -836,13 +837,15 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
handle.getKey().close(true);
});
producers.forEach((k, currentProducer) -> {
log.info("[{}] Fencing out producer {}", topic, currentProducer);
currentProducer.close(true);
if (!currentProducer.isRemote()) {
log.info("[{}] Fencing out producer {}", topic, currentProducer);
currentProducer.close(true);
}
});
}
return handleTopicEpochForExclusiveProducer(producer);
case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
if (hasExclusiveProducer || (hasAnyNonReplicatorProducer())) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
waitingExclusiveProducers.add(Pair.of(producer, future));
Expand Down Expand Up @@ -1381,4 +1384,8 @@ public boolean isSystemCursor(String sub) {
return COMPACTION_SUBSCRIPTION.equals(sub)
|| (additionalSystemCursorNames != null && additionalSystemCursorNames.contains(sub));
}

private boolean hasAnyNonReplicatorProducer() {
return !producers.isEmpty() && producers.values().stream().anyMatch(p -> !p.isRemote());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -87,6 +88,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -1858,4 +1860,48 @@ public void testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() thr
// Verify: all inflight tasks are done.
ensureNoBacklogByInflightTask(getReplicator(topicName));
}

@DataProvider(name = "producerAccessMode")
public Object[][] producerAccessModeProvider() {
return new Object[][]{{ProducerAccessMode.Exclusive}, {ProducerAccessMode.ExclusiveWithFencing},
{ProducerAccessMode.WaitForExclusive}};
}

@Test(dataProvider = "producerAccessMode")
public void testReplicatorProducerWithExclusiveProducer(ProducerAccessMode producerAccessMode) throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/exclusive-topic");
final String subscribeName = "subscribe_1";
final byte[] msgValue = "test".getBytes();

// cluster1 replicates topic messages to cluster2 first, so the replicator producer is added in cluster2
// then add an exclusive producer in cluster2
@Cleanup Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
@Cleanup Producer<byte[]> producer2 =
client2.newProducer().topic(topicName).accessMode(producerAccessMode).create();
@Cleanup Consumer<byte[]> consumer2 =
client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
}

@Test(dataProvider = "producerAccessMode")
public void testExclusiveProducerWithReplicatorProducer(ProducerAccessMode producerAccessMode) throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/exclusive-topic");
final String subscribeName = "subscribe_1";
final byte[] msgValue = "test".getBytes();

// cluster2 has an exclusive producer first, then cluster1 replicates topic messages to cluster2
@Cleanup Producer<byte[]> producer2 =
client2.newProducer().topic(topicName).accessMode(producerAccessMode).create();
@Cleanup Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
@Cleanup Consumer<byte[]> consumer2 =
client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
}

}