关于使用kafka的个人总结_使用kafka与的结论_yjh314的博客-程序员宅基地

技术标签: kafka  

    本篇文章算是对几个月工作的一个总结吧,接触了几个月的kafka和flume,感觉自己也只能算是了解kafka的阶段,接下来也打算深入研究下kafka的原理,下面写的算是对工作中使用到的kafka方面的东西做个总结吧!

     

kafka概述

(有空可以浏览一下http://orchome.com)

      当前kafka版本为0.10.0.0。

      关于Kafka需要了解的几个关键词:

      ①Producer

          发布消息的对象(flume)      

      ②Consumer

          订阅发布的消息并处理消息的对象(spark)

      ③Broker

          Producer把生产的消息“推”到Broker中,Consumer可以从Broker中订阅一个  或多个topic,并从Broker中“拉”消息进行消费

      ④Topic

          Kafka中把消息分门别类,每一类消息是一个topic

      ⑤Partition

          Kafka把每个Topic分成多个partition,每个partition中的消息是有序的,partition中的每条消息都会被分配一个有序的id(offset) 

 

Topic & Partition

   1.kafka把一个topic分为1个或以上的partition,每个partition在物理上对应一个文件夹,

    

文件夹中存储的是该partition的所有消息以及一个索引文件。(以第一个消息的offset命名)   

 

   

   2.每一个分区都是一个顺序的消息队列,并且可以持续的添加。分区中的每条消息都被分配了一个序列号,即偏移量(64字节的offset),在每个分区中的偏移量都是唯一的

 

   3.在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition(通过实现kafka.producer.Partitioner接口)。key相同的消息会被发送并存储到同一个partition里

 

   4.每一条消息被发送到broker时,会根据分区的规则选择被存储到哪一个partition在创建topic时可以在config/server.properties中指定这个partition的数量

    

 

  

offset

   1.消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制。

 

   2.当消费者消费消息的时候,偏移量会线性的增加。实际偏移量由消费者控制,消费者可以将偏移量重置为之前的一个偏移量,重新读取消息。

 

 

具体使用:

 

开启kafka前先要开启zookeeper

 

1.bin/zkServer.sh start

2.开启kafka server

   bin/kafka-server-start  config/server.properties

3.测试时可以先开kafka的producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test--broker-list是kafka所在主机

  再开启kafka的consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

接着就可以在producer中发送消息

 

 

Kafka配置文件:

几个关键配置:

broker.id=0

listeners=PLAINTEXT://172.16.49.173:9092

port=9092

log.dirs=kafka-logs

zookeeper.connect=localhost:2181

listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时可能会抛出异常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的也可以在远端客户端配置hosts

Zookeeper.connect中zk集群地址以“,”分隔

集群的配置

与单机的配置差不多,只是listeners的配置需要改为集群服务的IP地址和采用的端口。

关于kafka的监控:

  可以使用kafkaOffSetMonitor

  (https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar)

将KafkaOffsetMonitor-assembly-0.2.0.jar上传到服务器后,可以新建一个脚本用于启动该应用。脚本内容如下:

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar

    com.quantifind.kafka.offsetapp.OffsetGetterWeb

--zk zk1:2181,zk2:2181,zk3:2181     

--port 8088

--refresh 10.seconds     

--retain 2.days

 

   zk集群

 

Consumer消费情况

(左上角显示的是topic生产速率,右上角是consumer消费速率,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目)


 

       这里有一个坑,默认情况下Producer往一个不存在的Topic发送message时会自动创建这个Topic。由于在这个封装中,有同时传递message和topic的情况,如果调用方法时传入的参数反了,将会在Kafka集群中自动创建Topic。在正常情况下,应该是先把Topic根据需要创建好,然后Producer往该Topic发送Message,最好把Kafka这个默认自动创建Topic的功能关掉。 
  如果真的不小心创建了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。如果需要调整这两个功能的话,在server.properties中配置如下两个参数:

auto.create.topics.enable   默认值true

delete.topic.enable   默认值false

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yjh314/article/details/78296357

智能推荐

n进制下计算求n的值_I一W一I的博客-程序员宅基地

n进制下计算求n的值例1:在n进制下,15×4=112,求n。A.5 B.6 C.7 D.8解:方法一:把每一项都转换成十进制,然后进行计算。(1 × n + 5) × 4 = n² + n + 2经计算求得:n = 6,则选择B。方法二:先转换成十进制化简后,等号两侧%n。原式:(1 × n + 5) × 4 = n² + n + 2化简:4n + 20 = n² + n + ...

Java Pattern和Matcher用法_假装你是大灰狼的博客-程序员宅基地

java.util.regex是一个用正则表达式所订制的模式来对字符串进行匹配工作的类库包。它包括两个类:Pattern和Matcher。Pattern,一个Pattern是一个正则表达式经编译后的表现模式。Matcher,一个Matcher对象是一个状态机器,它依据Pattern对象...

SOA学习之初_yy6060_yy6060的博客-程序员宅基地

根据公司项目的需要,这两天正在恶补SOA概念性的一些知识,发现这东西可真是博大精深啊,涉及各方面的知识,有网络、架构、各种协议、实现方式等。 看的这本书的名字叫做:使用JAVA WEB服务构建SOA。其实这本书主要讲述的是如何实现SOA,因此对于我这个没怎么接触过SOA的

kubernetes-sidecar方式日志收集(filebeat->kafka)_filebeat sidecar 读取std_only°夏至besos的博客-程序员宅基地

version: filebeat:7.3.0容器日志的默认存储方式是:json-file1. docker 所有日志实际存放路径为:/var/lib/docker/containers/ ,日志文件的名称跟k8s的命名方式并没有关联;2. 为了方便管理(使用deployment,container的名称来命名日志),会创建一些links,指向日志实际存储路径 /var/lib/docker/containers/ ; a.在 /var/log/pods/pod_uuid/ 目录下为每个pod..

在线答题小程序V2.0_小肥羊在线答题小程序学习_小肥羊同学的博客-程序员宅基地

本文主要记录开源项目在线答题小程序更新历史,对于一个好的开源项目,不仅仅代码要漂亮,更主要的是要有清晰的开发过程记录,详细的更新历史记录,所以很多东西都要学习。在线答题小程序V2.0相较于V1.0主要有以下几点更新1、部分界面优化2、选择题增加对多选的支持,同时增加填空题和判断题3、将答题模块原生的radio组件用样式重写了,为了支持选择题的多选模式,从体验上跟原生的radio并无区分目前将该版本定义为V2.0,算是一个比较完善的版本,不管从界面还是交互流程..

随便推点

Newman的安装使用_“danman安装”_潘正恒的博客-程序员宅基地

Newman是postman命令行的执行工具,可以在命令行执行postman的脚本Node.js开发的postman命令行newman为node.js的第三方库,所以要安装Newman,首先要安装node.js环境安装Newman的方法:第一步:先装Node.jsNode.js官网:https://nodejs.org/en/下载直接安装,默认安装方式安装完成后,命令行输入node...

ASPxGridView控件单元格数据设置不换行_苦命书生的博客-程序员宅基地

ASPxGridView控件单元格数据设置不换行今天遇见这样一个问题,aspxgridview控件的数据不换行,看起来很丑在此我们需要添加样式使得文本处在同一行上,aspxgridview再引用样式就可以了 <style type="text/css"> .sl { white-space: nowrap; } </style>效果如下:...

Python 绘制局部放大图_python局部放大图_Eloik的博客-程序员宅基地

请务必先看此文章: 【Python】Matplotlib局部放大图画法这篇文章已经非常详细,但是没有数据,所以自己生成了一些数据,以及对部分代码进行了函数封装,以便于二次使用。import matplotlib.pyplot as pltfrom matplotlib.patches import ConnectionPatchimport numpy as npdef zone_and_linked(ax,axins,zone_left,zone_right,x,y,linked='bott

2021-05-08 QObject::connect: No such slot QSpinBox::(setValue(int))_qq_43361844的博客-程序员宅基地

#include "widget.h"#include <QApplication>#include<QSpinBox>#include<QSlider>#include<QHBoxLayout>int main(int argc, char *argv[]){ QApplication a(argc, argv); QWidget *window = new QWidget; window->s.

python拼接嵌套json_Python 之 Json序列化嵌套类_weixin_39637363的博客-程序员宅基地

想要用python自已手动序列化嵌套类,就要明白两个问题:1.Json是什么?2.Json支持什么类型?答案显而易见Json就是嵌套对象Json在python中支持列表,字典(当然也支持int,string.....,不过说这个也没多大必要)很好,等等,列表,字典?我们在python中学过列表,字典,字典列表,列表字典,字典字典,那,我们可不可以把类对象转化为这些呢?我可以很确定的告诉你,可以,并...