7.SpringBoot整合Kafka_扛麻袋的少年的博客-程序员宅基地

技术标签: # Kafka  

1.引入Maven依赖

<!-- springboot整合kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

2.application.yml配置文件

   新增如下配置:

spring:
  #Kafka配置
  kafka:
    producer:
      # 发送端brokers 集群
      bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
      # 发送端 id
      client-id: producerDemo
      # 发送端确认模式
      acks: -1
      # 发送失败重试次数
      retries: 3
      # 批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
      batch-size: 4096
      # 33554432 即32MB的批处理缓冲区
      buffer-memory: 40960
      # key 序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value 序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
        linger.ms: 10
    consumer:
      # 消费端 brokers 集群
      bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
      # 消费者 group.id 组ID
      group-id: test-group
      # 消费者消费消息后,进行自动提交
      enable-auto-commit: true
      # 自动提交的频率(与 enable.auto.commit = true 属性配合使用)
      auto-commit-interval: 1000
      # 新的groupid,是否从头开始消费
      auto-offset-reset: earliest
      # key 反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value 反序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.Controller类 ----生产&消费

@Controller
@RequestMapping("kafka")
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送消息
     */
    @RequestMapping("producer.htm")
    @ResponseBody
    public void getUser(ModelAndView view){
        kafkaTemplate.send("testTopic","消息发送");
        kafkaTemplate.send("testTopic1","消息发送1");
        kafkaTemplate.send("testTopic2","消息发送2");
    }

    /**
     * 消费testTopic中的消息
     */
    @KafkaListener(topics = {"testTopic"})
    public void topicMessage(ConsumerRecord<?, ?> record,String content){
        System.out.println("消息:"+ content);
        System.out.println("消息被消费.+++++++++++Topic:"+ record.topic() + ",+++++++++++++Message:" + record.value() );
    }

    /**
     * 消费testTopic1中的消息
     */
    @KafkaListener(topics = {"testTopic1"})
    public void topicMessage1(String content){
        System.out.println("消息被消费1:"+ content);
    }

    /**
     * 消费testTopic2中的消息
     */
    @KafkaListener(topics = {"testTopic2"})
    public void topicMessage2(String content){
        System.out.println("消息被消费2:"+ content);
    }

}

至此,SpringBoot 整合 Kafka 总结完毕。

如果本文对你有所帮助,那就给我点个赞呗 ^_^

END


如需了解其他方式Spring整合Kafka,请移步

  1.【基于XML方式】Spring整合Kafka

  2.【基于注解方式】Spring整合Kafka

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lzb348110175/article/details/100700051

智能推荐

Python文件UnicodeDecodeError: invalid start byte / illegal multibyte sequence_suoluo_2020的博客-程序员宅基地

Python代码中的中文读写经常出问题,究其原因,主要在codec上。例如,如下两个错误:UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 98: invalid start byte或:'gbk' codec can't decode byte 0xfe in position 16172: illegal multibyte sequence原因:open文件时指定了encoding 类型,但在文

python拼接ts文件成mp3格式_利用python爬虫通过m3u8文件下载ts视频_weixin_39742065的博客-程序员宅基地

什么是m3u8文件M3U8文件是指UTF-8编码格式的M3U文件。M3U文件是记录了一个索引纯文本文件,打开它时播放软件并不是播放它,而是根据它的索引找到对应的音视频文件的网络地址进行在线播放。原视频数据分割为很多个TS流,每个TS流的地址记录在m3u8文件列表中比如我这里有一个m3u8文件,文件内容如下:#EXTM3U#EXT-X-VERSION:3#EXT-X-MEDIA-SEQUENCE:0...

Android录制小视频(仿微信小视频)_jsmeli的博客-程序员宅基地

