Apache Kafka 是一种发布-订阅消息系统。消息系统允许你在进程、应用程序和服务器之间发送消息。Apache Kafka 是一种软件,我们可以在其中定义主题(Topic,一个主题可能是一个类别)并对其进行进一步处理。点击此处阅读更多关于 Kafka 的内容:什么是 Apache Kafka 及其工作原理。Kafka 生产者 负责将数据写入主题,而主题由多个分区组成。现在,Kafka 中的生产者将根据你的消息自动知道要写入哪个 Broker 和分区;如果你的集群中发生 Kafka Broker 故障,生产者将自动从中恢复,这使得 Kafka 具有弹性,也是 Kafka 如今如此优秀和被广泛使用的原因。在本文中,我们将讨论如何使用 Java 创建带有 Key 的 Apache Kafka 生产者的分步实现。
分步实现
步骤 1:在 IntelliJ 中创建一个新的 Apache Kafka 项目
要使用 Java 和 Maven 在 IntelliJ 中创建一个新的 Apache Kafka 项目,请参考如何使用 Java 和 Maven 在 IntelliJ 中创建 Apache Kafka 项目。
步骤 2:安装并运行 Apache Kafka
要在本地系统中安装并运行 Apache Kafka,请参考 如何安装并运行 Apache Kafka。
步骤 3:创建带有 Key 的生产者
首先,我们必须创建生产者属性。要创建生产者属性,请参考下面的代码片段。
创建生产者属性:
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
创建生产者:
KafkaProducer producer = new KafkaProducer(properties);
创建带有 Key 的生产者记录:
String topic = "demo_topic";
String value = "hello_world " + i;
String key = "id_" + i;
ProducerRecord record =
new ProducerRecord(topic, key, value);
带有回调函数的 Java 生产者:
producer.send(record, (recordMetadata, e) -> {
// 每次记录成功发送或抛出异常时执行
if (e == null) {
logger.info("Received new metadata.
" +
"Topic: " + recordMetadata.topic() + "
" +
"Partition: " + recordMetadata.partition() + "
" +
"Offset: " + recordMetadata.partition() + "
");
} else {
logger.error("Error while producing ", e);
}
}).get(); // 阻塞 .send() 以使其同步
刷新并关闭生产者:
producer.flush();
producer.close();
下面是完整的代码。代码内部添加了注释,以便更详细地理解代码。
Java
“
package org.kafkademo.basics;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerWithKeyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Logger logger = LoggerFactory.getLogger(KafkaProducerWithKeyDemo.class);
String bootstrapServer = "127.0.0.1:9092";
// Create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAPSERVERSCONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEYSERIALIZERCLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUESERIALIZERCLASS_CONFIG, StringSerializer.class.getName());
// Create the Producer
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
String topic = "demo_topic";
String value = "hello_world " + i;
String key = "id_" + i;
// Log the Key
logger.info("Key: " + key);
// Create a Producer Record with Key
ProducerRecord record =
new ProducerRecord(topic, key, value);
// Java Producer with Callback
producer.send(record, (recordMetadata, e) -> {
// Executes every time a record successfully sent
// or an exception is thrown