inflxudb 实时数据库,用来存储大量的实时数据,经过测试十万条数据存储只需要七八十毫秒,速度非常快,查询速度也非常好,吞吐量极高,并且本身具有归档功能(即:编写cq连续查询,可以取一段时间的平均值,自动存储),这样为数据存储/统计,非常友好.

与传统数据库比较

绿色版,无须安装一件运行

链接:https://pan.baidu.com/s/1BxWXkgDhyOFWXbJ2C5nQ7g

提取码:o6l0

概念      MySQL      InfluxDB

数据库(同)  database    database

表(不同)  table  measurement

列(不同)  column  tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)

retention policy:数据存储策略(默认策略为autogen)InfluxDB没有删除数据操作,规定数据的保留时间达到清除数据的目的;

常用命令

-- 查看所有的数据库

show databases;

-- 使用特定的数据库

use database_name;

-- 查看所有的measurement

show measurements;

-- 查询10条数据

select*frommeasurement_namelimit10;

-- 数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式

precisionrfc3339;-- 之后再查询,时间就是rfc3339标准格式

-- 或可以在连接数据库的时候,直接带该参数

influx -precisionrfc3339

-- 查看一个measurement中所有的tag key

show tag keys

-- 查看一个measurement中所有的field key

show field keys

-- 查看一个measurement中所有的保存策略(可以有多个,一个标识为default)

show retention policies;

-- 查看当前数据库的Retention Policies

SHOW RETENTION POLICIESON"testDB"

-- 创建新的Retention Policies

CREATERETENTION POLICY"rp_name"ON"db_name"DURATION30d REPLICATION1DEFAULT

-- rp_name:策略名

-- db_name:具体的数据库名

-- 30d:保存30天,30天之前的数据将被删除

-- 它具有各种时间参数,比如:h(小时),w(星期)

-- REPLICATION 1:副本个数,这里填1就可以了

-- DEFAULT 设为默认的策略

-- 修改策略

ALTERRETENTION POLICY"2_hours"ON"telegraf"DURATION4h DEFAULT

-- 删除策略

dropretention POLICY"2_hours"ON"telegraf"

-- # 创建表

-- # 直接在插入数据的时候指定表名(weather就是表名)

insertweather,altitude=1000,area=北 temperature=11,humidity=-4

-- # 删除表

DROPMEASUREMENT"measurementName"

--  显示用户

SHOW USERS

-- # 创建用户

CREATEUSER"username"WITH PASSWORD'password'

-- # 创建管理员权限的用户

CREATEUSER"username"WITH PASSWORD'password'WITH ALL PRIVILEGES

-- # 删除用户

DROPUSER"username"

-- 创建数据库

CREATEDATABASE mydb

-- 登陆

auth

用户名

密码

-- 插入数据

insertorders,phone=20website=90

-- 查询所有的cq

show continuous queries

-- 删除一个cq

DROPCONTINUOUS QUERY ON

默认端口8086

建表语句

createdatabase water_database

insert calculat_db,createTime="2020-08-20",dataName="A_por_6654"dataValue=23,reportTime="2020-08-20"

insert problem_db,problemName="A_S_234",problemValue=66itemMinValue=23,itemMaxValue=64

insert warndata_db,createTime="2020-08-20",dataName="A_por_6654"dataValue=16,itemWarnValueMin=20,itemWarnValueMax=53,areaCode="ZW001",areaName="中维",areaPointId="adfdsfduiedfijisf",deviceName="中维设备",content="xxx数据项低于最低阈值4"

CREATECONTINUOUS QUERY cq_1mONwater_databaseBEGINSELECTmean(dataValue)ASmeanValueINTOwater_database.rp_ten_year.cal_mean_dbFROMwater_database.rp_ten_year.calculat_dbGROUPBYtime(1m),dataName END

SELECTmean(dataValue)ASmeanValueINTOcal_mean_dbFROMcalculat_dbGROUPBYtime(1m),dataName

