springboot整合kafka,kafka消息过滤_kafka tag-程序员宅基地

技术标签: 零碎总结  spring boot  java  kafka  

需求

出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码

代码

配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
    #    client-id: # 发送请求时传给服务器的id
    consumer:
      topic: TEST_XXX
      bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
      group-id: TEST_GROUP # 消费者所属消息组
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类

消费过滤代码:

    /**
     * 消息过滤
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
    
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可以丢弃消息  配合RecordFilterStrategy使用
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
    
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
    
                String data = (String) consumerRecord.value();
                log.info("filterContainerFactory filter : "+data);
                if (data.indexOf("test") != -1) {
    
                    return false;
                }
                //返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }

实际消费代码:

 @KafkaListener(topics = "${spring.kafka.consumer.topic}",groupId = "${spring.kafka.consumer.group-id}",containerFactory = "filterContainerFactory")
    public void consumerDatabase(String data, Acknowledgment ack){
    
    log.info("group1接收到消息时间:{}",dateTime);
   //业务处理代码
    //手动提交偏移量
    ack.acknowledge();
}

原理:消息过滤器

消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。

配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),

返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013036688/article/details/122108550

智能推荐

级联样式表_级联样式表| 第三部分-程序员宅基地

文章浏览阅读173次。级联样式表 CSS-难以成熟 (CSS — Difficult to maturation)Unlike software, the CSS specifications are developed by successive versions, which would allow a browser to refer to a particular version. CSS was devel..._级联样式表是哪年产生的

sql server学习笔记——批处理语句、存储过程_sql的批处理-程序员宅基地

文章浏览阅读1.7k次。目录批处理语句1、批处理语句简介示例一:示例二:存储过程一、什么是存储过程1、存储过程的简介2、存储过程包含的内容3、存储过程的优点4、存储过程的分类系统存储过程:用户定义存储过程5、常用的系统储存过程(1)一般常用的存储过程(2)xp_cmdshell二、创建存储过程1、定义存储过程的语法2、不带参数的存储过程3、带参数..._sql的批处理

css代码的定位及浮动

上次,我们解除了css的内外边距、鼠标悬停及其练习。现在我们学习css元素练习和定位。

一种简单的蒙特卡洛树搜索并行化方法-程序员宅基地

文章浏览阅读2.2k次,点赞4次,收藏6次。本文提出了 WU-UCT, 一种新颖的并行 MCTS 算法, 通过监控未观察样本的数量来解决并行化过程中统计数据过时的问题. 基于新设计的统计数据, 它有正确地修正了UCT节点选择策略, 实现了有效的探索和利用的权衡. _树搜索并行化

python多页爬取page_python爬虫实现爬取同一个网站的多页数据的实例讲解-程序员宅基地

文章浏览阅读3.2k次。对于一个网站的图片、文字音视频等,如果我们一个个的下载,不仅浪费时间,而且很容易出错。Python爬虫帮助我们获取需要的数据,这个数据是可以快速批量的获取。本文小编带领大家通过python爬虫获取获取总页数并更改url的方法,实现爬取同一个网站的多页数据。一、爬虫的目的从网上获取对你有需要的数据二、爬虫过程1、获取url(网址)。2、发出请求,获得响应。3、提取数据。4、保存数据。三、爬虫功能可以..._python 爬虫 get_page_size

《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL_flink+kafka统计数据后插入mysql-程序员宅基地

文章浏览阅读2.1k次。本文原创地址是: http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/ , 未经允许禁止转载。前言之前其实在 《从0到1学习Flink》—— 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星球里有朋友叫我: 写个从 kafka 中读取数据,..._flink+kafka统计数据后插入mysql

随便推点

还在用PPT做组织架构图?公司都在用的架构图软件是什么?_书本里印刷的结构图是用什么软件做的-程序员宅基地

文章浏览阅读3.1k次。还在用PPT、Word和Excel画企业组织结构图吗?对于人力资源的同事来说,画组织结构图是一键非常头疼的事情,尤其是对于一些大公司和人员变动较大的公司来说,需要经常更换组织结构图,每次变动都要耗费大量的时间和精力去重新绘图。其实绘制织结构图很简单,之所以难是因为没有找对工具和方法!今天小编就教你如何用亿图图示轻松绘制一个既美观又专业的组织结构图!下图是一个简单的组织结构图例子,小编就以此为例,详细讲解一下好看清晰、实用的公司组织结构图是怎么画出来的。1、新建组织结构图2、创建组织结构_书本里印刷的结构图是用什么软件做的

ESP32-C3 BLE5.0 扩展蓝牙名称长度的流程_蓝牙广播名称过长-程序员宅基地

文章浏览阅读1.8k次,点赞4次,收藏5次。BLE5.0 扩展蓝牙名称长度_蓝牙广播名称过长

centos8安装NVIDIA显卡驱动,docker模式运行机器学习_centos8安装显卡驱动-程序员宅基地

文章浏览阅读3.5k次。centos8安装NVIDIA显卡驱动,docker模式运行机器学习_centos8安装显卡驱动

利用优先级拥抱需求变更_需求优先级反复变化-程序员宅基地

文章浏览阅读2.4k次。需求变更这件事,每个开发人员都遇到过,每个产品经理也都遇到过。 以前,我们会追求需求不变更,但无论是产品型团队还是项目型团队,需求不变更都是天方夜谈,不可能实现的。即使把需求变更的成本提得很高,流程搞得很复杂,又要填变更单,又要几级经理审批,又要需求评审,依然无法避免。 于是,团队的目标变成了少变更,希望尽量少的变更既能满足业务的需要,又能减少开发团队的反感。但‘少’是个相对的概念_需求优先级反复变化

素数筛法_筛法求素数-程序员宅基地

文章浏览阅读2.3k次。一,素数筛法如果需要用到素数表,要么硬编码,要么自己求出前若干项素数。硬编码适合小表,大表只能自己求,而最常见的就是筛法。_筛法求素数

【深度长文】细思极恐的YouTube可跳过广告-程序员宅基地

文章浏览阅读1.8k次。【预警】:Youtube的可跳过广告尽管极其精妙(看完你就知道了),但其实并没有到细思极恐的程度;同时本文也并非深度长文,长是比较长,深不深度就另说了!作为一个广告PM,这种标题党行为都是为了优化点击率。开篇就明确文章属标题党,如今像我这样的良心作者已经不多了,快猛戳关注啊!有一些互联网产品:你认为它很简单、甚至很愚蠢,然而实际上它的设计却无比精妙、细思极恐,YouTube的5秒可跳过的贴片广

推荐文章

热门文章

相关标签