SpringBoot从入门到精通系列七:消息组件Kafka_springboot kafka 消息类型-程序员宅基地

技术标签: 消息组件Kafka  日常分享专栏  SpringBoot  

Kafka作为消息组件使用,并不是单纯的消息组件,Kafka被定位成开源的分布式事件流平台。

相关消费kafka消息博客如下:

一、安装Kafka及CMAK

Kafka还依赖于Zookeeper,因此安装Kafka之前需要先安装、运行ZooKeeper。

ZooKeeper和Kafka相关博客:

1.Kafka主题和分区

Kafak的主题只是存储消息的逻辑容器,主题之下会分为若干个分区,分区才是存储消息的物理容器。

简而言之,消息存在于分区中,一个或多个分区组成主题。因此,Kafka的消息组织方式是三级结构:主题->分区->消息。

主题只是消息的逻辑分类,它是发布消息的类别或消息源的名称。分区才是真正存储消息的地方,分区在物理存储层面就是一个个日志文件。kafka配置的log.dirs属性用于指定Kafka日志的存储目录,由于Kafka消息实际存储在分区(日志文件)中,因此该属性是指定Kafka消息的存储目录。

分区文件是一个有序的、不可变的记录序列,序列的数据项可通过下标访问,下标从0开始。分区文件的结构有点类似于不可变的List集合,只不过List集合存储在内存中,而分区文件则持久化地存储在磁盘上。

Kafka默认有一个名为"__consumer_offsets"的主题,该主题是Kafka自动创建的内部主题:位移主题,用于保存Kafka内部的位移信息。

  • Replication:复制因子
  • Total Partitions:总共包含多少个分区
  • Partitions on Broker:当前Broker上的分区的数量
  • Partitions:当前Broker上包含哪些分区
  • Partitions as Leader:当前Broker上的领导者分区的数量
  • Brokers:分布在几个Broker上
  • Brokers Spread %:分布率

2.ISR副本

ISR副本就是Kafka认为与领导者副本的数据同步的副本。根据该定义可以看出,领导者副本天然就是ISR副本,某些情况下,ISR中只要领导者一个副本。

追随者副本怎样的条件下才算ISR副本呢?

  • 领导者副本有10条消息
  • 追随者副本A中只有4条消息
  • 追随者副本B中有6条消息

两个副本都有可能符合ISR标准,也都有可能不符合ISR标准,设置有可能副本A符合ISR标准,副本B不符合ISR标准。

判断一个副本是否符合ISR标准,取决于server.properties文件中的replica.lag.time.max.ms配置参数,该参数的默认值为30000(即30秒),Kafka建议将该参数配置为10~30秒。

  • replica.lag.time.max.ms参数的含义是允许追随这副本滞后于领导者副本的最长时间,比如将该参数设置为15s,意味着只要一个追随者副本滞后于领导者副本的时间不连续超过15s,Kafka就认为该追随者副本符合ISR标准,即使该追随者副本中保存的消息明显少于领导者副本中的消息也没关系。
  • 由于追随者副本总是在不断地尽力从领导者副本中拉取消息,然后写入自己的日志中,如果这个同步速度持续慢于领导者副本的消息写入速度,那么在达到replica.lag.time.max.ms时间后,该追随者副本就会被移出ISR集合。如果该追随者副本后来又慢慢地追上了领导者副本的进度,那么它是能够被重新加入ISR集合的,因此ISR副本也是一个动态调整的集合。

Kafka将所有不符合ISR标准的副本称为非同步副本。通常而言,非同步副本滞后于领导者副本太多,当领导者副本挂掉时,非同步副本不适合被选举为领导者副本,否则会造成数据丢失,这也是Kafka的默认设置。

但是,如果剥夺了非同步副本被选举为领导者副本的资格,则势必会造成可用性降低。比如将复制因子设为4,这意味这一个分区有1个领导者副本和3个追随者副本。当领导者副本挂掉时,有可能这3个追随者副本都不符合ISR标准,那么就没法选出新的领导者副本,这个分区也就不可用了。

因此,在允许一定数据丢失的场景中,也可开启Unclean领导者选举,也就是允许选举非同步副本作为领导者副本——只要将server.properties文件中的unclean.leader.election.enable参数设为true即可。

开启Unclean领导者选举可以提高Kafka的可用性,但可能会造成数据丢失。

3.消息生产者

消息就是Kafka所记录的数据节点,消息在Kafka中又被称为记录(record)或事件(event),用消息来代指Kafka的数据节点。

