PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。
pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。
Allow pg_dump to share a snapshot taken by another session using --snapshot (Simon Riggs, Michael Paquier)
The remote snapshot must have been exported by pg_export_snapshot() or been defined when creating a logical replication slot. This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.
但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.
1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2. 流复制协议详见
http://www.postgresql.org/docs/devel/static/protocol-replication.html
1. http://blog.163.com/digoal@126/blog/static/163877040201326829943/
2. http://www.postgresql.org/docs/devel/static/protocol-replication.html
4. http://www.postgresql.org/docs/devel/static/protocol-replication.html
5. test_decoding支持的options, contrib/test_decoding/test_decoding.c
foreach(option, ctx->output_plugin_options)
{
DefElem *elem = lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));
if (strcmp(elem->defname, "include-xids") == 0)
{
/* if option does not provide a value, it means its value is true */
if (elem->arg == NULL)
data->include_xids = true;
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-timestamp") == 0)
{
if (elem->arg == NULL)
data->include_timestamp = true;
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "force-binary") == 0)
{
bool force_binary;
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &force_binary))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
}
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "only-local") == 0)
{
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown",
elem->defname,
elem->arg ? strVal(elem->arg) : "(null)")));
}