通过开发一个Kafka Connect增加对Kafka Connect的认识
Kafka Connect是生产者、消费者的一种特殊使用场景,一般用于构建数据管道完成数据的导入导出功能。
Kafka Connect核心概念(Connector)
Source:Source负责导入数据到Kafka
Sink:Sink负责从Kafka导出数据
Kafka Connect还有两个重要概念:Task和Worker。
Task是Kafka Connect数据模型的主角,每一个Connector都会协调一系列的Task去执行任务,Connector可以把一项工作分割成许多Task,然后把Task分发到各个Worker去执行任务(分布式模式下),Task不保存自己的状态信息,而是交给特定的Kafka主题去保存。Connector和Task都是逻辑工作单位,必须安排在进程中执行,而在Kafka Connect中,这些进程就是Worker。
Kafka Connect提供了以下特性:
通用性:规范化其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理
支持独立模式(standalone)和分布式模式(distributed)
REST接口:使用REST API提交和管理Connector
自动位移管理:自动管理位移提交,不需要开发人员干预,降低了开发成本
分布式和可扩展性:Kafka Connect基于现有的组管理协议来实现扩展Kafka Connect集群
流式计算/批处理的集成
1.启动zk+kafka(以下两条命令在两个窗口执行)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
注:kafka路径不能过深,否则启动失败
2.修改连接器配置
source配置,修改file配置项为自己的数据源路径(./config/connect-file-source.properties
)
# 连接器名称
name=local-file-source
# 连接器类
connector.class=FileStreamSource
# 最大任务数
tasks.max=1
# 从file指定的数据源导入数据到kafka(注意路径分隔符,否则可能找不到文件)
file=e:\\test\\source\\source.txt
# 从数据源读取的数据写入到此处指定的topic中
topic=connect-test
sink配置,修改file配置项为自己数据导出文件存放路径(./config/connect-file-sink.properties
)
# 连接器名称
name=local-file-sink
# 连接器类
connector.class=FileStreamSink
# 最大任务数
tasks.max=1
# 将kafka的数据导出到file指定的文件中
file=e:/test/sink/sink.txt
# 需要导出的kafka消息的topic列表
topics=connect-test
3.启动Source和Sink
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-file-source.properties .\config\connect-file-sink.properties
4.测试 往e:\\test\\source\\source.txt
写入数据,然后查看e:/test/sink/sink.txt
中是否有刚刚写入的内容
代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-connect</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 将依赖打包到jar包中-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
QueryDataFromDB.java
package com.example.dao;
import com.example.entity.SourceEntity;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class QueryDataFromDB {
// MySQL 8.0 以下版本 - JDBC 驱动及数据库 URL
// private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
// private static final String DB_URL = "jdbc:mysql://localhost:3306/kafka_connect";
// MySQL 8.0 以上版本 - JDBC 驱动及数据库 URL
private static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
private static final String DB_URL = "jdbc:mysql://localhost:3306/kafka_connect?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC";
// 数据库的用户名与密码,需要根据自己的设置
private static final String USER = "root";
private static final String PASS = "root";
private static volatile Connection CONN = null;
private static volatile Statement STMT = null;
public static void init() {
if (CONN == null && STMT == null) {
synchronized (QueryDataFromDB.class) {
if (CONN == null && STMT == null) {
try {
// 注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
// 打开链接
CONN = DriverManager.getConnection(DB_URL, USER, PASS);
STMT = CONN.createStatement();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
public static List<SourceEntity> query(String table, Long id) {
if (STMT == null) {
init();
}
String sql = "SELECT id, message FROM " + table + " where id > " + id;
List<SourceEntity> list = new ArrayList<>();
try {
ResultSet rs = STMT.executeQuery(sql);
while (rs.next()) {
SourceEntity entity = new SourceEntity();
entity.setId(rs.getLong("id"));
entity.setMessage(rs.getString("message"));
list.add(entity);
}
rs.close();
} catch (SQLException e) {
e.printStackTrace();
stop();
}
return list;
}
public static void stop() {
try {
STMT.close();
CONN.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
SourceEntity.java
package com.example.entity;
import lombok.Data;
@Data
public class SourceEntity {
private Long id;
private String message;
}
MySourceConnector.java
package com.example.source;
import com.example.dao.QueryDataFromDB;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MySourceConnector extends SourceConnector {
private static final Logger LOG = LoggerFactory.getLogger(MySourceConnector.class);
// topic配置项
public static final String TOPIC_KEY = "topic";
// table配置项
public static final String TABLE_KEY = "table";
// timeout配置项
public static final String TIMEOUT_KEY = "timeout";
// 默认table值
public static final String DEFAULT_TABLE = "source";
// 默认timeout值
public static final Long DEFAULT_TIMEOUT = 10000l;
private String topic;
private String table;
private long timeout = DEFAULT_TIMEOUT;
@Override
public String version() {
String version = AppInfoParser.getVersion();
LOG.info("------------1");
return version;
}
@Override
public ConfigDef config() {
LOG.info("------------2");
return new ConfigDef()
.define(TABLE_KEY, Type.STRING, Importance.HIGH, "db table")
.define(TOPIC_KEY, Type.STRING, Importance.HIGH, "The topic to publish data to");
}
@Override
public void start(Map<String, String> props) {
QueryDataFromDB.init();
LOG.info("------------3");
for (Map.Entry<String, String> entry : props.entrySet()) {
LOG.info("****************************" + entry.getKey() + " : " + entry.getValue());
}
topic = props.get(TOPIC_KEY);
if (topic == null || topic.trim().isEmpty()) {
throw new ConnectException("消息topic和数据库表名不能为空");
}
table = props.get(TABLE_KEY);
if (table == null || table.trim().isEmpty()) {
table = DEFAULT_TABLE;
props.put(TABLE_KEY, DEFAULT_TABLE);
}
if (props.get(TIMEOUT_KEY) == null) {
timeout = DEFAULT_TIMEOUT;
props.put(TIMEOUT_KEY, DEFAULT_TABLE);
}
}
@Override
public Class<? extends Task> taskClass() {
LOG.info("------------4");
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
LOG.info("------------5");
LOG.info("------------maxTasks is {}", maxTasks);
if (maxTasks != 1) {
LOG.warn("当前connector不支持多任务!");
}
Map<String, String> config = new HashMap<>();
config.put(TOPIC_KEY, topic);
config.put(TABLE_KEY, table);
config.put(TIMEOUT_KEY, String.valueOf(timeout));
ArrayList<Map<String, String>> configs = new ArrayList<>();
configs.add(config);
return configs;
}
@Override
public void stop() {
QueryDataFromDB.stop();
LOG.info("------------10");
}
}
MySourceTask.java
package com.example.source;
import com.alibaba.fastjson.JSON;
import com.example.dao.QueryDataFromDB;
import com.example.entity.SourceEntity;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MySourceTask extends SourceTask {
private static final Logger LOG = LoggerFactory.getLogger(MySourceTask.class);
// 数据库表名字段
public static final String TABLE_FIELD = "table";
// 偏移量字段
public static final String POSITION_FIELD = "position";
// 值的数据格式
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
// kafka消息topic
private String topic = null;
// 数据库表名
private String table;
// 数据库查询频率
private long timeout = MySourceConnector.DEFAULT_TIMEOUT;
// 保存当前偏移量
private Long position = null;
@Override
public String version() {
LOG.info("------------6");
return new MySourceConnector().version();
}
@Override
public void start(Map<String, String> props) {
LOG.info("------------7");
table = props.get(MySourceConnector.TABLE_KEY);
topic = props.get(MySourceConnector.TOPIC_KEY);
timeout = Long.parseLong(props.get(MySourceConnector.TIMEOUT_KEY));
}
@Override
public List<SourceRecord> poll() {
LOG.info("------------8");
// 刚开始启动时,从kafka拉取上次从数据源最后一次读取的偏移量
if (position == null) {
Map<String, Object> offset = context.offsetStorageReader().offset(getKey(table));
if (offset != null) {
position = (long) offset.get(POSITION_FIELD);
} else {
position = -1l;
}
}
LOG.info("---------------------position is " + position);
List<SourceEntity> datas = QueryDataFromDB.query(table, position);
List<SourceRecord> records = new ArrayList<>();
for (SourceEntity data : datas) {
// kafka根据SourceRecord的timestamp对kafka中的偏移量进行更新,
// 我们以数据库表的自增主键id进行读取,所以这里将最大的id保存到kafka,防止重复读取
position = Math.max(position, data.getId());
String dataStr = JSON.toJSONString(data);
LOG.info("------------------------------------" + dataStr);
// SourceRecord record = new SourceRecord(getKey(table), getValue(data.getId()), topic, VALUE_SCHEMA, dataStr);
SourceRecord record = new SourceRecord(getKey(table), getValue(position), topic,
null, null, null, VALUE_SCHEMA, dataStr, System.currentTimeMillis());
records.add(record);
}
// 每x秒读取一次数据库
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return records;
}
@Override
public void stop() {
LOG.info("------------9");
}
private Map<String, String> getKey(String table) {
return Collections.singletonMap(TABLE_FIELD, table);
}
private Map<String, Long> getValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}
}
MySinkConnector.java
package com.example.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MySinkConnector extends SinkConnector {
private static final Logger LOG = LoggerFactory.getLogger(MySinkConnector.class);
// 保存Sink数据的文件的配置项
public static final String FILE_KEY = "file";
// 保存Sink数据的文件
private String filename;
@Override
public String version() {
LOG.info("============1");
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
LOG.info("============2");
return new ConfigDef()
.define(FILE_KEY, Type.STRING, Importance.HIGH, "Destination filename.");
}
@Override
public void start(Map<String, String> props) {
LOG.info("============3");
filename = props.get(FILE_KEY);
}
@Override
public Class<? extends Task> taskClass() {
LOG.info("============4");
return MySinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
LOG.info("============5");
LOG.info("============maxTasks is {}", maxTasks);
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_KEY, filename);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
LOG.info("============10");
}
}
MySinkTask.java
package com.example.sink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
public class MySinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(MySinkTask.class);
// 保存Sink数据的文件
private String filename;
// 用于输出数据到文件的流对象
private PrintStream outputStream;
@Override
public String version() {
LOG.info("============6");
return new MySinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
LOG.info("============7");
filename = props.get(MySinkConnector.FILE_KEY);
if (filename == null) {
throw new ConnectException("Sink文件不能为空");
}
try {
outputStream = new PrintStream(new FileOutputStream(filename, true), false, StandardCharsets.UTF_8.name());
} catch (FileNotFoundException | UnsupportedEncodingException e) {
throw new ConnectException("无法找到或新建Sink文件", e);
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) {
LOG.info("============8");
for (SinkRecord record : sinkRecords) {
LOG.info("==========================" + record.value());
outputStream.println(record.value());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
LOG.info("============8");
outputStream.flush();
}
@Override
public void stop() {
LOG.info("============9");
if (outputStream != null && outputStream != System.out) {
outputStream.close();
}
}
}
启动
1.执行mvn package
打包出jar包
2.将jar包kafka-connect-1.0-SNAPSHOT-jar-with-dependencies.jar
拷贝到kafka的libs
目录下
注:xxx-with-dependencies.jar
才包含了第三方依赖,需要使用这个jar包
3.启动zk+kafka(在kafka根目录执行以下命令,以下两条命令在两个窗口执行)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
注:kafka路径不能过深,否则启动失败
4.以单机模式启动kafka connect(在kafka根目录执行以下命令)
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-file-source.properties .\config\connect-file-sink.properties
注:启动时必须带至少一个连接器配置,所以这里直接借用启动kafka自带连接器的启动脚本
5.添加Source和Sink连接器
添加Source
接口:http://localhost:8083/connectors
接口类型:POST
请求体(application/json):
{ "name": "my-source-connector", "config": { "connector.class": "com.example.source.MySourceConnector", "topic": "my_connector_topic", "table":"source", "timeout":5000 } }
添加Sink
接口:http://localhost:8083/connectors
接口类型:POST
请求体(application/json):
{ "name": "my-sink-connector", "config": { "connector.class": "com.example.sink.MySinkConnector", "topics": "my_connector_topic", "file": "E:/test/my/sink.txt" } }
6.测试自定义连接器
往数据库插入数据
INSERT INTO `kafka_connect`.`source` (`id`, `message`) VALUES (null, 'xxx message');
查看E:/test/my/sink.txt
中是否有新添加到mysql的数据
文章浏览阅读83次。原标题:Oracle 12c新特性系列专题-安徽Oracle授权认证中心 随着Oracle database 12c的普及,数据库管理员 (DBA) 的角色也随之发生了转变。 Oracle 12c数据库对 DBA 而言是下一代数据管理。它让 DBA 可以摆脱单调的日常管理任务,能够专注于如何从数据中获取更多价值。未来我们会推出基于Oracle12c的技术文章,帮助DBA尽快掌握新一代数据库的新特性..._ilm add policy row store compress advanced row after
文章浏览阅读150次。问题及代码:*Copyright(c)2016,烟台大学计算机与控制工程学院 *All right reserved. *文件名称:负数把正数赶出队列.cpp *作者:张冰 *完成日期;2016年10月09日 *版本号;v1.0 * *问题描述: 设从键盘输入一整数序列a1,a2,…an,试编程实现: 当ai>0时,ai进队,当ai<0时,将队首元素出队,当ai
文章浏览阅读376次。贪心+构造
文章浏览阅读150次。本文讲的是Linux命名空间学习教程(二) IPC,【编者的话】Docker核心解决的问题是利用LXC来实现类似VM的功能,从而利用更加节省的硬件资源提供给用户更多的计算资源。而 LXC所实现的隔离性主要是来自内核的命名空间, 其中pid、net、ipc、mnt、uts 等命名空间将容器的进程、网络、消息、文件系统和hostname 隔离开。本文是Li..._主机的 ipc 命名空间
文章浏览阅读2w次,点赞5次,收藏7次。在设备上强制安装apk。在app已有的情况下使用-r参数在app版本低于现有版本使用-d参数命令adb install -r -d xxx.apk_adb绕过安装程序强制安装app
文章浏览阅读1.6w次。图片摘自别人错误提示如下:Unable to load R3 module D:\Program Files\Oracle\VirtualBox/VBoxDD.dll(VBoxDD):GetLastError=1790(VERR_UNRESOLVED_ERROR)意思是:出现这样的原因应该是跟windows系统的[主题文件被破解]的有关,我也不知道原因为_unresolved (unknown) host platform error. (verr_unresolved_error)
文章浏览阅读2.9w次,点赞12次,收藏63次。感谢原文:https://blog.csdn.net/abc5382334/article/details/24260817感谢原文:https://blog.csdn.net/jiaqingge/article/details/52564348Html CSS的三种链接方式css文本的链接方式有三种:分别是内联定义、链入内部css、和链入外部css1.代码为:<html>..._html链接css代码
文章浏览阅读625次。近几年,蓝牙耳机市场发展迅速,越来越多的消费者希望抛弃线缆,更自由地听音乐,对于运动人士来说,蓝牙耳机的便携性显得尤为重要。但目前市面上的大多数蓝牙耳机实际上都是“有线”的,运动过程中产生的听诊器效应会严重影响听歌的感受。而在“真无线”耳机领域,除了苹果的AirPods外,可供选择的产品并不多,而AirPods又不是为运动场景打造的,防水能力非常差。那么对于喜欢运动又想要“自由”的朋友来说,有没有一款产品能够满足他们的需求呢?下面这十款小编专门为大家搜罗的蓝牙耳机或许就能找到适合的!网红击音F1_适合游戏与运动的高音质蓝牙耳机
文章浏览阅读1k次,点赞6次,收藏7次。在本篇博文中,我们在 iOS 17 beta 4(SwiftUI 5.0)测试版中发现了 SwiftUI 视图首次显示时状态的改变会导致动画“副作用”的问题,并提出多种解决方案。
文章浏览阅读1.9k次。 在 上篇文章–Flutter 实现支持上拉加载和下拉刷新的 ListView 中,我们最终实现的效果是在 listView 上面留下了一段空白,本意是用来加载轮播图的,于是今天就开发了一下,希望能给各位灵感。一 、效果如下说一下大体思路 其实图片展示是用的 PageView ,然后,下面的指示器 是用的 TabPageSelector ,当然整体是用 Stack 包裹起来的。1、..._flutter pageview轮播图 site:csdn.net
文章浏览阅读241次。1.类只有两种访问权限:public,可以被所有包中的类访问; 缺省,只能当前包(当前文件夹)中的类访问2.类成员的访问控制权限-- public 可以被跨类,跨包(package)访问-- private 可以修饰数据成员,构造方法,方法,被它修饰的成员只能被本类自已访问,不能被子类访问-- protected ..._非公共类实际只有两种访问权限
文章浏览阅读601次,点赞25次,收藏5次。想要高效清理电脑磁盘垃圾,我们需要利用专业的清理垃圾软件,CleanMyMac X便是其中表现出众的软件之一,利用CleanMyMac X的清理系统垃圾功能,可以释放更多的磁盘存储空间。2.扫描所有文件后,扫描结果会将所有体积较大的视频文件和文件包进行分类罗列,用户可以根据文件类型、文件大小以及访问时间进行选择性删除。2.打开CleanMyMac X软件,在左侧功能栏中分别设置【智能扫描】、【清理】、【保护】、【速度】、【应用程序】、【文件】等六大功能。3.选择【清理】-【系统垃圾】,然后点击【扫描】。