Android视频录制一、概述日常生活中,录制一些视频已经渐渐成为一种习惯,当然这对于我们技术来说并没有什么影响,因为无论大家用不用,你都需要开发,这只是需求制定者–PM应该关心的事情,我们需要关心的是视频开发的过程以及难点还有会碰上什么坑,这才是技术应该想的事情。不过,市场上面的视频以及直播的App确实也是与日俱增,蝌蚪音客、美拍、小影,小咖秀,快手等等。这类App的技术难点基本都是在音视频处理这

python2和python3共存环境变量_Win7和win10下python3和python2同时安装并解决pip共存问题..._weixin_39849548的博客-程序员宅基地

特别说明,本文是在Windows64位系统下进行的,32位系统请下载相应版本的安装包,安装方法类似。使用python开发,环境有Python2和 python3 两种,有时候需要两种环境切换使用,下面提供详细教程一份。1、下载python3和python2选择Downloads---&gt;Windows,点击进入就可以看到寻找想要的python版本本文选择的是:安装scrapypip3 inst...

Python基础迭代器iter() 和 next()。_孙小星的博客-程序员宅基地

Python3 迭代器与生成器迭代器迭代是Python最强大的功能之一,是访问集合元素的一种方式。。迭代器是一个可以记住遍历的位置的对象。迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。迭代器有两个基本的方法:iter() 和 next()。字符串,列表或元组对象都可用于创建迭代器:实例(Py

关于JAVA核心技术(卷一)读后的思考(用户自定义类,静态域和静态方法的思考以及方法参数)..._dianjian2253的博客-程序员宅基地

用户自定义类:这部分并没有太过于困难的部分,借由代码进行复习:Employee类的定义:package com.java.EmployeeTest;import java.time.*;public class Employee { private String name; private double salary; private LocalDate h...

随便推点

根据URL加载图片并付给bitmap(微信分享中使用)_流火星空的博客-程序员宅基地

原文网址: http://xiaxveliang.blog.163.com/blog/static/297080342013827768676/

【每周一文】Learning Classifiers from Only Positive and Unlabeled Data(2008)_下一步的博客-程序员宅基地

概述本文也是用于求解PULearning问题,并且提出在样本满足一定分布情况下,根据正样本和未标记样本作为负样本训练出来的分类模型,预测出来的结果和该样本属于正样本的概率存在一个固定常数系数。 根据该假设提出两种模型训练思路,能够得到最好的效果。问题求解对于给定的样本数据引入第三个随机变量S表示该样本是否被标记过,则p(x,y,s)满足一定的概率分布。 变量含义x为样本特征数据

yum安装模糊查询所需要的软件包方法_fishinhouse的博客-程序员宅基地_yum查询需要的包

yum安装卸载命令,yum install 'package_name'yum remove 'package_name'yum查询想要安装的包yum search 'keyword'例如yum search messenger模糊搜索本地yum包yum list | grep packageName列出所有可安装的软件包yum list列出所有可更新的软件包yum list updates列出所...

记录-插入节点insertBefore()报错_启舟亢的博客-程序员宅基地

记录今天在插入节点问题上出现了报错Uncaught TypeError: Failed to execute ‘insertBefore‘ on ‘Node‘: 2 arguments required, but only 1 present.在之前都是用appendChild,突发想用insertBefore,就报错了,感觉去看了一下文档。node.insertBefore(newn...

Linux TCP/IP调优-Linux内核参数注释_weixin_30518397的博客-程序员宅基地

固定文件的内核参数下列文件所在目录:/proc/sys/net/ipv4/名称默认值建议值描述tcpsyn_retries51对于一个新建连接,内核要发送多少个SYN连接请求才决定放弃。不应该大于255,默认值是5,对应于180毫秒左右时间。(对于大负载而物理通信良好的网络来说,这个值偏高,可以修改为2。这个...

mysql常用语句_weixin_43460393的博客-程序员宅基地

create database name;/*创建数据库文件*/show databases;drop database name;use name

推荐文章

热门文章

相关标签