一、需求分析

注意: 本环境使用 elasticsearch 7.0版本开发,切勿低于此版本

mysql 表结构

有一张表,记录的数据特别的多,需要将7天前的记录,插入到Elasticsearch中,并删除原有表7天前的记录。

表结构如下:

CREATE TABLE `historic_records` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`user_id` varchar(50) NOT NULL DEFAULT '' COMMENT '用户id',`time` bigint(20) NOT NULL DEFAULT '0' COMMENT '上线/下线时间',`create_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '创建时间',`update_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '更新时间',`online_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '在线状态 默认1 0 离线 1 在线',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '软删除标志:0-已删除;1-正常',PRIMARY KEY (`id`),KEY `user_id` (`user_id`),KEY `order_index` (`time`,`create_time`,`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='历史记录表';

View Code

查询sql:

select * from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

删除sql:

delete from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

ES中的一些概念

index(索引)

相当于mysql中的数据库

type(类型)

相当于mysql中的一张表

document(文档)

相当于mysql中的一行(一条记录)

field(域)

相当于mysql中的一列(一个字段)

节点

一个服务器,由一个名字来标识

集群

一个或多个节点组织在一起

分片

将一份数据划分为多小份的能力,允许水平分割和扩展容量。多个分片可以响应请求,提高性能和吞吐量。

副本

复制数据,一个节点出问题时,其余节点可以顶上。

倒排索引

可参考 https://www.elastic.co/guide/cn/elasticsearch/guide/current/inverted-index.html

es数据结构

设定映射,规定好各个字段及其数据类型,便于es更好地进行管理。根据mysql表结构,映射如下:

# 创建映射
_index_mappings = {"settings": {"index": {"number_of_shards": 3,"number_of_replicas": 1}},"mappings": {# self.index_type : {"properties": {"id": {"type": "long"},"loid": {"type": "keyword"},"mac": {"type": "keyword"},"time": {"type": "date","format": "epoch_millis"},"create_time": {"type": "date","format": "epoch_millis"},"update_time": {"type": "date","format": "epoch_millis"},"online_status": {"type": "short"},"status": {"type": "short"}}# }
    }
}

View Code

解释:

索引设置,都在 settings{...} 中

number_of_shards
每个索引的主分片数,默认值是 5 。这个配置在索引创建后不能修改。

number_of_replicas
每个主分片的副本数,默认值是 1 。对于活动的索引库,这个配置可以随时修改。

映射配置,都在mappings{...} 中

属性设置,都在 properties{...} 中

Elasticsearch 支持 如下简单域类型:

  • 字符串: string
  • 整数 : byteshortintegerlong
  • 浮点数: floatdouble
  • 布尔型: boolean
  • 日期: date

仔细看上面的mysql 表结构

由于 id 的类型是 bigint(20),那么在es中就是 long,表示长整形。

user_id 的类型是 varchar(50) ,在es中,有2中,分别是 text和 keyword。

这2种,是有区别的。text 会创建全文索引,支持模糊搜索。而keyword则不会,必须精确搜索才行。

由于 user_id不需要模糊搜索,因此 设置 keyword才是合理的。

create_time 虽然类型是 bigint(20),但是它存储在mysql里面,表示时间戳。

因此es中就是data,时间格式为:epoch_millis,表示微秒时间戳。

online_status 的类型是tinyint(1),在es中是 short,表示短的数字

三、elasticsearch和kibana搭建

elasticsearch

新建目录elasticsearch

mkdir /opt/elasticsearch-7.1.1

目录结构如下:

./
├── dockerfile
├── elasticsearch-7.1.1-amd64.deb
├── run.sh
└── sources.list

dockerfile

FROM ubuntu:16.04
# 修改更新源为阿里云
ADD sources.list /etc/apt/sources.list
ADD elasticsearch-7.1.1-amd64.deb ./
# 安装jdk和elasticsearch
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && dpkg -i elasticsearch-7.1.1-amd64.deb && rm -rf elasticsearch-7.1.1-amd64.deb
EXPOSE 9200
# 添加启动脚本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]

