river-jdbc

安装

./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.4.0.8/elasticsearch-river-jdbc-1.4.0.8-plugin.zip

文档

两种方式:river或者feeder

该插件能够以“pull模式”执行river和以“push模式”执行feeder。在feeder模式下插件运行在不同的JVM中,可以连接到远程的Elasticsearch集群。

该插件可以从不同的关系数据库源并行的获取数据。当索引到elasticsearch中时,多线程bulk模式确保了高吞吐。

安装运行river

#安装elasticsearch
curl -OL https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.2.zipcd $ES_HOME
unzip path/to/elasticsearch-1.4.2.zip#安装JDBC插件
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.4.0.6/elasticsearch-river-jdbc-1.4.0.6-plugin.zip#下载mysql driver
curl -o mysql-connector-java-5.1.33.zip -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.33.zip/from/http://cdn.mysql.com/'
cp mysql-connector-java-5.1.33-bin.jar $ES_HOME/plugins/jdbc/ chmod 644 $ES_HOME/plugins/jdbc/*#启动elasticsearch
./bin/elasticsearch#停止river
curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

JDBC插件参数

JDBC插件一般的格式如下:

curl -XPUT 'localhost:9200/_river/<rivername>/_meta' -d '{<river parameters>"type" : "jdbc","jdbc" : {<river definition>}
}'

例如

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select * from orders","index" : "myindex","type" : "mytype",...}
}'

如果一个数组传递给jdbc字段,多个river源也是可以的。

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{<river parameters>"type" : "jdbc","jdbc" : [ {<river definition 1>}, {<river definition 2>} ]
}'

可以通过concurrency参数并行控制多个river源

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{<river parameters>"concurrency" : 2,"type" : "jdbc","jdbc" : [ {<river definition 1>}, {<river definition 2>} ]
}'

jdbc块外部的参数

strategy - JDBC插件的策略。当前的实现有simplecolumn

schedule - a single or a list of cron expressions for scheduled execution

threadpoolsize -scheduled executions的线程池大小

interval - 两个river启动的延迟时间

max_bulk_actions - 每个bulk索引请求提交的长度(默认是1000)

max_concurrrent_bulk_requests - bulk请求的并行数量(默认是2*cpu core)

max_bulk_volume - 一个bulk请求的最大容量(默认是10m)

max_request_wait - 一个bulk请求最大的等待时间(默认是60s)

flush_interval - flushing索引文档到bulk action的间隔时间

jdbc块内部的参数

url - the JDBC driver URL

user - the JDBC database user

password - the JDBC database password

sql - SQL语句。既可以是一个字符串也可以是一个列表。

"sql" : [{"statement" : "select ... from ... where a = ?, b = ?, c = ?","parameter" : [ "value for a", "value for b", "value for c" ]},{"statement" : "insert into  ... where a = ?, b = ?, c = ?","parameter" : [ "value for a", "value for b", "value for c" ],"write" : "true"},{"statement" : ...}
]

sql.statement - the SQL statement

sql.write - 如果为true,SQL语句解释为一个insert/update语句,这个语句写权限。默认为false

sql.callable - 如果为true,SQL语句解释为一个CallableStatement用于保存存储过程。默认为false

sql.parameter - 绑定参数到SQL语句。可以用到一些指定的值

  • $now - the current timestamp
  • $job - a job counter
  • $count - last number of rows merged
  • $river.name - the river name
  • $last.sql.start - a timestamp value for the time when the last SQL statement started
  • $last.sql.end - a timestamp value for the time when the last SQL statement ended
  • $last.sql.sequence.start - a timestamp value for the time when the last SQL sequence started
  • $last.sql.sequence.end - a timestamp value for the time when the last SQL sequence ended
  • $river.state.started - the timestamp of river start (from river state)
  • $river.state.timestamp - last timestamp of river activity (from river state)
  • $river.state.counter - counter from river state, counts the numbers of runs

locale - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US")

timezone - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values

rounding - rounding mode for parsing numeric values. Possible values "ceiling", "down", "floor", "halfdown", "halfeven", "halfup", "unnecessary", "up"

scale - the precision of parsing numeric values

autocommit - true if each statement should be automatically executed. Default is false

fetchsize - the fetchsize for large result sets, most drivers use this to control the amount of rows in the buffer while iterating through the result set

max_rows - limit the number of rows fetches by a statement, the rest of the rows is ignored

max_retries - the number of retries to (re)connect to a database

max_retries_wait - a time value for the time that should be waited between retries. Default is "30s"

resultset_type - the JDBC result set type, can be TYPE_FORWARD_ONLY, TYPE_SCROLL_SENSITIVE, TYPE_SCROLL_INSENSITIVE. Default is TYPE_FORWARD_ONLY

resultset_concurrency - the JDBC result set concurrency, can be CONCUR_READ_ONLY, CONCUR_UPDATABLE. Default is CONCUR_UPDATABLE

ignore_null_values - if NULL values should be ignored when constructing JSON documents. Default is false

prepare_database_metadata - if the driver metadata should be prepared as parameters for acccess by the river. Default is false

prepare_resultset_metadata - if the result set metadata should be prepared as parameters for acccess by the river. Default is false

column_name_map - a map of aliases that should be used as a replacement for column names of the database. Useful for Oracle 30 char column name limit. Default is null

query_timeout - a second value for how long an SQL statement is allowed to be executed before it is considered as lost. Default is 1800

connection_properties - a map for the connection properties for driver connection creation. Default is null

index - the Elasticsearch index used for indexing

type - the Elasticsearch type of the index used for indexing

index_settings - optional settings for the Elasticsearch index

type_mapping - optional mapping for the Elasticsearch index type

默认的参数设置

{"strategy" : "simple","schedule" : null,"interval" : 0L,"threadpoolsize" : 4,"max_bulk_actions" : 10000,"max_concurrent_bulk_requests" : 2 * available CPU cores,"max_bulk_volume" : "10m","max_request_wait" : "60s","flush_interval" : "5s","jdbc" : {"url" : null,"user" : null,"password" : null,"sql" : null,"locale" : Locale.getDefault().toLanguageTag(),"timezone" : TimeZone.getDefault(),"rounding" : null,"scale" : 2,"autocommit" : false,"fetchsize" : 10, /* MySQL: Integer.MIN */"max_rows" : 0,"max_retries" : 3,"max_retries_wait" : "30s","resultset_type" : "TYPE_FORWARD_ONLY","resultset_concurreny" : "CONCUR_UPDATABLE","ignore_null_values" : false,"prepare_database_metadata" : false,"prepare_resultset_metadata" : false,"column_name_map" : null,"query_timeout" : 1800,"connection_properties" : null,"index" : "jdbc","type" : "jdbc","index_settings" : null,"type_mapping" : null,}
}