从存储上看,消息就是存储在分区文件(有点类似于List集合)中的一个数据项,消息具有key、value、时间戳和可选的元数据头。

消息示例:
key:“java”
value: “a new book”
timestamp: “2:06 p.m.”

消息生产者向消息主题发送消息,这些消息将会被分发到该主题下的分区中保存,主题下的每条消息只会被保存在一个领导者分区中,而不会在多个领导者分区中保存多份。

分区的主要目的就是实现负载均衡,可以将同一个主题的不同分区房子不同节点上,因此对消息的读写操作也都是针对分区这个粒度进行的。所以,每个节点都能独立地处理各自分区的读、写请求,通过添加新节点即可很方便地提高Kafka的吞吐量。

当消息生产者发送一条消息时,会按如下方式来决定该消息被分发到哪个分区:

  • 如果在发送消息时指定了分区,则消息被分到指定的分区
  • 如果在发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区
  • 如果既没有指定分区,且消息的key也为空,则用round-robin(轮询)策略来选择一个分区。

round-robin策略就是指按顺序来分发消息,比如一个主题有P0、P1、P2三个分区,那么第一条消息被分发到P0分区,第二条消息被分发到P1分区,第三条消息被分发到P2分区…

4.消费者与消费者组

消费者用于从消息主题读取消息。

Kafka的消息主题与JMS、AMQP的消息队列是不同的:

  • JMS、AMQP消息队列中的消息只能被消费一次,当消息被消费时,这条消息就会被移出队列。
  • 但Kafka主题中的消息完全可以被多次重复消费,甚至可以从指定下标处开始读取消息

从某种角度来看,Kafka主题中的消息会在一段时间内被持久化保存,客户端(消费者)可根据需要反复地读取它们。Kafka主题中的消息默认保存时间为7天,这个默认保存时间可通过server.properties文件中的如下配置进行修改:

#设置主题中消息的默认保存时间
log.retention.hours=168

当消息过期之后,Kafka可以对消息进行两种处理:delete或compact。

  • delete表示直接删除过期消息
  • compact表示对消息进行压缩整理

通过server.properties文件中的如下配置来设置对过期消息的处理策略:

#设置删除过期消息
log.cleanup.policy=delete

如果想修改某个主题下的保存时间,可专门配置该主题的retention.ms属性。修改指定主题的额外属性,推荐使用kafka-config.bat命令。

该命令指定如下常用选项:

  • –alter:修改
  • –describe:显示。该选项与–alter选项只能选择其中之一
  • –add-config:指定要添加的配置属性,该选项的值应该符合"k1=v1,k2=[v1,v2,v2],k3=v3"的形式
  • –delete-config:指定要删除的配置属性,该选项的值应该符合"k1,k2"的形式
  • –bootstrap-server:指定要连接的服务器
  • –entity-type:指定要配置的实体类型,该选项可支持Topic(主题)、clients(客户端)、users(用户)、brokers(代理)、broker-loggers(代理日志)这些值。
  • –entity-name:指定要配置的实体名称,该选项与–entity-type结合使用,用于指定主题名、客户端ID、用户名、Broker ID。

以下命令将test1主题的retention.ms属性设为10个小时:

kafka-config.sh --alter --bootstrap-server localhost:9092
--entity-type topics
--entity-name test1
--add-config retention.ms=3600000

kafka采用轮询机制来检测消息是否过期,意味着即使某些消息已经过期,但只要轮询机制还没有处理到这些过期消息,就会依然保留在该主题下。
Kafka轮询检查的时间间隔也在server.properties文件中设置,该文件中包含如下配置:

#设置对过期消息进行轮询检查的间隔时间为5分钟
log.retention.check.interval.ms=300000

上面配置指定了对过期消息进行轮询检查的间隔时间为5分钟,意味着每5分钟就会检查一次消息是否过期。

一个消费者组包含多个消费者实例。同一个消费者组内的所有消费者共享一个公告的ID,这个ID被称为组ID。

  • 一个消费者组内的多个消费者实例一起协调消费主题所包含的全部分区。
  • 每个分区只能由同一个消费者组内的一个消费者实例来消费,但一个消费者实例可负责消费多个分区。

在同一个消费者组内,每个分区只能由一个消费者实例来负责消费,这意味着同一个消费者组内的多个消费者实例不可能消费相同的消息,这就是典型的P2P消息模型。

由于消费者组之间彼此独立,互不影响,能订阅相同的主题而互不干涉。如果消费者实例属于不同的消费者组,这就是典型的Pub消息模型。

