1、pom.xml

注意:<dependency>
            <groupId>com.alibaba.flink</groupId>
            <artifactId>datahub-connector</artifactId>
            <version>0.1-SNAPSHOT</version>
            <classifier>jar-with-dependencies</classifier>
        </dependency>

此依赖需要手动加载到maven仓库,具体方式参见:读取DataHub数据示例 - 实时计算Flink版 - 阿里云

也可以参考本人小记:

Blink-DataHub connector Maven依赖转化_大数据00的博客-CSDN博客

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.alibaba.blink</groupId><artifactId>blink-udx-3.x</artifactId><version>1.0-SNAPSHOT</version><properties><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><blink.version>blink-3.3.0</blink.version><java.version>1.8</java.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-core</artifactId><version>${blink.version}</version><scope>provided</scope>
<!--            <systemPath>${project.basedir}/lib/flink-core-blink-3.2.2.jar</systemPath>--></dependency><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${blink.version}</version><scope>provided</scope>
<!--            <systemPath>${project.basedir}/lib/flink-streaming-java_2.11-blink-3.2.2.jar</systemPath>--></dependency><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-table_2.11</artifactId><version>${blink.version}</version><scope>provided</scope>
<!--            <systemPath>${project.basedir}/lib/flink-table_2.11-blink-3.2.2.jar</systemPath>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.12</version><scope>provided</scope></dependency><!--        <dependency>-->
<!--            <groupId>com.aliyun.datahub</groupId>-->
<!--            <artifactId>aliyun-sdk-datahub</artifactId>-->
<!--            <version>2.12.2-public</version>-->
<!--        </dependency>--><dependency><groupId>com.aliyun.datahub</groupId><artifactId>aliyun-sdk-datahub</artifactId><version>2.12.2-public</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>jul-slf4j</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><!--<exclusion>--><!--<artifactId>jackson-databind</artifactId>--><!--<groupId>com.fasterxml.jackson.core</groupId>--><!--</exclusion>--><!--<exclusion>--><!--<artifactId>jackson-annotations</artifactId>--><!--<groupId>com.fasterxml.jackson.core</groupId>--><!--</exclusion>--></exclusions></dependency><!-- 打包只需要这个依赖--><dependency><groupId>com.alibaba.flink</groupId><artifactId>datahub-connector</artifactId><version>0.1-SNAPSHOT</version><classifier>jar-with-dependencies</classifier></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.1.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.1</version><configuration><archive><manifest><mainClass>com.alibaba.blink.demo.stream_demo</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

2、开发入口类

package com.alibaba.blink.demo;import com.alibaba.blink.mojo.Record;
import com.alibaba.blink.sink.HoloSink;
import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.List;public class stream_demo {private static String endPoint = "your endpoint";//内网访问。//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。private static String projectName = "your projectName ";private static String topicSourceName = "your topicSourceName";private static String accessId = "your accessId ";private static String accessKey = "your accessKey";private static Long datahubStartInMs = 0L;//设置消费的启动位点对应的时间。private static Long datahubEndInMs=Long.MAX_VALUE;public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<List<RecordEntry>> listDataStreamSource = env.addSource(new DatahubSourceFunction(endPoint,projectName,topicSourceName,accessId,accessKey,datahubStartInMs,datahubEndInMs,20L,1000L,1000));SingleOutputStreamOperator<Record> result = listDataStreamSource.flatMap((FlatMapFunction<List<RecordEntry>, Record>) (ls, collector) -> {for (RecordEntry recordEntry : ls) {Record record = new Record(recordEntry);collector.collect(record);}}).returns(Record.class).filter(s -> (s.getOneid() != null)).filter(s -> (s.getEvent() != null));//        result.print();
//                .addSink((SinkFunction)new HoloSink())
//                .setParallelism(2);result.addSink(new HoloSink()).setParallelism(10);env.execute();}
}

3、开发自定义HoloSink

