Apache Kafka is amongst the popular message brokers being used in the industry. Kafka works on a publish-subscribe model and can be used to establish async communication between microservices. We are here going to discuss how to publish messages to Kafka topic using a Java application.
There are some key terms being used in the context of Kafka
- Topic: It refers to the queue in Kafka where messages are published and subscribed from.
- Producer: It refers to the application which publishes messages to the queue. Here, in our case, our Java application will be acting as a producer and will be responsible for publishing messages on the topic.
- Consumer: It refers to the application that reads/subscribes to the message from the Kafka topic. We will be using Kafka-provided console-based consumers to read the message from the topic.
Creating Java Application to Publish Messages in Kafka
Requirements:
- JDK 17 or above
- IntelliJ Idea or your other favorite IDE
- Apache Kafka
Step 1: Spring Boot Application Initialization
Create a spring boot application from Spring Initializer with the set of configurations provided below.
Add two dependencies by clicking on add dependencies button:
- Spring Web
- Spring for Apache Kafka
pom.xml file
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.3.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.kafkaDemo</groupId> <artifactId>KafkaDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>KafkaDemo</name> <description>Publish Message to Kafka using Java</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
Step 2: Create User.java file
We will be creating a POJO class named User, we will publish this User class’s object into Kafka topic. We will accept this object through client side by exposing an API endpoint.
User.java:
import java.io.Serializable;
public class User implements Serializable {
private String name; private String email; private Integer age;
public User(String name, String email, Integer age) { this.name = name; this.email = email; this.age = age; }
public User() { }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; } }
Step 3: Configuring KafkaTemplate Bean
Spring Kafka dependency provides KafkaTemplate bean to publish messages to kafka. This dependency comes with pre-configured bean of KafkaTemplate<String,String>. It also provides us the power to create our own type of KafkaTemplate bean. We will be configuring the bean of KafkaTemplate<String,User>. To do so, we are going to do these task-
- Create a KafkaProducerConfig.java file
- Configuring ProducerFactory and KafkaTemplate bean it.
Add the code give below in KafkaConfiguration.java
KafkaProducerConfig.java:
import com.kafkaDemo.KafkaDemo.models.User; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap; import java.util.List; import java.util.Map;
@Configuration public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}") private List<String> bootstrapServers;
@Bean("userProducerFactory") public ProducerFactory<String, User> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<String,User>(props); }
@Bean("userKafkaTemplate") public KafkaTemplate<String, User> userKafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
Step 4: Adding property in applicaton.properties
Add the below properties in application.properties file. You can add the list of Kafka-brokers address in spring.kafka.bootstrap-servers property in comma separated format.
spring.application.name=KafkaDemo spring.kafka.bootstrap-servers=localhost:9092 server.port=7070
Step 5: Creating a REST Controller
Create KafkaController.java file and the below code in it.
KafkaController.java:
import com.kafkaDemo.KafkaDemo.models.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/api/kafka") public class KafkaController {
private KafkaTemplate<String,User> kafkaTemplate;
@Autowired public KafkaController(@Qualifier("userKafkaTemplate") KafkaTemplate<String, User> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }
@PostMapping("/publish") public ResponseEntity<String> publish(@RequestBody User user) { if (user == null) {return ResponseEntity.badRequest().build();} kafkaTemplate.send("userTopic",user.getEmail(),user); return ResponseEntity.accepted().body("Message sent to be published"); } }
Step 6: Creating a topic named userTopic in Kafka
Open a terminal in the location of bin folder where you have installed Kafka in your system, and run the command given below. For Linux or mac machine, run shell script file instead of windows batch file.
.\kafka-topics.bat --create --topic userTopic --bootstrap-server localhost:9092
Step 7: Starting Apache ZooKeeper, Kafka Broker and Consumer
- First of all, start zookeeper with the command below: –
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
- Secondly, start the Kafka broker on port 9092. By default, Kafka broker starts on this port number.
.\kafka-server-start.bat ..\..\config\server.properties
- Third, start the console-based consumer provided by kafka by running the command given below: –
.\kafka-console-consumer.bat --topic userTopic --bootstrap-server localhost:9092
Step 8: Output Check
Finally, we will start our java application, and will hit the “/publish” endpoint and will check the output in the consumer.
  
Conclusion
In conclusion, we’ve walked through the steps of publishing messages to a Kafka topic using a Java application, specifically with the Spring Boot framework. We started by setting up our development environment and dependencies, then created a User model class and configured a KafkaTemplate bean to handle message publishing. We also added necessary properties to the application configuration, built a REST controller to expose an API endpoint for publishing messages, and finally, created and tested a Kafka topic. This comprehensive guide demonstrates how to effectively use Kafka for asynchronous communication between microservices, leveraging Java’s robust ecosystem.
Publish Message to Kafka Using Java – FAQs
What is Zookeeper in Kafka?
Zookeeper is responsible for managing Kafka brokers, topics, leader selection etc. First of all, you have to start zookeeper before starting any Kafka broker.
How to create topic in Apache Kafka?
To create a topic in Apache Kafka, open the terminal in /bin folder of your Kafka installation directory and run the command below: –
.\kafka-topics.bat --create --topic <topic-name> --bootstrap-server localhost:9092
Replace <topic-name> with your topic name.
How do I handle serialization and deserialization in Kafka?
Kafka provides serializers and de-serializers to convert objects to byte arrays and vice versa. For custom objects like the User class, you can use the JsonSerializer and JsonDeserializer provided by Spring Kafka. You configure these in your Kafka producer and consumer configurations, ensuring that messages are properly serialized when sent and deserialized when received.
What should I do if my messages are not being consumed?
If your messages are not being consumed, check the following:
- Ensure your consumer is connected to the correct Kafka broker and topic.
- Verify that there are no network issues preventing the consumer from connecting to the broker.
- Check the consumer group settings; if multiple consumers are in the same group, they will share the load.
- Ensure the consumer’s offset is correctly managed. Sometimes, the consumer might be reading from an offset where no new messages are available.
- Look at the logs for any errors or warnings that might indicate issues in the consumer configuration or logic.
|