Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行、使用最广泛的框架之一。它是一个基于微服务的框架,使用 Spring Boot 构建生产就绪的应用程序所需的时间非常少。Spring Boot 让我们可以轻松创建独立的、基于 Spring 的生产级应用程序,这些程序我们可以“直接运行”。下面,让我们列举一下 Spring Boot 的主要特性:

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow
  • 提供“起步”依赖项以简化构建配置
  • 尽可能自动配置 Spring 和第三方库
  • 提供生产就绪的功能,如健康检查、指标和外部化配置
  • 几乎不需要代码生成,也不需要 XML 配置

Apache Kafka 是一个发布-订阅消息系统。消息系统允许我们在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义主题(一个主题可能是一个类别)并进一步处理的软件。应用程序可以连接到这个系统并将消息传输到主题上。消息可以包含任何类型的信息,从个人博客上的任何事件,到可以触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用 Spring Boot 从 Kafka 主题消费消息并将它们显示在我们的控制台中(这里的前提是已安装 Kafka)。

!image

示例:

> 前提条件:请确保你已经在本地机器上安装了 Apache Kafka,你需要了解 如何在 Windows 上安装并运行 Apache Kafka?

第 1 步: 访问这个 链接 并创建一个 Spring Boot 项目。将“Spring for Apache Kafka”依赖项添加到你的 Spring Boot 项目中。

!image

第 2 步: 创建一个名为 KafkaConfig 的配置文件。下面是 KafkaConfig.java 文件的代码。

// Java Program to Illustrate Kafka Configuration

package com.amiya.kafka.apachekafkaconsumer.config;

// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

// Annotations
@EnableKafka
@Configuration

// Class
public class KafkaConfig {

    @Bean
    public ConsumerFactory consumerFactory()
    {

        // Creating a Map of string-object pairs
        Map config = new HashMap();

        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);

        return new DefaultKafkaConsumerFactory(config);
    }

    // Creating a Listener
    public ConcurrentKafkaListenerContainerFactory
    concurrentKafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory factory
            = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

第 3 步: 创建一个名为 KafkaConsumer 的消费者文件。

// Java Program to Illustrate Kafka Consumer

package com.amiya.kafka.apachekafkaconsumer.consumer;

// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component

// Class
public class KafkaConsumer {

    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id")

    // Method
    public void
    consume(String message)
    {
        // Print statement
        System.out.println("message = " + message);
    }
}

第 4 步: 现在我们需要做以下事情,以便使用 Spring Boot 从 Kafka 主题消费消息:

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

运行你的 Apac

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。如需转载,请注明文章出处豆丁博客和来源网址。https://shluqu.cn/53231.html
点赞
0.00 平均评分 (0% 分数) - 0