一、介绍

  1. flume自带的Http Source可以通过Http Post接收事件。

  2. 场景:对于有些应用程序环境,它可能不能部署Flume SDK及其依赖项,或客户端代码倾向于通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

  3. 从客户端的角度看,HTTP SOURCE表现的像web服务器一样能接收flume事件

二、参数

配置参数 默认值 描述
type   http (org.apache.fluem.source.httpSource)
bind   绑定的IP地址或主机名
port   绑定的端口号
enableSSL false  
keystore   使用的keystore文件的路径
keystorePassword   能够进入keystore的密码
handler JSONHandler HTTP SOURCE使用的处理程序类
handler.*   传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入
  1. 为了安全传输,http source也支持SSL,SSL支持的相关例子可以参见我的关于flume之Avro Source博客

  2. Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序需要一个HttpServletRequest和返回一个flume 事件列表。默认是:JSONHandler。

    例如:xxx.handler=com.dxz.flume_demo.source.HTTPSourceXmlHandler

  3. 自定义的handler如果想传入参数,可以使用handler.*配置

    如:xxx.handler.myparam=zhangsan

  4. 如果配置中没有指定处理程序,HTTP SOURCE将使用与Flume绑定的处理程序,即:JSONHandler,它能处理JSON格式的事件。每个事件可以包含包装为数组的几个事件,尽管Source写入的管道可能有限制的事务能力。

    处理程序接受UTF-8,UTF-16,UTF-32编码的JSON格式的数据,并且将它转换成一个列表的事件。

    格式:

    [ { "headers":{"":"","":""
                     },
         "body":"the first event"
       },
       { "headers":{"":"","":""
                     },
         "body":"the second event"
       }
       
    ]

配置文件http_source.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1  a1.sources.r1.type=http
a1.sources.r1.bind=localhost
a1.sources.r1.port=50000
a1.sources.r1.channels=c1  a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1  a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100  

启动:cd到bin目录下执行

flume-ng.cmd agent -conf ../conf -conf-file ../conf/http_source.conf -name a1 -property flume.root.logger=INFO,console

3) 测试:

$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://192.168.1.102:50000

4) 服务器端结果

2.http source handler自定义例子

假定xml请求格式,期望格式如下:

<events><event><headers><header1>value1</header1></headers><body>test</body></event><event><headers><header1>value1</header1></headers><body>test2</body></event></events>

现在要求flume http source可以处理这种请求的xml格式

操作步骤如下:

1)建立maven工程,pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dxz</groupId><artifactId>flume-demo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>flume-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency>  <groupId>org.apache.flume</groupId>  <artifactId>flume-ng-core</artifactId>  <version>1.6.0</version>  <scope>compile</scope>  </dependency>  </dependencies>
</project>

2)开发代码 ,自定义handler类

package com.dxz.flume_demo.source;import com.google.common.base.Preconditions;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.http.HTTPBadRequestException;
import org.apache.flume.source.http.HTTPSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;import javax.servlet.http.HttpServletRequest;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class HTTPSourceXMLHandler implements HTTPSourceHandler {private final String ROOT = "events";private final String EVENT_TAG = "event";private final String HEADERS_TAG = "headers";private final String BODY_TAG = "body";private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";private final String TIMESTAMP_HEADER = "timestamp";private final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();// Document builders are not thread-safe.// So make sure we have one for each thread.private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>();private boolean insertTimestamp;private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {if (docBuilder.get() == null) {docBuilder.set(documentBuilderFactory.newDocumentBuilder());}Document doc;final List<Event> events;try {doc = docBuilder.get().parse(httpServletRequest.getInputStream());Element root = doc.getDocumentElement();root.normalize();// Verify that the root element is "events"
            Preconditions.checkState(ROOT.equalsIgnoreCase(root.getTagName()));NodeList nodes = root.getElementsByTagName(EVENT_TAG);LOG.info("get nodes={}", nodes);int eventCount = nodes.getLength();events = new ArrayList<Event>(eventCount);for (int i = 0; i < eventCount; i++) {Element event = (Element) nodes.item(i);// Get all headers. If there are multiple header sections,// combine them.NodeList headerNodes = event.getElementsByTagName(HEADERS_TAG);Map<String, String> eventHeaders = new HashMap<String, String>();for (int j = 0; j < headerNodes.getLength(); j++) {Node headerNode = headerNodes.item(j);NodeList headers = headerNode.getChildNodes();for (int k = 0; k < headers.getLength(); k++) {Node header = headers.item(k);// Read only element nodesif (header.getNodeType() != Node.ELEMENT_NODE) {continue;}// Make sure a header is inserted only once,// else the event is malformedPreconditions.checkState(!eventHeaders.containsKey(header.getNodeName()),"Header expected only once " + header.getNodeName());eventHeaders.put(header.getNodeName(), header.getTextContent());}}Node body = event.getElementsByTagName(BODY_TAG).item(0);if (insertTimestamp) {eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));}System.out.println("httpServletRequest.getCharacterEncoding()="+httpServletRequest.getCharacterEncoding());System.out.println("body.getTextContent()=" + body.getTextContent());events.add(EventBuilder.withBody(body.getTextContent().getBytes(Charset.defaultCharset()), eventHeaders));}} catch (SAXException ex) {throw new HTTPBadRequestException("Request could not be parsed into valid XML", ex);} catch (Exception ex) {throw new HTTPBadRequestException("Request is not in expected format. " + "Please refer documentation for expected format.", ex);}return events;}public void configure(Context context) {insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP, false);}
}

3)在该工程的flume-demo目录下执行命令mvn package,会将该工程打成jar包,会生产target目录,从中找到flume-demo.jar,将其拷贝到flume的lib目录下

