Skip to content

Add Avro payload decoding support to ingestion-kafka plugin#22364

Open
Tarun-kishore wants to merge 5 commits into
opensearch-project:mainfrom
Tarun-kishore:feature/avro-payload-decoder-kafka
Open

Add Avro payload decoding support to ingestion-kafka plugin#22364
Tarun-kishore wants to merge 5 commits into
opensearch-project:mainfrom
Tarun-kishore:feature/avro-payload-decoder-kafka

Conversation

@Tarun-kishore

Copy link
Copy Markdown

Description

Summary

The Kafka ingestion plugin now supports Avro-binary encoded messages via an opt-in
AvroPayloadDecoder. Users configure it by adding avro.* keys to the index's
ingestion_source.param map — no change is required for existing JSON topics.

The schema can be supplied inline (avro.schema) or fetched from a schema registry URL
at index creation time (avro.schema_registry_url). An avro.skip_bytes option strips
framing headers (e.g. the 5-byte Confluent wire format prefix) before decoding.
An avro.msg_field option extracts a nested record as the document source, enabling
envelope-style message layouts where the actual document lives inside a wrapper record.

Changes

  • KafkaPayloadDecoder — new functional interface with a PASSTHROUGH constant;
    used when no avro.* params are present so the existing code path is unaffected.
  • AvroPayloadDecoder — decodes Avro binary to JSON bytes. Handles nested records,
    arrays, maps, bytes (ByteBuffer/GenericFixed), schema registry fetch with
    configurable timeouts, and both plain and Confluent-style schema registry responses.
  • KafkaSourceConfig — extracts and removes avro.* keys from the param map so
    they are never forwarded to the Kafka consumer.
  • KafkaPartitionConsumer — wires the decoder; uses PASSTHROUGH when no Avro
    params are present.
  • build.gradle — applies the shadow plugin to embed Avro and Jackson inside the
    plugin JAR with package relocation, avoiding conflicts with OpenSearch's own JARs.

Dependency approach

Why Avro and Jackson must be bundled

Avro uses Jackson internally to parse schema JSON (Schema.Parser.parse()). Jackson is
also a core dependency of OpenSearch itself and ships in the server's lib/ directory.

A first instinct might be to declare Avro as a regular runtimeOnly dependency and let
it share the server's Jackson at runtime. This does not work for two reasons:

  1. Plugin classloader isolation. OpenSearch loads each plugin in its own isolated
    classloader that has no visibility into the server's lib/ directory. Any class that
    Avro tries to load from Jackson at runtime will result in a NoClassDefFoundError
    unless Jackson is bundled inside the plugin itself.

  2. OpenSearch jar-hell check. Even if isolation were not a concern, OpenSearch runs
    a static jar-hell check at plugin load time that rejects any plugin whose JAR contains
    a class name already present in another JAR on the classpath. Bundling a plain
    (unrelocated) Avro JAR alongside the server's Jackson would not trigger this — but
    bundling a plain Jackson JAR alongside the server's Jackson would, because the same
    fully-qualified class names would appear in two different JARs.

The shadow JAR solution

The Gradle shadow plugin is applied to the ingestion-kafka plugin. It produces a single
"fat" JAR that embeds Avro and Jackson and rewrites their package names at the
bytecode level before the JAR is written:

  • org.apache.avro.**org.opensearch.plugin.kafka.shaded.avro.**
  • com.fasterxml.jackson.**org.opensearch.plugin.kafka.shaded.jackson.**
  • org.slf4j.**org.opensearch.plugin.kafka.shaded.slf4j.**

Because the embedded classes now live under a private package, the jar-hell check sees
no overlap with the server's Jackson or SLF4J JARs. At runtime, Avro's internal
jackson.* references — already rewritten in the bytecode by the shadow plugin — resolve
to the embedded copy, so Schema.Parser.parse() works correctly inside the isolated
plugin classloader.

Test classpath

