Kafka Producer - Serialization of Additional Message Properties
The OpenLegacy Kafka producer allows users to specify in the asset Kafka message attributes, such as the https://www.geeksforgeeks.org/apache-kafka-message-keys/], partition, timestamp, headers, etc.
Also, the response from the Kafka server includes the partition and the partition and offset numbers.
The output structure of the asset should look like this:
{
"offset" : 0,
"partition" : 0,
"timestamp" : 1749215744393,
"topic" : "test-topic"
}The user can specify these properties using the HUB UI as additional attributes to the asset.
As a result, they will be added to the operation definition properties and used at runtime.

Adding properties to the asset in the Kafka module using the HUB UI
Runtime Properties for the HUB Project Contract
All these properties could be specified as runtime properties inside the HUB project contract. This gives the ability to map values for the runtime properties, for both the input and output values. Using the mapping capabilities, the user can assign static and dynamic values, construct new values based on several input fields, etc.
Serialization properties that are defined as
additional attributesof the asset take precedence over runtime properties specified in the contract.For example, if the
kafka.keyserialization property has a value defined in the asset's additional attributes, this value will override any value assigned to thekafka.keyruntime property in the contract.
The invoke step could change the values of these properties based on the result of sending a message to the Kafka server. For example, the offset value of the sent message.
To enable this functionality, the user needs to select an invoke step, select Asset Configuration in the three dots context menu, and enable the required properties.

Enabling runtime properties in the invoke step → “Asset Configuration” additional menu

Mapping input values to the input runtime properties before executing the invoke step

Mapping output runtime properties to the output values after executing the invoke step
Available Properties
Property | Description |
|---|---|
| It specifies a static value for the Kafka topic to which the message is sent. The value must be a String. When not specified, the value of the Kafka topic could be extracted from the asset’s path value or the Kafka module connection details. |
| Specifies a dynamic value for the Kafka topic. The value must be a String that refers to a field name in the root of the operation's JSON payload. At runtime, the value of this field is used as the topic |
| Specifies a static value for the Kafka message key. The value must be a String. If not specified, the key will be an empty String. |
| Specifies a dynamic value for the Kafka message key. The value must be a String that refers to a field name in the root of the operation's JSON payload. At runtime, the value of this field is used as the key. |
| Specifies static values for the Kafka message headers. The value must be a String of header name to header value pairs, using an equal sign to separate the header name and value and commas to separate the pairs. For example: |
| Specifies a dynamic value for the Kafka message headers. The value must be a String that refers to a field name in the root of the operation's JSON payload. At runtime, the value of this field is used as the headers string. The value of this field must follow the same format as for |
| Specifies a static value for the Kafka message timestamp. The value must be a String representing a Long value. |
| Specifies a dynamic value for the Kafka message timestamp. The value must be a String that refers to a field name in the root of the operation's JSON payload. At runtime, the value of this field is used as the timestamp. |
| Specifies a static value for the Kafka message partition. The value must be a String representing an Integer value. |
| Specifies a dynamic value for the Kafka message partition. The value must be a String that refers to a field name in the root of the operation's JSON payload. At runtime, the value of this field is used as the partition. |
| Specifies whether to remove dynamic fields from the operation's JSON payload before sending the message. The value must be a String representing a Boolean "true" or "false" value. If not specified, the default value is "false". If set to "true", all fields referenced by the *.field properties will be removed from the JSON payload. |
Property Behavior
- Property Prefix - All Kafka producer properties use the prefix kafka.
- Static vs. Dynamic Values - Properties without any suffix specify static values that are used directly. Properties with the suffix
.fieldspecify dynamic values that are extracted from the operation's JSON payload at runtime.- Dynamic Field Removal - When
kafka.remove-dynamic-fieldsis set totrue, any field referenced by a *.field property will be removed from the JSON payload after its value is extracted. This can be useful when the user wants to include metadata in the request but doesn't want it to be part of the actual message sent to Kafka.- Property Precedence - Property precedence order is undefined. It is recommended to use only one type of property (static value or dynamic
.field) for each attribute to avoid unexpected behavior.
Examples
-
Static Values Example - In this example, all Kafka message attributes are set using static values:
kafka.topic="orders" kafka.key="customer123" kafka.headers="region=US,source=webapi" kafka.timestamp="1685964000000" kafka.partition="1"Message body:
{ "orderId": "ORD-001", "customerName": "John Doe", "items": [ { "productId": "P100", "quantity": 2 } ], "totalAmount": 199.99 }Resulting Kafka message attributes:
Attribute Value Topic orders Key customer123 Header 1 region-US Header 2 source=webapi Timestamp 1685964000000 Partition 1 -
Dynamic Values Example - In this example, Kafka message attributes are extracted from fields in the JSON payload:
kafka.topic.field="targetTopic" kafka.key.field="customerId" kafka.headers.field="messageHeaders" kafka.timestamp.field="eventTimestamp" kafka.partition.field="targetPartition" kafka.remove-dynamic-fields="true"Message body:
{ "targetTopic": "customer-events", "customerId": "CUST-456", "messageHeaders": "region=EU,eventType=registration,version=1.0", "eventTimestamp": 1685964123000, "targetPartition": 2, "data": { "customerName": "Jane Smith", "email": "[email protected]", "registrationDate": "2025-06-05" } }After processing (with remove-dynamic-fields=true), the message body becomes:
Attribute Value Topic customer-events Key CUST-456 Header 1 region=EU Header 2 eventType=registration Header 3 version=1.0 Timestamp 1685964123000 Partition 2 -
Mixed Example (Static and Dynamic Values) - You can also mix static and dynamic values:
kafka.topic="notifications" kafka.key.field="userId" kafka.headers="source=mobile-app,version=2.0" kafka.timestamp.field="notificationTime" kafka.partition.field="userPartition"Message body:
{ "userId": "USER-789", "userPartition": 3, "notificationTime": 1685964567000, "notification": { "type": "push", "title": "New Message", "body": "You have a new message from support" } }Resulting Kafka message attributes:
Attribute Value Topic notifications Key USER-789 Header 1 source=mobile-app Header 2 version=2.0 Timestamp 1685964567000 Partition 3
Updated about 9 hours ago
