这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql
为例,这里代码分为两种,事务和非事务
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @Author: J
* @Version: 1.0
* @CreateTime: 2023/8/2
* @Description: 测试
**/
public class FlinkJdbcSink {
public static void main(String[] args) throws Exception {
// 构建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
// 构建jdbc sink
SinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink(
"insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
new JdbcStatementBuilder<CustomizeBean>() {
@Override
public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
pStmt.setString(1, customizeBean.getName());
pStmt.setInt(2, customizeBean.getAge());
pStmt.setString(3, customizeBean.getGender());
pStmt.setString(4, customizeBean.getHobbit());
}
}, // 字段映射配置,这部分就和常规的java api差不多了
JdbcExecutionOptions.builder()
.withBatchSize(10) // 批次大小,条数
.withBatchIntervalMs(5000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build(), // 写入参数配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
.withUsername("root")
.withPassword("password")
.build() // jdbc信息配置
);
// 添加jdbc sink
customizeSource.addSink(jdbcSink);
env.execute();
}
}
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;
import javax.sql.XADataSource;
/**
* @Author: J
* @Version: 1.0
* @CreateTime: 2023/8/2
* @Description: 测试
**/
public class FlinkJdbcSink {
public static void main(String[] args) throws Exception {
// 构建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
// 每20秒作为checkpoint的一个周期
env.enableCheckpointing(20000);
// 两次checkpoint间隔最少是10秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// 程序取消或者停止时不删除checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// checkpoint必须在60秒结束,否则将丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只能有一个checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置EXACTLY_ONCE语义,默认就是这个
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoint存储位置
env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint
SinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink(
"insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {
pStmt.setString(1, customizeBean.getName());
pStmt.setInt(2, customizeBean.getAge());
pStmt.setString(3, customizeBean.getGender());
pStmt.setString(4, customizeBean.getHobbit());
}, // 字段映射配置,这部分就和常规的java api差不多了
JdbcExecutionOptions.builder()
.withMaxRetries(0) // 设置重复次数
.withBatchSize(25) // 设置批次大小,数据条数
.withBatchIntervalMs(1000) // 批次最大等待时间
.build(),
JdbcExactlyOnceOptions.builder()
// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的
.withTransactionPerConnection(true)
.build(),
(SerializableSupplier<XADataSource>) () -> {
// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url
mysqlXADataSource.setUser("root"); // 设置用户
mysqlXADataSource.setPassword("password"); // 设置密码
return mysqlXADataSource;
}
);
// 添加jdbc sink
customizeSource.addSink(exactlyOneJdbcSink);
env.execute();
}
}
<!-- 在原有的依赖中加入下面两个内容 -->
<!-- JDBC connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
文章浏览阅读1k次。通过使用ajax方法跨域请求是浏览器所不允许的,浏览器出于安全考虑是禁止的。警告信息如下:不过jQuery对跨域问题也有解决方案,使用jsonp的方式解决,方法如下:$.ajax({ async:false, url: 'http://www.mysite.com/demo.do', // 跨域URL ty..._nginx不停的xhr
文章浏览阅读2k次。关于在 Oracle 中配置 extproc 以访问 ST_Geometry,也就是我们所说的 使用空间SQL 的方法,官方文档链接如下。http://desktop.arcgis.com/zh-cn/arcmap/latest/manage-data/gdbs-in-oracle/configure-oracle-extproc.htm其实简单总结一下,主要就分为以下几个步骤。..._extproc
文章浏览阅读1.5w次。linux下没有上面的两个函数,需要使用函数 mbstowcs和wcstombsmbstowcs将多字节编码转换为宽字节编码wcstombs将宽字节编码转换为多字节编码这两个函数,转换过程中受到系统编码类型的影响,需要通过设置来设定转换前和转换后的编码类型。通过函数setlocale进行系统编码的设置。linux下输入命名locale -a查看系统支持的编码_linux c++ gbk->utf8
文章浏览阅读750次。今天准备从生产库向测试库进行数据导入,结果在imp导入的时候遇到“ IMP-00009:导出文件异常结束” 错误,google一下,发现可能有如下原因导致imp的数据太大,没有写buffer和commit两个数据库字符集不同从低版本exp的dmp文件,向高版本imp导出的dmp文件出错传输dmp文件时,文件损坏解决办法:imp时指定..._imp-00009导出文件异常结束
文章浏览阅读143次。当下是一个大数据的时代,各个行业都离不开数据的支持。因此,网络爬虫就应运而生。网络爬虫当下最为火热的是Python,Python开发爬虫相对简单,而且功能库相当完善,力压众多开发语言。本次教程我们爬取前程无忧的招聘信息来分析Python程序员需要掌握那些编程技术。首先在谷歌浏览器打开前程无忧的首页,按F12打开浏览器的开发者工具。浏览器开发者工具是用于捕捉网站的请求信息,通过分析请求信息可以了解请..._初级python程序员能力要求
文章浏览阅读7.6k次,点赞2次,收藏6次。@Service标注的bean,类名:ABDemoService查看源码后发现,原来是经过一个特殊处理:当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致public class AnnotationBeanNameGenerator implements BeanNameGenerator { private static final String C..._@service beanname
文章浏览阅读6.9w次,点赞73次,收藏463次。1.前序创建#include<stdio.h>#include<string.h>#include<stdlib.h>#include<malloc.h>#include<iostream>#include<stack>#include<queue>using namespace std;typed_二叉树的建立
文章浏览阅读7.1k次。在Asp.net上使用Excel导出功能,如果文件名出现中文,便会以乱码视之。 解决方法: fileName = HttpUtility.UrlEncode(fileName, System.Text.Encoding.UTF8);_asp.net utf8 导出中文字符乱码
文章浏览阅读2.1k次,点赞4次,收藏23次。第一次实验 词法分析实验报告设计思想词法分析的主要任务是根据文法的词汇表以及对应约定的编码进行一定的识别,找出文件中所有的合法的单词,并给出一定的信息作为最后的结果,用于后续语法分析程序的使用;本实验针对 PL/0 语言 的文法、词汇表编写一个词法分析程序,对于每个单词根据词汇表输出: (单词种类, 单词的值) 二元对。词汇表:种别编码单词符号助记符0beginb..._对pl/0作以下修改扩充。增加单词
文章浏览阅读773次。我在使用adb.exe时遇到了麻烦.我想使用与bash相同的adb.exe shell提示符,所以我决定更改默认的bash二进制文件(当然二进制文件是交叉编译的,一切都很完美)更改bash二进制文件遵循以下顺序> adb remount> adb push bash / system / bin /> adb shell> cd / system / bin> chm..._adb shell mv 权限
文章浏览阅读6.8k次,点赞12次,收藏125次。1. 单目相机标定引言相机标定已经研究多年,标定的算法可以分为基于摄影测量的标定和自标定。其中,应用最为广泛的还是张正友标定法。这是一种简单灵活、高鲁棒性、低成本的相机标定算法。仅需要一台相机和一块平面标定板构建相机标定系统,在标定过程中,相机拍摄多个角度下(至少两个角度,推荐10~20个角度)的标定板图像(相机和标定板都可以移动),即可对相机的内外参数进行标定。下面介绍张氏标定法(以下也这么称呼)的原理。原理相机模型和单应矩阵相机标定,就是对相机的内外参数进行计算的过程,从而得到物体到图像的投影_相机-投影仪标定
文章浏览阅读2.2k次。文章目录Wayland 架构Wayland 渲染Wayland的 硬件支持简 述: 翻译一篇关于和 wayland 有关的技术文章, 其英文标题为Wayland Architecture .Wayland 架构若是想要更好的理解 Wayland 架构及其与 X (X11 or X Window System) 结构;一种很好的方法是将事件从输入设备就开始跟踪, 查看期间所有的屏幕上出现的变化。这就是我们现在对 X 的理解。 内核是从一个输入设备中获取一个事件,并通过 evdev 输入_wayland