The shadow plugin's Gradle integration (via OpenSearchTestBasePlugin) strips all
api/runtimeOnly dependencies from the test classpath on the assumption they are
embedded. This would silently remove kafka-clients, snappy-java, and other JARs that
are not embedded. An afterEvaluate block re-adds runtimeClasspath to the test
task's classpath to restore them. The relocated packages inside the shadow JAR use
different class names from the originals still present in runtimeClasspath, so no
jar-hell conflict is introduced.

Testing

  • Unit tests cover inline schema, schema registry fetch, skip_bytes, msg_field
    extraction, wrapper schema substitution, tombstone handling, GenericFixed encoding,
    and JSON escaping edge cases.
  • Integration tests (AvroIngestionFromKafkaIT) use Testcontainers to spin up a real
    Kafka broker and verify end-to-end that Avro messages are decoded and indexed with
    correct field values, that skip_bytes strips headers correctly, and that
    Avro-encoded delete operations propagate as expected.

Related Issues

Resolves #22363

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Adds an AvroPayloadDecoder that converts Avro-binary Kafka messages to
JSON bytes before they reach the indexing pipeline.  Configuration lives
in the existing ingestion_source param map under avro.* keys:

  avro.schema                 – inline Avro JSON schema
  avro.schema_registry_url    – URL to fetch the schema from at startup
  avro.schema_registry_headers.<name>   – HTTP headers for that request
  avro.schema_registry_connect_timeout_ms  – connect timeout (default 10s)
  avro.schema_registry_request_timeout_ms  – request timeout (default 10s)
  avro.skip_bytes             – header bytes to strip before decoding
                                (e.g. 5 for Confluent wire format)
  avro.wrapper_schema / avro.wrapper_field
                              – optional envelope schema with a field
                                substituted at startup by the inner schema
  avro.msg_field              – record field to use as the document source

If no avro.* params are present the existing passthrough path is used
unchanged, so there is no behaviour change for non-Avro topics.

Because Avro requires Jackson to parse schemas, and OpenSearch plugin
classloaders cannot reach the server's lib/ JARs, both Avro and Jackson
are embedded in the plugin JAR via the Gradle shadow plugin and relocated
to a private package (org.opensearch.plugin.kafka.shaded.*).  This keeps
the plugin self-contained and avoids the OpenSearch jar-hell check that
fires when the same fully-qualified class name appears in two JARs.

Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
@Tarun-kishore Tarun-kishore requested a review from a team as a code owner July 1, 2026 01:51
@github-actions github-actions Bot added enhancement Enhancement or improvement to existing feature or request missing-component labels Jul 1, 2026
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit be0ecc4.

PathLineSeverityDescription
plugins/ingestion-kafka/build.gradle12highNew build plugin 'com.gradleup.shadow' added. Build plugins execute arbitrary code during compilation. Artifact authenticity cannot be verified from the diff alone; maintainers must confirm this resolves to the expected com.gradleup:shadow artifact and not a namespace-hijacked substitute.
plugins/ingestion-kafka/build.gradle27highNew dependency 'org.apache.avro:avro:1.12.0' added. Apache Avro is a well-known library but artifact authenticity cannot be verified from the diff; maintainers must confirm the SHA1 in avro-1.12.0.jar.sha1 matches the canonical Maven Central artifact.
plugins/ingestion-kafka/build.gradle79highNew dependency 'com.thoughtworks.paranamer:paranamer:2.8' added as runtimeOnly. This reflection/bytecode-inspection library is bundled in the plugin ZIP and executes at runtime. Maintainers must verify the JAR SHA1 against a trusted Maven Central source.
plugins/ingestion-kafka/build.gradle71highThree new Jackson artifacts (jackson-databind:2.22.0, jackson-core:2.22.0, jackson-annotations:2.22) added via the 'shadow' configuration and embedded in the plugin JAR. These replace previously ignored/absent Jackson classes. Maintainers must verify the SHA1 files against trusted Maven Central sources.
plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/AvroPayloadDecoder.java350mediumThe fetchSchema() method creates an HttpClient and issues an HTTP GET to a user-supplied URI (avro.schema_registry_url / avro.wrapper_schema_registry_url) during index initialization. Any OpenSearch user with index-create permission can trigger outbound HTTP requests to arbitrary internal or external endpoints, creating a Server-Side Request Forgery (SSRF) vector against internal network resources.
plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/AvroPayloadDecoder.java362mediumHTTP headers including Authorization tokens (avro.schema_registry_headers.* / avro.wrapper_schema_registry_headers.*) are stored in index settings and passed verbatim to external schema registry requests. Sensitive credentials persisted in index metadata may be exposed via the cluster state API to users with read-metadata permissions.

