Category : Apache Kafka | Sub Category : Apache Kafka | By Prasad Bonam Last updated: 2023-07-12 05:41:32 Viewed : 775
Kafka producer and consumer using the Java client API:
Here is an example of a basic Kafka producer and consumer using the Java client API:
Kafka Producer Example:
javaimport org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Set Kafka broker and topic details
String bootstrapServers = "localhost:9092";
String topic = "my-topic";
// Set Kafka producer properties
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create Kafka producer
Producer<String, String> producer = new KafkaProducer<>(properties);
// Create a message
String key = "my-key";
String value = "Hello, Kafka!";
// Create a Kafka record
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// Send the message asynchronously
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Error while producing message: " + exception.getMessage());
} else {
System.out.println("Message sent successfully! Topic: " +
metadata.topic() + ", Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
}
}
});
// Close the producer
producer.close();
}
}
Kafka Consumer Example:
javaimport org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// Set Kafka broker and topic details
String bootstrapServers = "localhost:9092";
String topic = "my-topic";
// Set Kafka consumer properties
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "my-consumer-group");
// Create Kafka consumer
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic
consumer.subscribe(Collections.singletonList(topic));
// Poll for new messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: Key = " + record.key() +
", Value = " + record.value() +
", Topic = " + record.topic() +
", Partition = " + record.partition() +
", Offset = " + record.offset());
}
// Manually commit the offsets after processing the records
consumer.commitSync();
}
}
}
In this example, the producer sends a single message with a key-value pair to a Kafka topic. The consumer subscribes to the same topic and continuously polls for new messages, printing the received records.
Make sure to have the Kafka broker running on localhost:9092
and create a topic named "my-topic" before running the code.
To run the Kafka producer and consumer, you will need the Kafka client library added to your project dependencies. You can download it from the Apache Kafka website or include it as a Maven/Gradle dependency.
Remember to adjust the Kafka broker details, topic names, and serialization/deserialization settings based on your Kafka setup and data format.
This is a basic example to demonstrate Kafka producer and consumer functionality. Kafka provides many advanced features like partitioning, offset management, consumer groups, and more, which you can explore as you delve deeper into Kafka.