为什么叫你是我的眼,因为你看到的世界,就是我的世界。
PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。
pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。
Allow pg_dump to share a snapshot taken by another session using --snapshot (Simon Riggs, Michael Paquier)
The remote snapshot must have been exported by pg_export_snapshot() or been defined when creating a logical replication slot. This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.
测试:
postgres=# begin transaction isolation level repeatable read;
BEGIN

postgres=# select pg_export_snapshot();
 pg_export_snapshot 
--------------------
 0000072C-1
(1 row)

先不要断开这个事务。等备份启动后再关闭即可(不需要等待备份结束)。
pg95@db-172-16-3-150-> pg_dump --snapshot=0000072C-1
使用这个SNAPSHOT导出。
对于逻辑复制,我们需要使用逻辑流复制协议创建slot,然后开启备份。
例子:
vi pg_hba.conf
# replication privilege.
local   replication     postgres                                trust
host    replication     postgres        127.0.0.1/32            trust
host    replication     postgres        ::1/128                 trust

pg_ctl reload

在数据库端使用test_encoding记录逻辑变更到WAL中。
使用流复制协议连接数据库。
pg95@db-172-16-3-150-> psql 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres replication=database' 
psql (9.5devel)
Type "help" for help.
postgres=# CREATE_REPLICATION_SLOT ab12 LOGICAL "/opt/pgsql9.5/lib/test_decoding.so";
 slot_name | consistent_point | snapshot_name |           output_plugin            
-----------+------------------+---------------+------------------------------------
 ab12      | 7/77B59A00       | 00000736-1    | /opt/pgsql9.5/lib/test_decoding.so
(1 row)

使用这个SNAPSHOT导出。
pg95@db-172-16-3-150-> pg_dump --snapshot=00000736-1
另外我们可以使用pg_recvlogical从这个slot开始接收逻辑变更:
pg_recvlogical -S ab12 -d postgres -v  -f - --start
BEGIN 1849
table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:12
COMMIT 1849

有了dump(基础备份)+逻辑变更(SQL)+exec sql模块,就可以完成基于SQL的逻辑复制。
使用SQL函数也可以消费slot的变更。
-[ RECORD 9 ]+-----------------------------------
slot_name    | ab12
plugin       | /opt/pgsql9.5/lib/test_decoding.so
slot_type    | logical
datoid       | 13181
database     | postgres
active       | f
active_pid   | 
xmin         | 
catalog_xmin | 1850
restart_lsn  | 7/77B5A3B0

postgres=# select * from pg_replication_slots ;
postgres=# insert into t1 values (1);
INSERT 0 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('ab12', NULL, NULL, 'include-timestamp', '1', 'include-xids', '1');
  location  | xid  |                                                           data                                                 
          
------------+------+----------------------------------------------------------------------------------------------------------------
----------
 7/77B5ABB8 | 1852 | BEGIN 1852
 7/77B5ABB8 | 1852 | table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[b
igint]:15
 7/77B5AD98 | 1852 | COMMIT 1852 (at 2000-01-01 08:00:00+08)
(3 rows)

但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.
[其他]
1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。
例如:
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"

2. 流复制协议详见
http://www.postgresql.org/docs/devel/static/protocol-replication.html
[参考]
1. http://blog.163.com/digoal@126/blog/static/163877040201326829943/
2. http://www.postgresql.org/docs/devel/static/protocol-replication.html
3. http://www.postgresql.org/docs/devel/static/test-decoding.html
4. http://www.postgresql.org/docs/devel/static/protocol-replication.html
5. test_decoding支持的options, contrib/test_decoding/test_decoding.c
include-xids
include-timestamp
force-binary
skip-empty-xacts
only-local
如下:
        foreach(option, ctx->output_plugin_options)
        {
                DefElem    *elem = lfirst(option);
                Assert(elem->arg == NULL || IsA(elem->arg, String));
                if (strcmp(elem->defname, "include-xids") == 0)
                {
                        /* if option does not provide a value, it means its value is true */
                        if (elem->arg == NULL)
                                data->include_xids = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_xids))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "include-timestamp") == 0)
                {
                        if (elem->arg == NULL)
                                data->include_timestamp = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "force-binary") == 0)
                {
                        bool            force_binary;
                        if (elem->arg == NULL)
                                continue;
                        else if (!parse_bool(strVal(elem->arg), &force_binary))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));

                        if (force_binary)
                                opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
                }
                else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
                {
                        if (elem->arg == NULL)
                                data->skip_empty_xacts = true;
                        else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "only-local") == 0)
                {
                        if (elem->arg == NULL)
                                data->only_local = true;
                        else if (!parse_bool(strVal(elem->arg), &data->only_local))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else
                {
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("option \"%s\" = \"%s\" is unknown",
                                                        elem->defname,
                                                        elem->arg ? strVal(elem->arg) : "(null)")));
                }

