# -*- coding: utf-8 -*-

"""

作者:陈龙

日期:2016-7-22

功能:oracle数据库到ES的数据同步

"""

import os

import sys

import datetime, time

# import fcntl

import threading

import pyes  # 引入pyes模块,ES接口

import cx_Oracle  # 引入cx_Oracle模块,Oracle接口

os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'  # 中文编码

reload(sys)  # 默认编码设置为utf-8

sys.setdefaultencoding('utf-8')

# 创建ES连接 并返回连接参数

def connect_ES(addr):

try:

global conn

conn = pyes.ES(addr)  # 链接ES '127.0.0.1:9200'

print 'ES连接成功'

return conn

except:

print 'ES连接错误'

pass

# 创建ES映射mapping 注意各各个字段的类型

def create_ESmapping():

global spiderInfo_mapping, involveVideo_mapping, involveCeefax_mapping,keyWord_mapping,sensitiveWord_mapping

spiderInfo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},

'tableId': {'index': 'not_analyzed', 'type': 'integer'},

'title': {'index': 'analyzed', 'type': 'string'},

'author': {'index': 'not_analyzed', 'type': 'string'},

'content': {'index': 'analyzed', 'type': 'string'},

'publishTime': {'index': 'not_analyzed', 'type': 'string'},

'browseNum': {'index': 'not_analyzed', 'type': 'integer'},

'commentNum': {'index': 'not_analyzed', 'type': 'integer'},

'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 除去涉我部分内容的ES映射结构

involveVideo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},

'tableId': {'index': 'not_analyzed', 'type': 'integer'},

'title': {'index': 'analyzed', 'type': 'string'},

'author': {'index': 'not_analyzed', 'type': 'string'},

'summary': {'index': 'analyzed', 'type': 'string'},

'publishTime': {'index': 'not_analyzed', 'type': 'string'},

'url': {'index': 'not_analyzed', 'type': 'string'},

'imgUrl': {'index': 'not_analyzed', 'type': 'string'},

'ranking': {'index': 'not_analyzed', 'type': 'integer'},

'playNum': {'index': 'not_analyzed', 'type': 'integer'},

'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我视音频内容的ES映射结构

involveCeefax_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},

'tableId': {'index': 'not_analyzed', 'type': 'integer'},

'title': {'index': 'analyzed', 'type': 'string'},

'author': {'index': 'not_analyzed', 'type': 'string'},

'content': {'index': 'analyzed', 'type': 'string'},

'publishTime': {'index': 'not_analyzed', 'type': 'string'},

'keyWords': {'index': 'not_analyzed', 'type': 'string'},

'popularity': {'index': 'not_analyzed', 'type': 'integer'},

'url': {'index': 'not_analyzed', 'type': 'string'},

'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我图文资讯内容的ES映射结构

keyWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},

'keywords':{'index': 'not_analyzed', 'type': 'string'}}

sensitiveWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},

'sensitiveType':{'index': 'not_analyzed', 'type': 'string'},

'sensitiveTopic': {'index': 'not_analyzed', 'type': 'string'},

'sensitiveWords': {'index': 'not_analyzed', 'type': 'string'}}

# 创建ES相关索引和索引下的type

def create_ESindex(ES_index, index_type1,index_type2,index_type3,index_type4,index_type5):

if conn.indices.exists_index(ES_index):

pass

else:

conn.indices.create_index(ES_index)  # 如果所有Str不存在,则创建Str索引

create_ESmapping()

conn.indices.put_mapping(index_type1, {'properties': spiderInfo_mapping},[ES_index])  # 在索引pom下创建spiderInfo的_type  "spiderInfo"

conn.indices.put_mapping(index_type2, {'properties': involveVideo_mapping},[ES_index])  # 在索引pom下创建involveVideo的_type  "involveVideo"

conn.indices.put_mapping(index_type3, {'properties': involveCeefax_mapping},[ES_index])  # 在索引pom下创建involveCeefax的_type  "involveCeefax"

conn.indices.put_mapping(index_type4, {'properties': keyWord_mapping}, [ES_index])

conn.indices.put_mapping(index_type5, {'properties': sensitiveWord_mapping}, [ES_index])

# conn.ensure_index

# 创建数据库连接 并返回连接参数

def connect_Oracle(name, password, address):

try:

global conn1