View Code

run.sh

#!/bin/bash
set -e# 添加时区
TZ=Asia/Shanghai
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone# 覆盖配置文件
cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
echo "transport.host: localhost
transport.tcp.port: 9300
http.port: 9200
network.host: 0.0.0.0" >> /etc/elasticsearch/elasticsearch.yml

# 修改启动文件,去掉-d参数,避免后台运行
sed -i 72's@-d -p $PID_FILE@-p $PID_FILE@g' /etc/init.d/elasticsearch# 启动elasticsearch,要hold住,否则容器启动就退出了!
/etc/init.d/elasticsearch start

View Code

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main restricted
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main restricted
deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb http://mirrors.aliyun.com/ubuntu/ xenial multiverse
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates multiverse
deb http://mirrors.aliyun.com/ubuntu/ xenial-backports main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu xenial-security main restricted
deb http://mirrors.aliyun.com/ubuntu xenial-security universe
deb http://mirrors.aliyun.com/ubuntu xenial-security multiverse

View Code

生成镜像

docker build -t elasticsearch-7.1.1 .

启动容器

docker run -d -it --restart=always -p 9200:9200 elasticsearch-7.1.1

访问页面

kibana

新建目录kibana

mkdir /opt/kibana-7.1.1

目录结构如下:

./
├── dockerfile
├── kibana-7.1.1-amd64.deb
└── run.sh

dockerfile

FROM ubuntu:16.04
ADD kibana-7.1.1-amd64.deb ./
# 安装jdk和elasticsearch
RUN dpkg -i kibana-7.1.1-amd64.deb && rm -rf kibana-7.1.1-amd64.deb
EXPOSE 5601
# 添加启动脚本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]

View Code

run.sh

#!/bin/bash# 添加时区
TZ=Asia/Shanghai
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone#elasticsearch="192.168.91.128"
if [ -z $elasticsearch ];thenecho "elasticsearch参数为空!比如: 192.168.91.128"exit
fi# 修改配置文件
# 修改监听地址
sed -i '7s@#server.host: "localhost"@server.host: "0.0.0.0"@g' /etc/kibana/kibana.yml
# 删除行,并添加一行内容
sed -i '28d' /etc/kibana/kibana.yml
sed -i "N;28 i elasticsearch.hosts: ["http://$elasticsearch:9200"]" /etc/kibana/kibana.yml

# 启动
/usr/share/kibana/bin/kibana "-c /etc/kibana/kibana.yml"

View Code

生成镜像

docker build -t kibana-7.1.1 .

启动镜像

docker run -d -it --restart=always -p 5601:5601 -e elasticsearch=192.168.10.104 kibana-7.1.1

访问页面

二、查询mysql数据

为了方便操作 mysql,封装了一个mysql工具类,用来查询和更新数据。

mysql.py

#!/usr/bin/env python3
# coding: utf-8import pymysqlclass Mysql(object):# mysql 端口号,注意:必须是int类型def __init__(self,host,user,passwd,db_name,port=3306):self.host = hostself.user = userself.passwd = passwdself.db_name = db_nameself.port = portdef select(self,sql):"""执行sql命令:param sql: 命令:return: 元祖"""try:conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,port=self.port,database=self.db_name,charset='utf8',cursorclass=pymysql.cursors.DictCursor)cur = conn.cursor()  # 创建游标cur.execute(sql)  # 执行sql命令res = cur.fetchall()  # 获取执行的返回结果
            cur.close()conn.close()  # 关闭mysql 连接return resexcept Exception as e:print(e)return Falsedef update(self,sql):"""更新操作,比如insert, delete,update:param sql: sql命令:return: bool"""try:conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,port=self.port,database=self.db_name,)cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 创建游标# conn.cursor()# print("ip: {} insert 执行命令: {}".format(self.host,sql))sta = cur.execute(sql)  # 执行sql命令,返回影响的行数# print("sta",sta,type(sta))#res = cur.fetchall()  # 获取执行的返回结果if isinstance(sta,int):  # 判断返回结果, 是数字就是正常的#print('插入记录 Done')pass# write_log('正常,远程执行sql: %s 成功'%sql, "green")else:write_log('错误,远程执行sql: %s 失败'%sql, "red")return Falseconn.commit()  # 主动提交,否则执行sql不生效
            cur.close()conn.close()  # 关闭mysql 连接return staexcept Exception as e:print(e)# write_log('错误,远程mysql执行命令: {} 异常'.format(sql), "red")return False

