Apache Kafka is the distributed event streaming platform capable of handling high throughput, low latency, and fault tolerance. One of the common tasks when working with Kafka is determining the number of messages on a specific topic. This article will guide you through the process of using Java to retrieve the number of messages in the Kafka topic.
To get the number of messages in the Kafka topic, we can use the AdminClient provided by Kafka. This client allows you to fetch the beginning and end offsets of each partition in the topic. By subtracting the beginning offset from the end offset. We can determine the number of messages in each partition and summing these values gives you the total number of messages in the topic.
Implementation to Get the Number of Messages in a Topic in Apache KafkaStep 1: Create a Maven ProjectCreate a new maven project using IntelliJ Idea and add the following dependencies to the project.
Dependency:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.0</version> </dependency> </dependencies>
After project creation done, the the folder structure in the IDE will look like the below image:

Step 2: Create the KafkaTopicMessageCount ClassCreate the KafkaTopicMessageCount class to interact with the Kafka broker:
Java
package com.gfg;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* KafkaTopicMessageCount class is used to calculate the number of messages in a Kafka topic.
* It connects to a Kafka broker, retrieves offsets for each partition of the topic, and calculates the message count.
*/
public class KafkaTopicMessageCount {
/**
* Main method to execute the message count calculation.
*
* @param args command line arguments
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Define the topic to be analyzed
String topic = "my-new-topic";
// Set up properties for the AdminClient
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Create an AdminClient with the specified properties
try (AdminClient adminClient = AdminClient.create(properties)) {
// Get the message counts for the specified topic
Map<TopicPartition, Long> messageCounts = getMessageCountsForTopic(adminClient, topic);
// Print the message counts for each partition
messageCounts.forEach((tp, count) -> System.out.println("Partition: " + tp.partition() + ", Message Count: " + count));
}
}
/**
* Retrieves the message counts for each partition of the specified topic.
*
* @param adminClient the AdminClient instance to interact with the Kafka broker
* @param topic the name of the Kafka topic
* @return a map of TopicPartition to message count
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
public static Map<TopicPartition, Long> getMessageCountsForTopic(AdminClient adminClient, String topic) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> messageCounts = new HashMap<>();
// Fetch the beginning offsets for each partition
List<TopicPartition> partitions = getTopicPartitions(adminClient, topic);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = adminClient.listOffsets(
partitions.stream().collect(HashMap::new, (m, v) -> m.put(v, OffsetSpec.earliest()), HashMap::putAll)
).all().get();
// Fetch the end offsets for each partition
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = adminClient.listOffsets(
partitions.stream().collect(HashMap::new, (m, v) -> m.put(v, OffsetSpec.latest()), HashMap::putAll)
).all().get();
// Calculate the number of messages in each partition
for (TopicPartition partition : partitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
long messageCount = endOffset - beginningOffset;
messageCounts.put(partition, messageCount);
}
return messageCounts;
}
/**
* Retrieves the list of TopicPartitions for the specified topic.
*
* @param adminClient the AdminClient instance to interact with the Kafka broker
* @param topic the name of the Kafka topic
* @return a list of TopicPartitions
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
private static List<TopicPartition> getTopicPartitions(AdminClient adminClient, String topic) throws ExecutionException, InterruptedException {
// Describe the topic to get information about its partitions
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
TopicDescription topicDescription = describeTopicsResult.all().get().get(topic);
List<TopicPartition> partitions = new ArrayList<>();
// Create a list of TopicPartitions from the topic description
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
partitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
return partitions;
}
}
Step 3: Add the KafkaClient DependencyOpen the pom.xml file and add the KafkaClient dependency to the project.
XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gfg</groupId>
<artifactId>kafka-topics</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
</project>
Step 4: Run the applicationRun the application, it will display the number of messages in each partition of the specified topic.
 This example demonstrates the accurate count of the messages currently in the each partition of the specified topic in the Apache Kafka.
|