kafka-springboot从搭建到使用_Blue-SkyWalker的博客-程序员宅基地

技术标签: java  MessageQueue  zookeeper  kafka  easy  docker  

Docker+kafka+springboot

 

前言:我们暂时先搭建单机的zookeeper和kafka,后面会持续更新伪分布式集群构建;网上很多资料都残缺不全,对于新手来说是一件非常痛苦事,我们直接步入正题吧

 

1拉取zookeeper和kafka镜像

我拉取的是wurstmeister/kafka(维护较多)和zookeeper这两个镜像;当然也可以自己下载客户端打包成docker 镜像,但是我个人觉得自己打包意义不大。大家跟紧我的脚步,别跑丢了

docker pull wurstmeister/kafka

docker pull zookeeper

拉取成功后查看镜像

docker images

2.运行镜像

2.1先运行zookeeper

docker run --name zk -p 2181:2181 -d your-docker-id

your-docker-id是docker images 下获取到的本地docker 镜像id,zk通常使用的是2181端口

zookeeper启动起来后

2.2运行kafka

由于我的是阿里云主机,需要在kafka实现注册时将公网ip暴露给zookeeper,我这里直接更该镜像kafka的配置

find / -name server.properties

得到如下

修改如下

主要添加两行

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://公网ip:9092

再运行kafka

docker run

--name kafka -p 9092:9092

-e KAFKA_ADVERTISED_HOST_NAME=172.17.120.202

-e KAFKA_CREATE_TOPICS="test:1:1"

-e KAFKA_ZOOKEEPER_CONNECT=172.17.120.202:2181

-d  your-docker-id

-e是制定运行配置环境

KAFKA_ADVERTISED_HOST_NAME是内网ip

KAFKA_CREATE_TOPICS 是创建一个topic为test的队列test将会有1个分区和1个副本

KAFKA_ZOOKEEPER_CONNECT是我们的zk地址,是我们指定的根路径

通过日志我们可以看到kafka已经正常启动并且创建了topic-test的队列

docker logs -f kafka

3 springboot整合

3.1添加maven

这里注意maven-kafka版本要与你的kafka版本对应下,我的kafka版本比较新,使用的最新版

可以进入kafka容器,在opt/kafka/lib目录下,看到

这种文件名,前面的2.12是语言版本,后面的2.1.0是kafka版本<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
   <version>2.9.8</version>
</dependency>
<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.47</version>
</dependency>

3.2添加配置文件

server.port=8080
logging.level.root=info
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=kafka地址(我这里使用的公网地址):9092
#=============== provider  =======================
spring.kafka.producer.retries=1
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=0
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.3创建测试生产者

@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public void send(){
        for (int i = 0 ;i < 10 ; i++){
            Message message = new Message(i);
            System.out.println("发送第" + i +"消息");
            kafkaTemplate.send("test",JSONObject.toJSONString(message));
        }
    }
}

3.4创建测试消费者

@Component
public class KafkaTestComsumer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?,?> consumerRecord){
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if(kafkaMessage.isPresent()){
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            System.err.println("消费消息:"+message);
        }
    }
}

3.5测试代码及测试结果

@Autowired
KafkaProducer kafkaProducer;
@GetMapping("/test")
@Test
public String test(Animal animal){
    kafkaProducer.send();
    return "sucess";
}

 

到此完美结束。

后言:我这个2.1.0是目前非常新的kafka版本,自带了zookeeper,不过我们通常有自己的zk,所以我这里顺便弄了个zk,如果还有什么问题我们可以一起讨论,附一个apache-kafka地址-

http://kafka.apache.org/quickstart

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_32092505/article/details/87101353

智能推荐

设置BAT批处理窗口显示颜色_亦游的博客-程序员宅基地_bat 颜色

@echo offcolor 2eecho ___________________________________________________________echo 设置默认的控制台前景和背景颜色(color)。 echo 指定控制台输出的颜色属性 echo 颜色

Java中可变长参数的使用及注意事项_暗夜螃蟹的博客-程序员宅基地

来自:http://www.cnblogs.com/lanxuezaipiao/p/3190673.html在Java5 中提供了变长参数(varargs),也就是在方法定义中可以使用个数不确定的参数,对于同一方法可以使用不同个数的参数调用,例如print("hello");print("hello","lisi");print("hello","张三", "alexia");下面介绍如何定义可

18-12-19 美国7-11连锁店_Grape_orz的博客-程序员宅基地

