Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析_spring kafka生产者源码-程序员宅基地

技术标签: spring  KafkaListener  【MQ-Apache Kafka】  kafka  


在这里插入图片描述


概述

【依赖】

	<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
	</dependency>

【配置】

#kafka
spring.kafka.bootstrap-servers=10.11.114.247:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer


spring.kafka.consumer.group-id=zfprocessor_group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.fetch-min-size=10
spring.kafka.consumer.fetch-max-wait=10000ms

spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=manual


logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR

Spring-kafka生产者源码流程

ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPICA.TOPIC, messageMock);

主要的源码流程如下

在这里插入图片描述


Spring-kafka消费者源码流程(@EnableKafka@KafkaListener

消费的话,比较复杂

 @KafkaListener(topics = TOPICA.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPICA.TOPIC)
    public void onMessage(MessageMock messageMock){
    
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }

划重点,主要关注

在这里插入图片描述


Flow

在这里插入图片描述

作为引子,我们继续来梳理下源码

在这里插入图片描述

继续

在这里插入图片描述
继续

在这里插入图片描述
KafkaBootstrapConfiguration的主要功能是创建两个bean

KafkaListenerAnnotationBeanPostProcessor

实现了如下接口

implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton

主要功能就是监听@KafkaListener注解 。 bean的后置处理器 需要重写 postProcessAfterInitialization

@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
    
		   // 获取对应的class
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			// 查找类是否有@KafkaListener注解
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			// 查找类中方法上是否有对应的@KafkaListener注解,
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
    
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
    
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
    
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
    
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
    
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
    
						// 处理@KafkaListener注解   重点看 
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			if (hasClassLevelListeners) {
    
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

重点方法

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

继续 processListener

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {
    

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
    
			this.listenerScope.addListener(beanRef, bean);
		}
		// 构建 endpoint
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
    
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
    
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
    
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
    
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());
		endpoint.setSplitIterables(kafkaListener.splitIterables());

		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
    
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
    
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
    
				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
			}
		}

		endpoint.setBeanFactory(this.beanFactory);
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
    
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		// 将endpoint注册到registrar
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
    
			this.listenerScope.removeListener(beanRef);
		}
	}

继续看 registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    
		Assert.notNull(endpoint, "Endpoint must be set");
		Assert.hasText(endpoint.getId(), "Endpoint id must be set");
		// Factory may be null, we defer the resolution right before actually creating the container
		// 把endpoint封装为KafkaListenerEndpointDescriptor
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
    
			if (this.startImmediately) {
     // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
    
			   // 将descriptor添加到endpointDescriptors
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

总的来看: 得到一个含有KafkaListener基本信息的Endpoint,将Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,至此这部分的流程结束了,感觉没有下文呀。

在这里插入图片描述


KafkaListenerEndpointRegistrar.endpointDescriptors 这个List中的数据怎么用呢?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    }

KafkaListenerEndpointRegistrar 实现了 InitializingBean 接口,重写 afterPropertiesSet,该方法会在bean实例化完成后执行

	@Override
	public void afterPropertiesSet() {
    
		registerAllEndpoints();
	}

继续 registerAllEndpoints();

	protected void registerAllEndpoints() {
    
		synchronized (this.endpointDescriptors) {
    
		// 遍历KafkaListenerEndpointDescriptor 
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
    
			   	// 注册 
				this.endpointRegistry.registerListenerContainer(
						descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			this.startImmediately = true;  // trigger immediate startup
		}
	}

继续

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    
		registerListenerContainer(endpoint, factory, false);
	}

go

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {
    

		Assert.notNull(endpoint, "Endpoint must not be null");
		Assert.notNull(factory, "Factory must not be null");

		String id = endpoint.getId();
		Assert.hasText(id, "Endpoint id must not be empty");
		synchronized (this.listenerContainers) {
    
			Assert.state(!this.listenerContainers.containsKey(id),
					"Another endpoint is already registered with id '" + id + "'");   
	      // 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			// 如果KafkaListener注解中有对应的group信息,则将container添加到对应的group中
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
    
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
    
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
    
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
    
				startIfNecessary(container);
			}
		}
	}

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yangshangwei/article/details/115359111

智能推荐

使用JDBC连接数据库出现 The server time zone value ‘�й���׼ʱ��‘ is unrecognized or represents more than one解决方案_jdbc.properties timezone-程序员宅基地

文章浏览阅读553次。在 jdbc.properties 文件中的 url 后面加上 ?serverTimezone=UTC加入之前的jdbc.properties文件:user=rootpassword=12345678url=jdbc:mysql://localhost:3306/testdriverClass=com.mysql.cj.jdbc.Driver加入之后:user=rootpassword=12345678url=jdbc:mysql://localhost:3306/test?serv_jdbc.properties timezone

计算机图形学孔令德基础知识,计算机图形学基础教程孔令德答案-程序员宅基地

文章浏览阅读1.4k次。计算机图形学基础教程孔令德答案【篇一:大学计算机图形学课程设】息科学与工程学院课程设计任务书题目:小组成员:巴春华、焦国栋成员学号:专业班级:计算机科学与技术、2009级本2班课程:计算机图形学指导教师:燕孝飞职称:讲师完成时间: 2011年12 月----2011年 12 月枣庄学院信息科学与工程学院制2011年12 月20日课程设计任务书及成绩评定12【篇二:计算机动画】第一篇《计算机图形学》..._计算机图形学基础教程 孔令德 答案

python xlwings追加数据_大数据分析Python库xlwings提升Excel工作效率教程-程序员宅基地

