使用 Java 在 Apache Kafka 中创建带有 Key 的生产者

Apache Kafka 是一种发布-订阅消息系统。消息系统允许你在进程、应用程序和服务器之间发送消息。Apache Kafka 是一种软件,我们可以在其中定义主题(Topic,一个主题可能是一个类别)并对其进行进一步处理。点击此处阅读更多关于 Kafka 的内容:什么是 Apache Kafka 及其工作原理Kafka 生产者 负责将数据写入主题,而主题由多个分区组成。现在,Kafka 中的生产者将根据你的消息自动知道要写入哪个 Broker 和分区;如果你的集群中发生 Kafka Broker 故障,生产者将自动从中恢复,这使得 Kafka 具有弹性,也是 Kafka 如今如此优秀和被广泛使用的原因。在本文中,我们将讨论如何使用 Java 创建带有 Key 的 Apache Kafka 生产者的分步实现。

分步实现

步骤 1:在 IntelliJ 中创建一个新的 Apache Kafka 项目

要使用 Java 和 Maven 在 IntelliJ 中创建一个新的 Apache Kafka 项目,请参考如何使用 Java 和 Maven 在 IntelliJ 中创建 Apache Kafka 项目。

步骤 2:安装并运行 Apache Kafka

要在本地系统中安装并运行 Apache Kafka,请参考 如何安装并运行 Apache Kafka

步骤 3:创建带有 Key 的生产者

首先,我们必须创建生产者属性。要创建生产者属性,请参考下面的代码片段。

创建生产者属性:

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

创建生产者:

KafkaProducer producer = new KafkaProducer(properties);

创建带有 Key 的生产者记录:

String topic = "demo_topic";
String value = "hello_world " + i;
String key = "id_" + i;

ProducerRecord record =
          new ProducerRecord(topic, key, value);

带有回调函数的 Java 生产者:

producer.send(record, (recordMetadata, e) -> {
                // 每次记录成功发送或抛出异常时执行
                if (e == null) {
                    logger.info("Received new metadata. 
" +
                            "Topic: " + recordMetadata.topic() + "
" +
                            "Partition: " + recordMetadata.partition() + "
" +
                            "Offset: " + recordMetadata.partition() + "
");
                } else {
                    logger.error("Error while producing ", e);
                }
            }).get(); // 阻塞 .send() 以使其同步

刷新并关闭生产者:

producer.flush();
producer.close();

下面是完整的代码。代码内部添加了注释,以便更详细地理解代码。

Java


package org.kafkademo.basics;

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.StringSerializer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

public class KafkaProducerWithKeyDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

Logger logger = LoggerFactory.getLogger(KafkaProducerWithKeyDemo.class);

String bootstrapServer = "127.0.0.1:9092";

// Create Producer Properties

Properties properties = new Properties();

properties.setProperty(ProducerConfig.BOOTSTRAPSERVERSCONFIG, bootstrapServer);

properties.setProperty(ProducerConfig.KEYSERIALIZERCLASS_CONFIG, StringSerializer.class.getName());

properties.setProperty(ProducerConfig.VALUESERIALIZERCLASS_CONFIG, StringSerializer.class.getName());

// Create the Producer

KafkaProducer producer = new KafkaProducer(properties);

for (int i = 0; i < 10; i++) {

String topic = "demo_topic";

String value = "hello_world " + i;

String key = "id_" + i;

// Log the Key

logger.info("Key: " + key);

// Create a Producer Record with Key

ProducerRecord record =

new ProducerRecord(topic, key, value);

// Java Producer with Callback

producer.send(record, (recordMetadata, e) -> {

// Executes every time a record successfully sent

// or an exception is thrown

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。如需转载,请注明文章出处豆丁博客和来源网址。https://shluqu.cn/22631.html
点赞
0.00 平均评分 (0% 分数) - 0