备注

1 也可以使用linux版的,没有什么差别,他如同大多数nosql数据一样,可以封装一个util 去操作它

2 其中归档功能必须是有实时数据后,才会触发归档(当时写好连续查询一直以为有问题,坑了我好久)

3 influxdb 的分页 SELECT * FROM "calculat_db" limit 20 OFFSET 0  ====>> limit 代表 每页条数  offset 代表 当前页码且从0 开始

下面分享下我项目中百度到的工具类

TsdbService

```

import com.jovision.sfwl.modules.datacenter.influxdb.bean.UniteMetricData;

import org.influxdb.InfluxDB;

import org.influxdb.dto.BatchPoints;

import org.influxdb.dto.Point;

import org.influxdb.dto.QueryResult;

import java.util.Map;

import java.util.concurrent.TimeUnit;

public interface TsdbService {

void createDatabase(String database);

void dropDatabase(String database);

boolean databaseExist(String database);

void createRetentionPolicy();

void createRetentionPolicy(String database, String policyName, String duration, int replication, Boolean isDefault);

void dropRetentionPolicy();

void dropRetentionPolicy(String database, String retentionPolicy);

void createContinuousQuery(String measurement, String extendPolicy);

boolean continuousQueryExists(String measurement);

boolean continuousQueryExists(String database, String cqName);

void dropContinuousQuery(String databaseName, String cqName);

boolean measurementsExists(String measurement);

boolean measurementsExists(String database, String measurement);

QueryResultquery(String command);

QueryResultdataQuery(String command);

void insert(Point point1);

void insert(String measurement, TimeUnit timeUnit, UniteMetricData data);

void batchInsert(BatchPoints batchPoints);

PointpointBuilder(String measurement,

Map tags,

Map fields,

long time,

TimeUnit timeunit);

BatchPointsbuildBatchPoints();

BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision);

BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision, String retentionPolicy);

}

```


TsdbServiceImpl


