技术标签: zookeeper kafka java-zookeeper
环境: kafka-2.2.1、zookeeper-3.6.3
kafka给我们提供了SASL/SCRAM模式,将SASL、ACL规则信息存储到zookeeper中,并且通过KafkaClientAdmin
Api,新增、编辑、删除规则,其特性如下
应用发送、消费实现动态身份认证和授权
基于kafka SASL/SCRAM模式,客户端会在建立连接进行SASL身份验证,在消费发送时,进行ACL鉴权
安全认证代码无侵入性、兼容原生kafka api
有两种配置方式,通过根据-Djava.security.auth.login.config
指定jaas配置文件,并且配置producer.properties
/consumer.properties
开启SASL配置,兼容原生kafka api
账号级别区分:管理账号有最高权限、业务账号限定资源权限
可通过配置kafka集群超级管理员,通过超级管理员账号登录是不进行ACL鉴权。因此超级管理员可以操作所有的资源权限
安全认证支持动态开启、关闭
kafka提供SASL和ACL管理的api,通过这些api可以新增、修改、查询、删除SASL和ACL策略。通过配置策略,可以实现安全策略开启、关闭
在集群初始化时,配置开启SASL、ACL,包括
添加SASL、ACL规则
alice
为用户名,应用scret-1
为密码的SASL认证信息。并且给这个用户授予消息写入权限客户端配置SASL配置,并发送消息
在properties中配置SASL账号密码。或者指定jaas配置文件
服务端认证、鉴权
在客户端与服务端建立连接时,对客户端alice进行身份认证(SASL),认证通过后,在发送消息时检查资源(主题)是否授权(ACL)给改用户alice
zookeeper与broker认证、鉴权流程(SASL/SCRAM + ACL)
第一步:集群配置
Zookeeer SASL配置
修改zoo.cfg配置,开启SASL
...
sessionRequireClientSASLAuth=true
zookeeper.sasl.client=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
sessionRequireClientSASLAuth
:默认为false, 当设置为true 时,Zookeeper 服务器将只接受通过 SASL 向服务器进行身份验证来自客户端的连接和请求。使用 SASL 配置但身份验证失败(即使用无效凭据)将无法与服务器建立会话
zookeeper.sasl.client
: 将值设置为false以禁用SASL身份验证。默认值为true
authProvider.1
: 自定义ZooKeeper 添加新类型的SASL身份验证方案。指定的类需要实现AuthenticationProvider接口。zookeeper提供实现: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
新增JAAS配置文件:zoo_jaas.conf
,用于定义认证身份信息。我们使用的LoginModule实现为: org.apache.zookeeper.server.auth.DigestLoginModule
,它是基于明文用户、密码的形式进行身份认证
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret”
user_kafka="kafkasecret";
};
指定jaas配置文件
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/Users/vhicool/zookeeper/conf/zoo_jaas.conf"
启动zookeeper节点
./bin/zkServer.sh start-foreground
Kafka SASL配置
创建证书
创建证书
在配置kafka broker之前,我们需要新增broker账号,这一步是必须的,需要在broker重启应用SASL+ACL配置前执行。这个username=admin,password=adminsecret,用了进行broker直接的SASL认证
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name admin
服务端
用户认证信息配置 jaas.conf
,其中KafkaServer
是服务端用来broker与broker直接的SASL认证,Client
是kafka broker作为客户端与zookeeper服务端直接的认证。与SASL/PALIN模式不同的是KafkaServer
不能指定kafka客户端用户认证账号,而是通过bin/kafka-configs.sh
来配置。在Client
中的LoginModule同样是明文用户、密码认证,和zookeeper的org.apache.zookeeper.server.auth.DigestLoginModule
对应
//broker与broker
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adminsecret";
};
//broker与zookeeper
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkasecret";
};
指定kafka环境变量,是SASL生效
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
server.properties
配置,通过指定super.users
指定超级用户,超级用户是不需要进行ACL鉴权,可作为管理员账号
#SASL/SCRAM
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-25
#ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
第二部:新增认证、授权规则
kafka cli工具连接
除了使用kafka自带cli,同时也可以使用kafka通过的api去维护:KafkaClientAdmin
新增kafka client JAAS认证文件:kafka_client_jaas.conf
//kafka client与zookeeper SASL
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkasecret";
};
// kafka client与broker SASL
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adminsecret";
};
新增broker启动环境变量,指定JAAS文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
通过命令工具新增主题
验证:kafka client与zookeeper SASL
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-1
给用户alice SASL认证
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice
给主题ACL授权
授予用户alice
对主题topic-1
生产者权限。涉及验证:kafka client与zookeeper SASL(此处我们并没有开启zookeeper ACL,因此不需要zookeeper ACL权限验证)
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --producer --topic topic-1
测试发送消息
涉及验证: kafka client与broker SASL+ACL
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256
第三步:客户端配置
在用户alice配置SASL认证,并且授权主题topic-1
发送权限后,客户端通过原生api发送消息
Properties kafkaProperties = new Properties();
kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProperties.put("bootstrap.servers", "localhost:9092");
//SASL配置
kafkaProperties.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"alice\" password=\"alice-secret\";");
kafkaProperties.put("security.protocol","SASL_PLAINTEXT");
kafkaProperties.put("sasl.mechanism","SCRAM-SHA-256");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProperties);
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>("topic-1", null, null, "test message", null);
producer.send(record).get();
本地文件存储
PLAIN模式
SASL/PLAIN是一种简单的用户名/密码认证机制,通常与TLS一起用于加密以实现安全认证。他的SASL认证是存储在本地JAAS文件,因此如果需要实现动态身份认证,需要拓展SASL认证接口
zookeeper存储
SASL/SCRAM
它解决了与执行用户名/密码认证的传统机制(如PLAIN和DIGEST-MD5)的安全问题。 RFC 5802中定义了该机制。Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,它们可以与TLS一起使用以执行安全认证。用户名用作配置ACL等的认证主体。Kafka中的默认SCRAM实现在Zookeeper中存储SCRAM凭据,适用于Zookeepher位于专用网络上的Kafka安装
外部认证服务器存储
SASL/Kerberos
kafka默认SASL通过静态JAAS文件配
从Kafka 2.0版开始,可以通过使用配置选项sasl.server.callback.handler.class
和sasl.client.callback.handler.class
配置自己的回调处理程序来从外部源获取用户名和密码,从而避免在磁盘上存储明文密码。
sasl.server.callback.handler.class
实现 AuthenticateCallbackHandler 接口的 SASL 服务器回调处理程序类的完全限定名称。服务器回调处理程序必须以侦听器前缀和小写的 SASL 机制名称为前缀。例如,listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler
sasl.client.callback.handler.class
实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称。
kafka默认ACL存储在zookeeper
自定义扩展外部存储
通过在客户端配置authorizer.class.name,可自定义实现存储位置。kafka默认是通过zookeeper存储ACL规则,实现类为:kafka.security.auth.SimpleAclAuthorizer
。如果我们要扩展ACL存储,需要自定义认证类并实现kafka.security.auth.Authorizer
接口,包括实现authorize、addAcls、removeAcls、getAcls
等方法
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
文章浏览阅读2.5k次。人人开源 获取定时任务CronTrigger出现异常报错信息:org.springframework.beans.factory.UnsatisfiedDependencyException:Error creating bean with name ‘scheduleJobController’: Unsatisfieddependency expressed through field ‘scheduleJobService’; nestedexception is org.springf
文章浏览阅读1.5k次。收取邮件有两种协议,POP3和IMAP,POP3相对于IMAP功能较少无法对邮件进行更深层次的操作,因此本文使用IMAP协议收取邮件。python提供了很多收邮件的模块,本文使用imaplib来接收邮件。前提需要在邮件箱的设置中打开允许IMAP,不同邮件开启全方式不同,具体开启方式自行百度。(本文使用outlook,默认开启)难点获得邮件不难,难点是如何解析邮件的内容,由于邮件内容的编码是不固定的..._python接收邮件內容
文章浏览阅读1.3w次,点赞25次,收藏396次。目录我们可以用zabbix监控哪些zabbix的主动监控与被动监控配置zabbix自定义监控流程安全组是什么,限制了3306的入规则,客户端还能访问吗Nagio监控?服务器一般需要监控哪些项目? 凭借这些项目如何判断服务器的瓶颈?zabbix监控mysql的io情况是否正常的流程监控MySQL主从同步是否异常,如果异常,则发送短信或者邮件给管理员。zabbix如何修改其中监控的一台服务器中内存阈值信息,比如正常内存使用到了 80%报**警,我想修改为 60%报警...
文章浏览阅读325次。Python 是一门上手快、优雅简洁的编程语言,其多范式、丰富的标准库和第三方库能够让编程人员把精力集中在逻辑和思维方法上,而不用去担心复杂语法、类型系统等外在因素,从而高效地达成自己的编程目标。Python 抽象层次非常高,这帮助我们更好更快地完成编程,但也屏蔽了很多细节,程序员也无法精确控制计算机底层的资源,代码性能优化就变得比较复杂。很多资深的程序员可能会觉得 Python 性能不够好,无法编写高性能的程序,其实这句话也不全对。对于计算密集型的程序,Python 可以通过扩展的形式使得核心计算直接调用_asyncio网络
文章浏览阅读2.1k次,点赞2次,收藏2次。笔记型博客之spring-声明式事务_spring容器事务
文章浏览阅读861次,点赞15次,收藏19次。总之,1444-DYN04-01RA AB 控制器传感器是一款功能强大、可靠稳定的传感器,适用于各种旋转和往复机械的监测和控制。它的应用可以提高机械的运行效率和安全性,为工业生产和能源领域的发展提供有力支持。此外,1444-DYN04-01RA AB 控制器传感器还具有友好的人机界面,可以方便地进行操作和维护。1444-DYN04-01RA AB 控制器传感器是一款用于监测旋转和往复机械的传感器,可以测量振动和压力等动态输入以及轴位移、偏心和活塞杆沉降等静态输入。_1444-dyn04-01ra手册
文章浏览阅读8.2k次,点赞38次,收藏18次。本文介绍Ubuntu如何安装gcc。_apt-get gcc
文章浏览阅读9.2k次。Sass 是 CSS 的改进版本,您现在就可以在 React 项目中使用它。_react 使用sass
文章浏览阅读211次。使用 Flutter 从头开始写一个 App是一件轻松惬意的事情。但是对于成熟产品来说,完全摒弃原有 App 的历史沉淀,全面转向 Flutter 并不现实。用 Flutter 去统一 iOS/Android 技术栈,把它作为已有原生 App 的扩展,然后通过逐步试验有序推进从而提升终端开发效率,可能才是现阶段 Flutter 最有效的集成方式。那么,Flutter 工程与原生工程该如何组织管理?..._flutter 打包ios 是原生app吗
文章浏览阅读1.1k次。忘了发了,来迟了Misc应该算是签到来得早不如来的巧CyzCC_loves_LOL小脑洞+老考点,理解一下其实是放进百度翻译然后一下看出来了s = 'D0g3isthepAssword'flag = ''for i in range(len(s)): tmp = ord(s[i])-3 if(tmp<65): flag += chr(tmp+26) else: flag += chr(tmp)print(flag)密码_chr(ord(tmp)+26)
文章浏览阅读7.3w次,点赞90次,收藏216次。由于Pytorch不像TensorFlow有谷歌巨头做维护,很多功能并没有很高级的封装,比如说没有tf.one_hot函数。本篇介绍将一个mini batch的label向量变成形状为[batch size, class numbers]的one hot编码的两种方法,涉及到tensor.scatter_tensor.index_select使用scatter_获得one hot 编码..._label变为onehot
文章浏览阅读3.7k次。与大家分享一套影视级调色预设 Triune Color Cinematic LUTs V2。兼容ps,fcpx,pr,ae,lr等后期视频和图像处理软件,帮助大家轻松实现电影级调色,让作品获得更震撼、更高级的色彩表现。原文及下载地址:https://www.mac69.com/cj/1287.html影视级调色预设 Triune Color Cinematic LUTs V2 官方介绍..._triune color cinematic luts