在美国有一个连锁店叫7-11店,因为这个店以前是早上7点开门晚上11点关门。有一天,一个顾客在商店里挑了四件商品,然后到付款处去交钱。营业员拿起计算器,按了一些键,然后说:总共是$7.11。这个顾客开玩笑说:难道因为你们的店名叫7-11,所以我就要付$7.11吗?这个店员没有听出来这是一个玩笑,回答说:当然不是,我把这四件商品的价格相乘才得出这个结果的!顾客一听非常吃惊:你怎么把...

SpringBoot小新手。_weixin_30361641的博客-程序员宅基地

2018-09-27最近在学习SpringBoot:教材先是在https://start.spring.io/下载了工程。demo.zip下载之后,导入Eclipse工程,pom.xml里面的&lt;parent&gt;结果是有红波浪线的。提示是打不到库,然后就我在maven的安装根目录下D:\apache-maven-3.5.4\conf\settings.xml在里面配置了新的...

多线程--volatile关键字_z-xiaoyao的博客-程序员宅基地

多线程中的volatile关键字,有以下特性volatile:1、强制从公共堆栈中取得变量的值,而不是从线程私有数据栈中取得变量的值2、程安全包含原子性和可见性两个方面,java同步机制都是围绕这两个方面来确保线程安全的3、volatile本身并不处理数据的原子性,而是强制对数据的读写及时影响到主内存4、volatile关键字解决的是变量读取时的可见性问题,但无法保证原子性,对于多个线程访问一个实...

matlab程序求一个正交的相似变换矩阵,图像的等距变换,相似变换,仿射变换,射影变换及其matlab实现..._weixin_39940788的博客-程序员宅基地

第二次写CSDN文档,上一篇的排版实在太烂了,于是决定认真学习一下markdown的语法。好了,废话不多说,今天,我们学习一下图像(2维平面)到图像(2维平面)的四种变换,等距变换,相似变换,仿射变换,投影变换 首先介绍它的原理,最后介绍matlab的实现1.数学基础射影变换矩阵H属于射影群PL(n)中的一个,仿射群是由PL(3)中最后一行为(0,0,1)的矩阵组成的子群,包括仿射群,欧式群,其中...

随便推点

linux shell 定时备份mysql数据库_胡子哥502的博客-程序员宅基地

安装 crontab检测是否已安装 crontab# crontab-bash: crontab: command not found 那说明没有安装 crontab 开始安装:# yum -y install vixie-cron等待安装完成即可.编写备份数据库shellvim /usr/java/shell/bak_mys

python xml.etree.ElementTree模块_weixin_30437847的博客-程序员宅基地

使用的XML文件如下:file.xml&lt;?xml version="1.0"?&gt;&lt;data name="ming"&gt; &lt;country name="Singapore"&gt; &lt;rank&gt;4&lt;/rank&gt; &lt;year&gt;2011&lt;/year&gt; &...

css类名可以用变量吗,如何使用变量内容作为css类名?_端木团团的博客-程序员宅基地

echo("var answersC = answer-table-$qrow-&gt;id;");echo("var toggleC = toggle-table-$qrow-&gt;id;");?&gt;jQuery('toggleC').click(function() {jQuery('answersC').slideToggle('fast');});我没有一个静态的css类名,而是有一...

20170617Windows10_02_线程传参、时间片及线程启动退出_蒲公英24的博客-程序员宅基地

线程线程间参数传递:1:子线程必须在主线程结束之前结束才是正常的退出,否则都有可能出现问题。主线程的结束意味着进程的结束。主线程在结束的时候会强制关闭未关闭的子线程,这样就可能导致很多线程的数据没清理。2:线程创建立即执行,立即执行具体先执行子线程的还是CreateThread后面的函数是不一定的,都有可能。子线程创建的子线程,如果传递参数要使用,那么可以传递全局变量或者new的变量,

Linux下jdk的安装与卸载_eric_han1987的博客-程序员宅基地

1.卸载默认的jdk两种方法:(1)卸载系统自带的jdk版本:查看自带的jdk:   #rpm -qa|grep gcj看到如下信息:   libgcj-4.1.2-44.el5   java-1.4.2-gcj-compat-1.4.2.0-40jpp.115使用rpm -e --nodeps 命令删除上面查找的内容:   #rpm -e –nodeps java

渗透测试 必学的靶场_H.U.C _TongXin的博客-程序员宅基地

1.DVWA可以说是入坑必刷靶机了,没有之一。(很多高校的入坑课的第一节都是搭建 DVWA)还是简单介绍一下吧。Damn Vulnerable Web Application (DVWA),其主要目标是帮助安全专业人员在法律环境中测试他们的技能和工具,帮助 Web 开发人员更好地了解保护 Web 应用程序的过程,并帮助教师/学生在课堂环境中教授/学习 Web 应用程序安全性。传送门:https://github.com/ethicalhack3r/DVWA/archive/master.zip2.P