Introduction
Apache kafka is an event streaming distributed system. Kafka handles the large amount of data in a latency manner so it’s the best choice for microservices architecture. Kafka is widely used for real time streaming based applications. Kafka is used in many applications like payments, finance, monitoring and managing assets, capturing the data form sensor and many more. In this blog I write about how to pass custom objects in apache kafka.
What is apache kafka
Apache kafka is an open source event streaming system that handles large amounts of real time data and manages messaging, storage and streaming processes. Apache kafka is a publisher-subscribe system. Multiple publishers send data to the kafka and that data is consumed to the multiple consumers by kafka. There are few key components of kafka you have understand that are,
- Producer : send message on kafka topic
- Consumer : Read message fro the kafka topic
- Broker : Kafka server that stores the data and client server request
- Cluster : Group of kafka broker that is work together to manage data
- Topic : The group of message that is categorized by uniques name
- Partitions : Topic is divided into partitions
- Offset : A unique integer value that represents the partitions
- Consumer Group : Group of consumers that share a common task.
- Zookeeper : Centralized services that managed all information and metadata.
What use custom Object is kafka
Apache kafka handles simple messages of string but in the real world we want to send data like json and another formatted or as a custom object. That’s why here we send customer objects to kafka as a real time application. A strict data type gives many errors in type checking so a custom object is a better way to use in type check and increases the readability of the code.
Prerequisites
- Java 17
- Maven
- Spring Boot Development Environment (here IntelliJ IDEA)
- Kafka
- Zookeeper
Steps to Integrate Kafka with Spring Boot Application
Step 1 : Add kafka dependency into your application.
org.springframework.kafka
spring-kafka
3.3.0
Step 2 : Configure Application Properties
Add the kafka properties into the application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
Here group_id is a consumer group name you can choose any meaningful name here.
Step 3 : Define a custom object.
Create a class for defining a custom object.
public class User {
private String id;
private String name;
private String email;
// Getters and setters
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", email='" + email + '\'' +
'}';
}
}
Step 4 : Create a services for define a kafka producer
Create a producer that sends custom objects to the kafka topic.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class UserProducer {
private static final String TOPIC = "users";
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(User user) {
kafkaTemplate.send(TOPIC, user);
}
}
Step 5 : Create a services for define a kafka consumer
Create a consumer that reads messages to the kafka topic.
import org.springframework.kafka.annotation.KafkaListener;
public class UserConsumer {
@KafkaListener(topics = "users", groupId = "group_id")
public void consume(User user) {
System.out.println("Consumed message: " + user);
}
}
Step 6 : define controller
Define the endpoint in the controller that will test the producer.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api")
public class Controller {
@Autowired
private UserProducer userProducer;
@PostMapping("/publish")
public String sendMessage(@RequestBody User user) {
userProducer.sendMessage(user);
return "Message published successfully!";
}
}
Step 7 : Run the Application and kafka
Start the kafka server by downloading the kafka and run the zookeeper-server-start.sh and kafka-server-start.sh script.


Step 8 : Test the Producer Using the API with tools like postmen
Here I test the endpoint using postmen.

Step 9 : For check the consumer see logs

Conclusion
You can incorporate a custom object with Kafka into your Springboot application to improve the flexibility and efficiency of your application. The above step-by-step procedure makes it simple to integrate custom objects in Kafka.For managing data and interacting with other microservices, this is highly helpful in microservices.Kafka can be used for a variety of tasks, including asset management, data handling using sensors, and payment processing.