```

import com.jovision.sfwl.modules.datacenter.influxdb.bean.UniteMetricData;

import org.influxdb.BatchOptions;

import org.influxdb.InfluxDB;

import org.influxdb.InfluxDBFactory;

import org.influxdb.dto.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

@Service("tsdbService")

public class TsdbServiceImplimplements TsdbService {

private static final Loggerlogger = LoggerFactory.getLogger(TsdbServiceImpl.class);

private static final InfluxDB.ConsistencyLevelCONSISTENCY_LEVEL = InfluxDB.ConsistencyLevel.ANY;

private static final TimeUnitPRECESION = TimeUnit.SECONDS;

@Value("${tsdb.server.url}")

private Stringurl;

/**

* 用户名

*/

@Value("${tsdb.server.username}")

private Stringusername;

/**

* 密码

*/

@Value("${tsdb.server.password}")

private Stringpassword;

/**

* 数据库

*/

@Value("${tsdb.server.database}")

private Stringdatabase;

/**

* 保留策略

*/

@Value("${tsdb.server.retentionpolicy}")

private StringretentionPolicy;

private InfluxDBinfluxDB;

@PostConstruct

public void init() {

/* List serverAddressList = new ArrayList<>();

for (String host : hosts.split(",")) {

serverAddressList.add(String.format("%s:%s", host, port));

}

influxDB = InfluxDBFactory.connect(serverAddressList, username, password);

*/

influxDB = InfluxDBFactory.connect(url, username, password);

try {

// 如果指定的数据库不存在,则新建一个新的数据库,并新建一个默认的数据保留规则

if (!this.databaseExist(database)) {

createDatabase(database);

createRetentionPolicy();

}

}catch (Exception e) {

// 该数据库可能设置动态代理,不支持创建数据库

logger.error("[TsdbService] occur error when init tsdb, err msg: {}", e);

}finally {

influxDB.setRetentionPolicy(retentionPolicy);

}

influxDB.setLogLevel(InfluxDB.LogLevel.NONE);

// Flush every 1000 Points, at least every 100ms

// bufferLimit represents the maximum number of points can stored in the retry buffer

// exceptionHandler represents a consumer function to handle asynchronous errors

// threadFactory represents the ThreadFactory instance to be used

influxDB.enableBatch(BatchOptions.DEFAULTS

.actions(1000)

.flushDuration(100)

.bufferLimit(10)

.exceptionHandler((points, e) -> {

List target =new ArrayList<>();

points.forEach(target::add);

String msg = String.format("failed to write points:%s\n", target.toString().substring(0, 10000));

logger.error(msg, e);

})

.threadFactory(

Executors.defaultThreadFactory()

));

}

/**

* 测试连接是否正常

*

* @return true 正常

*/

public boolean ping() {

boolean isConnected =false;

Pong pong;

try {

pong =influxDB.ping();

if (pong !=null) {

isConnected =true;

}

}catch (Exception e) {

e.printStackTrace();

}

return isConnected;

}

@Override

public void createDatabase(String database) {

influxDB.query(new Query("CREATE DATABASE " + database, ""));

}

@Override

public void dropDatabase(String database) {

influxDB.query(new Query("DROP DATABASE " + database, ""));

}

@Override

public boolean databaseExist(String database) {

return influxDB.databaseExists(database);

}

@Override

public void createRetentionPolicy() {

String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",

"default_policy", database, "90d", 3);

this.query(command);

}

/**

* @return void

* @Description

* @Author 寂寞旅行

* @Date 10:48 2020/8/20

* @Param [database, policyName, duration, replication, isDefault]

* 数据库    策略名        时间      1            是否为默认策略

**/

@Override

public void createRetentionPolicy(String database, String policyName, String duration, int replication, Boolean isDefault) {

String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,

database, duration, replication);

if (isDefault) {

sql = sql.concat(" DEFAULT");

}

this.query(sql);

}

@Override

public void dropRetentionPolicy() {

this.dropRetentionPolicy(database, retentionPolicy);

}

@Override

public void dropRetentionPolicy(String database, String retentionPolicy) {

String sql = String.format("DROP RETENTION POLICY %s ON %s", retentionPolicy, database);

this.query(sql);

}

@Override

public void createContinuousQuery(String measurement, String extendPolicy) {

String cqName = String.format("cq_%s", measurement);

String originMeasurement = String.format("%s.%s.%s", database, retentionPolicy, measurement);

String cqMeasurement = String.format("%s.%s.%s_hour", database, extendPolicy, measurement);

String sql = String.format("CREATE CONTINUOUS QUERY \"%s\" ON %s RESAMPLE EVERY 1h FOR 2h BEGIN SELECT MEAN(*) INTO %s FROM %s GROUP BY time(1h),* FILL(none) END",

cqName, database, cqMeasurement, originMeasurement);

this.query(sql);

}

@Override

public boolean continuousQueryExists(String measurement) {

String cqName = String.format("cq_%s", measurement);

return continuousQueryExists(database, cqName);

}

@Override

public boolean continuousQueryExists(String database, String cqName) {

String sql ="SHOW CONTINUOUS QUERIES";

QueryResult result = query(sql);

List seriesList = result.getResults().get(0).getSeries();

if (seriesList !=null) {

for (QueryResult.Series series : seriesList) {

if (database.equals(series.getName())) {

List> continuousQueryList = series.getValues();

if (continuousQueryList ==null) {

return false;

}else {

for (List queryResult : continuousQueryList) {

if (cqName.equals(queryResult.get(0))) {

return true;

}

}

}

}

}

}

return false;

}

@Override

public void dropContinuousQuery(String databaseName, String cqName) {

String sql = String.format("DROP CONTINUOUS QUERY %s ON %s", cqName, databaseName);

QueryResult result = query(sql);

}

@Override

public boolean measurementsExists(String measurement) {

return measurementsExists(database, measurement);

}

@Override

public boolean measurementsExists(String database, String measurement) {

String sql = String.format("SHOW MEASUREMENTS ON %s", database);

QueryResult result = query(sql);

if (result !=null) {

List seriesList = result.getResults().get(0).getSeries();

if (seriesList !=null) {

QueryResult.Series series = seriesList.get(0);

List> valueList = series.getValues();

for (List value : valueList) {

if (measurement.equals(value.get(0))) {

return true;

}

}

}

}

return false;

}

@Override

public QueryResultquery(String command) {

return influxDB.query(new Query(command, database));

}

@Override

public QueryResultdataQuery(String command) {

return influxDB.query(new Query(command, database), TimeUnit.MILLISECONDS);

}

@Override

public void insert(Point point1) {

influxDB.write(point1);

}

@Override

public void insert(String measurement, TimeUnit timeUnit, UniteMetricData data) {

timeUnit = timeUnit ==null ? TimeUnit.MILLISECONDS : timeUnit;

Point point = pointBuilder(measurement, data.getTags(), data.getFields(), data.getTimestamp(), timeUnit);

influxDB.write(database, retentionPolicy, point);

}

@Override

public void batchInsert(BatchPoints batchPoints) {

influxDB.write(batchPoints);

}

@Override

public PointpointBuilder(String measurement,

Map tags,

Map fields,

long time,

TimeUnit timeunit) {

Point point = Point.measurement(measurement).time(time, timeunit).tag(tags).fields(fields).build();

return point;

}

@Override

public BatchPointsbuildBatchPoints() {

return this.batchPointsBuilder(database, CONSISTENCY_LEVEL, PRECESION);

}

@Override

public BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision) {

return batchPointsBuilder(database, level, precision, null);

}

@Override

public BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision, String retentionPolicy) {

return BatchPoints.database(database).consistency(level).precision(precision).retentionPolicy(retentionPolicy).build();

}

}

```