文章浏览阅读1k次。原标题:大数据分析Python库xlwings提升Excel工作效率教程Excel在当今的企业中非常非常普遍。在AAA教育,我们通常建议出于很多原因使用代码,并且我们的许多数据科学课程旨在教授数据分析和数据科学的有效编码。但是,无论您偏爱使用大数据分析Python的程度如何,最终,有时都需要使用Excel来展示您的发现或共享数据。但这并不意味着仍然无法享受大数据分析Python的某些效率!实际上,..._xlwings通过索引添加数据

java8u211_jre864位u211-程序员宅基地

文章浏览阅读911次。iefans为用户提供的jre8 64位是针对64位windows平台而开发的java运行环境软件,全称为java se runtime environment 8,包括Java虚拟机、Java核心类库和支持文件,不包含开发工具--编译器、调试器和其它工具。jre需要辅助软件--JavaPlug-in--以便在浏览器中运行applet。本次小编带来的是jre8 64位官方版下载,版本小号u211版..._jre8是什么

kasp技术原理_KASP基因分型-程序员宅基地

文章浏览阅读5k次。KASP基因分型介绍KASP(Kompetitive Allele-Specific PCR),即竞争性等位基因特异性PCR,原理上与TaqMan检测法类似,都是基于终端荧光信号的读取判断,每孔反应都是采用双色荧光检测一个SNP位点的两种基因型,不同的SNP对应着不同的荧光信号。KASP技术与TaqMan法类似,它与TaqMan技术不同的是,它不需要每个SNP位点都合成特异的荧光引物,它基于独特的..._kasp是什么

华为p50预装鸿蒙系统,华为p50会不会预装鸿蒙系统_华为p50会预装鸿蒙系统吗-程序员宅基地

文章浏览阅读154次。华为现在比较火的还真就是新开发的鸿蒙系统了,那么在即将上市的华为p50手机上会不会预装鸿蒙系统呢?接下来我们就来一起了解一下华为官方发布的最新消息吧。1.华为p50最新消息相信大家都知道,随着华为鸿蒙OS系统转正日期临近,似乎全网的花粉们都在关注华为鸿蒙OS系统优化、生态建设等等,直接忽略了不断延期发布的华为P50手机,如今华为P50系列手机终于传来了最新的好消息,在经过一系列方案修改以后,终于被..._华为手机p50直接预装鸿蒙系统

随便推点

python用什么软件编程好-初学python编程,有哪些不错的软件值得一用?-程序员宅基地

文章浏览阅读2.1k次。Python编程的软件其实许多,作为一门面向大众的编程言语,许多修正器都有对应的Python插件,当然,也有特地的PythonIDE软件,下面我简单引见几个不错的Python编程软件,既有修正器,也有IDE,感兴味的朋友可以本人下载查验一下:1.VSCode:这是一个轻量级的代码修正器,由微软规划研发,免费、开源、跨途径,轻盈活络,界面精练,支撑常见的自动补全、语法提示、代码高亮、Git等功用,插..._python入门学什么好

pytorch一步一步在VGG16上训练自己的数据集_torch vgg训练自己的数据集-程序员宅基地

文章浏览阅读3.2w次,点赞30次,收藏307次。准备数据集及加载,ImageFolder在很多机器学习或者深度学习的任务中,往往我们要提供自己的图片。也就是说我们的数据集不是预先处理好的,像mnist,cifar10等它已经给你处理好了,更多的是原始的图片。比如我们以猫狗分类为例。在data文件下,有两个分别为train和val的文件夹。然后train下是cat和dog两个文件夹,里面存的是自己的图片数据,val文件夹同train。这样我们的..._torch vgg训练自己的数据集

毕业论文管理系统设计与实现(论文+源码)_kaic_论文系统设计法-程序员宅基地

文章浏览阅读968次。论文+系统+远程调试+重复率低+二次开发+毕业设计_论文系统设计法

在python2与python3中转义字符_Python 炫技操作:五种 Python 转义表示法-程序员宅基地

文章浏览阅读134次。1. 为什么要有转义?ASCII 表中一共有 128 个字符。这里面有我们非常熟悉的字母、数字、标点符号,这些都可以从我们的键盘中输出。除此之外,还有一些非常特殊的字符,这些字符,我通常很难用键盘上的找到,比如制表符、响铃这种。为了能将那些特殊字符都能写入到字符串变量中,就规定了一个用于转义的字符 \ ,有了这个字符,你在字符串中看的字符,print 出来后就不一定你原来看到的了。举个例子>..._pytyhon2、python3对%转义吗

java jar 文件 路径问题_「问答」解决jar包运行时相对路径问题-程序员宅基地

文章浏览阅读1.3k次。我这几天需要做一个Java程序,需要通过jar的形式运行,还要生成文件。最终这个程序是要给被人用的,可能那个用的人还不懂代码。于是我面临一个问题:生成的文件一定不能存绝对路径。刚开始我想得很简单,打绝对路径改成相对路径不就行了吗?于是有了这样的代码:String path = "../test.txt";File file = new File(path);……这个写法本身并没有问题,直接运行代码..._jar启动文件路径中存在!

微信读书vscode插件_曾经我以为 VSCode 是程序员专属的工具,直到发现了这些……...-程序员宅基地

文章浏览阅读598次。如果你知道 VSCode,一说起它,你可能第一个想到的就是把它当做一个代码编辑器,而它的界面应该可能大概率是这样的——如果你恰好又是个程序员,那你可能经常会用到它,不管是 Python、JS 还是 C++ 等各种语言对应的文件,都可以用它来进行简单的编辑和整理,甚至是运行和 debug......但是今天要讲的显然不是这些,经过小美的多方研究,发现了即使是对于大多数并不了解 VSCode,也完全不..._vscode weixin read

推荐文章

热门文章

相关标签