package com.alibaba.blink.sink;import com.alibaba.blink.mojo.Record;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.UUID;public class HoloSink extends RichSinkFunction<Record> {private static String url="jdbc:postgresql://ip:port/database?tcpKeepAlive=true";private static String username = "username ";private static String password = "password ";private static String postgresdriver = "org.postgresql.Driver";private Connection connection;private ThreadLocal<PreparedStatement> pstmt;private ThreadLocal<PreparedStatement> querymt;private ThreadLocal<PreparedStatement> updatemt;private Connection getConnection() {Connection conn = null;try {Class.forName(postgresdriver);conn = DriverManager.getConnection(url, username, password);} catch (Exception e) {e.printStackTrace();}return conn;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.connection = getConnection();this.pstmt = new ThreadLocal<>();this.querymt = new ThreadLocal<>();this.updatemt = new ThreadLocal<>();}@Overridepublic void invoke(Record record, Context context) throws Exception {if ( null == record || record.getOneid() == null) {System.out.println("record is null!!!");return;}//哪个线程空着呢 用哪个线程if (this.querymt.get() == null)this.querymt.set(this.connection.prepareStatement("select oneid,event from holo_sink where oneid=?"));if (this.updatemt.get() == null)this.updatemt.set(this.connection.prepareStatement("update holo_sink set event=? where oneid=?"));if (this.pstmt.get() == null)this.pstmt.set(this.connection.prepareStatement("insert into holo_sink(oneid,event) values (?,?)"));((PreparedStatement)this.querymt.get()).setString(1, record.getOneid());ResultSet resultSet = ((PreparedStatement)this.querymt.get()).executeQuery();
//        System.out.println(resultSet.getFetchSize());//如果oneid存在 即更新if ( resultSet.next() ) {((PreparedStatement)this.updatemt.get()).setString(1, record.getEvent());((PreparedStatement)this.updatemt.get()).setString(2, record.getOneid());((PreparedStatement)this.updatemt.get()).executeUpdate();System.out.println("update " + record.toString() + ",threadId:" + Thread.currentThread().getId());// oneid不存在就插入 并赋值新的oneid}else {((PreparedStatement)this.pstmt.get()).setString(1, UUID.randomUUID().toString());((PreparedStatement)this.pstmt.get()).setString(2, record.getEvent());((PreparedStatement)this.pstmt.get()).executeUpdate();System.out.println("insert " + record.toString() + ",threadId:" + Thread.currentThread().getId());}}@Overridepublic void close() throws Exception {super.close();if (this.pstmt.get() != null)((PreparedStatement)this.pstmt.get()).close();if (this.querymt.get() != null)((PreparedStatement)this.querymt.get()).close();if (this.updatemt.get() != null)((PreparedStatement)this.updatemt.get()).close();if (this.connection != null)this.connection.close();}
}

4、自定义mojo类

package com.alibaba.blink.mojo;import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;public class Record {private String oneid;private String event;public Record(RecordEntry recordEntry) {this.oneid = getString(recordEntry, "oneid");this.event = getString(recordEntry, "event");}private String getString(RecordEntry recordEntry, String field) {Object o = ((TupleRecordData)recordEntry.getRecordData()).getField(field);if (o == null)return "null";return o.toString();}public String getOneid() {return oneid;}public void setOneid(String oneid) {this.oneid = oneid;}public String getEvent() {return event;}public void setEvent(String event) {this.event = event;}public Record(String oneid, String event) {this.oneid = oneid;this.event = event;}public Record() {}@Overridepublic String toString() {return "Record{" +"oneid='" + oneid + '\'' +", event='" + event + '\'' +'}';}
}

5、辅助类 DataHub Writer

package com.alibaba.blink.datahub;import com.alibaba.blink.utils.producer_with_random;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.PutRecordsResult;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;import java.util.ArrayList;
import java.util.List;public class datahubwriter {public static void main(String[] args) throws InterruptedException {// Endpoint以Region: 华东1为例,其他Region请按实际情况填写String endpoint = "endpoint ";String accessId = "accessId ";String accessKey = "accessKey ";
// 创建DataHubClient实例DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(new DatahubConfig(endpoint,// 是否开启二进制传输,服务端2.12版本开始支持new AliyunAccount(accessId, accessKey), true))//专有云使用出错尝试将参数设置为           false// HttpConfig可不设置,不设置时采用默认值.setHttpConfig(new HttpConfig().setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩.setConnTimeout(10000)).build();String project = "projectName";String topic = "topic ";int retryTimes = 10;tupleExample(datahubClient,project,topic,retryTimes);}// 写入Tuple型数据public static void tupleExample(DatahubClient datahubClient,String project,String topic,int retryTimes) throws InterruptedException {// 获取schemaRecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();// 生成100条数据List<RecordEntry> recordEntries = new ArrayList<>();for (int i = 0; i < 100; ++i) {RecordEntry recordEntry = new RecordEntry();// 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入recordEntry.addAttribute("key2", "value2");if(i%10==0) {Thread.sleep(1000);}String records = producer_with_random.get_records();String v = "" +Math.random() * 100 + 1+ System.currentTimeMillis() ;System.out.println(records +"=>"+ v);TupleRecordData data = new TupleRecordData(recordSchema);data.setField("oneid", records);data.setField("event", v);recordEntry.setRecordData(data);boolean add = recordEntries.add(recordEntry);if (add==true){System.out.println("数据生成成功!!!!");}}try {PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);int i = result.getFailedRecordCount();if (i > 0) {retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);}}  catch (DatahubClientException e) {System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());}}//重试机制public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {boolean suc = false;while (retryTimes != 0) {retryTimes = retryTimes - 1;PutRecordsResult recordsResult = client.putRecords(project, topic, records);if (recordsResult.getFailedRecordCount() > 0) {retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);}suc = true;break;}if (!suc) {System.out.println("retryFailure");}}
}

6、方法类

package com.alibaba.blink.utils;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;public class producer_with_random {static Map map = new HashMap<Integer,String>(){{ put(1,"0000048e-fb42-4898-ad1c-4b2aca517165");put(2,"00000842-448e-415b-808a-1f3682c64b1b");put(3,"00000d2d-7c7b-46b7-bd99-2e224cd194f9");put(4,"0000110b-efb9-4d03-9438-f4e58dbd6f11");put(5,"00001245-aa6c-4ddc-94be-b99ee7b51deb");put(6,"00001406-c496-4bdf-9fbc-76bbbc053423");put(7,"00001755-c010-49ef-b063-313bf0833fc1");put(8,"00001a92-8c10-4a73-9480-13941e2483db");put(9,"00001b17-8fff-440f-b9eb-5c9a5477d533");put(10,"00001d09-66ad-4ad7-a042-c8bdc578676d");put(11,"00001d67-d753-43d9-a087-2556548ca7c3");put(12,"00001da8-0290-4bed-8872-8a014543e91d");put(13,"00001f8d-6aa6-4483-9c75-b4c98c2d05cb");put(14,"000022b8-623d-404b-9a56-4f9243adc32c");put(15,"000023f4-c9e5-43fd-aa33-56551d6aebee");put(16,"00002651-df13-4cb1-a107-93623e18734c");put(17,"000033a1-6a4f-4b95-ae5a-40b7f8616498");put(18,"00003d5f-6360-4344-94c1-d67bedbd1709");put(19,"00003dcc-5b4c-4519-af41-484704079680");put(20, String.valueOf(UUID.randomUUID()));}};public static String get_records(){int random = (int)(Math.random()*21+1);System.out.println(random);return (String) map.get(random);}public static void main(String[] args) {System.out.println(get_records());}
}

7、打包上传

8、创建DataStream任务

9、任务详情

--完整主类名,必填,例如com.alibaba.realtimecompute.DatastreamExample
blink.main.class=com.alibaba.blink.demo.stream_demo--包含完整主类名的JAR包资源名称,多个JAR包时必填,例如blink_datastream.jar
--blink.main.jar=${完整主类名jar包的资源名称}--默认state backend配置,当作业代码没有显式声明时生效
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000--默认Checkpoint配置,当作业代码没有显式声明时生效
blink.checkpoint.interval.ms=180000--默认启用项目参数
--disable.project.config=false--设置自定义参数,代码中获取自定义参数的方法请参考如下链接:
--https://help.aliyun.com/document_detail/127758.html?spm=a2c4g.11174283.6.677.61fb1e49NJoWTR

其他可选择设置项

3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。
sql.exec.mini-batch.window.enabled=true
-- excatly-once语义。
blink.checkpoint.mode=EXACTLY_ONCE
-- checkpoint间隔时间,单位毫秒。
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
-- 实时计算Flink版2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
-- 实时计算Flink版2.0及以上版本开启5秒的microbatch(窗口函数不需要设置该参数)。
blink.microBatch.allowLatencyMs=5000
-- 表示整个Job允许的延迟。
blink.miniBatch.allowLatencyMs=5000
-- 双流join节点优化参数。
blink.miniBatch.join.enabled=true
-- 单个Batch的size。
blink.miniBatch.size=20000
-- local优化,实时计算Flink版2.0及以上版本默认已经开启,1.6.4版本需要手动开启。
blink.localAgg.enabled=true
-- 实时计算Flink版2.0及以上版本开启partial优化,解决count distinct效率低问题。
blink.partialAgg.enabled=true
-- union all优化。
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
-- GC优化(源表为SLS时,不能设置该参数)。
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
-- 时区设置。
blink.job.timeZone=Asia/Shanghai

10、提交发布 启动

11、资源文件

alibaba-flink-connectors-flink-1.5.2-compatible.zip-flink文档类资源-CSDN下载

blink_udx_3x-master.zip-flink文档类资源-CSDN下载

datahub-demo-master.zip-flink文档类资源-CSDN下载

BLINK-DataStream开发相关推荐

  1. Datastream 开发打包问题

    简介:Datastream作业开发时往往会遇到一些jar包冲突等问题,本文主要讲解作业开发时需要引入哪些依赖以及哪些需要被打包进作业的jar中,从而避免不必要的依赖被打入了作业jar中以及可能产生的依 ...

  2. 阿里正式向 Apache Flink 贡献 Blink 源码

    导读: 如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源.今天,我们终于等到了这一刻. 阿里资深 ...

  3. 终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码

    阿里妹导读:如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源.今天,我们终于等到了这一刻. 阿里 ...

  4. 重磅!阿里巴巴Blink正式开源,重要优化点解读\n

    喜大普奔!阿里巴巴终于在今天正式放出了内部Flink版本Blink的开源项目地址! 在1个月前的Flink Forward China峰会上,阿里巴巴集团副总裁周靖人宣布Blink 将于2019 年1 ...

  5. Flink学习笔记-阿里云Blink

    因为项目环境限制,流处理引擎只能使用阿里云 Blink3.3.0 版本,翻阅阿里云官网 Blink 和 GitHub Flink 的Blink分支资料,成功构建并运行 Blink 的 DataStre ...

  6. 【阿里内部应用】基于Blink为新商业调控打造实时大数据交互查询服务

    基于Blink为新商业调控打造实时大数据交互查询服务 案例与解决方案汇总页: 阿里云实时计算产品案例&解决方案汇总 从IT到DT.从电商到新商业,阿里巴巴的每个细胞都存在大数据的DNA,如何挖 ...

  7. How Blink Works 中文译文

    Chromium 的工程师们写了两篇技术文章 How Blink Works 和 How cc Works,分别介绍了 Chrome 浏览器内核内部的两个重要模块 Blink 和 cc 内部设计和实现 ...

  8. 历史在重演:从KHTML到WebKit,再到Blink(转)

    上周四,Google宣布从WebKit 分支出自己的浏览器渲染引擎 Blink.很多人觉得这像是晴天霹雳,或者甚至是迟到的愚人节笑话,但是其实这件事情是难以避免的,而且是历史的重演. 什么是WebKi ...

  9. 历史在重演:从KHTML到WebKit,再到Blink

    http://36kr.com/p/202396.html 上周四,Google宣布从WebKit 分支出自己的浏览器渲染引擎 Blink.很多人觉得这像是晴天霹雳,或者甚至是迟到的愚人节笑话,但是其 ...

  10. 揭秘阿里秒级百万TPS平台架构实现

    转载自  揭秘阿里秒级百万TPS平台架构实现 导读:搜索离线数据处理是一个典型的海量数据批次/实时计算结合的场景,阿里搜索中台团队立足内部技术结合开源大数据存储和计算系统,针对自身业务和技术特点构建了 ...

最新文章

  1. Tips——IndexSearcher自动更新
  2. 转:Ogre的MaterialSystem分析
  3. Java:反射+泛型:获取类型参数的实例
  4. Sigmoid函数与逻辑回归
  5. Oracle相关练习
  6. Android O(29 )---MTK 平台代码同步
  7. android软件百分比怎么实现,Android自实现百分比布局
  8. java提示框easyui风格_[Java教程]jQuery EasyUI 提示框(Messager)用法
  9. android 图库分析,Android开发之ImageSwitcher相册功能实例分析
  10. Bailian2700 字符替换【字符串】
  11. 微信公众平台体验之三(手机号归属)
  12. python中高阶函数与装饰器(3)
  13. EDI 公开课:EDI 系统集成之数据库方案
  14. 内存不能为read进不去桌面_纯小白向:AMD平台内存超频教程,附金士顿FuryDDR4 3200超频实战...
  15. webstorm 下载并设置jade、less
  16. 哈工大计算机系统大作业
  17. it系统应急响应流程图_各类突发事件应急处置流程图,速看!
  18. 03-OSPF OE2和OE1外部路由详解
  19. SSL连接dh key too small
  20. raid管理工具—megacli、megaraidsas-status

热门文章

  1. 【办公-excel】VBA 选择文件、文件夹
  2. canvas 画布绘制时钟
  3. 戴尔服务器系统板电压超出范围器,戴尔服务器硬件报错信息LCD液晶面板错误信息...
  4. 多物理场仿真软件COMSOL Multiphysics的安装使用
  5. Android RecyclerView 使用方法:垂直、横向、瀑布流
  6. java学生管理系统界面录入_[两个例题教学中的学生插话] java学生管理系统界面...
  7. Spring Cloud Alibaba入门教程-05【Alibaba微服务组件Nacos配置中心】
  8. 【狮子数学】chapter3-02-不定积分的计算(56-58讲)
  9. Jquery 上传文件(不通过form表单提交)
  10. elasticsearch client依赖包下载方法