# conn1 = cx_Oracle.connect('c##chenlong','1234567890','localhost:1521/ORCL') #链接本地数据库

conn1 = cx_Oracle.connect(name, password, address)  # 链接远程数据库 "pom","Bohui@123","172.17.7.118:1521/ORCL"

print 'Oracle连接成功'

return conn1

except:

print 'ES数据同步脚本连接不上数据库,请检查connect参数是否正确,或者模块版本是否匹配'

pass

def fetch_account(accountcode):  # 取两个‘_’之间的账号名称

end = accountcode.find('_')

return accountcode[0:end].strip()

# 根据表的个数创建不同的对象

# 从记录文档中读取各个表的记录ID,判断各个表的ID是否有变化

# 分别读取各个表中的相关数据

# 读取各个表的ID与记录的ID(记录在文本或者数据库中)并判断

"""def read_compare_ID():

global tuple_tableName_IdNum

global cur

tuple_tableName_IdNum = {}

tablename = []

cur = conn1.cursor()

result1 = cur.execute("select * from tabs")  ##执行数据库操作 读取各个表名

row = result1.fetchall()

for x in row:

tablename.append(x[0])  # 将表名取出并赋值给tablename数组

result2 = cur.execute('select {}_ID  from {}'.format(x[0], x[0]))

ID_num = result2.fetchall()

tuple_tableName_IdNum[x[0]] = ID_num"""

def readOracle_writeES(tableName, ES_index, index_type):

global cc

cur = conn1.cursor()

#result_AlltableNames = cur.execute("select * from tabs")

result_latestId = cur.execute("select max({}_Id) from {} ".format(tableName,tableName))

num1 = result_latestId.fetchone() #当前表中的最大ID

print '当前表中的最大ID{}'.format(num1[0])

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName.upper())) #通过数据库表拿到更新的ID tablename 都转化成大写

num2 = result_rememberId.fetchone() #上次记录的更新ID

print '上次记录的更新ID{}'.format(num2[0])

if tableName.upper() == 'T_SOCIAL':

while num2[0]

result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,likeNum,forwardNum,commentNum,accountCode from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  #之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入

aa= (i[5]+i[6])

bb=  (i[7]+i[8])

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),

'content': unicode(i[3]), 'publishTime': str(i[4]), 'browseNum': aa,

'commentNum':bb, 'dataType':fetch_account(i[9])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0] #如果写入成功才赋值

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName,index_type)

if tableName.upper() == 'T_HOTSEARCH':

while num2[0]

result_readOracle = cur.execute("select {}_ID,accountCode,title,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': '','content': '', 'publishTime': str(i[3]), 'browseNum': 0,

'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName, index_type)

if tableName.upper() == 'T_VIDEO_HOT':

while num2[0]

result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName,tableName,tableName,num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),

'content': '', 'publishTime': str(i[4]), 'browseNum': 0,

'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

if tableName.upper() == 'T_PRESS':

while num2[0]