UniteMetricData


```

import lombok.Data;

import java.util.Map;

@Data

public class UniteMetricData {

private static final long serialVersionUID =8968059029015805484L;

private Maptags;

private Mapfields;

private long timestamp;

public UniteMetricData(Map tags, Map fields, long timestamp) {

this.tags = tags;

this.fields = fields;

this.timestamp = timestamp;

}

public MapgetTags() {

return tags;

}

public void setTags(Map tags) {

this.tags = tags;

}

public MapgetFields() {

return fields;

}

public void setFields(Map fields) {

this.fields = fields;

}

public long getTimestamp() {

return timestamp;

}

public void setTimestamp(long timestamp) {

this.timestamp = timestamp;

}

}

```

linux下安装

```

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.3.x86_64.rpm

sudo yum localinstall influxdb-1.5.3.x86_64.rpm

```

安装好后更改influx.conf配置 使其加入用户认证过程

vim ****influx.conf

更改 auth-enabled-false 改为 auth-enabled = true

保存退出(esc +:wq )

然后启动ifnlux

```

sudo service influxdb start

```

2020-09-10 influxdb相关推荐

  1. 2020/09/10华为发布会

    源码:https://gitee.com/openharmony 五大根服务 支付 广告 浏览器 地图引擎 搜索引擎 视频回放

  2. 软件测评师知识点(2020.09.10)

    背靠背测试 背靠背测试方式是指同一功能分别由两组人员在互不交流的情况执行测试工作,是保证测试质量的有效手段. 程序编译 程序在编译的过程中,包含了词法分析.语法分析和语义分析等阶段 其中词法分析从左到 ...

  3. (十七:2020.09.10)nnUNet最全问题收录(9.10更新)

    来这里寻找你的答案! 一.写在前面 二.GITHUB ISSUE I. 使用上的问题: #477 <3D nnUNet支持FP16量化吗?> #474 <ImportError: c ...

  4. 百度与吉利联合制造智能电动汽车;霍尼韦尔2020年度10大创新科技揭晓 | 美通企业日报...

    今日看点:毕马威与香港特许秘书公会携手发布报告<风险管理调查:迈向成功的助力>.百度与吉利联合制造智能电动汽车.包头茂业万豪酒店开业.爱茉莉太平洋展示CES 2021创新奖获奖技术,科勒推 ...

  5. 计算机专业学生教师节礼物,2020教师节10种实用不贵礼物推荐,最后一件老师收到最开心...

    2020教师节10种实用不贵礼物推荐,最后一件老师收到最开心 2020-09-09 09:40:10 0点赞 0收藏 0评论 教师节礼物,不需要花里胡哨,应该以实用精致为主. 最好是日常好物,能让老师 ...

  6. RDKit | 基于RDKit(≥2020.09.1)的相似图绘制新方法

    导入库 from rdkit import Chem from rdkit.Chem import Draw from rdkit.Chem.Draw import SimilarityMaps fr ...

  7. 2020 年 10 月程序员工资统计,终于涨了!

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源:blog.csdn.net/juwikuang/a ...

  8. python现在第几版-2020 年10月编程语言排行榜,Python 排名逼近第二

    原标题:2020 年10月编程语言排行榜,Python 排名逼近第二 来源:菜鸟教程 TIOBE 2020 年 10 月份的编程语言排行榜已经公布,官方的标题是: Python 排名逼近第二. TIO ...

  9. 微软正式发布Windows 10 2020年10月更新

    让大家等待多时的Windows 10 2020年10月更新终于来了,这也是今年微软为Win10准备的最后一个重大更新. Windows 10 2020年10月更新就是之前多次说道的Windows 10 ...

  10. 比特币早期投资家:没有人能够阻止其发展 TechWeb 09-27 09:10 凤凰科技讯 据CNBC网站北京时间9月27日报道,风险投资家、“Social+Capital”基金创始人Chamath

    比特币早期投资家:没有人能够阻止其发展 TechWeb 09-27 09:10 凤凰科技讯 据CNBC网站北京时间9月27日报道,风险投资家."Social+Capital"基金创 ...

