Spring Boot 是 Java 编程语言中最流行、使用最广泛的框架之一。它是一个基于微服务的框架,使用 Spring Boot 构建生产就绪的应用程序所需的时间非常少。Spring Boot 让我们可以轻松创建独立的、基于 Spring 的生产级应用程序,这些程序我们可以“直接运行”。下面,让我们列举一下 Spring Boot 的主要特性:
- 创建独立的 Spring 应用程序
- 直接嵌入 Tomcat、Jetty 或 Undertow
- 提供“起步”依赖项以简化构建配置
- 尽可能自动配置 Spring 和第三方库
- 提供生产就绪的功能,如健康检查、指标和外部化配置
- 几乎不需要代码生成,也不需要 XML 配置
Apache Kafka 是一个发布-订阅消息系统。消息系统允许我们在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义主题(一个主题可能是一个类别)并进一步处理的软件。应用程序可以连接到这个系统并将消息传输到主题上。消息可以包含任何类型的信息,从个人博客上的任何事件,到可以触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用 Spring Boot 从 Kafka 主题消费消息并将它们显示在我们的控制台中(这里的前提是已安装 Kafka)。
示例:
> 前提条件:请确保你已经在本地机器上安装了 Apache Kafka,你需要了解 如何在 Windows 上安装并运行 Apache Kafka?
第 1 步: 访问这个 链接 并创建一个 Spring Boot 项目。将“Spring for Apache Kafka”依赖项添加到你的 Spring Boot 项目中。
第 2 步: 创建一个名为 KafkaConfig 的配置文件。下面是 KafkaConfig.java 文件的代码。
// Java Program to Illustrate Kafka Configuration
package com.amiya.kafka.apachekafkaconsumer.config;
// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
// Annotations
@EnableKafka
@Configuration
// Class
public class KafkaConfig {
@Bean
public ConsumerFactory consumerFactory()
{
// Creating a Map of string-object pairs
Map config = new HashMap();
// Adding the Configuration
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,
"group_id");
config.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory(config);
}
// Creating a Listener
public ConcurrentKafkaListenerContainerFactory
concurrentKafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
第 3 步: 创建一个名为 KafkaConsumer 的消费者文件。
// Java Program to Illustrate Kafka Consumer
package com.amiya.kafka.apachekafkaconsumer.consumer;
// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
// Class
public class KafkaConsumer {
@KafkaListener(topics = "NewTopic",
groupId = "group_id")
// Method
public void
consume(String message)
{
// Print statement
System.out.println("message = " + message);
}
}
第 4 步: 现在我们需要做以下事情,以便使用 Spring Boot 从 Kafka 主题消费消息:
- 运行 Apache Zookeeper 服务器
- 运行 Apache Kafka 服务器
- 从 Kafka 主题发送消息
运行你的 Apac