Kafka仅仅使用消费者组这种机制,就实现了传统消息引擎P2P和Pub-Sub两种消息模型。

5.Kafka分配分区策略

在理想情况下,消费者组中的消费者实例数恰好等于该组所订阅主题的分区总数,这样每个消费者实例就恰好负责消费一个分区,否则,可能出现如下两种情况:

  • 消费者实例数大于所订阅主题的分区总数:此时一个消费者实例要负责消费一个分区,但会由消费者实例处于空闲状态。
  • 消费者实例数小于所订阅主题的分区总数:此时一个消费者实例要负责消费多个分区。

Kafka为消费者实例提供了3种分配分区的策略:

  • range策略
  • round-robin策略
  • sticky策略

1)range策略:
range策略是基于每个主题单独分配分区的,大致步骤如下:

  • 将每个主题的分区按数字顺序进行排列,消费者实例则按消费者名称的字典顺序进行排列。
  • 用主题的分区总数除消费者实例总数,恰好能除尽,则每个消费者实例都分得相同数量的分区,如果除不尽,则排在前面的几个消费者实例将会分得额外的分区。

2)round-robin策略:
round-robin策略会把所有订阅主题的所有分区按顺序排列,然后采用轮询方式依次分给各消费者实例。

一般来说,如果消费者组内所有消费者实例所订阅的主题是相同的,那么使用round-robin策略能带来更公平的分配方案,否则使用range策略的效果更好。

3)sticky策略:
sticky策略主要用于处理重平衡需求,重平衡就是指重新为消费者实例分配分区的过程。比如以下3种情况就会触发重平衡:

  • 消费者组中的消费者实例数发生变化。比如有新的消费者实例加入消费者组,或者有消费者实例退出消费者组。
  • 订阅的主题数发送改变。当消费者组以正则表达式的方式订阅主题时,符合正则表达式的主题可能会动态地变化。
  • 订阅主题的分区数发送改变。当主题的分区数增加时,必须为之分配消费者实例来处理它。

当触发重新平衡处理时,使用range策略或round-robin策略,Kafka会彻底抛弃原有的分配方案,对变化后的消费者实例、分区进行彻底的重新分配。

sticky策略则有效地避免了上述两种策略的缺点:sticky策略会尽力维持之前的分配方案,只对改动部分进行最小的再分配,因此通常认为sticky策略在处理重平衡时具有最佳的性能。

二、使用Kafka核心API

Kafka包含如下5个核心API:

  • Producer API(生产者API):应用程序通过该API向主题发布消息。
  • Consumer API(消费者API):应用程序通过该API订阅一个或多个主题,并从订阅的主题中拉取消息(记录)
  • Streams Processor API(流API):应用程序可通过该API来实现流处理器,将一个主题的消息导流到另一个主题,并能对消息进行任意自定义的转换。
  • Connector API(连接器API):应用程序可通过该API来实现连接器,这些连接器不断地从源系统或应用程序将数据导入Kafka,也将Kafka消息不断地导入某个接收系统或应用程序。
  • Admin API(管理API):应用程序可通过该API管理和检查主题、Broker、其他Kafka实体。

生产者API的核心类是KafkaProducer,提供了一个send()方法来发送消息,该send()方法需要传入一个ProducerRecord<K,V>对象,该对象代表一条消息(记录),该对象定义了如下构造器:

  • ProducerRecord(String topic,Integer partition,K key,V value):创建一条发送到指定主题和指定分区的信息
  • ProducerRecord(String topic,Integer partition,K key,V value,Iterable
    headers):创建一条发送到指定主题和指定分区的信息,且包含多个消息头。
  • ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value):创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳
  • ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value,Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息,使用给定的时间戳,且包含多个消息头。
  • ProducerRecord(String topic,K key,V value):创建一条发送到指定主题的消息。
  • ProducerRecord(String topic,V value):创建一条发送到指定主题、不带key的消息。

使用生产者API发送消息很简单,基本只要两步:

  • 创建KafkaProducer对象,在创建该对象时要传入Properties对象,用于对该生产者进行配置
  • 调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

1.创建kafka的pom.xml

Kafka Clients依赖包括生产者API、消费者API、管理API

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.crazyit</groupId>
	<artifactId>kafka_test</artifactId>
	<version>1.0-SNAPSHOT</version>
	<name>kafka_test</name>

	<properties>
		<!-- 定义所使用的Java版本和源代码所用的字符集 -->
		<maven.compiler.source>11</maven.compiler.source>
		<maven.compiler.target>11</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<!-- 添加Kafka Clients依赖,包括Producer、Comsumer和Admin API -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.7.0</version>
		</dependency>
		<!-- 添加Kafka Streams依赖,包括Stream Processor API -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
			<version>2.7.0</version>
		</dependency>
	</dependencies>