结构化对象

SQL查询的一个优势是连接操作。从许多表获得数据形成新的元组。

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select \"relations\" as \"_index\", orders.customer as \"_id\", orders.customer as \"contact.customer\", employees.name as \"contact.employee\" from orders left join employees on employees.department = orders.department"}
}'

sql结构是

mysql> select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee"  from orders left join employees on employees.department = orders.department;
+-----------+-------+------------------+------------------+
| _index    | _id   | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big   | Big              | Smith            |
| relations | Large | Large            | Müller           |
| relations | Large | Large            | Meier            |
| relations | Large | Large            | Schulze          |
| relations | Huge  | Huge             | Müller           |
| relations | Huge  | Huge             | Meier            |
| relations | Huge  | Huge             | Schulze          |
| relations | Good  | Good             | Müller           |
| relations | Good  | Good             | Meier            |
| relations | Good  | Good             | Schulze          |
| relations | Bad   | Bad              | Jones            |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

得到的JSON对象为

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

怎样获取一个表

它dump一个表到Elasticsearch中。如果没有给定_id列,IDs将会自动生成。

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select * from orders"}
}'

结果是:

id=<random> {"product":"Apples","created":null,"department":"American Fruits","quantity":1,"customer":"Big"}
id=<random> {"product":"Bananas","created":null,"department":"German Fruits","quantity":1,"customer":"Large"}
id=<random> {"product":"Oranges","created":null,"department":"German Fruits","quantity":2,"customer":"Huge"}
id=<random> {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
id=<random> {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}

怎样获得增量的数据

推荐使用时间戳来同步。下面的例子获取最后一次river运行之后添加的所有产品行。

{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : [{"statement" : "select * from \"products\" where \"mytimestamp\" > ?","parameter" : [ "$river.state.last_active_begin" ]}],"index" : "my_jdbc_river_index","type" : "my_jdbc_river_type"}
}