6. src/backend/replication/walsender.c
创建逻辑slot时,自动创建snapshot。
/*
 * Create a new replication slot.
 */
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{

......
        if (cmd->kind == REPLICATION_KIND_LOGICAL)
        {

......
                /*
                 * Export a plain (not of the snapbuild.c type) snapshot to the user
                 * that can be imported into another session.
                 */
                snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);

......

注意在执行replication命令时,会先释放snapshot,因此务必在这之前将这个snapshot 给pg_dump导入。
/*
 * Execute an incoming replication command.
 */
void
exec_replication_command(const char *cmd_string)
{

......
        /*
         * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
         * command arrives. Clean up the old stuff if there's anything.
         */
        SnapBuildClearExportedSnapshot();

......

PostgreSQL 9.5 pg_dump新特性 你是我的眼相关推荐

  1. PostgreSQL 13 Beta 1 发布,大量新特性

    功能性 PostgreSQL 13 中有许多新功能可以帮助提高 PostgreSQL 的整体性能,同时使开发应用程序变得更加容易. B 树索引(PostgreSQL 的标准索引)在处理重复数据方面得到 ...

  2. PostgreSQL 覆盖 Oracle 18c 重大新特性

    标签 PostgreSQL , Oracle , 兼容性 , 18c 背景 Oracle 18c 发布了诸多新特性,其中大部分Oracle 18c的重大特性,在早期版本的PostgreSQL数据库已支 ...

  3. PostgreSQL 15 新特性解读 | 墨天轮优质文章合集

    5月19日,PostgreSQL 全球开发组宣布 PostgreSQL 15 的第一个 beta 版本,这一新版本在开发者体验.性能表现等方面都有提升.为了帮助大家更快速了解到PostgreSQL 1 ...

  4. PostgreSQL 14 版本发布,快来看看有哪些新特性!

    文章目录 性能增强 数据类型和 SQL 管理功能 复制和恢复 安全增强 更多特性 大家好!我是只谈技术不剪发的 Tony 老师. PostgreSQL 全球开发组于 2021-05-20 发布了 Po ...

  5. PostgreSQL 11 新特性之 PL/pgSQL 增强

    文章目录 PostgreSQL 11 增加了一个新的编程对象,存储过程(PROCEDURE).它与存储函数类似,但是没有返回值.存储过程还支持事务,参考文章"PostgreSQL 11 新特 ...

  6. 探索PostgreSQL 14新特性--SEARCH和CYCLE

    探索PostgreSQL 14新特性--SEARCH和CYCLE PG14的SEARCH和CYCLE新功能大大简化了递归查询的方式,本文给出一些基于旅行计划的示例. 创建数据库 本文示例基于任何PG1 ...

  7. mysql8导入 psc 没有数据_新特性解读 | MySQL 8.0.22 任意格式数据导入

    作者:杨涛涛 资深数据库专家,专研 MySQL 十余年.擅长 MySQL.PostgreSQL.MongoDB 等开源数据库相关的备份恢复.SQL 调优.监控运维.高可用架构设计等.目前任职于爱可生, ...

  8. mysql 5.7_MySQL 5.7新特性介绍

    1. 介绍身处MySQL这个圈子,能够切身地感受到大家对MySQL 5.7的期待和热情,似乎每个人都迫不及待的想要了解.学习和使用MySQL 5.7.那么,我们不禁要问,MySQL 5.7到底做了哪些 ...

  9. 最好的浏览器排行榜_PG是最好的数据库;TiDB 4.0前瞻;SequoiaDB高可用原理;20c DG新特性... 数据库周刊第18期...

    热门资讯 1. 2020年4月数据库流行度排行:MySQL 成事实王者,国产openGauss引期待 [摘要]2020年4月 DB-Engines 数据库流行度排行出炉.在本月的排行榜上,Oracle ...

最新文章

  1. Python 根据地址获取经纬度及求距离
  2. C#总结项目《影院售票系统》编写总结二
  3. 堆栈溢出回答了我们不知道的Java首要问题
  4. 使用JSF的面向服务的UI
  5. MyBatis的CRUD操作
  6. python修改文件linux编码格式,使用python的chardet库获得文件编码并修改编码
  7. 三星530换固态硬盘_也许是目前性价比最高的固态硬盘!三星870 QVO快速体验
  8. struts2拦截器实现登录拦截
  9. OpenID实现多系统整合的用户同步解决方案
  10. 苏宁大数据怎么运营_苏宁首个无人店大数据曝光,其消费人群是如何分布的?...
  11. c语言如何算字节,C语言中结构字节的计算方法
  12. 深入理解计算机系统-cachelab
  13. 机器学习SVM——实验报告
  14. 利用PowerDesigner逆向工程抓取数据模型
  15. 商城项目介绍以及ES6的新语法
  16. 另一个游戏的启示——富爸爸之现金流游戏
  17. 简单绕过chrome(谷歌游览器) 查看已保存的密码
  18. VoIP网络电话回音产生的原因分析
  19. 自学王爽老师汇编语言 检测点6.1
  20. MT4开发文档之 Server API: Hooks and Trade Functions

热门文章

  1. java执行完main就结束了吗_为什么main方法中执行完第一个方法完之后,后面的代码都不执行了?...
  2. 计算机调剂名额多的考研学校,避免调剂被刷,2020年考研调剂最容易成功的4类院校,提前了解!...
  3. 客户端产生CLOSE_WAIT状态的解决方案
  4. Vue中子组件如何向父组件传递数据?
  5. win7下hosts文件位置
  6. Spring Boot基础学习笔记21:自定义用户认证
  7. Spring Boot基础学习笔记16:项目打包部署
  8. 安卓学习笔记39:浏览网页、网页与安卓通信
  9. 【BZOJ1146】网络管理,整体二分
  10. 【codevs1037】取数游戏,博弈