The table above displays the top 10 most important findings.

Total: 6 | Critical: 0 | High: 4 | Medium: 2 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

'testcontainers': '1.19.7',
'ducttape': '1.0.8',
'snappy': '1.1.10.7',
'avro': '1.12.0',

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/opensearch-project/OpenSearch/blob/main/DEVELOPER_GUIDE.md#adding-dependencies

We have to copy the license and library SHAs when we add a new dependency. I see avro licenses added here for reference. Let us do this for all new dependencies.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added licenses for dependencies

.timeout(Duration.ofMillis(requestTimeoutMs))
.GET();
headers.forEach(builder::header);
HttpResponse<String> response = client.send(builder.build(), HttpResponse.BodyHandlers.ofString());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - have we tested this connection if it works? What is the expectation on failures, do we retry?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have tested the connection and the node was able to connect to schema registry. We do not retry the error as the error can be because of bad configs too. Should we retry in case of failure?

lastFetchedOffset = currentOffset;
KafkaOffset kafkaOffset = new KafkaOffset(currentOffset);
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp());
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), payloadDecoder.decode(messageAndOffset.value()), messageAndOffset.timestamp());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If payloadDecoder.decode throws an exception, we skip the remaining messages in the batch. On the next poller loop, it looks like we will start from the next batch resulting in skipping messages. Can we double check this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to have per message try-catch, so only bad message will be skipped

this.config = config;
this.partitionId = partitionId;
Map<String, Object> avroParams = config.getAvroParams();
this.payloadDecoder = avroParams.isEmpty() ? KafkaPayloadDecoder.PASSTHROUGH : new AvroPayloadDecoder(avroParams);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If AvroPayloadDecoder fails to be initialized, we can leak Kafka connections. There was a recent fix to handle this for initialize failures (ref), but failures here in the constructor is not handled. We will have to move this to the initialize method or think or a better way outside the constructor.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic to initialize method which closes the consumer on failure.

* changes to the ingestion pipeline are required.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AvroIngestionFromKafkaIT extends KafkaIngestionBaseIT {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us add some tests on schema evolution, error handling (invalid message, null fields). Let's focus more on schema evolution, testing the different supported formats (including the confluent style format) with field addition cases. Along with this, we can also add a test to ensure ingestion works when the kafka topic has 2 different schema messages (without newly added fields + few with new fields) to test backward compatibility.

return value;
}

private static byte[] toJsonBytes(Map<String, Object> map) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could reuse XContentBuilder, something like

private static byte[] toJsonBytes(Map<String, Object> map) {
      try {
          return BytesReference.toBytes(BytesReference.bytes(XContentFactory.jsonBuilder().map(map)));
      } catch (IOException e) {
          throw new IllegalArgumentException("Failed to serialize decoded Avro record as JSON", e);
      }
  }

We also do a similar mapping here.

We can explore if this cleans up the code, combining with convertAvroValue being schema aware and coverting the avro types to plain Java types.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use XContentFactory. Will explore change to convert avro type to Java type

}
// Fall back to Confluent-style {"schema":"<escaped-json>"} wrapper
logger.debug("Direct parse failed ({}), trying Confluent schema-field extraction", e.getMessage());
int keyIdx = body.indexOf("\"schema\"");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use a JSONParser or XContentParser here instead of doing it manually?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated to use XContentParser