result_readOracle = cur.execute(

"select {}_ID,accountCode,title,Author,PublishDate,Content from {} where {}_ID > {} and rownum<=40 ".format(

tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),

'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': 0,

'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

if tableName.upper() == 'T_INDUSTRY':

while num2[0]

result_readOracle = cur.execute(

"select {}_ID,accountCode,title,Author,PublishTime,Content,BrowseNum from {} where {}_ID > {} and rownum<=40 ".format(

tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),

'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': i[6],

'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True) : # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

if tableName.upper() == 'T_SOCIAL_SITESEARCH':

while num2[0]

result_readOracle = cur.execute('select {}_ID,title,author,content,publishTime,keyWords,browseNum,likeNum,forwardNum,commentNum,url,accountCode from {} where ({}_ID > {})'.format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchmany(50)  #因为数据量太大,超过了变量的内存空间,所以一次性取40条

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

popularity = (i[6] + i[7] + i[8] * 2 + i[9] * 2)

if conn.index(

{'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),

'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(i[5]),

'popularity':popularity,'url': i[10],

'dataType':fetch_account(i[11])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

if tableName.upper() == 'T_REALTIME_NEWS':

while num2[0]

result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

popularity = (i[5] + i[6] * 2)

if conn.index(

{'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),

'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),

'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName, index_type)

if tableName.upper() == 'T_KEY_NEWS':

while num2[0]

result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

popularity = (i[5] + i[6] * 2)

if conn.index(

{'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),

'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),

'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName, index_type)

if tableName.upper() == 'T_LOCAL_NEWS':

while num2[0]

result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

popularity = (i[5] + i[6] * 2)

if conn.index(

{'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),

'content': unicode(i[3]), 'publishTime': str(i[4]), 'keyWords': unicode(''),

'popularity': popularity, 'url': i[8], 'dataType': fetch_account(i[7])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName, index_type)

if tableName.upper() == 'T_VIDEO_SITESEARCH':

while num2[0]

result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime,url,imgUrl,playNum,keyWords from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index(

{

'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]), 'author': unicode(i[3]),

'summary': unicode('0'), 'publishTime': str(i[4]), 'browseNum': i[7],'url':i[5],'imgUrl':i[6],'ranking':0,

'playNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute(

"select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读{}写成功".format(tableName,index_type)

if tableName.upper() == 'T_BASE_KEYWORDS':

while num2[0]

result_readOracle = cur.execute('select {}_ID,keywords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName, num2[0]))

result_tuple1 = result_readOracle.fetchall()  #因为数据量太大,超过了变量的内存空间,所以一次性取40条

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index({'id': i[0], 'keywords': i[1]}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo

cc += 1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

if tableName.upper() == 'T_BASE_SENSITIVEWORDS':

while num2[0]

result_readOracle = cur.execute('select {}_ID,SensitiveType,SensitiveTopic,SensitiveWords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName,num2[0]))

result_tuple1 = result_readOracle.fetchall()  # 因为数据量太大,超过了变量的内存空间,所以一次性取40条

for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?

if conn.index({'id':i[0],

'sensitiveType':unicode(i[1]),

'sensitiveTopic': unicode(i[2]),

'sensitiveWords':unicode(i[3])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo

cc +=1

print 'bulk导入后的ID:{}'.format(i[0])

rememberId = i[0]

cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))

conn1.commit()

result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID

num2 = result_rememberId.fetchone()

print "{}读写成功".format(tableName)

else:

pass

def ww(a):

while True:

print a

time.sleep(0.5)  #用于多线程的一个实验函数

if __name__ == "__main__":

cc = 0

connect_ES('172.17.5.66:9200')

# conn.indices.delete_index('_all')  # 清除所有索引

create_ESindex("pom", "spiderInfo", "involveVideo", "involveCeefax","keyWord","sensitiveWord")

connect_Oracle("pom", "Bohui@123", "172.17.7.118:1521/ORCL")

# thread.start_new_thread(readOracle_writeES,("T_SOCIAL","pom","spiderInfo"),)#创建一个多线程

# thread.start_new_thread(readOracle_writeES,("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"),)#创建一个多线程

mm = time.clock()

readOracle_writeES("T_SOCIAL", "pom", "spiderInfo") #表名虽然在程序中设置了转化为大写,但是还是全大写比较好

readOracle_writeES("T_HOTSEARCH", "pom", "spiderInfo")

readOracle_writeES("T_VIDEO_HOT", "pom", "spiderInfo")

readOracle_writeES("T_PRESS", "pom", "spiderInfo")

readOracle_writeES("T_INDUSTRY", "pom", "spiderInfo")

readOracle_writeES("T_VIDEO_SITESEARCH", "pom", "involveVideo")

readOracle_writeES("T_REALTIME_NEWS", "pom", "involveCeefax")

readOracle_writeES("T_KEY_NEWS", "pom", "involveCeefax")

readOracle_writeES("T_LOCAL_NEWS", "pom", "involveCeefax")

readOracle_writeES("T_SOCIAL_SITESEARCH", "pom", "involveCeefax")

readOracle_writeES("T_BASE_KEYWORDS", "pom", "keyWord")

readOracle_writeES("T_BASE_SENSITIVEWORDS", "pom", "sensitiveWord")

nn = time.clock()

# conn.indices.close_index('pom')

conn1.close()

print '数据写入耗时:{}  成功写入数据{}条'.format(nn-mm,cc)

#实验多线程

"""

while a

conn.index(

{'tableName': 'T_base_account', 'type': '1', 'tableId': '123', 'title': unicode('陈龙'), 'author': 'ABC',

'content': 'ABC', 'publishTime': '12:00:00', 'browseNum': '12', 'commentNum': '12', 'dataType': '1'},

"pom", "spiderInfo", )  # 将数据写入索引pom的spiderInfo

a += 1

print time.ctime()

"""

"""

threads = []

t1 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL","pom","spiderInfo"))

threads.append(t1)

#t3 = threading.Thread(target=ww,args=(10,))

#threads.append(t3)

#t2 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"))

#threads.append(t2)

print time.ctime()

for t in threads:

t.setDaemon(True)

t.start()

t.join()

"""

es和oracle,Oracle和Elasticsearch数据同步相关推荐

  1. 如何实现Oracle数据库之间的数据同步?

    我们都知道,在Oracle数据库的管理与开发工作中,总会存在着一些表数据和基础资料数据,这时需要有效的将这些数据库进行同步合并,有没有什么简单的方法可以实现Oracle数据库之间的数据同步呢?在此诚恺 ...

  2. redis和oracle同步方案,redis与oracle之间怎么实现数据同步?

    redis与oracle之间怎么实现数据同步? 更新时间:2019-03-12 16:02 最满意答案 没有直接同步的方法,这个依赖于你的架构设计. 插入时同步,比如先更新了oracle,再更新red ...

  3. elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...

  4. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    一.为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数 ...

  5. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践 原

    一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数据可以 ...

  6. Oracle到MySQL实时数据同步CloudCanal实战

    简述 CloudCanal 2.1.0.x 版本开始支持 Oracle 作为源端的数据迁移同步能力,目前邀请测试中. 本文通过 Oracle 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力. ...

  7. 使用kafka connect 实现从oracle到kafka的数据同步

    1.登陆Oracle: [oracle@localhost ~]$ lsnrctl status [oracle@localhost ~]$ lsnrctl start [oracle@localho ...

  8. oracle两表同步java代码,利用DBLink+JOB实现两个Oracle数据库之间的数据同步

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 第三步:建立JOB任务,定时同步数据在PL/SQL的command window输入以下语句: begin sys.dbms_job.submit(job ...

  9. 两个oracle数据库外网同步,利用DBLink+JOB实现两个Oracle数据库之间的数据同步

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 第三步:建立JOB任务,定时同步数据在PL/SQL的command window输入以下语句: begin sys.dbms_job.submit(job ...

最新文章

  1. 纪念数学家、系统与控制学家关肇直院士《泛函分析》
  2. java小程序源码_【小程序源码分享】基于Java开发的物业管理系统!
  3. Java ist reverse_charist.js响应
  4. WEBBASE篇: 第五篇, CSS知识3
  5. 机器学习案例——生态系统蒸散速率预测
  6. ssl1746-商务旅行【tarjan,LCA】
  7. gdb调试的基本使用
  8. linux挂接u盘视频,LINUX挂接U盘
  9. 比较牛逼的答题卡扫描算法
  10. sql 如何查询上次的记录_学会SQL并不难,小白学习记录之五(多表查询)
  11. java操作地理位置信息
  12. window7安装sqlserver2000企业版
  13. 修改linux下默认的python版本
  14. 【C语言】指针的算术运算
  15. QQ登录之后自动弹出“QQ网吧”怎么屏蔽?
  16. 这些题你hold住吗?
  17. 每周一喂丨图说WiFi安全
  18. 【Python3】抓取Github吉祥物Octocat昵图并下载到本地
  19. 关于STM32F105/107时钟配置详解
  20. 力天创见智慧商业解决方案

热门文章

  1. ❤️Spring的声明式事务
  2. npm install 时候报错 gifsicle@5.2.0 postinstall: `node lib/install.js`
  3. matlab 贝叶斯信息标准_Matlab中贝叶斯(bayes)分类器实现分类
  4. sql server insert 锁表_SQL Server的insert执行的秘密(下) 带外键的insert分析
  5. Halcon 4点单标相机外参
  6. vue cli 解决跨域 线上 nginx 反向代理配置
  7. LR:Code-29723 Error: Failed to deliver a p2p message from parent to child process, reason。。。
  8. HTML5的新的结构元素介绍
  9. 【转载】Java NIO学习
  10. linux 适配电脑内核,Linux内核实践 如何添加网络协议[三]:实现 -电脑资料