

./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/






curl -OL https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.2.zipcd $ES_HOME
unzip path/to/elasticsearch-1.4.2.zip#安装JDBC插件
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/下载mysql driver
curl -o mysql-connector-java-5.1.33.zip -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.33.zip/from/http://cdn.mysql.com/'
cp mysql-connector-java-5.1.33-bin.jar $ES_HOME/plugins/jdbc/ chmod 644 $ES_HOME/plugins/jdbc/*#启动elasticsearch
curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'



curl -XPUT 'localhost:9200/_river/<rivername>/_meta' -d '{<river parameters>"type" : "jdbc","jdbc" : {<river definition>}


curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select * from orders","index" : "myindex","type" : "mytype",...}


curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{<river parameters>"type" : "jdbc","jdbc" : [ {<river definition 1>}, {<river definition 2>} ]


curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{<river parameters>"concurrency" : 2,"type" : "jdbc","jdbc" : [ {<river definition 1>}, {<river definition 2>} ]


strategy - JDBC插件的策略。当前的实现有simplecolumn

schedule - a single or a list of cron expressions for scheduled execution

threadpoolsize -scheduled executions的线程池大小

interval - 两个river启动的延迟时间

max_bulk_actions - 每个bulk索引请求提交的长度(默认是1000)

max_concurrrent_bulk_requests - bulk请求的并行数量(默认是2*cpu core)

max_bulk_volume - 一个bulk请求的最大容量(默认是10m)

max_request_wait - 一个bulk请求最大的等待时间(默认是60s)

flush_interval - flushing索引文档到bulk action的间隔时间


url - the JDBC driver URL

user - the JDBC database user

password - the JDBC database password

sql - SQL语句。既可以是一个字符串也可以是一个列表。

"sql" : [{"statement" : "select ... from ... where a = ?, b = ?, c = ?","parameter" : [ "value for a", "value for b", "value for c" ]},{"statement" : "insert into  ... where a = ?, b = ?, c = ?","parameter" : [ "value for a", "value for b", "value for c" ],"write" : "true"},{"statement" : ...}

sql.statement - the SQL statement

sql.write - 如果为true,SQL语句解释为一个insert/update语句,这个语句写权限。默认为false

sql.callable - 如果为true,SQL语句解释为一个CallableStatement用于保存存储过程。默认为false

sql.parameter - 绑定参数到SQL语句。可以用到一些指定的值

  • $now - the current timestamp
  • $job - a job counter
  • $count - last number of rows merged
  • $river.name - the river name
  • $last.sql.start - a timestamp value for the time when the last SQL statement started
  • $last.sql.end - a timestamp value for the time when the last SQL statement ended
  • $last.sql.sequence.start - a timestamp value for the time when the last SQL sequence started
  • $last.sql.sequence.end - a timestamp value for the time when the last SQL sequence ended
  • $river.state.started - the timestamp of river start (from river state)
  • $river.state.timestamp - last timestamp of river activity (from river state)
  • $river.state.counter - counter from river state, counts the numbers of runs

locale - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US")

timezone - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values

rounding - rounding mode for parsing numeric values. Possible values "ceiling", "down", "floor", "halfdown", "halfeven", "halfup", "unnecessary", "up"

scale - the precision of parsing numeric values

autocommit - true if each statement should be automatically executed. Default is false

fetchsize - the fetchsize for large result sets, most drivers use this to control the amount of rows in the buffer while iterating through the result set

max_rows - limit the number of rows fetches by a statement, the rest of the rows is ignored

max_retries - the number of retries to (re)connect to a database

max_retries_wait - a time value for the time that should be waited between retries. Default is "30s"


resultset_concurrency - the JDBC result set concurrency, can be CONCUR_READ_ONLY, CONCUR_UPDATABLE. Default is CONCUR_UPDATABLE

ignore_null_values - if NULL values should be ignored when constructing JSON documents. Default is false

prepare_database_metadata - if the driver metadata should be prepared as parameters for acccess by the river. Default is false

prepare_resultset_metadata - if the result set metadata should be prepared as parameters for acccess by the river. Default is false

column_name_map - a map of aliases that should be used as a replacement for column names of the database. Useful for Oracle 30 char column name limit. Default is null

query_timeout - a second value for how long an SQL statement is allowed to be executed before it is considered as lost. Default is 1800

connection_properties - a map for the connection properties for driver connection creation. Default is null

index - the Elasticsearch index used for indexing

type - the Elasticsearch type of the index used for indexing

index_settings - optional settings for the Elasticsearch index

type_mapping - optional mapping for the Elasticsearch index type


{"strategy" : "simple","schedule" : null,"interval" : 0L,"threadpoolsize" : 4,"max_bulk_actions" : 10000,"max_concurrent_bulk_requests" : 2 * available CPU cores,"max_bulk_volume" : "10m","max_request_wait" : "60s","flush_interval" : "5s","jdbc" : {"url" : null,"user" : null,"password" : null,"sql" : null,"locale" : Locale.getDefault().toLanguageTag(),"timezone" : TimeZone.getDefault(),"rounding" : null,"scale" : 2,"autocommit" : false,"fetchsize" : 10, /* MySQL: Integer.MIN */"max_rows" : 0,"max_retries" : 3,"max_retries_wait" : "30s","resultset_type" : "TYPE_FORWARD_ONLY","resultset_concurreny" : "CONCUR_UPDATABLE","ignore_null_values" : false,"prepare_database_metadata" : false,"prepare_resultset_metadata" : false,"column_name_map" : null,"query_timeout" : 1800,"connection_properties" : null,"index" : "jdbc","type" : "jdbc","index_settings" : null,"type_mapping" : null,}



curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select \"relations\" as \"_index\", orders.customer as \"_id\", orders.customer as \"contact.customer\", employees.name as \"contact.employee\" from orders left join employees on employees.department = orders.department"}


mysql> select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee"  from orders left join employees on employees.department = orders.department;
| _index    | _id   | contact.customer | contact.employee |
| relations | Big   | Big              | Smith            |
| relations | Large | Large            | Müller           |
| relations | Large | Large            | Meier            |
| relations | Large | Large            | Schulze          |
| relations | Huge  | Huge             | Müller           |
| relations | Huge  | Huge             | Meier            |
| relations | Huge  | Huge             | Schulze          |
| relations | Good  | Good             | Müller           |
| relations | Good  | Good             | Meier            |
| relations | Good  | Good             | Schulze          |
| relations | Bad   | Bad              | Jones            |
11 rows in set (0.00 sec)


index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}



curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : "select * from orders"}


id=<random> {"product":"Apples","created":null,"department":"American Fruits","quantity":1,"customer":"Big"}
id=<random> {"product":"Bananas","created":null,"department":"German Fruits","quantity":1,"customer":"Large"}
id=<random> {"product":"Oranges","created":null,"department":"German Fruits","quantity":2,"customer":"Huge"}
id=<random> {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
id=<random> {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}



{"type" : "jdbc","jdbc" : {"url" : "jdbc:mysql://localhost:3306/test","user" : "","password" : "","sql" : [{"statement" : "select * from \"products\" where \"mytimestamp\" > ?","parameter" : [ "$river.state.last_active_begin" ]}],"index" : "my_jdbc_river_index","type" : "my_jdbc_river_type"}


