参考: https://www.cnblogs.com/swordfall/p/10527423.html
flink 流处理写入数据到hbase. 采用的是批量写入(500条数据写入一次)。
HBaseWriter.java
package com.flink;
import com.flink.model.DeviceData;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
* 写入HBase
* 继承RichSinkFunction重写父类方法
*
* 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
*/
class HBaseWriter extends RichSinkFunction<DeviceData>{
private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);
private static org.apache.hadoop.conf.Configuration configuration;
private static Connection connection = null;
private static BufferedMutator mutator;
private static int count = 0;
@Override
public void open(Configuration parameters) throws Exception {
configuration = HBaseConfiguration.create();
configuration.set("hbase.master", "192.168.3.101:60020");
configuration.set("hbase.zookeeper.quorum", "192.168.3.101");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
params.writeBufferSize(2 * 1024 * 1024);
mutator = connection.getBufferedMutator(params);
}
@Override
public void close() throws IOException {
if (mutator != null) {
mutator.close();
}
if (connection != null) {
connection.close();
}
}
@Override
public void invoke(DeviceData values, Context context) throws Exception {
//Date 1970-01-06 11:45:55 to 445555000
long unixTimestamp= 0;
try {
String gatherTime = values.GatherTime;
//毫秒和秒分开处理
if (gatherTime.length() > 20) {
long ms = Long.parseLong(gatherTime.substring(20, 23));
Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
unixTimestamp = date.getTime() + ms;
} else {
Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
unixTimestamp = date.getTime();
}
} catch (ParseException e) {
e.printStackTrace();
}
String RowKey = values.MachID + String.valueOf(unixTimestamp);
String Key = values.OperationValue;
String Value = values.OperationData;
System.out.println("Column Family=f1, RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
Put put = new Put(RowKey.getBytes());
put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
mutator.mutate(put);
//每满500条刷新一下数据
if (count >= 500){
mutator.flush();
count = 0;
}
count = count + 1;
}
}
Main.java
//写入hbase
dataStream.addSink(new HBaseWriter());
DeviceData.java
package com.flink.model;
/**
* 设备数据的数据结构
*/
class DeviceData {
String compID;
String machID;
String Type;
String gateMac;
String operationValue;
String operationData;
String gatherTime;
}
文章浏览阅读2.8k次。AndroidStudio 升级到 4.2.2 版本后,没有代码高亮了,很蛋疼。解决办法是:点开上方的 File,先勾选 Power Save Mode 再取消就可以了。_android studio 高亮
文章浏览阅读1k次。使用swift4.0整合Unity出现[ valueForUndefinedKey:]: this class is not key value coding-compliant for the key unity.'在对应属性前加@objc 即可。或者调回swift3.2版本_forundefinedkey swift4
文章浏览阅读1.3k次。http auto-config="true" access-denied-page="/common/403.htm"> intercept-url pattern="/login.**" access="IS_AUTHENTICATED_ANONYMOUSLY"/> form-login login-page="/login.jsp" defau_springsecurity 设置cookie失效时间
文章浏览阅读1.1k次。继上篇内部拦截法需求还是跟上篇一样。只不过这次用外部拦截法来解决;只要在父容器添加如下代码就可以解决了滑动冲突,很简单,套模板就行 // 分别记录上次滑动的坐标(onInterceptTouchEvent) private int mLastXIntercept = 0; private int mLastYIntercept = 0; @Override public bo_viewpage2外部拦截事件
文章浏览阅读2.5k次,点赞7次,收藏9次。本文章系作者原创,未经许可,不得转载。汇编 堆栈 变量存储 指针先说栈的概念,栈其实也是一种。。。。。先说内存的概念吧。。。。。额 先说计算机吧,简单来说的话,可以把计算机理解成由CPU,内存,硬盘组成,而CPU内部又包括一种叫做内部寄存器的东西,包括 数据寄存器: AX,BX,CX,DX; 段寄存器: CS,DS,ES,SS; 指针与变址寄存器SP,BP,SI,DI; ..._汇编语言栈指针
文章浏览阅读1w次,点赞14次,收藏56次。转载自 架构师之路:从码农到架构师你差了哪些 Web应用,最常见的研发语言是Java和PHP。 后端服务,最常见的研发语言是Java和C/C++。 大数据,最常见的研发语言是Java和Python。 可以说,Java是现阶段中国互联网公司中,覆盖度最广的研发语言,掌握了Java技术体系,不管在成熟的大公司,快速发展的公司,还是创业阶段的公司,都能有立足之地。有..._web架构师
文章浏览阅读7.3k次,点赞6次,收藏36次。超级简单的Python爬虫入门教程(非常详细),通俗易懂,看一遍就会了_爬虫python入门
文章浏览阅读1.2k次。您的代码存在一些问题。首先,您在此处显示的两个模型是not等效的:尽管您将scikit-learn LogisticRegression设置为fit_intercept=True(这是默认设置),但您并没有这样做statsmodels一;来自statsmodels docs:默认情况下不包括拦截器,用户应添加。参见statsmodels.tools.add_constant。另一个问题是,尽管您处..._sm fit(method
文章浏览阅读518次。一、sfml官网下载32位的版本 一样的设置,64位的版本我没有成功,用不了。二、三、四以下这些内容拷贝过去:sfml-graphics-d.libsfml-window-d.libsfml-system-d.libsfml-audio-d.lib..._vsllfqm
文章浏览阅读2.7k次。由于工作需要,要做一个类似bc2的文本比较工具,用红色字体标明不同的地方,研究了半天,自己写了一个简易版的。文本比较的规则是1.先比较文本的行数,2.再比较对应行的字符串的长度3.再比较每一个字符串是否相同。具体代码如下:其中m_basestr和m_mergestr里面存放是待比较的字符串int basecount=m_basestr.GetLength(); int mergec_byoned compare 字符串比较算法
文章浏览阅读79次。xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">org.apacheapache174.0.0org.apache.atlasapache-atlas3.0.0-SNAPSHOTMetadata Management and Data Govern..._atlas.pom
文章浏览阅读1.5k次。C语言中有可以产生随机数据的函数,需要添加 stdlib. h头文件与time.h头文件。首先在main函数开头加上“ srand(unsigned)time(NULL));",这个语句将生成随机数的种子(不懂也没关系,只要记住这个语句,并且知道 srand是初始化随机种子用的即可)。然后,在需要使用随机数的地方使用 rand()函数。下面是一段生成十个随机数的代码:程序代码:#incl..._随机数