</project>

2.生产者程序

程序先发送了50条key为"fkjava"的消息,意味着50条消息都进入一个分区,程序后发送的50条不带key消息,意味着50条消息会被轮询进入该主题的3个分区。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer
{
    
	public final static String TOPIC = "test1";

	public static void main(String[] args)
	{
    
		var props = new Properties();
		// 指定Kafka的节点地址
		props.put("bootstrap.servers",
				"localhost:9092,localhost:9093,localhost:9094");
		// 指定确认机制,默认值是0。
		props.put("acks", "all");  // ①
		// 指定发送失败后的重试次数
		props.put("retries", 0);
		// 当多条消息要发送到同一分区时,生产者将尝试对多条消息进行批处理,
		// 从而减少网络请求数,这有助于提高客户机和服务器的性能。
		// 该参数控制默认的批处理的数据大小
		props.put("batch.size", 16384);
		// 指定消息key的序列化器
		props.put("key.serializer", StringSerializer.class.getName());
		// 指定消息value的序列化器
		props.put("value.serializer", StringSerializer.class.getName());
		try (
				// 创建消息生产者
				var producer = new KafkaProducer<String, String>(props))
		{
    
			for (var messageNo = 1; messageNo < 101; messageNo++)
			{
    
				var msg = "你好,这是第" + messageNo + "条消息";
				if (messageNo < 51)
				{
    
					// 发送带消息
					producer.send(new ProducerRecord<>(TOPIC, "fkjava", msg));
				} else
				{
    
					// 发送不带key的消息
					producer.send(new ProducerRecord<>(TOPIC, msg));
				}
				// 每生产了20条消息输出一次
				if (messageNo % 20 == 0)
				{
    
					System.out.println("发送的信息:" + msg);
				}
			}
		}
	}
}
  • acks=0:表示生产者不会等待Kafka的确认响应
  • acks=1:表示只要领导者分区已将消息写入本地日志文件,Kafka就会向生产者发送确认响应,无须等待集群中其他机器的确认。
  • acks=all:表示领导者分区会等待所有追随者分区都同步完成后才发送确认响应,这种确认机制可确保消息不会丢失,这是最强的可用性保证。

3.消费者

消费者API的核心类是KafkaConsumer,提供了如下常用方法:

  • subscribe(Collection topics):订阅主题
  • subscribe(Pattern pattern);订阅符合给定的正则表达式的所有主题
  • subscription():返回该消费者所订阅的主题集合
  • unsubscribe():取消订阅
  • close():关闭消费者
  • poll(Duration timeout):拉取消息
  • assign(Collection partitions):手动为该消费者分配分区
  • assignment():返回分配给该消费者的分区集合
  • commitAsync():异步提交offset
  • commitSync():同步提交offset

如果开启了自动提交offset,无须调用commitAsync()或commitSync()方法进行手动提交。自动提交offset比较方便,但手动提交offset则更准确,消费者程序真正被处理后再手动提交offset。

  • enforceRebalance():强制执行重平衡
  • seek(TopicPartition partition,long offset):跳到指定的offset处,即下一条消息从offset处开始拉取。
  • seekToBeginning(Collection<TopicPartition> partitions):跳到指定分区的开始处
  • seekToEnd(Collection<TopicPartition> partitions):跳到指定分区的结尾处
  • position(TopicPartition partition):返回指定分区当前的offset

KafkaConsumer的poll()方法用于拉取消息,该方法返回一个ConsumerRecords<K,V>对象,通过该对象可迭代访问ConsumerRecord对象,每个ConsumerRecord对象都代表一条消息。

使用消费者API拉取消息基本只要3步:

  • 创建KafkaConsumer对象,在创建对象时要传入Properties对象,用于对该消费者进行配置
  • 调用KafkaConsumer对象的poll()方法拉取消息,该方法返回ConsumerRecords
  • 对ConsumerRecords执行迭代,即可获取拉取到的每条消息

