摘要:

Flink log connector是阿里云日志服务推出的,用于对接Flink的工具,包含两块,分别是消费者和生产者,消费者用于从日志服务中读数据,支持exactly once语义,生产者用于将数据写到日志服务中,该Connector隐藏了日志服务的一些概念,比如Shard的分裂合并等,用户在使用时只需要专注在自己的业务逻辑即可。

阿里云日志服务是针对实时数据一站式服务,用户只需要将精力集中在分析上,过程中数据采集、对接各种存储计算、数据索引和查询等琐碎工作等都可以交给日志服务完成。

日志服务中最基础的功能是LogHub,支持数据实时采集与消费,实时消费家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

Flink Connector

Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.
生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.3.2</version>
</dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>flink-log-connector</artifactId><version>0.1.3</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version>
</dependency><dependency><groupId>com.aliyun.openservices</groupId><artifactId>aliyun-log</artifactId><version>0.6.10</version></dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>log-loghub-producer</artifactId><version>0.1.8</version>
</dependency>复制代码

代码:Github

用法

  1. 请参考日志服务文档,正确创建Logstore。
  2. 如果使用子账号访问,请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。

1. Log Consumer

在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数
量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

1.1 配置启动参数

Properties configProps = new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));复制代码

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

1.2 设置消费起始位置

Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

  • Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
  • Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
  • UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。

三种取值举例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");复制代码

1.3 监控:消费进度(可选)

Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考
文档消费组-查看状态,[消费组-监控报警
](help.aliyun.com/document_de…。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);复制代码

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

1.4 容灾和exactly once语义支持

当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并
从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);复制代码

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints。

1.5 补充材料:关联 API与权限设置

Flink log consumer 会用到的阿里云日志服务接口如下:

  • GetCursorOrData

    用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
    控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).复制代码
    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");复制代码
  • ListShards

     用于获取logStore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。复制代码
    // 设置每30s调用一次ListShards
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");复制代码
  • CreateConsumerGroup

    该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。复制代码
  • ConsumerGroupUpdateCheckPoint

    该接口用户将flink的snapshot同步到日志服务的consumerGroup中。
    复制代码

子用户使用Flink log consumer需要授权如下几个RAM Policy:

2. Log Producer

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchema<String> {public RawLogGroup serialize(String element) {RawLogGroup rlg = new RawLogGroup();RawLog rl = new RawLog();rl.setTime((int)(System.currentTimeMillis() / 1000));rl.addContent("message", element);rlg.addLog(rl);return rlg;}
}
public class ProducerSample {public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";public static String sAccessKeyId = "";public static String sAccessKey = "";public static String sProject = "ali-cn-hangzhou-sls-admin";public static String sLogstore = "test-flink-producer";private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);public static void main(String[] args) throws Exception {final ParameterTool params = ParameterTool.fromArgs(args);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);env.setParallelism(3);DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());Properties configProps = new Properties();// 设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);// 设置访问日志服务的akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);// 设置日志写入的日志服务projectconfigProps.put(ConfigConstants.LOG_PROJECT, sProject);// 设置日志写入的日志服务logStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);simpleStringStream.addSink(logProducer);env.execute("flink log producer");}// 模拟产生日志public static class EventsGenerator implements SourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {long seq = 0;while (running) {Thread.sleep(10);ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));}}@Overridepublic void cancel() {running = false;}}
}复制代码

2.1 初始化