转载于:https://www.cnblogs.com/bmaker/p/5472444.html

elasticsearch 中文API river相关推荐

  1. Elasticsearch Java API 很全的整理以及架构剖析

    Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种.相比来说transportClient API效率更高, ...

  2. Android 中文API (94) —— MediaController

    前言 本章内容是android.widget.MediaController,版本为Android 2.3 r1,翻译来自"唐明",再次感谢"唐明" !期待你一 ...

  3. php elasticsearch ik,elasticsearch 中文分词(elasticsearch-analysis-ik)安装

    elasticsearch 中文分词(elasticsearch-analysis-ik)安装 在elasticsearch的plugins目录下,创建ik目录 cd /usr/local/elast ...

  4. Android 中文 API 文档 (45) —— AbsoluteLayout.LayoutParams

    前言 本章内容是 android.widget.AbsoluteLayout.LayoutParams,版本为Android 2.2 r1,翻译来自"绵白糖",再次感谢" ...

  5. Elasticsearch Java API 6.2(java client)

    前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器,要么未来返回). 此外,客户端 ...

  6. Android 中文API (92) —— MenuInflater

    前言 本章内容是android.view.MenuInflater,版本为Android 2.3 r1,翻译来自"獨鍆躌踄",欢迎大家访问他的博客:http://www.cnblo ...

  7. Android 中文 API (90) —— WindowManager

    一.结构 public interface WindowManager extends android.view.ViewManager android.view.WindowManager 二.概述 ...

  8. Android 中文API (70) —— BluetoothDevice[蓝牙]

    前言 本章内容是 android.bluetooth.BluetoothDevice,为Android蓝牙部分的章节翻译.蓝牙设备类,代表了蓝牙通讯国足中的远端设备.版本为 Android 2.3 r ...

  9. jQuery验证控件jquery.validate.js使用说明+中文API

    官网地址:http://bassistance.de/jquery-plugins/jquery-plugin-validation jQuery plugin: Validation 使用说明 转载 ...

最新文章

  1. java 打电话_第四十二篇----拨打电话
  2. java sqlhelper_java版sqlhelper(转)
  3. Java查找数组重复元素,并打印重复元素、重复次数、重复元素位置
  4. js实现禁止右键 禁止f12 查看源代码
  5. myeclipse安装svn插件的多种方式
  6. Java Date 日期 时间 相关方法
  7. 16、mybatis动态sql 批量插入
  8. mysql编写函数 求1 n 偶数之和,编写求1 2 3 - n的函数.在main函数中调用该函数
  9. [翻译] 使用 Visual Studio 2019 来提高每个开发人员的工作效率
  10. 跟我学 Java 8 新特性之 Stream 流(三)缩减操作
  11. 前端学习(2262)vue造轮子框架搭建
  12. 【阿里云MVP月度分享】SaaS服务商如何通过数加平台统计业务流量
  13. web developer tips (65): 快速创建一个挂接SQL表的GridView
  14. TS Introduction(介绍)
  15. B样条曲线介绍和实现(等值线平滑)
  16. win10怎么设置默认浏览器_vscode如何设置默认打开的浏览器为Chrome?
  17. vmos安卓虚拟手机系统x86_VMOSPro下载-VMOSPro下载v 1.1.26 安卓版-西西软件下载
  18. 原生 js 轮播图(8)
  19. oracle 10g rac 停止,Oracle10g RAC 关闭及启动
  20. 动态获取数据表或临时表列名

热门文章

  1. vulnstack7 writeup
  2. POJ2429 GCDLCM Inverse(整数分解,由GCD+LCM求a,b)
  3. 管理经济分析04:经典博弈论问题及举例
  4. 速读《现代软件工程----构建之法》有感
  5. Ubuntu20.04美化桌面 dock栏居中
  6. Windows注册表中保存的信息及环境变量中SystemRoot在注册表中的位置
  7. ZoneAlarm Security Suite 2009注册机及注册方法
  8. pat-1069 The Black Hole of Numbers (20分)
  9. 入门ROS机器人操作系统——准备工作
  10. python做聚类分析_Python聚类分析-摩拜用户群分类