Added built-in TransactionStateSerde for __transaction_state topic deserialization Closes #407#1880
Added built-in TransactionStateSerde for __transaction_state topic deserialization Closes #407#1880axlet-prog wants to merge 4 commits into
TransactionStateSerde for __transaction_state topic deserialization Closes #407#1880Conversation
|
AI Summary The issue adds a built-in |
TransactionStateSerde for __transaction_state topic deserialization
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR adds built-in serde support for Kafka's ChangesTransaction State Serde
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
TransactionStateSerde for __transaction_state topic deserializationTransactionStateSerde for __transaction_state topic deserialization Closes #407
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@api/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.java`:
- Around line 128-129: In TransactionStateSerde, remove the unstructured
System.out.println debug prints in the deserialization/decode path and replace
them with structured logging (e.g., a private static final org.slf4j.Logger
created with LoggerFactory.getLogger(TransactionStateSerde.class)) using
logger.debug(...) for the same messages; add the necessary import for
org.slf4j.Logger and org.slf4j.LoggerFactory and ensure the three occurrences of
System.out.println in the deserialize/decode method are replaced so runtime
output is controlled by the logging framework.
In
`@api/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java`:
- Around line 89-93: Update the test in TransactionStateSerdeTest so it filters
by the exact transactional id instead of merely checking presence of
"transaction_id": replace the containsKey("transaction_id") check with a
comparison that extracts the transaction_id value from keyJson and continues
unless it equals the expected transactionalId (e.g., if
(!transactionalId.equals(keyJson.get("transaction_id"))) continue;), ensuring
any necessary casting to String is applied.
- Line 28: The test imports the Testcontainers-shaded Awaitility; replace the
shaded import with the public Awaitility API by changing the import from
org.testcontainers.shaded.org.awaitility.Awaitility to org.awaitility.Awaitility
in TransactionStateSerdeTest.java (remove the shaded import, add the public
import) and re-run tests to ensure no code-level changes are needed where
Awaitility is used.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 30d743ec-3dee-4af3-9c35-e61d03a7a3e8
📒 Files selected for processing (3)
api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.javaapi/src/main/java/io/kafbat/ui/serdes/builtin/TransactionStateSerde.javaapi/src/test/java/io/kafbat/ui/serdes/builtin/TransactionStateSerdeTest.java
|
@axlet-prog hi, please address coderabbit's comments |
…d key checking logic
|
Hi, @Haarolean i made necessary changes from coderabbit's comments |
No breaking changes.
What changes did you make? (Give an overview)
Added built-in
TransactionStateSerdefor deserializing messages from the__transaction_stateinternal Kafka topic (closes #407).The serde supports:
{"transaction_id": "..."}JSONCOMPACT_STRING/CompactArrayOfand tagged fields section), returning all transaction metadata fields as JSONtransaction_statusfield — numeric byte values are mapped to named enum constants (e.g.4→COMPLETE_COMMIT) via theTransactionStatusenumSchema definitions are based on
TransactionLogKey.jsonandTransactionLogValue.jsonfrom the Kafka 3.9transaction-coordinatormodule.Is there anything you'd like reviewers to focus on?
My implementation was based on __consumer_offsets serde
How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)
Added
TransactionStateSerdeTest(integration test using Testcontainers), which:__transaction_statetransaction_idEMPTY,ONGOING,COMPLETE_COMMIT,COMPLETE_ABORT) are correctly deserialized, including the nullable partition list casesChecklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)
A picture of a cute animal (not mandatory but encouraged)

Summary by CodeRabbit
__transaction_statetopic), exposing transaction lifecycle details and status (including committed and aborted states) in JSON form.