Add Avro payload decoding support to ingestion-kafka plugin#22364
Add Avro payload decoding support to ingestion-kafka plugin#22364Tarun-kishore wants to merge 5 commits into
Conversation
Adds an AvroPayloadDecoder that converts Avro-binary Kafka messages to
JSON bytes before they reach the indexing pipeline. Configuration lives
in the existing ingestion_source param map under avro.* keys:
avro.schema – inline Avro JSON schema
avro.schema_registry_url – URL to fetch the schema from at startup
avro.schema_registry_headers.<name> – HTTP headers for that request
avro.schema_registry_connect_timeout_ms – connect timeout (default 10s)
avro.schema_registry_request_timeout_ms – request timeout (default 10s)
avro.skip_bytes – header bytes to strip before decoding
(e.g. 5 for Confluent wire format)
avro.wrapper_schema / avro.wrapper_field
– optional envelope schema with a field
substituted at startup by the inner schema
avro.msg_field – record field to use as the document source
If no avro.* params are present the existing passthrough path is used
unchanged, so there is no behaviour change for non-Avro topics.
Because Avro requires Jackson to parse schemas, and OpenSearch plugin
classloaders cannot reach the server's lib/ JARs, both Avro and Jackson
are embedded in the plugin JAR via the Gradle shadow plugin and relocated
to a private package (org.opensearch.plugin.kafka.shaded.*). This keeps
the plugin self-contained and avoids the OpenSearch jar-hell check that
fires when the same fully-qualified class name appears in two JARs.
Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit be0ecc4.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
| 'testcontainers': '1.19.7', | ||
| 'ducttape': '1.0.8', | ||
| 'snappy': '1.1.10.7', | ||
| 'avro': '1.12.0', |
There was a problem hiding this comment.
https://github.com/opensearch-project/OpenSearch/blob/main/DEVELOPER_GUIDE.md#adding-dependencies
We have to copy the license and library SHAs when we add a new dependency. I see avro licenses added here for reference. Let us do this for all new dependencies.
There was a problem hiding this comment.
Added licenses for dependencies
| .timeout(Duration.ofMillis(requestTimeoutMs)) | ||
| .GET(); | ||
| headers.forEach(builder::header); | ||
| HttpResponse<String> response = client.send(builder.build(), HttpResponse.BodyHandlers.ofString()); |
There was a problem hiding this comment.
Just curious - have we tested this connection if it works? What is the expectation on failures, do we retry?
There was a problem hiding this comment.
We have tested the connection and the node was able to connect to schema registry. We do not retry the error as the error can be because of bad configs too. Should we retry in case of failure?
| lastFetchedOffset = currentOffset; | ||
| KafkaOffset kafkaOffset = new KafkaOffset(currentOffset); | ||
| KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp()); | ||
| KafkaMessage message = new KafkaMessage(messageAndOffset.key(), payloadDecoder.decode(messageAndOffset.value()), messageAndOffset.timestamp()); |
There was a problem hiding this comment.
If payloadDecoder.decode throws an exception, we skip the remaining messages in the batch. On the next poller loop, it looks like we will start from the next batch resulting in skipping messages. Can we double check this?
There was a problem hiding this comment.
Updated to have per message try-catch, so only bad message will be skipped
| this.config = config; | ||
| this.partitionId = partitionId; | ||
| Map<String, Object> avroParams = config.getAvroParams(); | ||
| this.payloadDecoder = avroParams.isEmpty() ? KafkaPayloadDecoder.PASSTHROUGH : new AvroPayloadDecoder(avroParams); |
There was a problem hiding this comment.
If AvroPayloadDecoder fails to be initialized, we can leak Kafka connections. There was a recent fix to handle this for initialize failures (ref), but failures here in the constructor is not handled. We will have to move this to the initialize method or think or a better way outside the constructor.
There was a problem hiding this comment.
Moved the logic to initialize method which closes the consumer on failure.
| * changes to the ingestion pipeline are required. | ||
| */ | ||
| @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class AvroIngestionFromKafkaIT extends KafkaIngestionBaseIT { |
There was a problem hiding this comment.
Let us add some tests on schema evolution, error handling (invalid message, null fields). Let's focus more on schema evolution, testing the different supported formats (including the confluent style format) with field addition cases. Along with this, we can also add a test to ensure ingestion works when the kafka topic has 2 different schema messages (without newly added fields + few with new fields) to test backward compatibility.
| return value; | ||
| } | ||
|
|
||
| private static byte[] toJsonBytes(Map<String, Object> map) { |
There was a problem hiding this comment.
We could reuse XContentBuilder, something like
private static byte[] toJsonBytes(Map<String, Object> map) {
try {
return BytesReference.toBytes(BytesReference.bytes(XContentFactory.jsonBuilder().map(map)));
} catch (IOException e) {
throw new IllegalArgumentException("Failed to serialize decoded Avro record as JSON", e);
}
}
We also do a similar mapping here.
We can explore if this cleans up the code, combining with convertAvroValue being schema aware and coverting the avro types to plain Java types.
There was a problem hiding this comment.
Updated to use XContentFactory. Will explore change to convert avro type to Java type
| } | ||
| // Fall back to Confluent-style {"schema":"<escaped-json>"} wrapper | ||
| logger.debug("Direct parse failed ({}), trying Confluent schema-field extraction", e.getMessage()); | ||
| int keyIdx = body.indexOf("\"schema\""); |
There was a problem hiding this comment.
Is it possible to use a JSONParser or XContentParser here instead of doing it manually?
There was a problem hiding this comment.
Yes, updated to use XContentParser
| logger.debug("Extracted msg_field=[{}] with {} fields", msgField, map.size()); | ||
| } | ||
|
|
||
| byte[] jsonBytes = toJsonBytes(map); |
There was a problem hiding this comment.
It looks like we are converting the avro record into a Map, then back to json bytes here, and later down in the pipeline again back to a Map. This might need changes downstream as well, but wonder if we can avoid the serialize/parse back and forth? We can add a note if not easy to implement as a TODO.
There was a problem hiding this comment.
Added a TODO comment, we can take up this optimization in future
There was a problem hiding this comment.
have we done any benchmarks on this avro decoder?
my gut feeling/concern is that this is going to be so slow it won't actually be usable in any real world application
| } | ||
| } | ||
|
|
||
| static Schema substituteInnerSchema(Schema wrapper, Schema inner, String wrapperField) { |
There was a problem hiding this comment.
Wonder where this will be useful? I was thinking the avro payload will follow the exact format required by the PBI flow, with same field name requirements. Later the message mappers will be able to do the transformations?
There was a problem hiding this comment.
Some Kafka topics use an outer envelope record that carries routing/operational metadata alongside the actual document payload. An envelope may be like:
{
"type": "record",
"name": "kafka_envelope",
"fields": [
{ "name": "ts", "type": "double" },
{ "name": "host", "type": "string" },
{ "name": "schema_id", "type": ["null", "int"] },
{ "name": "msg", "type": "null" }
]
}
The msg field is a placeholder — its actual schema is the document schema, which may be owned and versioned separately (e.g. registered independently in the schema registry). avro.wrapper_schema + avro.wrapper_field lets the user configure the envelope and the inner document schema independently. At index creation time they are merged by substituting the inner schema into the named wrapper_field. After decoding, avro.msg_field extracts just the inner record as the document source, discarding the envelope metadata fields like ts, host, dc etc.
Without this, the user would have to maintain one monolithic schema that hardcodes the full inner document definition inside the envelope, which breaks cleanly when the inner schema evolves independently under its own registry subject.
- Move AvroPayloadDecoder initialization from constructor to initialize() so that failures (e.g. schema registry unreachable) are caught by KafkaConsumerFactory, which closes the Kafka consumer to avoid leaks - Catch decode exceptions per message in fetch() with error logging and skip, instead of propagating out of the loop. Without this, a single undecodeable message would cause an infinite retry loop on the same batch offset - Replace hand-rolled toJsonBytes/toJsonString/escapeJson with XContentFactory.jsonBuilder().map() to reuse existing OpenSearch infrastructure and reduce code surface - Replace manual Confluent-style schema body parsing with XContentParser; rename parseSchemaFromBody to extractSchemaJson and return String so the method can be called from tests without crossing the shadow-JAR relocation boundary - Add TODO noting the Avro->Map->JSON->Map roundtrip that exists because the ingestion pipeline re-parses JSON bytes in MessageProcessorRunnable - Add license SHA1 files for avro, jackson-core, jackson-databind, and jackson-annotations (new dependencies introduced by this change) - Add IT tests for: corrupt message skipped without stopping ingestion, schema evolution with an optional field across a batch Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
| try { | ||
| value = payloadDecoder.decode(messageAndOffset.value()); | ||
| } catch (Exception e) { | ||
| logger.error("Failed to decode message at offset {}, skipping: {}", currentOffset, e.getMessage(), e); |
There was a problem hiding this comment.
Skipping failed messages here does not look correct, as this decision is left to the poller and processor depending on user configured DROP/BLOCK strategies. I'm thinking one way to handle this could be to always return the KafkaMessage (marked as failure if there are decoding errors). Later when the payload is read from KafkaMessage, we can throw an exception. Alternatively, we could lazily decode the payload when KafkaMessage.getPayload() is called (with caching). This however needs more thinking and handle thread visibility requirements. Both these approaches should follow the existing DROP/BLOCK strategies.
Please take a look and we can discuss if there are any challenges with this approach or if it doesn't make sense.
There was a problem hiding this comment.
i'm also not sure it's a good idea to log errors in the hot loop here. if 1 message errors, chances are all the messages will error, this will probably blow up your system just from the logging
There was a problem hiding this comment.
Updated to move the decoding in KafkaMessage.getPayload(). and the logging for failed message would have the same behaviour as other failures in pull based ingestion. As for thread visibility, GenericDatumReader is not thread-safe, so I've removed the cached reader field from AvroPayloadDecoder and it's created one per decode() call instead.
| try { | ||
| value = payloadDecoder.decode(messageAndOffset.value()); | ||
| } catch (Exception e) { | ||
| logger.error("Failed to decode message at offset {}, skipping: {}", currentOffset, e.getMessage(), e); |
There was a problem hiding this comment.
i'm also not sure it's a good idea to log errors in the hot loop here. if 1 message errors, chances are all the messages will error, this will probably blow up your system just from the logging
| * <p>The default no-op implementation is {@link #PASSTHROUGH}, which returns the raw bytes unchanged. | ||
| */ | ||
| @FunctionalInterface | ||
| public interface KafkaPayloadDecoder { |
There was a problem hiding this comment.
does this need to be limited to kafka?
what if someone wants to use this for kinesis?
There was a problem hiding this comment.
Moved a decoder to poller. So now if we want to extend similar approach to kinesis, PayloadDecoder can be extended
| logger.debug("Extracted msg_field=[{}] with {} fields", msgField, map.size()); | ||
| } | ||
|
|
||
| byte[] jsonBytes = toJsonBytes(map); |
There was a problem hiding this comment.
have we done any benchmarks on this avro decoder?
my gut feeling/concern is that this is going to be so slow it won't actually be usable in any real world application
- GenericEnumSymbol not handled: enum fields fell through to return value, which Jackson cannot serialize (relocated type). Now calls toString(). - Top-level schema validated as RECORD in constructor. Non-record schemas (enum, array, etc.) previously caused ClassCastException at decode time. - avro.msg_field validated against schema at construction. A wrong field name previously caused map.get() to return null, silently treating every message as a tombstone and dropping all data. A null value in a valid nullable union field now throws IllegalArgumentException rather than returning null, which previously propagated a null payload into KafkaMessage and caused NullPointerException in the downstream mapper. - substituteInnerSchema now preserves field order and aliases on the substituted field (previously only doc was copied). - avro.wrapper_schema now handles arriving as a Map (same as avro.schema), rather than calling Map.toString() and passing invalid JSON to Schema.Parser. - avro.skip_bytes now accepts Number values (e.g. Integer, Double) in addition to String, avoiding NumberFormatException on Double inputs. - ByteBuffer.duplicate() now calls rewind() to reset position to 0, ensuring base64 encoding starts from the correct offset. - NaN and Infinity in float/double fields are replaced with null and logged as a warning, rather than causing JsonGenerationException. - Schema registry URL is trimmed before URI.create() to prevent IllegalArgumentException on leading/trailing whitespace. - connect/request timeout values are validated positive via parsePositiveMs, with consistent Number-aware parsing and a clear error message. - Empty payload after skip_bytes is now returned as null (tombstone) rather than thrown as an exception, matching the documented contract. - Document schema evolution limitation: GenericDatumReader uses the configured schema as both writer and reader schema. True evolution requires writer schema per-message (e.g. Confluent wire format). - Document logical types limitation: timestamp/date/time decoded as raw int/long; decimal (bytes/fixed) base64-encoded, not a decimal string. - Decode errors now respect DROP/BLOCK strategy: KafkaMessage decodes lazily in getPayload() instead of eagerly in fetch(). Previously, a decode failure in fetch() was swallowed with a per-message continue, bypassing the poller's error handling entirely. Now the raw bytes are stored in KafkaMessage and decode() is called when the processor reads the payload — errors propagate through the existing errorStrategy path. GenericDatumReader is created per decode() call (not cached) so the method is thread-safe when called from processor threads. Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
- Move all heavy logic (param parsing, schema fetch, HTTP call) out of the constructor into a static factory method AvroPayloadDecoder.create(). The private constructor becomes a trivial field assignment that cannot fail, following the same pattern used for KafkaPartitionConsumer (ref opensearch-project#21974). KafkaPartitionConsumer.initialize() calls create() instead of new(). - Add payloadDecoder=PASSTHROUGH to constructor log for clarity. - Add getParam helper to reduce repeated params.containsKey/String.valueOf boilerplate in create(). - Move registryUrl/inlineSchema null validation before wrapper schema parsing so config errors fail fast before any schema work. - Extract schema resolution to a private resolveSchema() method, separating registry fetch path from inline parse path. - Remove logger.error in decode() catch blocks — logging and re-throwing causes double logging; the error surfaces through the caller's errorStrategy. - Replace isDebugEnabled guard with logger.debug(Supplier) lambda so the hex preview and record.getSchema().getFullName() calls are skipped entirely when debug logging is off. - Fix hex preview loop to start at skipBytes directly. - Avoid converting the full record to a Map when avro.msg_field is set: call record.get(msgField) directly on the GenericRecord and only convert the nested record, skipping envelope fields entirely. - Use instanceof pattern matching (Java 16) with braces throughout convertAvroValue() for consistency. - Catch SchemaParseException directly in extractSchemaJson() instead of catching Throwable and checking the type manually. - Add per-artifact LICENSE and NOTICE files for jackson-core, jackson-databind, and jackson-annotations (replacing the incorrectly named jackson-LICENSE.txt / jackson-NOTICE.txt). Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
…o state
Move KafkaPayloadDecoder out of the Kafka plugin into the server's
pollingingest package as PayloadDecoder. The interface is now available
to any ingestion source (Kafka, Kinesis, etc.) without depending on the
Kafka plugin. KafkaPayloadDecoder is deleted; all references updated to
the server interface.
Add ThreadLocal reuse for GenericDatumReader, BinaryDecoder, and
GenericRecord in AvroPayloadDecoder. These objects are not thread-safe
so each processor thread gets its own instance, but the same instance is
reused across successive messages on that thread, avoiding per-message
allocation on the hot path.
Add avro.wrapper_schema_registry_url so the wrapper schema can be fetched
from a schema registry independently of the inner schema. The wrapper
registry may use different auth and timeout settings:
avro.wrapper_schema_registry_url
avro.wrapper_schema_registry_headers.<name>
avro.wrapper_schema_registry_connect_timeout_ms (default 10 000)
avro.wrapper_schema_registry_request_timeout_ms (default 10 000)
avro.wrapper_schema (inline) and avro.wrapper_schema_registry_url are
mutually exclusive. avro.wrapper_field is required when either is set.
resolveHeaders() now takes an explicit prefix so each registry call uses
its own header set. resolveSchema() takes explicit headers and timeouts
instead of extracting them from params internally.
Fix double parse: replace extractSchemaJson (returned String, triggered
two Schema.Parser calls for the registry path) with parseSchemaFromBody
(returns Schema, single parse). Both the direct Avro JSON and Confluent
{"schema":"..."} wrapper formats now parse exactly once. As a
side-effect, avro.schema now also accepts Confluent-format bodies inline.
Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
eff448a to
be0ecc4
Compare
Description
Summary
The Kafka ingestion plugin now supports Avro-binary encoded messages via an opt-in
AvroPayloadDecoder. Users configure it by addingavro.*keys to the index'singestion_source.parammap — no change is required for existing JSON topics.The schema can be supplied inline (
avro.schema) or fetched from a schema registry URLat index creation time (
avro.schema_registry_url). Anavro.skip_bytesoption stripsframing headers (e.g. the 5-byte Confluent wire format prefix) before decoding.
An
avro.msg_fieldoption extracts a nested record as the document source, enablingenvelope-style message layouts where the actual document lives inside a wrapper record.
Changes
KafkaPayloadDecoder— new functional interface with aPASSTHROUGHconstant;used when no
avro.*params are present so the existing code path is unaffected.AvroPayloadDecoder— decodes Avro binary to JSON bytes. Handles nested records,arrays, maps, bytes (
ByteBuffer/GenericFixed), schema registry fetch withconfigurable timeouts, and both plain and Confluent-style schema registry responses.
KafkaSourceConfig— extracts and removesavro.*keys from the param map sothey are never forwarded to the Kafka consumer.
KafkaPartitionConsumer— wires the decoder; usesPASSTHROUGHwhen no Avroparams are present.
build.gradle— applies the shadow plugin to embed Avro and Jackson inside theplugin JAR with package relocation, avoiding conflicts with OpenSearch's own JARs.
Dependency approach
Why Avro and Jackson must be bundled
Avro uses Jackson internally to parse schema JSON (
Schema.Parser.parse()). Jackson isalso a core dependency of OpenSearch itself and ships in the server's
lib/directory.A first instinct might be to declare Avro as a regular
runtimeOnlydependency and letit share the server's Jackson at runtime. This does not work for two reasons:
Plugin classloader isolation. OpenSearch loads each plugin in its own isolated
classloader that has no visibility into the server's
lib/directory. Any class thatAvro tries to load from Jackson at runtime will result in a
NoClassDefFoundErrorunless Jackson is bundled inside the plugin itself.
OpenSearch jar-hell check. Even if isolation were not a concern, OpenSearch runs
a static jar-hell check at plugin load time that rejects any plugin whose JAR contains
a class name already present in another JAR on the classpath. Bundling a plain
(unrelocated) Avro JAR alongside the server's Jackson would not trigger this — but
bundling a plain Jackson JAR alongside the server's Jackson would, because the same
fully-qualified class names would appear in two different JARs.
The shadow JAR solution
The Gradle shadow plugin is applied to the ingestion-kafka plugin. It produces a single
"fat" JAR that embeds Avro and Jackson and rewrites their package names at the
bytecode level before the JAR is written:
org.apache.avro.**→org.opensearch.plugin.kafka.shaded.avro.**com.fasterxml.jackson.**→org.opensearch.plugin.kafka.shaded.jackson.**org.slf4j.**→org.opensearch.plugin.kafka.shaded.slf4j.**Because the embedded classes now live under a private package, the jar-hell check sees
no overlap with the server's Jackson or SLF4J JARs. At runtime, Avro's internal
jackson.*references — already rewritten in the bytecode by the shadow plugin — resolveto the embedded copy, so
Schema.Parser.parse()works correctly inside the isolatedplugin classloader.
Test classpath
The shadow plugin's Gradle integration (via
OpenSearchTestBasePlugin) strips allapi/runtimeOnlydependencies from the test classpath on the assumption they areembedded. This would silently remove
kafka-clients,snappy-java, and other JARs thatare not embedded. An
afterEvaluateblock re-addsruntimeClasspathto the testtask's classpath to restore them. The relocated packages inside the shadow JAR use
different class names from the originals still present in
runtimeClasspath, so nojar-hell conflict is introduced.
Testing
skip_bytes,msg_fieldextraction, wrapper schema substitution, tombstone handling,
GenericFixedencoding,and JSON escaping edge cases.
AvroIngestionFromKafkaIT) use Testcontainers to spin up a realKafka broker and verify end-to-end that Avro messages are decoded and indexed with
correct field values, that
skip_bytesstrips headers correctly, and thatAvro-encoded delete operations propagate as expected.
Related Issues
Resolves #22363
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.