Skip to content

Added built-in TransactionStateSerde for __transaction_state topic deserialization Closes #407#1880

Open
axlet-prog wants to merge 4 commits into
kafbat:mainfrom
axlet-prog:issues/407
Open

Added built-in TransactionStateSerde for __transaction_state topic deserialization Closes #407#1880
axlet-prog wants to merge 4 commits into
kafbat:mainfrom
axlet-prog:issues/407

Conversation

@axlet-prog

@axlet-prog axlet-prog commented Jun 13, 2026

Copy link
Copy Markdown
  • Breaking change? (if so, please describe the impact and migration path for existing application instances)

No breaking changes.

What changes did you make? (Give an overview)

Added built-in TransactionStateSerde for deserializing messages from the __transaction_state internal Kafka topic (closes #407).

The serde supports:

  • Key deserialization: reads the version prefix and returns {"transaction_id": "..."} JSON
  • Value deserialization: supports both V0 (standard encoding) and V1 (flexible encoding with COMPACT_STRING/CompactArrayOf and tagged fields section), returning all transaction metadata fields as JSON
  • Human-readable transaction_status field — numeric byte values are mapped to named enum constants (e.g. 4COMPLETE_COMMIT) via the TransactionStatus enum

Schema definitions are based on TransactionLogKey.json and TransactionLogValue.json from the Kafka 3.9 transaction-coordinator module.

Is there anything you'd like reviewers to focus on?

My implementation was based on __consumer_offsets serde

How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)

  • No need to
  • Manually (please, describe, if necessary)
  • Unit checks
  • Integration checks
  • Covered by existing automation

Added TransactionStateSerdeTest (integration test using Testcontainers), which:

  • Spins up a real Kafka broker
  • Produces a committed and an aborted transaction
  • Reads raw bytes from __transaction_state
  • Asserts that key deserialization returns the correct transaction_id
  • Asserts that all expected transaction states (EMPTY, ONGOING, COMPLETE_COMMIT, COMPLETE_ABORT) are correctly deserialized, including the nullable partition list cases

Checklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation (e.g. ENVIRONMENT VARIABLES)
  • My changes generate no new warnings (e.g. Sonar is happy)
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged

A picture of a cute animal (not mandatory but encouraged)
image

Summary by CodeRabbit

  • New Features
    • Added built-in deserialization support for Kafka transaction state records (the __transaction_state topic), exposing transaction lifecycle details and status (including committed and aborted states) in JSON form.
    • Automatically registers this topic-specific serde so it’s available wherever topic-related serdes are configured.
  • Tests
    • Added an integration test that writes committed and aborted transactional messages to a real Kafka cluster and verifies correct JSON output and expected partition/status behavior.

@axlet-prog axlet-prog requested a review from a team as a code owner June 13, 2026 17:34
@kapybro kapybro Bot added status/triage/manual Manual triage in progress and removed status/triage/manual Manual triage in progress labels Jun 13, 2026
@kapybro

kapybro Bot commented Jun 13, 2026

Copy link
Copy Markdown

AI Summary

The issue adds a built-in TransactionStateSerde to deserialize messages from Kafka's __transaction_state topic, supporting both key and value deserialization for V0 and V1 formats. The solution includes schema definitions based on Kafka 3.9's transaction-coordinator module and human-readable status mappings (e.g., 4COMPLETE_COMMIT). Testing was done via unit and integration checks, including a Testcontainers-based test that verifies deserialization of transaction states and keys.

@kapybro kapybro Bot changed the title Issues/407 Added built-in TransactionStateSerde for __transaction_state topic deserialization Jun 13, 2026
@kapybro kapybro Bot added area/serde Serialization & Deserialization (plugins) scope/backend Related to backend changes type/feature A brand new feature labels Jun 13, 2026
@coderabbitai

coderabbitai Bot commented Jun 13, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a3f14ff7-cd93-4976-beff-3599196d5d3a

📥 Commits

Reviewing files that changed from the base of the PR and between 6073189 and 31917c7.

📒 Files selected for processing (1)
  • api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java

📝 Walkthrough

Walkthrough

This PR adds built-in serde support for Kafka's __transaction_state internal topic. A new TransactionStateSerde class implements key/value deserializers with versioned schema handling and JSON output; the serde is registered in SerdesInitializer; and integration tests validate deserialization behavior against a real Kafka cluster.

Changes

Transaction State Serde