View Code

使用时,就简单了。导入这个类,调用相关方法。

mysql_test.py

from mysql import Mysqlhost = "192.168.0.179"
user = "sdn_db"
passwd = "Sdn@ujmyhn"
db_name = "terminalservice"
port = 3306sql = "select * from terminals_record_0 where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000"
res = Mysql(host,user,passwd,db_name,port).select(sql)
print(res)

View Code

三、完整代码

由于时间关系,代码不一一解释了。附上完整代码:

./
├── conf.py
├── es_bulk.py
├── README.md
├── requirements.txt
└── utils├── common.py└── mysql.py

conf.py

#!/usr/bin/env python3
# coding: utf-8
"""
配置文件,用于mysql和elasticsearch
"""
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # 项目根目录# mysql
HOST = "192.168.0.136"
USER = "root"
PASSWD = "123456"
DB_NAME = "terminal"
PORT = 3306# elasticsearch
INDEX_NAME = "historic_records"
INDEX_TYPE = "_doc"
ES_IP = "192.169.3.133"MAXIMUM = 100  # 一次性插入多少条

View Code

es_bulk.py

#!/usr/bin/env python3
# coding: utf-8import time
from elasticsearch import Elasticsearch
from elasticsearch import helpersimport conf
from utils.mysql import Mysql
from utils.common import write_log,valid_ip,check_tcpclass ElasticObj:def __init__(self,timeout=3600):''':param timeout: 超时时间'''self.index_name = conf.INDEX_NAME  # 索引名称self.index_type = conf.INDEX_TYPE  # 索引类型self.es_ip = conf.ES_IP  # es ip# 无用户名密码状态self.es = Elasticsearch([self.es_ip], port=9200, timeout=timeout)# 用户名密码状态# self.es = Elasticsearch([self.es_ip], http_auth=('esadm', 'mdase123'), port=9200, timeout=timeout)def create_index(self):'''创建索引:return: bool'''# 创建映射_index_mappings = {# 索引配置"settings": {"index": {"number_of_shards": 3,  # 分片数"number_of_replicas": 1  # 副本数
                }},# 设置字段"mappings": {"properties": {"id": {"type": "long"},"loid": {"type": "keyword"},"mac": {"type": "keyword"},"time": {"type": "date","format": "epoch_millis"},"create_time": {"type": "date","format": "epoch_millis"},"update_time": {"type": "date","format": "epoch_millis"},"online_status": {"type": "short"},"status": {"type": "short"}}}}# 判断索引不存在时if self.es.indices.exists(index=self.index_name) is not True:# 创建索引res = self.es.indices.create(index=self.index_name, body=_index_mappings)# print(res)if not res:write_log("错误,创建索引{}失败".format(self.index_name),"red")return Falsewrite_log("正常,创建索引{}成功".format(self.index_name), "green")return Trueelse:write_log("正常,索引{}已存在".format(self.index_name), "green")return Truedef bulk_insert(self,table,data_list):"""批量写入数据:param table: 表名:param data_list: 数据列表[{'online_status': 1,'update_time': 1556073035327,'create_time': 1556073035327,'id': 1, 'status': 1,'time': 1556073035327,'loid': '100010000123','mac': '60:45:cb:87:c9:93'},...]:return: bool"""# 批量插入start_time = time.time()  # 开始时间actions = []  # 临时数据列表i = 0  # 计数值try:# 循环数据列表for data in data_list:action = {"_index": self.index_name,"_type": self.index_type,#"_id": i,  #_id 也可以默认生成,不赋值"_source": {'id': data['id'],'user_id': data['user_id'],'time': data['time'],'create_time': data['create_time'],'online_status': data['online_status'],'status': data['status'],}}i += 1actions.append(action)  # 添加到列表if len(action) == conf.MAXIMUM:  # 列表数量达到100时helpers.bulk(self.es, actions)  # 批量插入数据del actions[0:len(action)]  # 删除列表元素if i > 0:  # 不足100时,插入剩余数据
                helpers.bulk(self.es, actions)end_time = time.time()  # 结束时间t = round((end_time - start_time),2)  # 计算耗时# print('本次共写入{}条数据,用时{}s'.format(i, t))write_log("正常,{} 表写入ES {}条数据,用时{}s".format(table,i, t), "green")return Trueexcept Exception as e:print(e)return Falsedef has_table(self,db_name,target_table):"""远程表是否存在:return: bool"""mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)sql = "select count(1) from {}.{}".format(db_name, target_table)res = mysql_obj.select(sql)# print("表是否存在",res,type(res))if res is False:write_log("错误,远程表 {}.{} 不存在".format(db_name,target_table),"red")return Falseelse:return Truedef has_conf(self):"""判断配置文件中的mysql和es 端口是否正常:return:"""if not valid_ip(conf.HOST):write_log("错误,MySQL IP配置不正确","red")return Falseif not valid_ip(conf.ES_IP):write_log("错误,ES IP配置不正确","red")return Falseif not check_tcp(conf.HOST,conf.PORT):write_log("错误,MySQL {} 端口不可达".format(conf.PORT),"red")return Falseif not check_tcp(conf.ES_IP,9200):write_log("错误,ES 9200 端口不可达","red")return Falsereturn Truedef read_mysql_es(self):"""读取7天的记录,并写入es:return: bool"""# 判断配置文件中的mysql和es 端口是否正常if not self.has_conf():# print(1)return False# 创建索引if self.create_index() is False:# print(2)return Falsemax = conf.MAXIMUM  # 一次性查询多少条
        flag_list = []  # 标志位列表mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)for i in range(64):  # 写入64张表# 判断表是否存在res = self.has_table(conf.DB_NAME,'historic_record_%s'%i)if not res:flag_list.append(False)return Falseid = 0  # 每一次查询后的最大idwhile True:# 查询数据sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (i, id, max)# print(sql)data_list = mysql_obj.select(sql)# print(data_list)if not data_list:  # 当结果为空时,结束循环write_log("警告,执行sql: %s 记录为空,无需写入es" %(sql), "yellow")break  # 跳出循环