logger.debug("Extracted msg_field=[{}] with {} fields", msgField, map.size());
}

byte[] jsonBytes = toJsonBytes(map);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are converting the avro record into a Map, then back to json bytes here, and later down in the pipeline again back to a Map. This might need changes downstream as well, but wonder if we can avoid the serialize/parse back and forth? We can add a note if not easy to implement as a TODO.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO comment, we can take up this optimization in future

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we done any benchmarks on this avro decoder?
my gut feeling/concern is that this is going to be so slow it won't actually be usable in any real world application

}
}

static Schema substituteInnerSchema(Schema wrapper, Schema inner, String wrapperField) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder where this will be useful? I was thinking the avro payload will follow the exact format required by the PBI flow, with same field name requirements. Later the message mappers will be able to do the transformations?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some Kafka topics use an outer envelope record that carries routing/operational metadata alongside the actual document payload. An envelope may be like:

  {
    "type": "record",
    "name": "kafka_envelope",
    "fields": [
      { "name": "ts",        "type": "double"          },
      { "name": "host",      "type": "string"          },
      { "name": "schema_id", "type": ["null", "int"]   },
      { "name": "msg",       "type": "null"            }
    ]
  }

The msg field is a placeholder — its actual schema is the document schema, which may be owned and versioned separately (e.g. registered independently in the schema registry). avro.wrapper_schema + avro.wrapper_field lets the user configure the envelope and the inner document schema independently. At index creation time they are merged by substituting the inner schema into the named wrapper_field. After decoding, avro.msg_field extracts just the inner record as the document source, discarding the envelope metadata fields like ts, host, dc etc.
Without this, the user would have to maintain one monolithic schema that hardcodes the full inner document definition inside the envelope, which breaks cleanly when the inner schema evolves independently under its own registry subject.

@varunbharadwaj varunbharadwaj added Indexing Indexing, Bulk Indexing and anything related to indexing and removed missing-component labels Jul 1, 2026
- Move AvroPayloadDecoder initialization from constructor to initialize()
  so that failures (e.g. schema registry unreachable) are caught by
  KafkaConsumerFactory, which closes the Kafka consumer to avoid leaks

- Catch decode exceptions per message in fetch() with error logging and
  skip, instead of propagating out of the loop. Without this, a single
  undecodeable message would cause an infinite retry loop on the same
  batch offset

- Replace hand-rolled toJsonBytes/toJsonString/escapeJson with
  XContentFactory.jsonBuilder().map() to reuse existing OpenSearch
  infrastructure and reduce code surface

- Replace manual Confluent-style schema body parsing with XContentParser;
  rename parseSchemaFromBody to extractSchemaJson and return String so
  the method can be called from tests without crossing the shadow-JAR
  relocation boundary

- Add TODO noting the Avro->Map->JSON->Map roundtrip that exists because
  the ingestion pipeline re-parses JSON bytes in MessageProcessorRunnable

- Add license SHA1 files for avro, jackson-core, jackson-databind, and
  jackson-annotations (new dependencies introduced by this change)

