kafka服务器的版本 需要与我们springboot 版本对应,否则由于kafka版本差异,发送消息时可能出现问题;
kafka 与 springboot 对应关系:
官网:kafka 与 springboot 对应关系.
更老版本的对应如下图:
简而言之:kafka服务器安装的版本 与 我们使用的springboot 版本要对应;不对应就可能出问题,发消息报错、消息发出去主题里没有呀。。。。。。
方案1和方案2 代价较大,方案3 实测即可不受版本影响;
kafka:
bootstrap-servers: 000.000.000.001:9092,000.000.000.002:9092 #<可根据配置修改>
producer:
# 重试次数,默认Integer.MAX_VALUE
retries: 3
# 同一批次内存大小(默认16K)
batch-size: 16384
# 生产者内存缓存区大小(32M)
buffer-memory: 33554432
#ack 0 不等待队列确认; ack 1等待leader 确认; ack -1 等待所有节点确认完成;
acks: 1
consumer:
group-id: test #可根据需求修改
# 关闭自动提交,消费后手动提交
enable-auto-commit: false
# earliest:从头开始消费 latest:从最新的开始消费 默认latest
auto-offset-reset: latest
# key和value反序列化(默认,可以不设置)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# # 消费者并发能力
concurrency: 1 #默认值
# # 设置手动提交的时候,需要设置ackMode
ack-mode: MANUAL
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
//获取配置参数;
@Configuration
public class KafkaConfig {
@Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
private List<String> bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Bean
public KafkaProducer<String, String> initKafkaTemplate() {
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.ACKS_CONFIG,acks);
//构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
//如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
}
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Future;
/**
* kafka 配置
* 由于kafka服务端使用的是 0.10.2版本,而我们的框架使用的spring boot版本为2.5.0,
* kafka starter版本中kafka-clients版本为2.7.1;kafka客户端作为生产者发送消息
* 时,会增加请求头,而服务端不支持,导致无法正常生产消息,故kafka作为生产者,不能直
* 接使用boot starter,需使用kafka原生生产者;
*/
@Slf4j
@Component
public class MessageSenderClient {
@Resource
private KafkaProducer kafkaProducer;
/**
* 消息发送至 kafka
*
* @param topic 路由
* @param data 消息内容
*/
public void send(String topic, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("The send message parameter cannot be null");
}
try {
Gson gson=new Gson();
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic,gson.toJson(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", gson.toJson(data));
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
/**
* 消息发送至 kafka
*
* @param topic
* @param partition 分区
* @param data
*/
public void send(String topic, Integer partition, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("The send message parameter cannot be null");
}
if (partition == null) {
send(topic, data);
}
try {
Gson gson=new Gson();
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, partition, null,gson.toJson(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
}
@Slf4j
@SpringBootTest
public class KafkaTest {
@Autowired
private MessageSenderClient kafkaProducer;
/**
* 发消息
*/
@Test
void testKafkaProducer() {
kafkaProducer.send("GroupQueue", "payment发送一条新消息");
}
}
结果:
发送成功;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* kafka 监听者
*/
@Slf4j
@Component
public class KafkaConsumerTest {
@KafkaListener(topics = {
"GroupQueue"})
void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("kafka consumer:" + record.topic() + "-" + record.partition() + "-" + record.value());
String value = record.value();
System.out.println("value:"+value);
/**
* 业务处理代码。。。。。。
*/
//处理完成后,手动提交ack
acknowledgment.acknowledge();
}
}
注意:由于我们kafka配置的从最新的开始消费,所以测试时,记得先启动消费者开启监听,生产者再发次消息;消费者才能收到最新的那条消息;
结果ok:
本文不如不当之处,请各位大佬及时指正;
文章浏览阅读103次。1.安装Docker首先需要安装docker 已經安裝过的兄台,可以忽略这一步,执行第二步windows 安装linux下载安装curl -sSL https://get.docker.com/ | sh设置开机自启sudo systemctl enable docker.servicesudo service docker start|restart|stop2. 安装git clone htt..._docker-compose php7.3
文章浏览阅读789次。原生JS实现原生JS,先使用 select() 选中输入框文本,然后使用 document.execCommand("Copy")复制到剪贴板前提条件select() 方法只对 <input> 和 <textarea> 有效输入框不能有disabled属性输入框不能有hidden属性输入框的 width 和 height 不能为0实现代码HTML<div> <input id="copyInput" type="text" value_js复制hidden内容到剪贴板
文章浏览阅读5.7w次,点赞19次,收藏59次。Zip压缩文件没有解压密码怎么办?在平时,我们难免的需要到浏览器上面寻找一些需要的资源,可往往下载好了zip文件后,解压时却需要密码。真的让人很痛苦,这样设置密码无非是想利用这些资源来赚取利益罢了。那有没有什么办法可以破解zip解压时需要的密码呢?答案当然是有的,下面我就其中一种方法具体演示一遍,希望对大家有所帮助。(第一次写博客,可能讲的没有那么详细,请见谅)首先我们需要下载一个破解软件..._zip没密码怎么打开
文章浏览阅读1.5k次。在离开之际,将自己的电脑系统重新升级了下,在此做如下记录,以便后续查证。1、双系统安装 这里只是简单说下,win10+Ubuntu,这次采用的Ubuntu分区是“boot”+“swap”+“/”的形式,具体可以去参考另一篇博客。2、cuda安装(1)下载安装文件检查自己的电脑配置是否支持cuda加速(一般英伟达系列显卡支持),这里推荐直接去https://d......_ubuntu20.10安装cuda9.0+cudnn7
文章浏览阅读1k次。VMware ESXi 7.0 U3n macOS Unlocker & OEM BIOS (标准版和厂商定制版)_esxi unlocker
文章浏览阅读1.5w次,点赞8次,收藏13次。COMSOL5.4版本几何模型上色comsol中的几何模型默认为灰白色,为了使不同零件在建模过程中具有分辨度、后期图像更直观,可以给不同的零部件添加不同的颜色。添加方法如下。1.导入一个灰白色模型这是一个锥盆在一个球形域内,从图中可以看出纯灰白色的图像很难将两者分开。这是可以考虑给各自赋不同的颜色。在组件中定义显示选择,选中球体的边界,并设置为蓝色,再点击抑制选择突出显示按钮,即可。同样的操作,将里面的锥盆设置为绿色。当然了,配置什么颜色完全取决于各位的审美。本次分享到此结束,谢谢浏览。._comsol几何模型怎么设置颜色
文章浏览阅读2.9k次。J1939多帧_j1939多帧接收拒绝接收
文章浏览阅读1.6k次。比如, 网络不稳定,服务器出错,跨域问题、或者资源本身有问题,导致出现了资源加载失败。就会造成后面需要使用的地方出现。// 便签的名称必须是 'SCRIPT' 与 event 错误的类型不能是 ErrorEvent。标签中资源加载出错,从而执行一些自己想要的操作。'script 加载错误'// 拿到触发错误的标签。_指向其它服务器页面的 加载失败。
文章浏览阅读529次。OpenCV 图像保存格式之一为 cv::Mat, 常用 imread 进行图像读取,用 imwrite 进行图像保存,用 imshow 进行图像显示。_opencv对传入的对象图片进行修改
文章浏览阅读520次。"font-size:18px;">类的方法 1.在加号方法中不能使用实例变量,实例变量是依托于对象存在的只有有了对象,实例变量才会有空间 2.self 谁调用该方法,self就代表谁 当方法是一个减号方法时, 调用该方法的对象是谁,self就代表当前对象 当方法是一个加号方法时, 调用该方法的类是谁,self就代表当前类 3._oc setobject
文章浏览阅读1.8k次。问题描述上传图片后想对文件进行查看发现一直是403错误,到linux下找到上传文件的,发现其群组和其他组没有读的权限,修改一下权限则可以访问。由此判定由权限问题导致的403错误。导致权限不足原因<1> tomcat版本差别高版本的tomcat会出现这种问题解决问题<1> 检查linux-- 查看系统默认配置vim /etc/profilelinux文件访问默认权限与umask有关,如果是002或022就正常没有问题。<2> 检查tomcat[_linux设置java上传文件的权限
文章浏览阅读163次。当机器人接触不同气味时,这个传感器向与之连接的一个电子系统传递不同的电信号,随后,该团队使用机器学习方法创建了一个“气味系统库”,包括柠檬、杏仁糖、天竺葵、和混合气味等,并使用了一个随机森林分类器进行气味分类。研究显示,这台机器人能够嗅出天竺葵、柠檬、杏仁蛋白糖等8种气味,就算混合在一起也能分辨出来,而且灵敏度比气相色谱-质谱法(GC-MS)高10000倍!甚至能检测出0.001微克的挥发性发挥物。以色列特拉维夫大学研究人员把蝗虫触须安装到机器人上,利用蝗虫的灵敏嗅觉将机器人辨别气味的灵敏度提高约1万倍。_“有嗅觉”的机器人能闻出8种气味,灵敏度比普通设备高10000倍