last_row = data_list[-1]  # 最后一行记录# print(last_row)id = last_row['id']  # 修改最大id
res = self.bulk_insert('historic_record_%s' % i, data_list)if not res:write_log("错误,historic_record_%s 写入ES 失败"%i,"red")flag_list.append(False)return Falseif False in flag_list:write_log("错误,historic_record 部分表写入ES错误,请查看上文","red")return Falsewrite_log("正常,historic_record 64张表全部写入ES成功", "green")return Truedef delete_record(self):"""删除7天的表数据:return: bool"""max = conf.MAXIMUM  # 一次性查询多少条flag_list = []mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)for i in range(64):  # 64张表# 判断表是否存在res = self.has_table(conf.DB_NAME, 'historic_record_%s' % i)if not res:flag_list.append(False)return False### 先查询数据id = 0  # 每一次查询后的最大idwhile True:# 查询数据sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (i, id, max)# print(sql)data_list = mysql_obj.select(sql)# print(data_list)if not data_list:  # 当结果为空时,结束循环write_log("警告,执行sql: %s 记录为空,无需删除" % sql, "yellow")break  # 跳出循环### 再删除数据sql = "delete from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (i, id, max)# print(sql)res = mysql_obj.update(sql)if res is False:write_log("错误,删除 historic_record_%s 记录失败" % i, "red")flag_list.append(False)breakelse:write_log("正常,删除 historic_record_%s 记录成功" % i, "green")last_row = data_list[-1]  # 最后一行记录# print(last_row)id = last_row['id']  # 修改最大idif False in flag_list:write_log("错误,删除 historic_record 部分表失败,请查看上文", "red")return Falsewrite_log("正常,删除 historic_record 64张表记录全部成功", "green")def main(self):self.read_mysql_es()self.delete_record()ElasticObj().main()  # 执行主程序