4)flume配置文件:http_source_xml.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1  a1.sources.r1.type=http
a1.sources.r1.bind=localhost
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sources.r1.handler=com.dxz.flume_demo.source.HTTPSourceXMLHandler
a1.sources.r1.insertTimestamp=truea1.sinks.k1.type=logger
a1.sinks.k1.channel=c1  a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

5)启动服务

flume-ng.cmd agent -conf ../conf -conf-file ../conf/http_source_xml.conf -name a1 -property flume.root.logger=INFO,console

6)测试:

7)结果:

转自:

https://blog.csdn.net/liuxiao723846/article/details/63342490

转载于:https://www.cnblogs.com/duanxz/p/9177917.html

flume http source示例讲解相关推荐

  1. sublime c语言如何编译,Sublime Text 3 实现C语言代码的编译和运行(示例讲解)

    Sublime Text 3是一款优秀的代码编辑软件.界面简洁,轻巧快速,很受大家的欢迎. 最近开始用他来编辑数据结构的代码,这就需要在新建编译系统. 具体方法如下: 首先: 接下来是关键的一步,将以 ...

  2. sublime3编程c语言,Sublime Text 3 实现C语言代码的编译和运行(示例讲解)

    Sublime Text 3 实现C语言代码的编译和运行(示例讲解) 发布时间:2020-10-14 12:48:35 来源:脚本之家 阅读:107 作者:jingxian Sublime Text ...

  3. 大数据——Flume组件Source、Channel和Sink具体使用

    Flume组件Source.Channel和Sink使用说明 Flume Sources Avro Source 配置范例 Thrift Source 配置范例 Exec Source 配置范例 JM ...

  4. python自动登录教程_python实现校园网自动登录的示例讲解

    因为最近想用树莓派搞个远程监控系统,又因为学校的网需要从网页登录而树莓派又不方便搞个显示器带着,所以寻思着搞个能够自动登录校园网的脚本程序,省去了每次都要打开浏览器输入账号密码的烦恼. 1.工具 火狐 ...

  5. python自动登录校园网 密码_python实现校园网自动登录的示例讲解

    因为最近想用树莓派搞个远程监控系统,又因为学校的网需要从网页登录而树莓派又不方便搞个显示器带着,所以寻思着搞个能够自动登录校园网的脚本程序,省去了每次都要打开浏览器输入账号密码的烦恼. 1.工具 火狐 ...

  6. php渲染页面简单例子,微信小程序如何渲染html内容(示例讲解)

    本篇文章给大家带来的内容是关于微信小程序如何渲染html内容(示例讲解),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 最近又做了一个新的小程序关于物流订单查询 遇到了一个小问题:数 ...

  7. python校园网登录_python实现校园网自动登录的示例讲解

    python实现校园网自动登录的示例讲解 来源:中文源码网    浏览: 次    日期:2018年9月2日 [下载文档:  python实现校园网自动登录的示例讲解.txt ] (友情提示:右键点上 ...

  8. html兄弟选择器怎么用,CSS的相邻兄弟选择器用法示例讲解

    对于有相同父元素的相邻HTML元素查找便可以使用CSS的相邻兄弟选择器,这里我们就来看一下CSS的相邻兄弟选择器用法简单讲解: 可选择紧接在另一个元素后的元素,且二者有相同的父级元素 下面代码中,it ...

  9. composer php中如何执行,php中composer如何实现类的自动加载(示例讲解)

    本篇文章给大家带来的内容是关于php中composer如何实现类的自动加载(示例讲解),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 我们在开发项目中会经常用到第三方的类库插件,但是 ...

  10. python创建数据库的sql语句_对python插入数据库和生成插入sql的示例讲解

    如下所示: #-*- encoding:utf-8 -*- import csv import sys,os import pymysql def read_csv(filename): ''' 读取 ...

最新文章

  1. markdown编辑器的小建议
  2. 在蓄电池管理系统中计算机应用,汽车电器与电子技术.docx
  3. Oracle Enterprise Manager 11g: Empowering IT to Drive Business Value
  4. 中如何直接使用方法返回的值_java基础-2-方法、面向对象
  5. Juniper 210 密码清不掉_三分钟学会如何找回mysql密码
  6. C#结构体和字节数组的转换
  7. zsh关于.zprofile .zlogin .zshrc .zshenv文件中环境变量的加载
  8. windows server 2008 添加磁盘
  9. Android开发之数据库Sqlite
  10. SQL Server 不允许保存更改的解决方法
  11. Lodash兼容IE6~IE8
  12. FFmpeg+SDL纯语音播放器
  13. css裁剪图片 clip-path
  14. 雷电3接口能干嘛_【分享帖】3块钱一斤的水泥能干嘛?600多万网友看完后都跪了:水泥竟然这么神…...
  15. Mixpanel 可视化ABTest分析 —— iOS篇
  16. 【HAL库系列】0.STM32CubeIDE介绍
  17. vue路由传参的三种方式/含页面刷新参数丢失解决方案(详细)
  18. 拼多多API接口大全
  19. 大学计算机专业找对象,单身率最高的大学专业是什么?这5个专业为什么成脱单最难专业...
  20. UCK商学院《当区块链遇见UCK》人物专访——张伟杰:区块链给90后带来新的机遇

热门文章

  1. php伪随机数 ctf,[GWCTF 2019]枯燥的抽奖
  2. python独一无二的路_独一无二的Python基础学习——可用作面试
  3. 详解Linux防火墙iptables禁IP与解封IP常用命令
  4. scrcpy能显示不能控制
  5. pycharm的todo和fixme标记,标志为今后再做和bug点
  6. 学习python3(一)
  7. 关于int main(int argc,char* argv[])详解
  8. UVA12265-Selling Land(细节处理)
  9. 欧几里德算法(模板)
  10. java内部类之成员内部类实例