ConsumerA:

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ConsumerA
{
    
	// 定义消费的主题
	public final static String TOPIC = "test1";
	// 定义该消费者实例所属的组ID
	private static final String GROUPID = "groupA";
	private static KafkaConsumer<String, String> consumer;

	public static void main(String[] args) throws InterruptedException
	{
    
		// 启动一条新线程来处理程序退出
		new Thread(() ->
		{
    
			var scanner = new Scanner(System.in);
			if (scanner.nextLine().equals(":exit"))
			{
    
				if (consumer != null)
				{
    
					// 取消订阅
					consumer.unsubscribe();
					// 关闭消费者
					consumer.close();
				}
				System.exit(0);
			}
		}).start();
		var props = new Properties();
		// 指定Kafka的节点地址
		props.put("bootstrap.servers",
				"localhost:9092,localhost:9093,localhost:9094");
		// 指定消费者组ID
		props.put("group.id", GROUPID);
		// 设置是否自动提交offset
		props.put("enable.auto.commit", "true");
		// 设置自动提交offset的时间间隔
		props.put("auto.commit.interval.ms", "1000");
		// session超时时长
		props.put("session.timeout.ms", "30000");
		// 程序读取消息的初始offset
		props.put("auto.offset.reset", "latest");
		// 指定消息key的反序列化器
		props.put("key.deserializer", StringDeserializer.class.getName());
		// 指定消息value的反序列化器
		props.put("value.deserializer", StringDeserializer.class.getName());
		consumer = new KafkaConsumer<>(props);
		// 订阅主题
		consumer.subscribe(Arrays.asList(TOPIC));
		System.out.println("---------开始消费---------");
		while (true)
		{
    
			// 拉取消息
			ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofMillis(100));
			if (null != msgList && msgList.count() > 0)
			{
    
				// 遍历取得的消息
				for (ConsumerRecord<String, String> record : msgList)
				{
    
					System.out.println("收到消息: key = " + record.key() + ", value = "
							+ record.value() + " offset = " + record.offset());
				}
			} else
			{
    
				Thread.sleep(1000);
			}
		}
	}
}

ConsumerB:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;

public class ConsumerB
{
    
	// 定义消费的主题
	public final static String TOPIC = "test1";
	// 定义该消费者实例所属的组ID
	private static final String GROUPID = "groupA";
	private static KafkaConsumer<String, String> consumer;

	public static void main(String[] args) throws InterruptedException
	{
    
		// 启动一条新线程来处理程序退出
		new Thread(() ->
		{
    
			var scanner = new Scanner(System.in);
			if (scanner.nextLine().equals(":exit"))
			{
    
				if (consumer != null)
				{
    
					// 取消订阅
					consumer.unsubscribe();
					// 关闭消费者
					consumer.close();
				}
				System.exit(0);
			}
		}).start();
		var props = new Properties();
		// 指定Kafka的节点地址
		props.put("bootstrap.servers",
				"localhost:9092,localhost:9093,localhost:9094");
		// 指定消费者组ID
		props.put("group.id", GROUPID);
		// 设置是否自动提交offset
		props.put("enable.auto.commit", "true");
		// 设置自动提交offset的时间间隔
		props.put("auto.commit.interval.ms", "1000");
		// session超时时长
		props.put("session.timeout.ms", "30000");
		// 程序读取消息的初始offset
		props.put("auto.offset.reset", "latest");
		// 指定消息key的反序列化器
		props.put("key.deserializer", StringDeserializer.class.getName());
		// 指定消息value的反序列化器
		props.put("value.deserializer", StringDeserializer.class.getName());
		consumer = new KafkaConsumer<>(props);
		// 订阅主题
		consumer.subscribe(Arrays.asList(TOPIC));
		System.out.println("---------开始消费---------");
		while (true)
		{
    
			// 拉取消息
			ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofMillis(100));
			if (null != msgList && msgList.count() > 0)
			{
    
				// 遍历取得的消息
				for (ConsumerRecord<String, String> record : msgList)
				{
    
					System.out.println("收到消息: key = " + record.key() + ", value = "
							+ record.value() + " offset = " + record.offset());
				}
			} else
			{
    
				Thread.sleep(1000);
			}
		}
	}
}
  • new KafkaConsumer<>(props);创建了KafkaConsumer
  • consumer.poll调用了KafkaConsumer的poll()方法拉取消息
  • for (ConsumerRecord<String, String> record : msgList)对ConsumerRecords执行迭代,访问拉取到的消息
  • 程序为消费者配置了auto.offset.reset=latest,两个消费者只能拉取运行期间收取到的消息