View Code

common.py

#!/usr/bin/env python3
# coding: utf-8
"""
共有的方法
"""import sys
import iodef setup_io():  # 设置默认屏幕输出为utf-8编码sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()import os
import time
import conf
import socket
import subprocess
import ipaddress
from multiprocessing import cpu_countdef write_log(content,colour='white',skip=False):"""写入日志文件:param content: 写入内容:param colour: 颜色:param skip: 是否跳过打印时间:return:"""# 颜色代码colour_dict = {'red': 31,  # 红色'green': 32,  # 绿色'yellow': 33,  # 黄色'blue': 34,  # 蓝色'purple_red': 35,  # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37,  # 白色
    }choice = colour_dict.get(colour)  # 选择颜色
path = os.path.join(conf.BASE_DIR,"output.log") # 日志文件with open(path, mode='a+', encoding='utf-8') as f:if skip is False:  # 不跳过打印时间时content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + contentinfo = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)f.write(content+"\n")def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)# print(p)# timeout = 1  # 超时时间t_beginning = time.time()  # 开始时间# seconds_passed = 0  # 执行时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif timeout and seconds_passed > timeout:p.terminate()# raise TimeoutError(cmd, timeout)if not skip:# self.res.code = 500# print('命令: {},执行超时!'.format(cmd))write_log('错误, 命令: {},本地执行超时!'.format(cmd),"red")# return self.res.__dict__return False# return '命令: {},执行超时!'.format(cmd)# result = p.stdout.read().decode('utf-8').strip()  # 命令运行结果# print("result",result)# self.res.data = result# return self.res.__dict__result = p.stdout.readlines()return resultdef valid_ip(ip):"""验证ip是否有效,比如192.168.1.256是一个不存在的ip:return: bool"""try:# 判断 python 版本if sys.version_info[0] == 2:ipaddress.ip_address(ip.strip().decode("utf-8"))elif sys.version_info[0] == 3:# ipaddress.ip_address(bytes(ip.strip().encode("utf-8")))
            ipaddress.ip_address(ip)return Trueexcept Exception as e:print(e)return Falsedef check_tcp(ip, port, timeout=1):"""检测tcp端口:param ip: ip地址:param port: 端口号:param timeout: 超时时间:return: bool"""flag = Falsetry:socket.setdefaulttimeout(timeout)  # 整个socket层设置超时时间cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)address = (str(ip), int(port))status = cs.connect_ex((address))  # 开始连接
        cs.settimeout(timeout)if not status:flag = Truereturn flagexcept Exception as e:print(e)return flagCOROUTINE_NUMBER = cpu_count()  # 协程池数量,根据cpu核心数来开,避免cpu飙高

View Code

mysql.py