- Add IT tests for: corrupt message skipped without stopping ingestion,
  schema evolution with an optional field across a batch

Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
try {
value = payloadDecoder.decode(messageAndOffset.value());
} catch (Exception e) {
logger.error("Failed to decode message at offset {}, skipping: {}", currentOffset, e.getMessage(), e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping failed messages here does not look correct, as this decision is left to the poller and processor depending on user configured DROP/BLOCK strategies. I'm thinking one way to handle this could be to always return the KafkaMessage (marked as failure if there are decoding errors). Later when the payload is read from KafkaMessage, we can throw an exception. Alternatively, we could lazily decode the payload when KafkaMessage.getPayload() is called (with caching). This however needs more thinking and handle thread visibility requirements. Both these approaches should follow the existing DROP/BLOCK strategies.

Please take a look and we can discuss if there are any challenges with this approach or if it doesn't make sense.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm also not sure it's a good idea to log errors in the hot loop here. if 1 message errors, chances are all the messages will error, this will probably blow up your system just from the logging

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to move the decoding in KafkaMessage.getPayload(). and the logging for failed message would have the same behaviour as other failures in pull based ingestion. As for thread visibility, GenericDatumReader is not thread-safe, so I've removed the cached reader field from AvroPayloadDecoder and it's created one per decode() call instead.

try {
value = payloadDecoder.decode(messageAndOffset.value());
} catch (Exception e) {
logger.error("Failed to decode message at offset {}, skipping: {}", currentOffset, e.getMessage(), e);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm also not sure it's a good idea to log errors in the hot loop here. if 1 message errors, chances are all the messages will error, this will probably blow up your system just from the logging

* <p>The default no-op implementation is {@link #PASSTHROUGH}, which returns the raw bytes unchanged.
*/
@FunctionalInterface
public interface KafkaPayloadDecoder {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be limited to kafka?
what if someone wants to use this for kinesis?

@Tarun-kishore Tarun-kishore Jul 2, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved a decoder to poller. So now if we want to extend similar approach to kinesis, PayloadDecoder can be extended

logger.debug("Extracted msg_field=[{}] with {} fields", msgField, map.size());
}

byte[] jsonBytes = toJsonBytes(map);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we done any benchmarks on this avro decoder?
my gut feeling/concern is that this is going to be so slow it won't actually be usable in any real world application

- GenericEnumSymbol not handled: enum fields fell through to return value,
  which Jackson cannot serialize (relocated type). Now calls toString().

- Top-level schema validated as RECORD in constructor. Non-record schemas
  (enum, array, etc.) previously caused ClassCastException at decode time.

- avro.msg_field validated against schema at construction. A wrong field
  name previously caused map.get() to return null, silently treating every
  message as a tombstone and dropping all data. A null value in a valid
  nullable union field now throws IllegalArgumentException rather than
  returning null, which previously propagated a null payload into
  KafkaMessage and caused NullPointerException in the downstream mapper.

- substituteInnerSchema now preserves field order and aliases on the
  substituted field (previously only doc was copied).

- avro.wrapper_schema now handles arriving as a Map (same as avro.schema),
  rather than calling Map.toString() and passing invalid JSON to Schema.Parser.

- avro.skip_bytes now accepts Number values (e.g. Integer, Double) in
  addition to String, avoiding NumberFormatException on Double inputs.

- ByteBuffer.duplicate() now calls rewind() to reset position to 0,
  ensuring base64 encoding starts from the correct offset.

- NaN and Infinity in float/double fields are replaced with null and
  logged as a warning, rather than causing JsonGenerationException.

- Schema registry URL is trimmed before URI.create() to prevent
  IllegalArgumentException on leading/trailing whitespace.

- connect/request timeout values are validated positive via parsePositiveMs,
  with consistent Number-aware parsing and a clear error message.

- Empty payload after skip_bytes is now returned as null (tombstone)
  rather than thrown as an exception, matching the documented contract.

- Document schema evolution limitation: GenericDatumReader uses the
  configured schema as both writer and reader schema. True evolution
  requires writer schema per-message (e.g. Confluent wire format).

- Document logical types limitation: timestamp/date/time decoded as raw
  int/long; decimal (bytes/fixed) base64-encoded, not a decimal string.

- Decode errors now respect DROP/BLOCK strategy: KafkaMessage decodes
  lazily in getPayload() instead of eagerly in fetch(). Previously, a
  decode failure in fetch() was swallowed with a per-message continue,
  bypassing the poller's error handling entirely. Now the raw bytes are
  stored in KafkaMessage and decode() is called when the processor reads
  the payload — errors propagate through the existing errorStrategy path.
  GenericDatumReader is created per decode() call (not cached) so the
  method is thread-safe when called from processor threads.

Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
- Move all heavy logic (param parsing, schema fetch, HTTP call) out of the
  constructor into a static factory method AvroPayloadDecoder.create(). The
  private constructor becomes a trivial field assignment that cannot fail,
  following the same pattern used for KafkaPartitionConsumer (ref opensearch-project#21974).
  KafkaPartitionConsumer.initialize() calls create() instead of new().

- Add payloadDecoder=PASSTHROUGH to constructor log for clarity.

- Add getParam helper to reduce repeated params.containsKey/String.valueOf
  boilerplate in create().

- Move registryUrl/inlineSchema null validation before wrapper schema
  parsing so config errors fail fast before any schema work.

- Extract schema resolution to a private resolveSchema() method, separating
  registry fetch path from inline parse path.

- Remove logger.error in decode() catch blocks — logging and re-throwing
  causes double logging; the error surfaces through the caller's errorStrategy.

- Replace isDebugEnabled guard with logger.debug(Supplier) lambda so the
  hex preview and record.getSchema().getFullName() calls are skipped entirely
  when debug logging is off.

- Fix hex preview loop to start at skipBytes directly.

- Avoid converting the full record to a Map when avro.msg_field is set:
  call record.get(msgField) directly on the GenericRecord and only convert
  the nested record, skipping envelope fields entirely.

- Use instanceof pattern matching (Java 16) with braces throughout
  convertAvroValue() for consistency.

- Catch SchemaParseException directly in extractSchemaJson() instead of
  catching Throwable and checking the type manually.

- Add per-artifact LICENSE and NOTICE files for jackson-core,
  jackson-databind, and jackson-annotations (replacing the incorrectly
  named jackson-LICENSE.txt / jackson-NOTICE.txt).

Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
…o state

Move KafkaPayloadDecoder out of the Kafka plugin into the server's
pollingingest package as PayloadDecoder. The interface is now available
to any ingestion source (Kafka, Kinesis, etc.) without depending on the
Kafka plugin. KafkaPayloadDecoder is deleted; all references updated to
the server interface.

Add ThreadLocal reuse for GenericDatumReader, BinaryDecoder, and
GenericRecord in AvroPayloadDecoder. These objects are not thread-safe
so each processor thread gets its own instance, but the same instance is
reused across successive messages on that thread, avoiding per-message
allocation on the hot path.

Add avro.wrapper_schema_registry_url so the wrapper schema can be fetched
from a schema registry independently of the inner schema. The wrapper
registry may use different auth and timeout settings:
  avro.wrapper_schema_registry_url
  avro.wrapper_schema_registry_headers.<name>
  avro.wrapper_schema_registry_connect_timeout_ms  (default 10 000)
  avro.wrapper_schema_registry_request_timeout_ms  (default 10 000)

avro.wrapper_schema (inline) and avro.wrapper_schema_registry_url are
mutually exclusive. avro.wrapper_field is required when either is set.
resolveHeaders() now takes an explicit prefix so each registry call uses
its own header set. resolveSchema() takes explicit headers and timeouts
instead of extracting them from params internally.

Fix double parse: replace extractSchemaJson (returned String, triggered
two Schema.Parser calls for the registry path) with parseSchemaFromBody
(returns Schema, single parse). Both the direct Avro JSON and Confluent
{"schema":"..."} wrapper formats now parse exactly once. As a
side-effect, avro.schema now also accepts Confluent-format bodies inline.

Signed-off-by: Tarun Kishore <Tarun-kishore@users.noreply.github.com>
@Tarun-kishore Tarun-kishore force-pushed the feature/avro-payload-decoder-kafka branch from eff448a to be0ecc4 Compare July 2, 2026 06:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Support Avro-encoded messages in the ingestion-kafka plugin

3 participants