ConsumerA和ConsumerB中的GROUPID使用相同的字符串,此时ConsumerA和ConsumerB模拟的是P2P消息模型:

  • 先运行ConsumerA和ConsumerB两个程序,再运行前面的Producer程序,所有key为fkjava的消息只会由一个消费者处理,因为所以key为fkjava的消息都在同一个分区中,没有key的消息则会分别由两个消费者处理,因为没有key的消息会被轮询分配到不同分区中。

ConsumerA和ConsumerB中的GroupID使用不同的字符串,此时ConsumerA和ConsumerB模拟的是Pub-Sub消息模型:

  • 先运行ConsumerA和ConsumerB两个程序,然后再运行前面的Producer程序,ConsumerA和ConsumerB都可拉取到完全相同的100条消息。

三、SpringBoot对kafka的支持

1.配置Kafka服务器的节点地址和生产者相关配置

# 配置Kafka默认的节点地址
spring.kafka.bootstrap-servers=\
localhost:9092,localhost:9093,localhost:9094
# 指定生产者的确认机制
spring.kafka.producer.acks=all
# 指定生产者发送失败后的重试次数
spring.kafka.producer.retries=0
# 指定生产者批处理的数据大小
spring.kafka.producer.batch-size=16384
# 指定生产者的消息key的序列化器
spring.kafka.producer.key-serializer=\
org.apache.kafka.common.serialization.StringSerializer
# 指定生产者的消息value的序列化器
spring.kafka.producer.value-serializer=\
org.apache.kafka.common.serialization.StringSerializer

# 指定默认的消费者组ID
spring.kafka.consumer.group-id=defaultGroup
# 设置消费者是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 设置消费者自动提交offset的时间间隔
spring.kafka.consumer.auto-commit-interval=1000
# 程序读取消息的初始offset
spring.kafka.consumer.auto-offset-reset=latest
# 指定消息key的反序列化器
spring.kafka.consumer.key-deserializer=\
org.apache.kafka.common.serialization.StringDeserializer
# 指定消息value的反序列化器
spring.kafka.consumer.value-deserializer=\
org.apache.kafka.common.serialization.StringDeserializer
# session超时时长
spring.kafka.consumer.properties[session.timeout.ms]=30000

server.port=8081

# 设置监听器的确认模式
spring.kafka.listener.ack-mode=batch

# 指定Streams API的应用ID
spring.kafka.streams.application-id=spring-pipe
# 指定应用启动时自动创建流
spring.kafka.streams.auto-startup=true
# 指定消息key默认的序列化和反序列化器
spring.kafka.streams.properties[default.key.serde]=\
org.apache.kafka.common.serialization.Serdes$StringSerde
# 指定消息value默认的序列化和反序列化器
spring.kafka.streams.properties[default.value.serde]=\
org.apache.kafka.common.serialization.Serdes$StringSerde
  • 为消费者配置了sessio.timeout.ms属性,该配置将会被直接传给kafka消费者,作用是控制消费者Session的超时时长

2.添加Spring Kafka依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<!-- 指定继承spring-boot-starter-parent POM文件 -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/>
	</parent>

	<groupId>org.crazyit</groupId>
	<artifactId>kafka_boot</artifactId>
	<version>1.0-SNAPSHOT</version>
	<name>kafka_boot</name>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>11</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- 添加Spring Kafka依赖 -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<!-- 添加Kafka Streams依赖,包括Stream Processor API -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<!-- 定义Spring Boot Maven插件,可用于运行Spring Boot应用 -->
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.发送消息

SpringBoot可以将自动配置的KafkaTemplate注入任意组件,接下来该组件调用Kafka Template的send()方法即可发送消息。

以下Service组件调用了KafkaTemplate来发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Objects;

@Service
public class MessageService
{
    
	public static final String TOPIC = "test1";
	private final KafkaTemplate<String, String> kafkaTemplate;

	@Autowired
	public MessageService(KafkaTemplate<String, String> kafkaTemplate)
	{
    
		this.kafkaTemplate = kafkaTemplate;
	}

	public void produce(String key, String message)
	{
    
		if (Objects.nonNull(key))
		{
    
			// 发送消息
			this.kafkaTemplate.send(TOPIC, key, message);
		} else
		{
    
			// 发送不带key的消息
			this.kafkaTemplate.send(TOPIC, message);
		}
	}
}

4.控制器类调用Service组件

为了让程序能调用上面Service组件的方法,本例提供了一个控制器类来调用Service组件的方法。
\app\controller\HelloController.java

