【Kafka】SpringBoot 整合 Kafka 以及 @KafkaListener 注解的使用_kafkalistener注解-程序员宅基地

技术标签: spring boot  java  kafka  后端  springboot  

一、前提已经安装好 kafka

我是在 windows 中安装的 Kafka,用于在本地测试用的

Windows 安装 kafka

二、新建 SpringBoot 项目

1、添加项目依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、添加配置文件 application.properties

配置中用了批量消费


# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#重试次数
spring.kafka.producer.retries=3
#批量发送的消息数量
spring.kafka.producer.batch-size=1000
#32MB的批处理缓冲区
spring.kafka.producer.buffer-memory=33554432
#默认消费者组
spring.kafka.consumer.group-id=crm-microservice-newperformance
#最早未被消费的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取数据量
spring.kafka.consumer.max-poll-records=4000
#是否自动提交
spring.kafka.consumer.enable-auto-commit=false
#自动提交时间间隔,单位ms
spring.kafka.consumer.auto-commit-interval=1000
#批消费并发量,小于或等于Topic的分区数
spring.kafka.consumer.batch.concurrency = 3

3、创建一个 KafkaConfiguration 配置类

package com.example.kafkademo.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Frederic.Hu
 * @date 2022/05/25 18:00
 */
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.batch.concurrency}")
    private Integer batchConcurrency;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    /**
     *  生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     *  生产者工厂
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     *  生产者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     *  消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     *  消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //设置并发量,小于或等于Topic的分区数
        factory.setConcurrency(batchConcurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        //配置监听手动提交 ack,消费一条数据完后,立即提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

    /**
     * 异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
        return (message,exception,consumer)->{
            System.out.println("消费异常:"+message.getPayload());
            return null;
        };
    }

}

4、写一个向 Kafka 推送消费的测试类(生产者 producer)

package com.example.kafkademo;

import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.*;

/**
 * @author Frederic.Hu
 * @Description
 * @date 2022/05/25 17:46
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void testSend(){
        Map<String, Object> map = new LinkedHashMap<>();
        map.put("username", "小明");
        map.put("userid", 1);
        map.put("age", 12);
        kafkaTemplate.send("test4", JSONObject.toJSONString(map)).addCallback(success -> {
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            logger.info("产线发送消息到kafka队列成功:{}, offset为:{}", JSONObject.toJSONString(map), offset);
        }, failure -> {
            logger.error("产线发送消息到kafka队列失败:{}, 报错信息为:{}", JSONObject.toJSONString(map), failure.getMessage());
        });
    }

}

5、创建一个消费者(消费者 consumer)

package com.example.kafkademo.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author Frederic.Hu
 * @Description
 * @date 2022/05/25 17:43
 */
@Component
public class BigDataTopicListener {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 监听kafka数据(批量消费)
     * @param consumerRecords
     * @param ack
     */
    @KafkaListener(id = "operation", topics = {"test4"}, containerFactory = "batchFactory", errorHandler="consumerAwareErrorHandler")
    public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
        long start = System.currentTimeMillis();

        //...
        //db.batchSave(consumerRecords);//批量插入或者批量更新数据

        for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
            logger.info("消费的每条数据为:{}", consumerRecord.value());
        }
        //手动提交
        ack.acknowledge();
        logger.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
    }

}

6、启动测试类,查看控制台

三、过程中的遇到的一些坑及总结

1、Kafka 中 topic 不存在的话,启动项目会报错

解决办法:启动项目之前,先在 Kafka 中创建好自己定义的 topic 名称,也可以在配置类中写一个自动创建 topic,但是出现一个问题,项目上线每个 Kafka 的集群数都不一样,自动创建 topic 时,分区数和副本数不好设置,设置不合理,启动项目是会报错的。

2、生产者生产消息是否成功怎么看?

解决办法:kafkaTemplate 提供了一个回调方法 addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理。

3、消费者消费消息报错了怎么办?

解决办法:新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用 @Bean 注入,BeanName 默认就是方法名,然后我们将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

4、消费不同的 topic 中的数据,消费者组(group id)如果用的是同一个,消费时会报错的

解决办法:@KafkaListener 中的 id 监听器使用不同的名称,如果配置文属性配置了默认消费组(group id),注解中的 监听器 id 会覆盖默认的消费组(group id)。

5、重复消费了数据,怎么办?

原因:消费者宕机、重启或者被强行 kill 进程,导致消费者消费的 offset 没有提交。或者消费后的数据,当 offset 还没有提交时,Partition 就断开连接。

解决办法:我目前项目中,是消费的数据插入到 MySQL 中的,如果重复消费了,插入到数据库中的时候,会查询该主键已经在数据库存在,则更新该条数据。

四、参考文档

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yuxiangdeming/article/details/125004280

智能推荐

PKI常见证书格式和转换_pkcs#10 pfx-程序员宅基地

文章浏览阅读1.1k次。PKCS 全称是 Public-Key Cryptography Standards ,是由 RSA 实验室与其它安全系统开发商为促进公钥密码的发展而制订的一系列标准,PKCS 目前共发布过 15 个标准。 常用的有:PKCS#7 Cryptographic Message Syntax StandardPKCS#10 Certification Request StandardP..._pkcs#10 pfx

pickle模块有关_pickle文件用txt方式打开之后影响吗-程序员宅基地

文章浏览阅读358次。读书笔记: 参考 :http://blog.csdn.net/junwei0206/article/details/45014103 pickle模块的用法: 总体使用方法类似于文件的读写,都要遵循打开–>读/写–>关闭 写入: 首先打开:pickle_file = open(‘my_list.pkl’,’wb’)#打开名为my_list.pkl的文件,以二进制的方式进行写入 写入:pi_pickle文件用txt方式打开之后影响吗