#!/usr/bin/env python3
# coding: utf-8import pymysql
from utils.common import write_logclass Mysql(object):# mysql 端口号,注意:必须是int类型def __init__(self,host,user,passwd,db_name,port=3306):self.host = hostself.user = userself.passwd = passwdself.db_name = db_nameself.port = portdef select(self,sql):"""执行sql命令:param sql: 命令:return: 元祖"""try:# print(host,self.user,self.passwd,self.port,self.db_name)conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,port=self.port,database=self.db_name,charset='utf8',cursorclass=pymysql.cursors.DictCursor)cur = conn.cursor()  # 创建游标# conn.cursor()cur.execute(sql)  # 执行sql命令res = cur.fetchall()  # 获取执行的返回结果
            cur.close()conn.close()  # 关闭mysql 连接return resexcept Exception as e:print(e)return Falsedef update(self,sql):"""更新操作,比如insert, delete,update:param sql: sql命令:return: bool"""try:conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,port=self.port,database=self.db_name,)cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 创建游标# conn.cursor()# print("ip: {} insert 执行命令: {}".format(self.host,sql))sta = cur.execute(sql)  # 执行sql命令,返回影响的行数# print("sta",sta,type(sta))#res = cur.fetchall()  # 获取执行的返回结果if isinstance(sta,int):  # 判断返回结果, 是数字就是正常的#print('插入记录 Done')pass# write_log('正常,远程执行sql: %s 成功'%sql, "green")else:write_log('错误,远程执行sql: %s 失败'%sql, "red")return Falseconn.commit()  # 主动提交,否则执行sql不生效
            cur.close()conn.close()  # 关闭mysql 连接#Migration.flag_list.append(True)return staexcept Exception as e:print(e)# write_log('错误,远程mysql执行命令: {} 异常'.format(sql), "red")# Migration.flag_list.append(False)return False

View Code

requirements.txt

PyMySQL==0.9.2
elasticsearch==6.3.1

README.md

## 说明
终端历史记录表,写入到elasticsearch中。主要将(terminal.historic_record_0~63) 这64张表的7天前数据写入到elasticsearch中并删除 64张表的7天前记录`注意: 本环境使用 elasticsearch 7.0版本开发,切勿低于此版本`## 配置说明
`conf.py` 是环境配置主要修改 以下信息
```python
# mysql
HOST = "192.168.0.136"
USER = "root"
PASSWD = "123456"
DB_NAME = "terminal"
PORT = 3306# elasticsearch
INDEX_NAME = "historic_record"
INDEX_TYPE = "_doc"
ES_IP = "192.169.3.133"
```请根据实际情况修改以上变量## 运行说明
## 一键执行,迁移相关所有表
`python es_bulk.py`## 查看结果
结果会输出到`output.log`文件,直接查看即可!登录到`kibana`,查看数据是否存在<br/>
<br/>
Copyright (c) 2019-present, xiao You

View Code

注意:如果是es 6.x的版本,创建索引,需要增加 index_type,否则会报错。

比如:

# 创建映射
_index_mappings = {# 索引配置"settings": {"index": {"number_of_shards": 3,  # 分片数"number_of_replicas": 1  # 副本数
        }},# 设置字段"mappings": {self.index_type: {"properties": {"id": {"type": "long"},"loid": {"type": "keyword"},"mac": {"type": "keyword"},"time": {"type": "date","format": "epoch_millis"},"create_time": {"type": "date","format": "epoch_millis"},"update_time": {"type": "date","format": "epoch_millis"},"online_status": {"type": "short"},"status": {"type": "short"}}}}
}

本文参考链接:

https://www.cnblogs.com/aaanthony/p/7380662.html

https://blog.csdn.net/m0_37673307/article/details/81153700

转载于:https://www.cnblogs.com/xiao987334176/p/10885295.html