import org.crazyit.app.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController
{
    
	private final MessageService messService;

	public HelloController(MessageService messService)
	{
    
		this.messService = messService;
	}

	@GetMapping("/produce/{key}/{message}")
	public String produce(@PathVariable String message,
			@PathVariable(required = false) String key)
	{
    
		messService.produce(key, message);
		return "发送消息";
	}
	@GetMapping("/produce/{message}")
	public String produce(@PathVariable String message)
	{
    
		messService.produce(null, message);
		return "发送消息";
	}
}

该控制器类定义了两个处理方法,分别用于发送带key的消息和不带key的消息。

5.接收消息

SpringBoot会自动将@KafkaListener注解修饰的方法注册为消息监听器。没有显示地通过containerFactory属性指定监听器容器工厂(KafkaListenerContainerFactory),SpringBoot会在容器中自动配置一个ConcurrentKafkaListenerContainerFactory Bean作为监听器容器工厂。

定义一个监听消息队列的监听器TopicListener1

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class TopicListener1
{
    
	@KafkaListener(topics = "test1", groupId="groupA")
	public void processMessage(ConsumerRecord<String, String> message)
	{
    
		System.out.println("从test1收到消息,其key为:" + message.key()
			+ ",其value为:" + message.value());
	}
}
  • processMessage()方法使用了@KafkaListener(topics=“test1”,groupId=“groupA”)注解修饰,表明该方法将会监听test1主题,且该监听器属于groupA消费者组。

定义一个监听消息队列的监听器TopicListener2

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class TopicListener2
{
    
	@KafkaListener(topics = "test1", groupId="groupB")
	public void processMessage(ConsumerRecord<String, String> message)
	{
    
		System.out.println("从test1收到消息,其key为:" + message.key()
				+ ",其value为:" + message.value());
	}
}
  • TopicListener1和TopicListener2所属的消费者组不同,因此这两个消费者模拟的是Pub-Sub消息模型

如果要定义更多的监听器容器工厂或者覆盖默认的监听器工厂,则可通过SpringBoot提供的ConcurrentKafkaListenerContainerFactory来实现,可对ConcurrentKafkaListenerContainerFactory进行与自动配置相同的设置。

例如以下配置片段:

@Configuration(proxyBeanMethods = false)
static class KafkaConfiguration
{
    
	@Bean
	public ConcurrentKafkaListenerContainerFactory myFactory(
		ConcurrentKafkaListenerContainerFactoryConfigure configurer,
		ConsumerFactory consumerFactory)

	{
    
		//创建ConcurrentKafkaListenerContainerFactory实例
		ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
		//使用与自动配置相同的属性来配置监听器容器工厂
		configurer.configure(factory,consumerFactory);
		//下面可以对ConcurrentKafkaListenerContainerFactory进行额外的设置
		....
		return factory;
	}
}

有了自定义的监听器容器工厂后,可通过@KafkaListener注解的containerFactory属性来指定使用自定义的监听器容器工厂。例如如下代码:

@KafkaListener(topics = "test1", containerFactory="myFactory")