C经典书籍笔记——C陷阱与缺陷③(语意陷阱)_复制指针不会复制指针所指向的是一句-程序员宅基地

文章浏览阅读463次,点赞26次,收藏10次。一、前言二、字符指针结论一:复制指针并不会复制指针所指向的内容。两个指针所指向位置相同,实际为同一个指针。结论而:开辟两个数组,即使两个数组内容相同,地址也绝不相同。三、边界计算与不对称边界1.经典错误①int main(){ int i = 0; int arr[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; for (i = 0; i < 13; i++) { arr[i] = 0; printf("haha"._复制指针不会复制指针所指向的是一句

一文读懂“数据分发服务DDS”(Data Distribution Service,RTPS,OMG)_omg dds-程序员宅基地

文章浏览阅读3.1w次,点赞174次,收藏460次。DDS 数据分发服务(Data Distribution Service)数据分发服务DDS(DataDistributionService)是对象管理组织(OMG)在HLA及CORBA等标准的基础上制定的新一代分布式实时通信中间件技术规范,DDS采用发布/订阅体系架构,强调以数据为中心,提供丰富的QoS服务质量策略,能保障数据进行实时、高效、灵活地分发,可满足各种分布式实时通信应用需求。DDS..._omg dds

鸿蒙用于物联网,任正非:鸿蒙开发初衷是用于物联网-程序员宅基地

文章浏览阅读133次。近日,华为社区发布了关于华为创始人任正非在接受外媒采访的内容,其中任正非表示,开发设计鸿蒙的初衷是用于物联网,比如工业控制、无人驾驶……来支撑使用,我们现在首先使用在手表、在智能8K大屏、车联网上。7月9日,华为“心声社区”发布华为创始人任正非6月18日接受法国《观点》周刊的采访实录。在采访中,任正非谈到关于华为自研操作系统的相关信息。任正非称,鸿蒙操作系统是一个面向确定时延系统的操作系统,对诸如..._鸿蒙开发需要硬件物联网吗

java 父类对象赋值给子类_typehelper.fathertochild-程序员宅基地

文章浏览阅读5.3k次。方式一:private void fatherToChild(Object father, Object child) { if (!(child.getClass().getSuperclass() == father.getClass())) { try { throw new Exception(child + "不是" + father + "的子类");..._typehelper.fathertochild

随便推点

鸿蒙手机系统会是什么样的,[财经]鸿蒙发布会主要内容2021新消息解读:OS手机操作系统是什么样的? - 南方财富网...-程序员宅基地

文章浏览阅读110次。2021 年 6 月 2 日,在备受瞩目的 HarmonyOS 2 及华为全场景新品发布会上,华为正式发布了数款出厂搭载鸿蒙 OS(HarmonyOS)的智能手机新品:Mate 40 系列、Mate X2 和 Nova 8 Pro 的新版本——同时,华为 P50 系列手机新品也在发布会上正式亮相。对于 P50 系列,余承东表示:这是一款伟大的产品,伟大的产品值得期待。除了上述智能手机新品,华为也在..._鸿蒙手机系统内容是什么样的

mybatis和Hibernate的区别_mybatis和hibernate的区别图-程序员宅基地

文章浏览阅读220次。1.什么是HibernateHibernate是关系型映射框架(ORM),程序员只要定义好实体到数据库的映射关系,即可通过Hibernate提供的方法完成持久层的操作,不需要对sql熟练掌握,Hibernate会根据存储逻辑,自动生成对应的sql,并调用jdbc接口加以执行2.什么是MybatisMybatis是持久层框架,侧重于实体和sql之间的映射关系..._mybatis和hibernate的区别图

pycharm安装库及安装失败解决办法_file 'c:\users\15394\pycharmprojects\pythonproject-程序员宅基地

文章浏览阅读9.8k次,点赞13次,收藏27次。pycharm安装包的步骤为稍等片刻,就安装好了,可以通过调用cmd(window+R,再输入cmd),输入python -m pip list即可查看安装的包。但是我在安装过程中出现了错误,无论是采用cmd安装还是pycharm安装库都不行,在网上查找之后,发现使用的是虚拟环境下的解释器,下面介绍如何将虚拟环境的解释器改成安装python真实路径的解释器。找到路径就可以了。..._file 'c:\users\15394\pycharmprojects\pythonproject\test.py' already exists

PHP文件操作入门_file_use_include_path-程序员宅基地

文章浏览阅读513次。文件读写1、读取文件file_get_contents():将文件中的内容读取到一个字符串中file_get_contents($filename,$use_include_path,$context,$offset,$maxlen);$filename:指定要读取的文件路径;其他参数为可选参数$use_include_path,若想在php.ini中配置的 include_path路..._file_use_include_path

如何利用SQLyog数据库管理工具生成各表的关系视图_sqlyog er图-程序员宅基地

文章浏览阅读3.8w次,点赞21次,收藏38次。 工具: 1. SQLyog-12.0.9-0.x64 2. Mysql5.7 mysql5.7直接去官网下载就可以了。SQLyog的破解版,在这个网址有http://download.csdn.net/download/whdxkcx/10027742。 过程: 1.打..._sqlyog er图

编程开发需知的24项_bestcssbuttongenerat-程序员宅基地

文章浏览阅读182次。1、element-ui一种好用的ui库 用户特别多,element遇到的问题基本都可以百度找到解决办法(要知道你遇到的问题别人也会遇到)2、vxe-table 一种强大的table组件库(真的很强大,这个是专门针对表格开发的库,功能很全也很强大,唯一不好的是用户不多,可能会遇到解决不了的bug,但我相信这个库一定会火)3、Vant - 轻量、可靠的移动端 Vue 组件库 (一般争对..._bestcssbuttongenerat