最新文章

  1. 数据结构--树和二叉树
  2. post传值php取不到数据,post请求中的参数形式和form-data提交数据时取不到的问题...
  3. python爬虫教程视频下载-利用Python网络爬虫获取电影天堂视频下载链接【详细教程】...
  4. 有关数据结构基础知识(数据结构 严蔚敏版)
  5. 机器学习中的度量—— 向量距离
  6. 《Android UI基础教程》——1.2节Android 应用程序的基本结构
  7. mysql 查询近几天的数据
  8. can3--socketcan之mcp251x.c
  9. 如何从iTunes里取得移动设备的uuid
  10. PyTorch中使用指定的GPU
  11. Mr.J-- 图片墙动画效果
  12. nginx 499 502 413 404 处理
  13. 北京涛思数据获得 Pre A 轮融资,专注时序空间大数据领域
  14. php 中PHP_EOL使用
  15. python logging打印终端_想知道Python如何在终端上打印表格吗?两行代码告诉你!
  16. java函数式编程入口_Java中的函数式编程
  17. PAT 7-14 公路村村通
  18. python熵权法求权重
  19. 从JDK 6升级到JDK 7过程中遇到的一个问题
  20. 数据流-移动超平面(HyperPlane)构造

热门文章

  1. 计算最大公约数和最小公倍数被Java程序员用代码写出来啦
  2. Java 处理SFTP使用代理进行上传下载
  3. DataTables中文使用说明
  4. 怎么把动图放到word里_如何word里插入会动的gif图
  5. linux命令解压文件到指定目录并覆盖,你不知道的Linux解压命令
  6. Node EE方案 -- Rockerjs在微店的建设与发展
  7. mysql json数组字符串转数组_数组、对象、字符串转JSON的函数
  8. 两行代码实现图片压缩
  9. 分布式事务解决方案之TCC
  10. mybatis中的ResultMap使用