6.主程序

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App
{
    
	public static void main(String[] args)
	{
    
		SpringApplication.run(App.class, args);
	}
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhengzaifeidelushang/article/details/120592403

智能推荐

kafka启动和主题操作命令_启动kafka获取指定主题-程序员宅基地

文章浏览阅读461次。启动kafka启动 kafka自带的 zookeeper:在bin目录下: ./zookeeper-server-start.sh …/config/zookeeper.properties 1>/dev/null 2>&1 &(标注的这一句是为了不输出启动日志)启动 kafka集群:(这里是伪分布式下的启动,加入-daemon以精灵线程启动,不打印日志)bin/kafka-server-start.sh -daemon config/server.prope._启动kafka获取指定主题

lombok的初步认识_lombok是什么需要集成么-程序员宅基地

文章浏览阅读116次。前言我们一般在定义javaBean的时候会有很多属性,而这些属性我们会生成getter/setter方法,如果属性很多的话,那么getter/setter方法就会有大量的代码,这样会非常臃肿,有没有什么办法可以解决这个问题,可以使用lombok来消除臃肿的getter/setter方法,让我们不用自己定义,让lombok给我们生成。1、lombok介绍Lombok是一种实用工具,可用来帮助开发人员消除Java的冗长代码,尤其是对于简单的Java对象(POJO)。它通过注释实现这一目的。通过在开发环境中_lombok是什么需要集成么

基于高通MSM8953平台的android系统SGM41511充电IC驱动开发_sgm 充电ic-程序员宅基地

文章浏览阅读551次。创建/kernel/msm-4.9/drivers/power/supply/sgm41511_charger.c。_sgm 充电ic

Linux下MySQL8.0版本忘记密码_linux下mysql8.0忘记密码-程序员宅基地

文章浏览阅读5.8k次,点赞17次,收藏27次。具体步骤如下:1、修改MySQL的配置文件(默认为/etc/my.cnf),在[mysqld]下添加一行skip-grant-tables2、保存配置文件后,重启MySQL服务service mysqld restart3、再次进入MySQL命令行mysql -uroot -p4.1、输入密码时直接回车,就会进入MySQL数据库了,这个时候按照常规流程修改root密码即可。M..._linux下mysql8.0忘记密码

redhat mysql_Redhat下安装mysql-程序员宅基地

文章浏览阅读532次。1.1 检查数据库使用的字符集检查数据库使用的字符集是否都是utf8,如不是,请修改字符集为utf8。步骤 1以root用户登录数据库服务器。步骤 2登录MySQL数据库。mysql-uroot-p123456步骤 3输入命令查看字符集。SHOWVARIABLESLIKE'character_set_%';l如果显示如下则表示所使用的字符集都是utf8,不需要再修改。l如果显示..._redhat 4.8.5-36安装mysql

spring中各个模块的作用_springdao模块的作用-程序员宅基地

文章浏览阅读575次。spring 有可能成为所有企业应用程序的一站式服务点,然而,Spring 是模块化的,允许你挑选和选择适用于你的模块,不必要把剩余部分也引入。下面的部分对在 Spring 框架中所有可用的模块给出了详细的介绍。Spring 框架提供约 20 个模块,可以根据应用程序的要求来使用。核心容器核心容器由核心,Bean,上下文和表达式语言模块组成,它们的细节如下:_springdao模块的作用

随便推点

二、电脑C盘空间的优化-程序员宅基地

文章浏览阅读1.8k次。软件下载文件的存放位置设置日常生活中所使用的软件,很多的缓存文件夹,下载文件夹都会在安装时便被默认设置在C盘的目录下,时间久了,随着文件的缓存越来越多,C盘剩余的空间也越来越少,如果系统出了故障需要重装系统时,很多文件存在C盘里,随着系统的重装当中某些重要的数据也会丢失。本篇便来教导大家,如何避免此事件的发生,优化C盘的存储空间。微信微信几乎是电脑中必装的软件了,随着聊天记录越来越多,特别在工作中有很多文件需要发送接收,这些文件,如果没有设置它的缓存位置,那么都会存储在你的C盘里的某个文件夹下打开微

Android实训案例(一)——计算器的运算逻辑-程序员宅基地

文章浏览阅读432次。Android实训案例(一)——计算器的运算逻辑 应一个朋友的邀请,叫我写一个计算器,开始觉得,就一个计算器嘛,很简单的,但是写着写着发现自己写出来的逻辑真不严谨,于是搜索了一下,看到mk(没有打广告….)上有视频,于是看了下他的逻辑,以前还真是小瞧计算器了,计算器要是高级一点的处理起来更加繁琐,这里就做了一个简单一点的,这次..._android的手机计算机实训原理

二、Storm入门之Hello Storm-程序员宅基地

文章浏览阅读74次。2019独角兽企业重金招聘Python工程师标准>>> ..._hello storm

Java url链接生成二维码_java实现链接变二维码 不导包-程序员宅基地

文章浏览阅读4.1k次,点赞3次,收藏16次。1、添加依赖2、添加二维码生成工具类3、测试_java实现链接变二维码 不导包

CentOS 连接Windows共享的桥接网络_centos 命令连接win10共享网络-程序员宅基地

文章浏览阅读1k次。解决的问题:有一台CentOS 7.6 Linux台式计算机,没有无线网络;想共享笔记本连接的无线网络,笔记本配置无线和有线以太网。解决方法:笔记本连接好WIFI; 笔记本以太网设置自动获得IP,自动获得DNS服务器..._centos 命令连接win10共享网络

C++中的 istringstream_c++中的istringstream函数-程序员宅基地

文章浏览阅读184次。今天在看别人的C++程序源码的时候,看到了一个比较有用的C++的输入输出控制类。因为之前没有使用过,故特地学习了一下,在此总结。程序源码中使用了这样的语句:ifstream fin(fname);int a,b,c;string s;getline(fin, s);istringstream ss(s);ss &gt;&gt; a&gt;&gt;b&gt;&gt;c;将读..._c++中的istringstream函数

推荐文章

热门文章

相关标签