Producer初始化主要需要做两件事情:

  • 初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

    // 用于发送数据的io线程的数量,默认是8
    ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
    // 该值定义日志数据被缓存发送的时间,默认是3000
    ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
    // 缓存发送的包中日志的数量,默认是4096
    ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
    // 缓存发送的包的大小,默认是3Mb
    ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
    // 作业可以使用的内存总的大小,默认是100Mb
    ConfigConstants.LOG_MEM_POOL_BYTES复制代码
    上述参数不是必选参数,用户可以不设置,直接使用默认值。复制代码
  • 重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

    RawLogGroup是log的集合,每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。
    复制代码

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {// 生成32位hash值public String getHashKey(String element) {try {MessageDigest md = MessageDigest.getInstance("MD5");md.update(element.getBytes());String hash = new BigInteger(1, md.digest()).toString(16);while(hash.length() < 32) hash = "0" + hash;return hash;} catch (NoSuchAlgorithmException e) {}return  "0000000000000000000000000000000000000000000000000000000000000000";}});复制代码

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

2.2 权限设置:RAM Policy

Producer依赖日志服务的API写数据,如下:

  • log:PostLogStoreLogs
  • log:ListShards

当RAM子用户使用Producer时,需要对上述两个API进行授权:

日志服务Flink Connector《支持Exactly Once》相关推荐

  1. 阿里云日志服务发布历史

    发布历史 2017/9 新功能 支持JDBC协议:通过SQL92标准语法对日志进行查询分析 2017/8 性能优化 对底层存储进行深度优化,分析性能提升1000倍,做到真正实时日志分析.参见场景:Ng ...

  2. 安卓使用阿里云的日志服务

    安卓日志阿里云的日志服务 一.说明 二.操作步骤 1. 在阿里云上开一个日志服务的project 2. 新建一个Logstore 3. 新建数据接入 4. 新建 accesskey 5. 下载sdk ...

  3. Flink清洗日志服务SLS的数据并求ACUPCU

    上文说到为什么使用Flink实时消费阿里云日志服务SLS的数据,并把阿里云上Flink消费SLS的代码粘贴到本地,做了相关修改之后成功把整个流程跑通了.但仅仅这样是不够的,从控制台上面输出的数据来看是 ...

  4. 日志服务发布Windows Logtail,完整支持两大平台

    Windows Logtail发布 日志服务支持客户端.网页.Syslog.SDK/API等方式接入数据,参考. Linux平台上的Logtail客户端拥有优秀的性能和良好的接入体验,在解决跨平台兼容 ...

  5. flink jdbc connector支持clickhouse

    1.业务背景 业务需求把数据写入clickhouse,同时还需要支持主键更新.目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行 ...

  6. 日志服务(SLS)集成 Spark 流计算实战

    前言 日志服务作为一站式的日志的采集与分析平台,提供了各种用户场景的日志采集能力,通过日志服务提供的各种与·与SDK,采集客户端(Logtail),Producer,用户可以非常容易的把各种数据源中的 ...

  7. 自建ELK vs 日志服务(SLS)全方位对比

    简介 提到日志实时分析,很多人都会想到很火的ELK Stack(Elastic/Logstash/Kibana)来搭建.ELK方案开源,在社区中有大量的内容和使用案例. 阿里云日志服务产品在新版中增强 ...

  8. 云原生架构下日志服务数据预处理

    简介:本篇实践将以某家国际教育机构为例,为大家详细介绍云原生架构下日志服务数据预处理以及对应的解决方案和最佳实践操作手册,方便用户快速对号入座,解决云原生架构下的常见日志难题. 直达最佳实践:[htt ...

  9. 日志服务与SIEM(如Splunk)集成方案实战

    背景信息 目标 本文主要介绍如何让阿里云日志服务与您的SIEM方案(如Splunk)对接, 以便确保阿里云上的所有法规.审计.与其他相关日志能够导入到您的安全运维中心(SOC)中. 名词解释 LOG( ...

最新文章

  1. git restore用法
  2. junit4 assert类中的assert方法总结
  3. 关于eclipse项目红色感叹号的解决办法
  4. mysql 线性表_线性表之顺序存储,基本操作
  5. WPF地区选择控件(内附下载地址)
  6. 整数分解为若干项之和
  7. SELECT list is not in GROUP BY clause and contains nonaggregated column---Linux工作笔记049
  8. idea中HTML可以打debug吗,你真的会用idea进行debug吗?idea实用debug教程
  9. ZWrite 和ZTest
  10. 计算二叉树的叶子结点个数
  11. 【Pycharm】笔记内容010:记录Pycharm报错“Can not find 程序所在目录 或者Can not run program...“的问题解决
  12. 开发一个app多少钱啊?
  13. 对于法线贴图(Normal Map) 的深入研究
  14. 打印机多少钱一台?购买打印机打印速度快吗
  15. 选课系统源码html,高校选课系统 - WEB源码|源代码 - 源码中国
  16. 计算机上如何设置访问密码忘了怎么办,电脑设置的密码忘记了怎么办
  17. 小程序云开发之--微信公众号文章采集篇
  18. 做一个广告业务后台需要几天,5天吗?不,用PhalApi开源框架,1天就能做好
  19. html 名人名言源代码,基于JQuery及AJAX实现名人名言随机生成器_咋地 _前端开发者...
  20. JavaScript 编程精解 中文第三版 二十、Node.js

热门文章

  1. 关于ognl+struts-tag与el+jstl互相代替,以及el和jstl的学习笔记
  2. vs2005 打sp1补丁失败的解决办法
  3. 【C++】多线程与异步编程【四】
  4. ASP.NET2.0 GridView小技巧汇粹
  5. String和常量池
  6. 利用JNI技术在Android中调用C++形式的OpenGL ES 2.0函数
  7. 【ubuntu】在ubuntu下无法输出拼音输入法中的中括号“【” 和 “】”的解决方法
  8. 图片转字符 android,转字符图app下载-转字符图 安卓版v2.4-PC6安卓网
  9. 商淘多b2b2c商城系统怎么在个人电脑上安装_社交电商系统开发是否有价值?
  10. 【java】Maven工程引入各种jar包的功能