Kafka producer and consumer using the Java client API

Category : Apache Kafka | Sub Category : Apache Kafka | By Prasad Bonam Last updated: 2023-07-12 05:41:32 Viewed : 326


Kafka producer and consumer using the Java client API:

Here is an example of a basic Kafka producer and consumer using the Java client API:

Kafka Producer Example:

java
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Set Kafka broker and topic details String bootstrapServers = "localhost:9092"; String topic = "my-topic"; // Set Kafka producer properties Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Create Kafka producer Producer<String, String> producer = new KafkaProducer<>(properties); // Create a message String key = "my-key"; String value = "Hello, Kafka!"; // Create a Kafka record ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // Send the message asynchronously producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("Error while producing message: " + exception.getMessage()); } else { System.out.println("Message sent successfully! Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } } }); // Close the producer producer.close(); } }

Kafka Consumer Example:

java
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Set Kafka broker and topic details String bootstrapServers = "localhost:9092"; String topic = "my-topic"; // Set Kafka consumer properties Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "my-consumer-group"); // Create Kafka consumer Consumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to the topic consumer.subscribe(Collections.singletonList(topic)); // Poll for new messages while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: Key = " + record.key() + ", Value = " + record.value() + ", Topic = " + record.topic() + ", Partition = " + record.partition() + ", Offset = " + record.offset()); } // Manually commit the offsets after processing the records consumer.commitSync(); } } }

In this example, the producer sends a single message with a key-value pair to a Kafka topic. The consumer subscribes to the same topic and continuously polls for new messages, printing the received records.

Make sure to have the Kafka broker running on localhost:9092 and create a topic named "my-topic" before running the code.

To run the Kafka producer and consumer, you will need the Kafka client library added to your project dependencies. You can download it from the Apache Kafka website or include it as a Maven/Gradle dependency.

Remember to adjust the Kafka broker details, topic names, and serialization/deserialization settings based on your Kafka setup and data format.

This is a basic example to demonstrate Kafka producer and consumer functionality. Kafka provides many advanced features like partitioning, offset management, consumer groups, and more, which you can explore as you delve deeper into Kafka.


Search
Sub-Categories
Related Articles

Leave a Comment: