Description
There's a bug in _consumer.js where the #clearCacheAndResetPositions method seeks to the wrong offset, causing already-processed messages to be reprocessed.
Bug Location
File: consumer.js
Method: #clearCacheAndResetPositions
The comment on the method clearly states the intended behavior:
/* Seek to stored offset for each topic partition. It's possible that we've
* consumed messages upto N from the internalClient, but the user has stale'd the cache
* after consuming just k (< N) messages. We seek back to last consumed offset + 1. */
However, the actual implementation does not add +1 to the offset:
const lastConsumedOffsets = this.#lastConsumedOffsets.get(key);
const topicPartitionOffsets = [
{
topic: topicPartition.topic,
partition: topicPartition.partition,
offset: lastConsumedOffsets.offset, // Bug: should be lastConsumedOffsets.offset + 1
leaderEpoch: lastConsumedOffsets.leaderEpoch,
}
];
seeks.push(this.#seekInternal(topicPartitionOffsets));
Expected Behavior
When cache expiration triggers #clearCacheAndResetPositions, the consumer should seek to lastConsumedOffset + 1 to avoid reprocessing the last successfully processed message.
Actual Behavior
The consumer seeks to lastConsumedOffset, causing the last successfully processed message to be consumed and processed again.
Description
There's a bug in _consumer.js where the #clearCacheAndResetPositions method seeks to the wrong offset, causing already-processed messages to be reprocessed.
Bug Location
File: consumer.js
Method: #clearCacheAndResetPositions
The comment on the method clearly states the intended behavior:
However, the actual implementation does not add +1 to the offset:
Expected Behavior
When cache expiration triggers #clearCacheAndResetPositions, the consumer should seek to lastConsumedOffset + 1 to avoid reprocessing the last successfully processed message.
Actual Behavior
The consumer seeks to lastConsumedOffset, causing the last successfully processed message to be consumed and processed again.