Layer / File(s) Summary
TransactionStateSerde Implementation
api/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.java
Jackson JsonMapper with custom SimpleModule converts Kafka Struct to JSON, mapping transaction_status numeric values to enum names. BuiltInSerde routing methods route key vs. value deserialization. Key deserializer parses transaction ID; value deserializer supports two versioned schemas (v0/v1) for producer IDs, partitions, and status. TransactionStatus enum with eight defined states and numeric ID lookup.
Serde Framework Registration
api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java
Import TransactionStateSerde. Call new transactionStateSerde() helper from registerTopicRelatedSerde(). Helper builds a Pattern from TransactionStateSerde.TOPIC_NAME and returns a map with a SerdeInstance for TransactionStateSerde.NAME.
Integration Tests
api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java
Test lifecycle creates unique Kafka topic and writes one committed and one aborted transaction via transactional producer. canOnlyDeserializeConsumerOffsetsTopic test verifies serde only matches __transaction_state for key/value targets. deserializesMessagesMadeByConsumerActivity test consumes records, deserializes via serde, parses JSON results, and asserts required transaction fields and expected status patterns using bounded Awaitility retries. Helper methods construct byte consumer and transactional producer.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • kafbat/kafka-ui#1856: Both PRs modify the serde initialization layer; the retrieved PR changes SerdeInstance construction/selection via an explicitlyConfigured flag, and the main PR updates SerdesInitializer to register a new topic-specific TransactionStateSerde by adding a new SerdeInstance.

Poem

🐰 Transactions now have light to shine,
A serde maps status, schema, design,
Version zero, version one aligned,
JSON flows where bytes were intertwined.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: adding a built-in TransactionStateSerde for deserializing the __transaction_state topic, which is the primary objective of this PR.
Linked Issues check ✅ Passed The PR fully implements issue #407 requirements by adding built-in serde support for the __transaction_state topic with key/value deserialization, status mapping, and comprehensive integration tests.
Out of Scope Changes check ✅ Passed All changes are in scope: TransactionStateSerde implementation, integration test for validation, and registration in SerdesInitializer are all necessary for delivering the __transaction_state topic deserialization feature.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@axlet-prog axlet-prog changed the title Added built-in TransactionStateSerde for __transaction_state topic deserialization Added built-in TransactionStateSerde for __transaction_state topic deserialization Closes #407 Jun 13, 2026
@axlet-prog axlet-prog marked this pull request as draft June 13, 2026 17:37
@axlet-prog axlet-prog marked this pull request as ready for review June 13, 2026 17:38

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@api/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.java`:
- Around line 128-129: In TransactionStateSerde, remove the unstructured
System.out.println debug prints in the deserialization/decode path and replace
them with structured logging (e.g., a private static final org.slf4j.Logger
created with LoggerFactory.getLogger(TransactionStateSerde.class)) using
logger.debug(...) for the same messages; add the necessary import for
org.slf4j.Logger and org.slf4j.LoggerFactory and ensure the three occurrences of
System.out.println in the deserialize/decode method are replaced so runtime
output is controlled by the logging framework.

In
`@api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java`:
- Around line 89-93: Update the test in TransactionStateSerdeTest so it filters
by the exact transactional id instead of merely checking presence of
"transaction_id": replace the containsKey("transaction_id") check with a
comparison that extracts the transaction_id value from keyJson and continues
unless it equals the expected transactionalId (e.g., if
(!transactionalId.equals(keyJson.get("transaction_id"))) continue;), ensuring
any necessary casting to String is applied.
- Line 28: The test imports the Testcontainers-shaded Awaitility; replace the
shaded import with the public Awaitility API by changing the import from
org.testcontainers.shaded.org.awaitility.Awaitility to org.awaitility.Awaitility
in TransactionStateSerdeTest.java (remove the shaded import, add the public
import) and re-run tests to ensure no code-level changes are needed where
Awaitility is used.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 30d743ec-3dee-4af3-9c35-e61d03a7a3e8

📥 Commits

Reviewing files that changed from the base of the PR and between c842552 and 4f93b32.

📒 Files selected for processing (3)
  • api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java
  • api/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.java
  • api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java

Comment thread api/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.java Outdated
Comment thread api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java Outdated
@Haarolean

Copy link
Copy Markdown
Member

@axlet-prog hi, please address coderabbit's comments

@axlet-prog

Copy link
Copy Markdown
Author

Hi, @Haarolean i made necessary changes from coderabbit's comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/serde Serialization & Deserialization (plugins) scope/backend Related to backend changes type/feature A brand new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serde: Support __transaction_state topic

2 participants