AWS实战:AWS Kinesis Data Firehose
简介
Amazon Kinesis Data Firehose 是一项提取、转换、加载 (ETL) 服务,可以将串流数据以可靠方式捕获、转换和提供到数据湖、数据存储和分析服务中。
Amazon Kinesis Data Firehose 是将流数据加载到数据存储和分析工具的最简单方式。Kinesis Data Firehose 是一项完全托管式服务,让您可以轻松地从数十万个来源中捕获、转换大量流数据,并将其加载到 Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Kinesis Data Analytics、通用 HTTP 终端节点,以及 Datadog、New Relic、MongoDB 和 Splunk 等的服务提供商中,从而获得近乎实时的分析与见解。
功能
- 从数据源读取数据,常用的数据来源包括以下几种
- 使用aws sdk提供的firehose接口
- putRecord
- putRecordBatch
- Amazon Kinesis Data Stream
- Amazon Kinesis Agent
- 一个java应用,可以安装在服务器上
- 指定需要监控的文件以及目标Kinesis Data Firehose delivery stream
- agent会自动从指定的文件收集数据发送到 Kinesis Data Firehose delivery stream
- CloudWatch Logs
- 创建 CloudWatch Logs subscription,并指定filter逻辑
- 当指定的log event产生时会发送log events到Kinesis Data Firehose
- CloudWatch Events
- 将Kinesis Data Firehose delivery stream作为CloudWatch Event rule的target
- 当event发生时,会将event数据发送到delivery stream
- AWS IoT
- 创建AWS IOT action
- 选择 Send messages to an Amazon Kinesis Firehose stream
- 使用aws sdk提供的firehose接口
- 对数据进行转换(可选)
- 可以利用lambda function对源数据进行转换
- 将数据存储到目标对象,常用的目标对象包括以下几种
- Amazon S3
- Amazon Redshift
- OpenSearch Service
- HTTP Endpoint
- Datadog
- Honeycomb
- Coralogix
- Dynatrace for
- LogicMonitor
- MongoDB Cloud
- New Relic
- Splunk
- Sumo Logic
case1:从AWS Kinesis Data Stream读取数据发送到Http Endpoint
项目地址:aws-kinesis-example/http-desitination at master · JessicaWin/aws-kinesis-example · GitHub
Lambda prodcer & Lambda consumer:nodejs
'use strict'
const AWS = require('aws-sdk');module.exports.sendDataToKinesisDataStream = async (event, context) => {const kinesis = new AWS.Kinesis();var params = {Data: `${JSON.stringify(event)}`,PartitionKey: 'test',StreamName: process.env.STREAM_NAME};let data = await kinesis.putRecord(params).promise();console.log(`${JSON.stringify(data)}`);
}module.exports.consumeDataFromKinesisDataStream = (event, context) => {console.log(`${JSON.stringify(event)}`);const eventBody = JSON.parse(event.body);for (let record of eventBody.records) {const data = JSON.parse(Buffer.from(record.data, 'base64').toString("utf8"));console.log(data);}const response = {};response.requestId = eventBody.requestId;response.timestamp = eventBody.timestamp;context.succeed(response);
}
AWS Resources
service: aws-kinesis-exampleprovider:name: awsregion: ${opt:region, 'ap-southeast-1'}stage: ${opt:stage, 'develop'}stackName: ${self:provider.stage}-${self:service}runtime: nodejs14.xmemorySize: 1024versionFunctions: falseiam:role:name: ${self:provider.stage}_KinesisLambdaRolemanagedPolicies:- arn:aws:iam::aws:policy/AdministratorAccessresources:Parameters:DeliveryStreamName:Type: StringDefault: ${self:provider.stage}-test-kinesis-delivery-streamResources:TestStream:Type: AWS::Kinesis::StreamProperties:Name: ${self:provider.stage}-test-kinesis-data-streamRetentionPeriodHours: 24ShardCount: 1S3DestinationBucket:Type: AWS::S3::BucketProperties:BucketName: !Sub "${self:provider.stage}-test-kinesis-destination"CorsConfiguration:CorsRules:- AllowedHeaders: ["*"]AllowedMethods: [GET, PUT, HEAD, POST, DELETE]AllowedOrigins: ["*"]KinesisDataStreamReadPolicy:Type: AWS::IAM::ManagedPolicyProperties:ManagedPolicyName: !Sub "${self:provider.stage}_KinesisDataStreamReadPolicy"PolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowAction:- kinesis:DescribeStream- kinesis:PutRecord- kinesis:PutRecords- kinesis:GetShardIterator- kinesis:GetRecords- kinesis:DescribeStreamSummary- kinesis:RegisterStreamConsumerResource:- !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${self:provider.stage}-test-kinesis-data-stream"KinesisDataStreamReadRole:Type: AWS::IAM::RoleProperties:RoleName: !Sub "${self:provider.stage}_KinesisDataStreamReadRole"AssumeRolePolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowPrincipal:Service:- firehose.amazonaws.comAction: sts:AssumeRoleManagedPolicyArns:- !Ref KinesisDataStreamReadPolicyFirehoseExecutionS3Policy:Type: AWS::IAM::ManagedPolicyProperties:ManagedPolicyName: !Sub "${self:provider.stage}_FirehoseExecutionS3Policy"PolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowAction:- s3:PutObject- s3:GetObject- s3:ListBucketMultipartUploads- s3:AbortMultipartUpload- s3:PutbucketLogging- s3:PutObjectVersionAcl- s3:PutBucketAcl- s3:PutBucketPolicy- s3:ListBucket- s3:GetBucketLocation- s3:PutObjectAclResource:- !Sub "arn:aws:s3:::${self:provider.stage}-test-kinesis-destination/*"- !Sub "arn:aws:s3:::${self:provider.stage}-test-kinesis-destination"KinesisDataFirehoseDeliveryRole:Type: AWS::IAM::RoleProperties:RoleName: !Sub "${self:provider.stage}_KinesisDataFirehoseDeliveryRole"AssumeRolePolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowPrincipal:Service:- firehose.amazonaws.comAction: sts:AssumeRoleManagedPolicyArns:- !Ref FirehoseExecutionS3PolicyDeliverystream:Type: AWS::KinesisFirehose::DeliveryStreamProperties:DeliveryStreamName: !Ref DeliveryStreamNameDeliveryStreamType: KinesisStreamAsSourceKinesisStreamSourceConfiguration:KinesisStreamARN: !GetAtt TestStream.ArnRoleARN: !GetAtt KinesisDataStreamReadRole.ArnHttpEndpointDestinationConfiguration:EndpointConfiguration:Name: api-gateway-urlUrl:!Join ["",["https://",!Ref HttpApi,".execute-api.",!Ref AWS::Region,".",!Ref AWS::URLSuffix,!Sub "/${self:provider.stage}/kinesis/consumer",],]CloudWatchLoggingOptions:Enabled: trueLogGroupName:!Join ["", [/aws/kinesisfirehose/, !Ref DeliveryStreamName]]LogStreamName: DestinationDeliveryBufferingHints:IntervalInSeconds: 60SizeInMBs: 5RequestConfiguration:ContentEncoding: NONERetryOptions:DurationInSeconds: 60RoleARN: !GetAtt KinesisDataFirehoseDeliveryRole.ArnS3BackupMode: FailedDataOnlyS3Configuration:BucketARN: !GetAtt S3DestinationBucket.ArnRoleARN: !GetAtt KinesisDataFirehoseDeliveryRole.ArnCloudWatchLoggingOptions:Enabled: trueLogGroupName:!Join ["", [/aws/kinesisfirehose/, !Ref DeliveryStreamName]]LogStreamName: BackupDeliveryErrorOutputPrefix: error/functions:KinesisDataStreamProducer:handler: handler.sendDataToKinesisDataStreamname: ${self:provider.stage}-${self:service}-data-producerenvironment:STREAM_NAME: !Ref TestStreamKinesisDataStreamConsumer:handler: handler.consumeDataFromKinesisDataStreamname: ${self:provider.stage}-${self:service}-data-consumerevents:- httpApi:path: /${self:provider.stage}/kinesis/consumermethod: POST
case2:从AWS Kinesis Data Stream读取数据发送到S3
项目地址:
https://github.com/JessicaWin/aws-kinesis-example/tree/master/s3-destination
Lambda prodcer & Lambda consumer:nodejs
'use strict'
const AWS = require('aws-sdk');module.exports.sendDataToKinesisDataStream = async (event, context) => {const kinesis = new AWS.Kinesis();var params = {Data: `${JSON.stringify(event)}`,PartitionKey: 'test',StreamName: process.env.STREAM_NAME};let data = await kinesis.putRecord(params).promise();console.log(`${JSON.stringify(data)}`);
}
AWS Resources
service: aws-kinesis-exampleprovider:name: awsregion: ${opt:region, 'ap-southeast-1'}stage: ${opt:stage, 'develop'}stackName: ${self:provider.stage}-${self:service}runtime: nodejs14.xmemorySize: 1024versionFunctions: falseiam:role:name: ${self:provider.stage}_KinesisLambdaRolemanagedPolicies:- arn:aws:iam::aws:policy/AdministratorAccessresources:Parameters:DeliveryStreamName:Type: StringDefault: ${self:provider.stage}-test-kinesis-delivery-stream-s3Resources:TestStream:Type: AWS::Kinesis::StreamProperties:Name: ${self:provider.stage}-test-kinesis-data-streamRetentionPeriodHours: 24ShardCount: 1S3DestinationBucket:Type: AWS::S3::BucketProperties:BucketName: !Sub "${self:provider.stage}-test-kinesis-destination"CorsConfiguration:CorsRules:- AllowedHeaders: ["*"]AllowedMethods: [GET, PUT, HEAD, POST, DELETE]AllowedOrigins: ["*"]KinesisDataStreamReadPolicy:Type: AWS::IAM::ManagedPolicyProperties:ManagedPolicyName: !Sub "${self:provider.stage}_KinesisDataStreamReadPolicy"PolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowAction:- kinesis:DescribeStream- kinesis:PutRecord- kinesis:PutRecords- kinesis:GetShardIterator- kinesis:GetRecords- kinesis:DescribeStreamSummary- kinesis:RegisterStreamConsumerResource:- !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${self:provider.stage}-test-kinesis-data-stream"KinesisDataStreamReadRole:Type: AWS::IAM::RoleProperties:RoleName: !Sub "${self:provider.stage}_KinesisDataStreamReadRole"AssumeRolePolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowPrincipal:Service:- firehose.amazonaws.comAction: sts:AssumeRoleManagedPolicyArns:- !Ref KinesisDataStreamReadPolicyKinesisDataFirehorseDeliveryPolicy:Type: AWS::IAM::ManagedPolicyProperties:ManagedPolicyName: !Sub "${self:provider.stage}_KinesisDataFirehorseDeliveryPolicy"PolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowAction:- "lambda:*"Resource:- "*"FirehoseExecutionS3Policy:Type: AWS::IAM::ManagedPolicyProperties:ManagedPolicyName: !Sub "${self:provider.stage}_FirehoseExecutionS3Policy"PolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowAction:- s3:PutObject- s3:GetObject- s3:ListBucketMultipartUploads- s3:AbortMultipartUpload- s3:PutbucketLogging- s3:PutObjectVersionAcl- s3:PutBucketAcl- s3:PutBucketPolicy- s3:ListBucket- s3:GetBucketLocation- s3:PutObjectAclResource:- !Sub "arn:aws:s3:::${self:provider.stage}-test-kinesis-destination/*"- !Sub "arn:aws:s3:::${self:provider.stage}-test-kinesis-destination"KinesisDataFirehorseDeliveryRole:Type: AWS::IAM::RoleProperties:RoleName: !Sub "${self:provider.stage}_KinesisDataFirehorseDeliveryRole"AssumeRolePolicyDocument:Version: "2012-10-17"Statement:- Effect: AllowPrincipal:Service:- firehose.amazonaws.comAction: sts:AssumeRoleManagedPolicyArns:- !Ref KinesisDataFirehorseDeliveryPolicy- !Ref FirehoseExecutionS3PolicyDeliverystream:Type: AWS::KinesisFirehose::DeliveryStreamProperties:DeliveryStreamName: !Ref DeliveryStreamNameDeliveryStreamType: KinesisStreamAsSourceKinesisStreamSourceConfiguration:KinesisStreamARN: !GetAtt TestStream.ArnRoleARN: !GetAtt KinesisDataStreamReadRole.ArnS3DestinationConfiguration:BucketARN: !GetAtt S3DestinationBucket.ArnRoleARN: !GetAtt KinesisDataFirehorseDeliveryRole.ArnBufferingHints:IntervalInSeconds: 60SizeInMBs: 5ErrorOutputPrefix: error/Prefix: success/CloudWatchLoggingOptions:Enabled: trueLogGroupName:!Join ["", [/aws/kinesisfirehose/, !Ref DeliveryStreamName]]LogStreamName: DestinationDeliveryfunctions:KinesisDataStreamProducer:handler: handler.sendDataToKinesisDataStreamname: ${self:provider.stage}-${self:service}-data-producerenvironment:STREAM_NAME: !Ref TestStream
AWS实战:AWS Kinesis Data Firehose相关推荐
- aws iam php,AWS实战 - IAM角色的简单使用
介绍 简单来说,IAM角色是一组权限的集合,IAM用户或者AWS服务可以临时性代入这个角色,获得角色所拥有的权限:AWS官方定义如下: IAM角色类似于用户,因为它是一个AWS实体,该实体具有确定其在 ...
- AWS实战:Aurora到Redshift数据同步
什么是Aurora Amazon Aurora是一种基于云且完全托管关系型数据库服务,与MySQL 和 PostgreSQL 数据库兼容,完全托管意味着自动对数据库进行管理,包括管理数据备份.硬件配置 ...
- 使用Kinesis Data Analytics进行实时数据处理
kinesis Part 1 of a multi-part series around analysing Flanders' traffic whilst leveraging the power ...
- R语言data.table导入数据实战:把data.frame数据转化为data.table数据
R语言data.table导入数据实战:把data.frame数据转化为data.table数据 目录 R语言data.table导入数据实战:把data.frame数据转化为data.table数据 ...
- R语言将dataframe宽表转化为长表实战:使用data.table、使用tidyr包gather函数、使用reshape2包
R语言将dataframe宽表转化为长表实战:使用data.table.使用tidyr包gather函数.使用reshape2包 目录
- aws mysql迁移_AWS Data Migration Service-AWS数据库迁移服务-AWS中国区域
AWS Database Migration Service 可帮助您快速并安全地将数据库迁移至 AWS.源数据库在迁移过程中可继续正常运行,从而最大程度地减少依赖该数据库的应用程序的停机时间.AWS ...
- 实战 | AWS自动化启停、镜像备份、补丁作业
新钛云服已为您服务1260天 EC2自动启停 1.打开Systems Manager控制台,然后选择更改管理下面的 维护时段 2.点击 创建维护时段 ① 描述 cron(00 22 ? * * *) ...
- aws创建html网页,AWS: 在AWS上创建一个网站,综合运用(Lambda + Api Gateway + Dynamodb + S3)...
简介 本文将创建一个微型网站,以达到综合运用AWS服务的目的: 1.Dynamodb:一种完全托管的NoSQL数据库服务 2.Lambda:实现具体的业务逻辑,基于python3编写,它会调用dyna ...
- 【AWS】 AWS Free Usage Tier
AWS: Amazon Web Services 主页: http://aws.amazon.com/free/ TREC的微博Track需要下载大量的twitter数据,去问了厦大的林琛老师,老师介 ...
- 如何简单实用AWS的 AWS Educate Starter Account 账号玩转aws云
亚马逊云科技-全球领先的大数据和云计算服务以及云解决方案提供商亚马逊云科技--亚马逊公司旗下云计算服务平台,为全球客户提供整套基础设施和云解决方案.以弹性云服务器.云存储.数据库.机器学习为主的安全. ...
最新文章
- CentOS7安装配置VSFTP
- hdu 2112 HDU Today 最短路(Dijkstra算法)
- sqlalchemy外键和relationship查询
- 【从入门到放弃-Java】并发编程-NIO-Channel
- python制作加密工具_Python制作钉钉加密/解密工具
- java如何画百分比圆环_canvas绘制旋转的圆环百分比进度条
- 不属于jsp构成元素_JSP构成元素-JSP基础
- workbench拓扑优化教程_轻量化及拓扑优化软件GENESIS介绍——可集成到Workbench
- 风控人必知必会的征信知识
- 关于日期控件被模态框遮盖的问题解析
- Windows 7 系统封装文字版 精简教程笔记!
- vue实现微信分享链接 生成卡片
- 第二届亚太应用经济学会博硕士论文研讨会长沙落幕
- python分组统计excel数据_python中excel数据分组处理
- Deepin下安装日文输入法
- Android监听系统输入法键盘弹出显示与隐藏事件
- threw ‘java.lang.NullPointerException‘ exception // toString()
- 键盘可以实现向计算机输入数据判断,计算机应用基础_学习指南.docx
- 服务器里的文件怎么删除
- git进阶 | 01 - git基础操作进阶
热门文章
- 计算机网络路由器的配置连接不上,路由器安装设置好后电脑还是不能上网解决办法...
- 简单的学生网页作业源码 基于web在线餐饮网站的设计与实现——蛋糕甜品店铺(html css javascript)
- iOS从零开始,使用Swift:探索基础框架
- 计算机时间校对更改原因,计算机时间校准方法
- 立方单位换算计算机,立方进率换算(立方进率单位换算表)
- 华为手机2.3亿出货量背后,一场技术+市场的胜利
- 北京药监局考试计算机操作,考科一电脑操作
- powershell免杀思路分析(过某60和某绒)
- 十道解分式方程及答案_10道解分式方程练习题及答案.doc
- python金融衍生品_Python 金融数据分析:单一风险衍生品估值丨数析学院