python MySQL 插入Elasticsearch相关推荐

  1. Python MySQL 插入表

    Python MySQL 插入表 - 吴吃辣 - 博客园 Python MySQL 插入表 章节 Python MySQL 入门 Python MySQL 创建数据库 Python MySQL 创建表 ...

  2. python mysql 插入_Python向MySQL插入数据

    我在尝试使用python连接器将值插入mysql时遇到了一个问题. 问题是,我试图在mysql中将输入作为值传递,但是输入被添加为表的名称而不是字段的值.谁能让我现在做错了什么?在 我的代码是:imp ...

  3. python+mysql插入数据

    mysql数据处理之插入数据 目的 实现数据插入到mysql,比如我们需要插入1w+数据到数据,可以通过这种方式插入,或者也可以将python 自动化的数据,报存到数据库中. 1.连接数据库 impo ...

  4. Python MySQL示例教程

    Welcome to Python MySQL example tutorial. MySQL is one of the most widely used database and python p ...

  5. ❤️ 万字Python MySQL从入门到精通详细教程❤️ 再也不用担心学不会数据库了❤️

    文章目录 前言 ⭐集合三万字基础教程⭐ 一.SQL详细教程 二.mysql入门详细教程 ⭐转python mysql⭐ 三.Python MySQL入门连接 3.1基本环境准备 3.2连接 四.Pyt ...

  6. python mysql批量insert数据、返回id_Python3 操作 MySQL 插入一条数据并返回主键 id的实例...

    Python 中貌似并没有直接返回插入数据 id 的操作(反正我是没找到),但是我们可以变通一下,找到最新插入的数据 #!/usr/bin/env python3 # -*- coding: UTF- ...

  7. 【python解决SQLAlchemy+MySQL插入数据时报警告Warning: (1366, “Incorrect string value: ‘\\xD6\\xD0\\xB9\\xFA\\xB】

    python解决SQLAlchemy+MySQL插入数据时报警告Warning: (1366, "Incorrect string value: '\xD6\xD0\xB9\xFA\xB1\ ...

  8. Python连接mysql,插入数据时不报错,但是没有插入进去

    Python连接mysql,插入数据时不报错,但是没有插入进去在connect方法中,设置 autocommit =True conn=pymysql.connect(host=host_db,use ...

  9. python操作mysql插入数据

    python操作mysql插入数据 首先安装pymysql这个库 pycharm连接数据库 操作mysql语句 连接数据库 插入数据 由于有时候,数据存在excel表格中,需要借助python去读取数 ...

最新文章

  1. .NET下数据访问层+webform前台 技术大比拼
  2. 判断力有时候有多么重要?懂得选择多么重要?
  3. fiddler 工具作用和使用场景
  4. 热烈庆祝排名进入5000
  5. Windows开机运行程序
  6. Linux卸载JDK的方法
  7. 酷派5890刷recovery详细教程
  8. DJ4 组合逻辑电路与138译码器
  9. revit二次开发概念_Revit二次开发那些事儿
  10. 小游戏---java版2048(2048 go go go)
  11. 虚拟服务器开启打印端口号,打印机服务器虚拟端口设置方法
  12. 计算机不能正常开机怎么处理,电脑不能正常启动的原因和处理步骤
  13. python学习笔记(八)传递任意数量的实参
  14. Unity游戏开发程序员学习线路图及技能提升指南
  15. 计算机专业优势及就业前景,女生学习计算机专业的优势及就业前景
  16. 三款正射图合并软件性能对比
  17. 航空中的QNH QNE QFE分别都是什么
  18. 2019中兴校招流程回顾总结
  19. java生成短信验证码_Java随机生成手机短信验证码的方法
  20. 人到中年——IT男择业感悟

热门文章

  1. 使用com.sun.imageio.plugins.png.PNGMetadata读取图片的元数据 1
  2. Jerry的ABAP, Java和JavaScript乱炖
  3. java异常的总接口_重构:Java特别的接口修改:在throws子句中添加一个异常?
  4. python可以写安卓应用吗_python可以编写android程序吗?
  5. 最长递增子序列Python解法
  6. oracle 日志切换太频繁,关于oracle日志切换的问题
  7. c语言double字母,C语言double和float 实例分析
  8. c++ 私有内部类_Java内部类新解,你没有见过的船新版本
  9. 处于计算机学科的基础地位,谈谈离散数学在计算机学科中的地位和作用(原稿)...
  10. MATLAB 长度和像素_MATLAB——单车道NaSch模型