如果有任何问题,或者更完善的方案 请加我创建的群 568752806 来一起探讨吧

电脑配置

普通个人台式机电脑1台,配置如下:

处理器:Intel(R) Core(TM) i5-4750 CPU @ 3.2GHz   4核处理器

内存:   共32G  金士顿HyperX 骇客神条 DDR3 1866 8G x 4

固态硬盘:GLOWAY STK512GS3-S7    实际容量 476G

主板:Z97-PRO Wi-Fi ac

网卡:Broadcom 802.11ac

Intel(R) Ethernet Connection(2) I218-V

操作系统:Windows 10 64位教育版系统

搭建环境

VMware Workstation 12 Pro    版本 12.5.7 build-5813279

创建4台虚拟Cent OS 7 系统

主机名称                   IP                     内存        磁盘大小         说明

contoso.org             192.168.10.20              10G        100G         PHP for 开发者

mariadb1.org          192.168.10.101            4G           80G          Mariadb  for db1

mariadb2.org          192.168.10.102            4G           80G          Mariadb  for db2

rabbitmq1.org         192.168.10.105            6G           80G          RabbitMQ for 消息存储


在主机 192.168.10.20 上配置 IP到自定义域名之间的映射关系

[root@contoso ~]# cat  >  /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4  contoso.org
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.20 user.corp.contoso.org
192.168.10.20 corp.contoso.org
192.168.10.20 contoso.org
192.168.10.20 contoso

优化内核 配置

[root@contoso ~]# cat >> /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

[root@contoso ~]# ulimit -n 65536

[root@contoso ~]# cat > /etc/sysctl.conf
# sysctl settings are defined through files in
# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.
#
# Vendors settings live in /usr/lib/sysctl.d/.
# To override a whole file, create a new file with the same in
# /etc/sysctl.d/ and put new settings there. To override
# only specific settings, add a file with a lexically later
# name in /etc/sysctl.d/ and put new settings there.
#
# For more information, see sysctl.conf(5) and sysctl.d(5).
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15
net.ipv4.tcp_retries2 = 5
net.ipv4.tcp_fin_timeout = 2
net.ipv4.tcp_max_tw_buckets = 36000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_max_orphans = 32768
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_wmem = 8192 131072 16777216
net.ipv4.tcp_rmem = 32768 131072 16777216
net.ipv4.tcp_mem = 786432 1048576 1572864
net.ipv4.ip_local_port_range = 1024  65000

[root@contoso ~]# sysctl -p

安装PHP的运行环境,注意不是在root用户登录的环境下安装PHP,而是在管理员myth登录的环境下安装PHP

[myth@contoso ~]$  yum install https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm

[myth@contoso ~]$  yum install http://rpms.remirepo.net/enterprise/remi-release-7.rpm

[myth@contoso ~]$ yum --enablerepo=remi-php71,remi,epel -y install php php-devel php-mysql php-fpm php-pecl-xdebug php-gd php-intl php-freetype php-mcrypt php-mbstring php-pecl-memcached php-pecl-redis php-pecl-swoole

配置远程调试服务器

[myth@contoso ~]$ cat > /etc/php.d/15-xdebug.ini
; Enable xdebug extension module
zend_extension=xdebug.so
xdebug.remote_autostart=1
xdebug.remote_enable=1
xdebug.remote_connect_back=1
xdebug.remote_port=9001
xdebug.remote_handler=dbgp
; see http://xdebug.org/docs/all_settings

Apache优化:修改最大并发连接数,以下是大型网站的配置参数,参数值除以2属于中型网站的配置参数,如有问题可以再调整
[root@contoso ~]# ll /etc/httpd/conf.modules.d
total 32
-rw-r--r-- 1 root root 3739 Apr 12 21:50 00-base.conf
-rw-r--r-- 1 root root  139 Apr 12 21:50 00-dav.conf
-rw-r--r-- 1 root root   41 Apr 12 21:50 00-lua.conf
-rw-r--r-- 1 root root  950 Aug 30 03:43 00-mpm.conf
-rw-r--r-- 1 root root  957 Apr 12 21:50 00-proxy.conf
-rw-r--r-- 1 root root   88 Apr 12 21:50 00-systemd.conf
-rw-r--r-- 1 root root  451 Apr 12 21:50 01-cgi.conf
-rw-r--r-- 1 root root  423 Aug  2 18:21 15-php.conf
[root@contoso ~]# cat  >  /etc/httpd/conf.modules.d/00-mpm.conf
# Select the MPM module which should be used by uncommenting exactly
# one of the following LoadModule lines:

# prefork MPM: Implements a non-threaded, pre-forking web server
# See: http://httpd.apache.org/docs/2.4/mod/prefork.html
LoadModule mpm_prefork_module modules/mod_mpm_prefork.so
<IfModule mpm_prefork_module>
  StartServers        100
  MinSpareServers     100
  MaxSpareServers     120  
  ServerLimit         3000
  MaxClients          3000
  MaxRequestsPerChild 500000
</IfModule>

# worker MPM: Multi-Processing Module implementing a hybrid
# multi-threaded multi-process web server
# See: http://httpd.apache.org/docs/2.4/mod/worker.html
#
#LoadModule mpm_worker_module modules/mod_mpm_worker.so

# event MPM: A variant of the worker MPM with the goal of consuming
# threads only for connections with active processing
# See: http://httpd.apache.org/docs/2.4/mod/event.html
#
#LoadModule mpm_event_module modules/mod_mpm_event.so

[root@contoso ~]#

在主机 192.168.10.101 上配置 IP到自定义域名之间的映射关系

[root@mariadb1 ~]# cat  >  /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.101 mariadb1.org
[root@mariadb1 ~]#

优化内核 配置

[root@mariadb1 ~]# cat >> /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

[root@mariadb1 ~]# ulimit -n 65536

[root@mariadb1 ~]# cat > /etc/sysctl.conf
# sysctl settings are defined through files in
# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.
#
# Vendors settings live in /usr/lib/sysctl.d/.
# To override a whole file, create a new file with the same in
# /etc/sysctl.d/ and put new settings there. To override
# only specific settings, add a file with a lexically later
# name in /etc/sysctl.d/ and put new settings there.
#
# For more information, see sysctl.conf(5) and sysctl.d(5).
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15
net.ipv4.tcp_retries2 = 5
net.ipv4.tcp_fin_timeout = 2
net.ipv4.tcp_max_tw_buckets = 36000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_max_orphans = 32768
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_wmem = 8192 131072 16777216
net.ipv4.tcp_rmem = 32768 131072 16777216
net.ipv4.tcp_mem = 786432 1048576 1572864
net.ipv4.ip_local_port_range = 1024  65000

[root@mariadb1 ~]# sysctl -p

安装MariaDB

官网给出的baseurl地址下载太慢了

[root@mariadb1 ~]# cat > /etc/yum.repos.d/MariaDB.repo
# MariaDB 10.2 CentOS repository list - created 2017-07-08 12:50 UTC
# http://downloads.mariadb.org/mariadb/repositories/
[mariadb]
name = MariaDB
baseurl = http://yum.mariadb.org/10.2/centos7-amd64
gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB
gpgcheck=1

清华的baseurl下载速度很快

[root@mariadb1 ~]# cat > /etc/yum.repos.d/MariaDB.repo
# MariaDB 10.2 CentOS repository list - created 2017-07-08 12:50 UTC
# http://downloads.mariadb.org/mariadb/repositories/
[mariadb]
name = MariaDB
baseurl = https://mirrors.tuna.tsinghua.edu.cn/mariadb/mariadb-10.2.7/yum/centos7-amd64/
gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB
gpgcheck=1

[root@mariadb1 ~]# yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb1~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*
[root@mariadb1 ~]#
[root@mariadb1 ~]#cat > /etc/my.cnf.d/server.cnf
#
# These groups are read by MariaDB server.
# Use it for options that only the server (but not clients) should see
#
# See the examples of server my.cnf files in /usr/share/mysql/
#

# this is read by the standalone daemon and embedded servers
[server]

# this is only for the mysqld standalone daemon
[mysqld]
character-set-server=utf8
lower-case-table-names=1

log-bin=/var/log/mariadb/mariadb-log-bin
log-bin-index=/var/log/mariadb/mariadb-log-bin.index
log-error=/var/log/mariadb/mariadb-error.log

general-log=ON
general-log-file=/var/log/mariadb/queries.log
log-output=file

slow-query-log=ON
slow-query-log-file=/var/log/mariadb/mariadb-slow.log
long_query_time=1

#
# * Galera-related settings
#
[galera]
# Mandatory settings
#wsrep_on=ON
#wsrep_provider=
#wsrep_cluster_address=
#binlog_format=row
#default_storage_engine=InnoDB
#innodb_autoinc_lock_mode=2
#
# Allow server to accept connections on all interfaces.
#
#bind-address=0.0.0.0
#
# Optional setting
#wsrep_slave_threads=1
#innodb_flush_log_at_trx_commit=0

# this is only for embedded server
[embedded]

# This group is only read by MariaDB servers, not by MySQL.
# If you use the same .cnf file for MySQL and MariaDB,
# you can put MariaDB-only options here
[mariadb]

# This group is only read by MariaDB-10.1 servers.
# If you use the same .cnf file for MariaDB of different versions,
# use this group for options that older servers don't understand
[mariadb-10.1]

[ root@mariadb1 ~]#

在主机 192.168.10.102 上配置 IP到自定义域名之间的映射关系

[root@mariadb2 ~]# cat  >  /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.102 mariadb2.org
[root@mariadb2 ~]#

优化内核 配置

[root@mariadb2 ~]# cat >> /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

[root@mariadb2 ~]# ulimit -n 65536

[root@mariadb2 ~]# cat > /etc/sysctl.conf
# sysctl settings are defined through files in
# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.
#
# Vendors settings live in /usr/lib/sysctl.d/.
# To override a whole file, create a new file with the same in
# /etc/sysctl.d/ and put new settings there. To override
# only specific settings, add a file with a lexically later
# name in /etc/sysctl.d/ and put new settings there.
#
# For more information, see sysctl.conf(5) and sysctl.d(5).
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15
net.ipv4.tcp_retries2 = 5
net.ipv4.tcp_fin_timeout = 2
net.ipv4.tcp_max_tw_buckets = 36000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_max_orphans = 32768
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_wmem = 8192 131072 16777216
net.ipv4.tcp_rmem = 32768 131072 16777216
net.ipv4.tcp_mem = 786432 1048576 1572864
net.ipv4.ip_local_port_range = 1024  65000

[root@mariadb2 ~]# sysctl -p

安装MariaDB

官网给出的baseurl地址下载太慢了

[root@mariadb2 ~]# cat > /etc/yum.repos.d/MariaDB.repo
# MariaDB 10.2 CentOS repository list - created 2017-07-08 12:50 UTC
# http://downloads.mariadb.org/mariadb/repositories/
[mariadb]
name = MariaDB
baseurl = http://yum.mariadb.org/10.2/centos7-amd64
gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB
gpgcheck=1

清华的baseurl下载速度很快

[root@mariadb2 ~]# cat > /etc/yum.repos.d/MariaDB.repo
# MariaDB 10.2 CentOS repository list - created 2017-07-08 12:50 UTC
# http://downloads.mariadb.org/mariadb/repositories/
[mariadb]
name = MariaDB
baseurl = https://mirrors.tuna.tsinghua.edu.cn/mariadb/mariadb-10.2.7/yum/centos7-amd64/
gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB
gpgcheck=1

[root@mariadb2 ~]# yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb2 ~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*
[root@mariadb2 ~]#
[root@mariadb2 ~]#cat > /etc/my.cnf.d/server.cnf
#
# These groups are read by MariaDB server.
# Use it for options that only the server (but not clients) should see
#
# See the examples of server my.cnf files in /usr/share/mysql/
#

# this is read by the standalone daemon and embedded servers
[server]

# this is only for the mysqld standalone daemon
[mysqld]
character-set-server=utf8
lower-case-table-names=1

log-bin=/var/log/mariadb/mariadb-log-bin
log-bin-index=/var/log/mariadb/mariadb-log-bin.index
log-error=/var/log/mariadb/mariadb-error.log

general-log=ON
general-log-file=/var/log/mariadb/queries.log
log-output=file

slow-query-log=ON
slow-query-log-file=/var/log/mariadb/mariadb-slow.log
long_query_time=1

#
# * Galera-related settings
#
[galera]
# Mandatory settings
#wsrep_on=ON
#wsrep_provider=
#wsrep_cluster_address=
#binlog_format=row
#default_storage_engine=InnoDB
#innodb_autoinc_lock_mode=2
#
# Allow server to accept connections on all interfaces.
#
#bind-address=0.0.0.0
#
# Optional setting
#wsrep_slave_threads=1
#innodb_flush_log_at_trx_commit=0

# this is only for embedded server
[embedded]

# This group is only read by MariaDB servers, not by MySQL.
# If you use the same .cnf file for MySQL and MariaDB,
# you can put MariaDB-only options here
[mariadb]

# This group is only read by MariaDB-10.1 servers.
# If you use the same .cnf file for MariaDB of different versions,
# use this group for options that older servers don't understand
[mariadb-10.1]

[ root@mariadb2 ~]#

在主机 192.168.10.105 上配置 IP到自定义域名之间的映射关系

[root@rabbitmq1 ~]#  cat  >  /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.105 rabbitmq1.org
[root@rabbitmq1 ~]#

优化内核 配置

[root@rabbitmq1 ~]# cat >> /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

[root@rabbitmq1 ~]# ulimit -n 65536

[root@rabbitmq1 ~]# cat > /etc/sysctl.conf
# sysctl settings are defined through files in
# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.
#
# Vendors settings live in /usr/lib/sysctl.d/.
# To override a whole file, create a new file with the same in
# /etc/sysctl.d/ and put new settings there. To override
# only specific settings, add a file with a lexically later
# name in /etc/sysctl.d/ and put new settings there.
#
# For more information, see sysctl.conf(5) and sysctl.d(5).
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15
net.ipv4.tcp_retries2 = 5
net.ipv4.tcp_fin_timeout = 2
net.ipv4.tcp_max_tw_buckets = 36000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_max_orphans = 32768
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_wmem = 8192 131072 16777216
net.ipv4.tcp_rmem = 32768 131072 16777216
net.ipv4.tcp_mem = 786432 1048576 1572864
net.ipv4.ip_local_port_range = 1024  65000

[root@rabbitmq1 ~]# sysctl -p

消息服务器的安装请按照在CentOS 7系统上安装RabbitMQ http://blog.csdn.net/zhengzizhi/article/details/77018658

在主机 192.168.10.20 上配置虚拟主机:

[root@contoso ~]# cat  >  /etc/httpd/conf.d/httpd-vhosts.conf
<Directory "/home/myth/www/think">
        Options +Indexes +FollowSymLinks
        Order allow,deny
        Allow from all
        AllowOverride All
        Require all granted
</Directory>

<VirtualHost *:80>
    ServerAdmin zhengzizhi@126.com
    DocumentRoot "/home/myth/www/think/public"
    ServerName contoso.org
    ServerAlias contoso.org
    ErrorLog "/home/myth/log/httpd/contoso-error_log"
    CustomLog "/home/myth/log/httpd/contoso-access_log" common
</VirtualHost>

<Directory "/home/myth/www/think">
        Options +Indexes +FollowSymLinks
        Order allow,deny
        Allow from all
        AllowOverride All
        Require all granted
</Directory>

<VirtualHost *:80>
    ServerAdmin zhengzizhi@126.com
    DocumentRoot "/home/myth/www/think/public"
    ServerName corp.contoso.org
    ServerAlias corp.contoso.org
    ErrorLog "/home/myth/log/httpd/corp-contoso-error_log"
    CustomLog "/home/myth/log/httpd/corp-contoso-access_log" common
</VirtualHost>

<Directory "/home/myth/www/think">
        Options +Indexes +FollowSymLinks
        Order allow,deny
        Allow from all
        AllowOverride All
        Require all granted
</Directory>

<VirtualHost *:80>
    ServerAdmin zhengzizhi@126.com
    DocumentRoot "/home/myth/www/think/public"
    ServerName user.corp.contoso.org
    ServerAlias user.corp.contoso.org
    ErrorLog "/home/myth/log/httpd/user-corp-contoso-error_log"
    CustomLog "/home/myth/log/httpd/user-corp-contoso-access_log" common
</VirtualHost>

[root@contoso ~]#

[myth@contoso ~]$ mkdir -p /home/myth/log/httpd#注意:日志文件的目录需要提前创建好,

[root@contoso ~]#  sed -i -- 's/^#ServerName www.example.com:80/ServerName contoso.org:80/g'  /etc/httpd/conf/httpd.conf

直接贴消费者的实现代码 think\apps\index\controller\Consumer.php:

<?php
/** 作者:zhengzizhi@126.com* 日期:二O一七年 七夕节*/
namespace app\index\controller;
use PDO;
use think\Db;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;class Consumer
{private $connection;private $channel;public function __construct(){}/*** Special remind: before updating consumer code to debug,first,please close consumer listen! * Listens for incoming messages* * [root@rabbitmq1 ~]# rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'* [root@rabbitmq1 ~]# systemctl restart rabbitmq-server && rabbitmqadmin list exchanges* * [root@contoso ~]# cd /home/myth/www/think && php public/index.php index/Consumer/listen*/public function listen(){$this-> connection = new AMQPStreamConnection('192.168.10.105', 5672, 'guest', 'guest');$this->channel = $this-> connection->channel();/*** Declares queue, creates if needed** @param string $queue* @param bool $passive* @param bool $durable* @param bool $exclusive* @param bool $auto_delete* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_declare('bank.transfers1',#queue - Should be unique in direct exchangefalse,             #passive - false  Don't check if a queue with the same name existstrue,              #durable - true The queue will survive(exist) server restartsfalse,             #exclusive - false The queue might be accessed by other channelsfalse              #auto_delete - false The queue won't be deleted once the channel is closed);/*** Declares exchange** @param string $exchange* @param string $type* @param bool $passive* @param bool $durable* @param bool $auto_delete* @param bool $internal* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->exchange_declare('corp1.fanout', #exchange - That is the exchange(corp.direct)'fanout',      #type - That is the type(direct) of exchange(corp.direct)false,         #passive - false Don't check if a exchange with the same name existstrue,          #durable - true The exchange will survive(exist) server restartsfalse          #auto_delete - false The exchange won't be deleted once the channel is closed);/*** Binds queue to an exchange** @param string $queue* @param string $exchange* @param string $routing_key* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_bind('bank.transfers1', 'corp1.fanout');/*** Specifies QoS* don't dispatch a new message to a worker until it has processed and* acknowledged the previous one. Instead, it will dispatch it to the* next worker that is not still busy.** @param int $prefetch_size* @param int $prefetch_count* @param bool $a_global* @return mixed*/$this->channel->basic_qos(null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"1,      #prefetch count - prefetch window in terms of whole messagesnull    #a_global - null to mean that the QoS settings should apply per-consumer#a_global - true to mean that the QoS settings should apply per-channel);/*** Starts a queue consumer** @param string $queue* @param string $consumer_tag* @param bool $no_local* @param bool $no_ack* @param bool $exclusive* @param bool $nowait* @param callback|null $callback* @param int|null $ticket* @param array $arguments* @return mixed|string*/$this->channel->basic_consume('bank.transfers1',     #queue - get the messages from the queue(bank.transfers)'',                    #consumer_tag - Consumer identifierfalse,                 #no_local - Don't receive messages published by this consumerfalse,                 #no_ack - false acks turned on, - true turned off.  send a proper acknowledgment from the worker, once we're done with a taskfalse,                 #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse,                 #nowait - false Don't wait for a server response[$this, 'callback1']   #callback - A PHP callback);$this->channel->queue_declare('bank.transfers2', #queue - Should be unique in direct exchangefalse,             #passive - false  Don't check if a queue with the same name existstrue,              #durable - true The queue will survive(exist) server restartsfalse,             #exclusive - false The queue might be accessed by other channelsfalse              #auto_delete - false The queue won't be deleted once the channel is closed);$this->channel->exchange_declare('corp2.fanout', #exchange - That is the exchange(corp.direct)'fanout',       #type - That is the type(direct) of exchange(corp.direct)false,          #passive - false Don't check if a exchange with the same name existstrue,           #durable - true The exchange will survive(exist) server restartsfalse           #auto_delete - false The exchange won't be deleted once the channel is closed);$this->channel->queue_bind('bank.transfers2', 'corp2.fanout');$this->channel->basic_consume('bank.transfers2',     #queue - get the messages from the queue(bank.transfers)'',                    #consumer_tag - Consumer identifierfalse,                 #no_local - Don't receive messages published by this consumerfalse,                 #no_ack - false acks turned on, - true turned off.  send a proper acknowledgment from the worker, once we're done with a taskfalse,                 #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse,                 #nowait - false Don't wait for a server response[$this, 'callback2']   #callback - A PHP callback);// 'Consuming from queue';# Loop as long as the channel has callbacks registered# After 10 seconds there will be a timeout exception# $channel->wait(null, false, 10)while(count($this->channel->callbacks)) {// 'Waiting for incoming messages'$this->channel->wait();}$this->channel->close();$this->connection->close();}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback1(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount;        $status = $account->status;$user_to = $account->user_to;       $_isSuccess = 1;if($account->status == 0){$status = 1;$db2 = Db::connect('db2');$db2->startTrans();            try{$cnt = $db2->query('SELECT COUNT(*) AS cnt FROM think_message_apply a WHERE a.msg_id = ?',[$msg_id]);if ($cnt[0] == ['cnt'=>0]) {$db2->execute('INSERT INTO think_message_apply(msg_id,user_from,amount,status,user_to)VALUES(?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);}$db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;}}if($_isSuccess == 1 && $account->status == 0){$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg,                   #msg'',                     #exchange$req->get('reply_to')   #routing_key);//回复一条入账成功的消息给生产者(消息发送者)            /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');//确认一条回复消息已经发送} else {$status = 0;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg,                   #msg'',                     #exchange$req->get('reply_to')   #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');}}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback2(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount;        $status = $account->status;$user_to = $account->user_to;       $_isSuccess = 1;if($account->status == 1){$status = 2;$db2 = Db::connect('db2');$db2->startTrans();try{$db2->execute('UPDATE think_account a SET a.amount = a.amount + ? WHERE a.user_id = ?',[$amount,$user_to]);                $db2->execute('UPDATE think_message_apply b SET b.STATUS = ? WHERE b.msg_id = ?',[$status,$msg_id]);               $db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;                }}if($_isSuccess == 1 && $account->status == 1){$status = 2;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg,                   #msg'',                     #exchange$req->get('reply_to')   #routing_key);//回复一条入账成功的消息给生产者           /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送} else {$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg,                   #msg'',                     #exchange$req->get('reply_to')   #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送}}}

直接贴生产者的实现代码 think\apps\index\controller\Producer.php:

<?php
/** 作者:zhengzizhi@126.com* 日期:二O一七年 七夕节*/
namespace app\index\controller;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\Request;
use think\Db;
use PDO;class Producer
{private $connection;private $channel;private $callback1_queue;private $callback2_queue;private $pass_msg1 = true;private $pass_msg2 = true;/*** @var string*/private $response;private $waiting1;private $waiting2;/*** @var string*/private $corr_id;private $suffix1 = ['01','02','03','04','05','06','07','08','09'];private $suffix2 = ['10','20','30','40','50','60','70','80','90'];public function __construct() {$this->connection = new AMQPStreamConnection('192.168.10.105', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();}/*** @return string* * [root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps* [root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data* [root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv* [root@contoso ~]# cat /dev/null > /home/myth/www/think/apps/bank-data/data.csv** [root@mariadbxxx ~]# cat /dev/null > /var/log/mariadb/queries.log && cat /dev/null > /var/log/mariadb/mariadb-slow.log && cat /dev/null > /var/log/mariadb/mariadb-error.log* [root@mariadbxxx ~]# mysql -uroot -p123456 -h127.0.0.1 -e "reset master"*  * GET http://contoso.org/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2* * [root@rabbitmq1 ~]# rabbitmqadmin list bindings* [root@rabbitmq1 ~]# rabbitmqadmin list queues* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10* * [myth@contoso ~]$ ab -r -t 7200 -s 7200 -k -n 100000 -c 500 "http://contoso.org/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"*/public function transfer(Request $request){$this->response = null;$msg_id = session_create_id();//uniqid();    //$index = random_int(1,9);/** $this->corr_id has a value like 53e26b393313a*/$this->corr_id = $msg_id;$user_from = $request->param('account.user_from');$amount = $request->param('account.amount');$status = $request->param('account.status');$user_to = $request->param('account.user_to');$account = ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,];       $this->channel->exchange_declare("corp1.fanout",'fanout',false,true,false);list($this->callback1_queue,, ) = $this->channel->queue_declare('',     #queue $msg_id.$this->suffix1[$index]false,  #passivetrue,   #durabletrue,   #exclusivefalse #auto delete);$this->channel->queue_declare("bank.transfers1",  #queuefalse,             #passivetrue,              #durablefalse,             #exclusivefalse              #auto delete);$this->channel->queue_bind('', "corp1.fanout");$this->channel->queue_bind("bank.transfers1", "corp1.fanout");/** create a message with two properties: reply_to, which is set to the* callback queue and correlation_id, which is set to a unique value for* every request*/$msg1 = new AMQPMessage(json_encode($account),                                                          #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback1_queue,'delivery_mode' => 2,] #properties);/*** Publishes a message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$this->channel->basic_publish($msg1,                  #message"corp1.fanout",         #exchange"bank.transfers1",      #routing keytrue,                   #mandatoryfalse);$this->channel->basic_consume($this->callback1_queue,  #queue'',           #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse,           #no localfalse,             #no ackfalse,           #exclusivefalse,            #no wait[$this, 'onCallback1']  #callback);$this->waiting1 = false;while(!$this->waiting1) {$this->channel->wait();}if($this->pass_msg1 == false){$this->channel->close();$this->connection->close();return $this->response;}$this->channel->exchange_declare("cor2.fanout",'fanout',false,true,false);list($this->callback2_queue,, ) = $this->channel->queue_declare('',     #queue $msg_id.$this->suffix2[$index]false,  #passivetrue,   #durabletrue,   #exclusivefalse #auto delete);$this->channel->queue_declare("bank.transfers2",   #queuefalse,               #passivetrue,                #durablefalse,               #exclusivefalse                #auto delete);$this->channel->queue_bind('', "corp2.fanout");$this->channel->queue_bind("bank.transfers2", "corp2.fanout");$account['status'] = 1;$msg2 = new AMQPMessage(json_encode($account),                                                             #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback2_queue,'delivery_mode' => 2,] #properties);$this->channel->basic_publish($msg2,                  #message"corp2.fanout",         #exchange "bank.transfers2",      #routing keytrue,                   #mandatoryfalse);$this->channel->basic_consume($this->callback2_queue,  #queue'',           #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse,           #no localfalse,             #no ackfalse,           #exclusivefalse,            #no wait[$this, 'onCallback2']  #callback);$this->waiting2 = false;while(!$this->waiting2) {$this->channel->wait();}$this->channel->close();$this->connection->close();return $this->response;}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback1(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) { $this->waiting1 = true;$body = json_decode($rep->body);if($body->success == true){$msg_id = $body->data->msg_id;$user_from = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;$user_to = $body->data->user_to;if($status == 1){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('INSERT  INTO think_message_supply(msg_id,user_from,amount,status,user_to) VALUES (?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);$db1->commit();} catch (\Exception $e){$db1->rollback();$this->response = json(['success'=>0,'msg'=>"First Transaction db1 has failed."]);$this->pass_msg1 = false;}} } else {$this->response =  json(['success'=>0,'msg'=>"First Transaction db2 has failed."]);$this->pass_msg1 = false;}}}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback2(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->waiting2 = true;$body = json_decode($rep->body);if( $body->success == true){ $msg_id = $body->data->msg_id;$user_id = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;if($status == 2){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('UPDATE think_message_supply a SET a.status = ? WHERE a.msg_id = ?',[$status,$msg_id]);$db1->execute('UPDATE think_account b SET b.amount = b.amount - ? WHERE b.user_id = ?',[$amount,$user_id]);$db1->commit();} catch (\Exception $e){$db1->rollback();file_put_contents(APP_PATH.'bank-data/data.csv', $rep->body.'  ##  '.date('Y-m-d H:i:s',time()).PHP_EOL, FILE_APPEND);$this->response =  json(['success'=>1,'msg'=>"Avoid to rededuct money,now you must contact our customer."]);$this->pass_msg2 = false;}if($this->pass_msg2 == true){$this->response = json(['success'=>1,'msg'=>'Bank Transaction is successfull.']);}} } else {$this->response =  json(['success'=>0,'msg'=>"Second Transaction db2 has failed."]);}}}}

看看基于实时消息的分布式数据库代码的执行流程吧,很详细直观,按照箭头标识的圆圈序号阅读实现流程:

图片比较大显示到网页页面缩小了比例,导致不清晰,请将如下2张流程图复制到画图工具里面阅读

顺便提一下这些文件里的参数值都跟系统的性能密切相关

[root@contoso ~]# ll /proc/sys/net/ipv4
total 0
-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_cache_bucket_size
-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_cache_enable
-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_rbm_optfmt
-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_rbm_strictvalid
dr-xr-xr-x 1 root root 0 Aug 28 22:24 conf
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_echo_ignore_all
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_echo_ignore_broadcasts
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_errors_use_inbound_ifaddr
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ignore_bogus_error_responses
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ratelimit
-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ratemask
-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_max_memberships
-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_max_msf
-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_qrv
-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_maxttl
-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_minttl
-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_threshold
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_default_ttl
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_dynaddr
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_early_demux
-rw-r--r-- 1 root root 0 Aug 28 22:24 ip_forward
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_forward_use_pmtu
-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_high_thresh
-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_low_thresh
-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_max_dist
-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_secret_interval
-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_time
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_local_port_range
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_local_reserved_ports
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_nonlocal_bind
-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_no_pmtu_disc
dr-xr-xr-x 1 root root 0 Aug 29 21:50 neigh
-rw-r--r-- 1 root root 0 Aug 29 21:50 ping_group_range
dr-xr-xr-x 1 root root 0 Aug 29 21:50 route
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_abort_on_overflow
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_adv_win_scale
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_allowed_congestion_control
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_app_win
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_autocorking
-r--r--r-- 1 root root 0 Aug 29 21:50 tcp_available_congestion_control
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_base_mss
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_challenge_ack_limit
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_congestion_control
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_dsack
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_early_retrans
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_ecn
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fack
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fastopen
-rw------- 1 root root 0 Aug 29 21:50 tcp_fastopen_key
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fin_timeout
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_frto
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_invalid_ratelimit
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_intvl
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_probes
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_time
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_limit_output_bytes
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_low_latency
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_orphans
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_ssthresh
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_syn_backlog
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_tw_buckets
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_mem
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_min_tso_segs
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_moderate_rcvbuf
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_mtu_probing
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_no_metrics_save
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_notsent_lowat
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_orphan_retries
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_reordering
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retrans_collapse
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retries1
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retries2
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_rfc1337
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_rmem
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_sack
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_slow_start_after_idle
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_stdurg
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_synack_retries
-rw-r--r-- 1 root root 0 Aug 28 22:24 tcp_syncookies
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_syn_retries
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_thin_dupack
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_thin_linear_timeouts
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_timestamps
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tso_win_divisor
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tw_recycle
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tw_reuse
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_window_scaling
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_wmem
-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_workaround_signed_windows
-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_mem
-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_rmem_min
-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_wmem_min
-rw-r--r-- 1 root root 0 Aug 29 21:50 xfrm4_gc_thresh
[root@contoso ~]#

cat /proc/sys/net/ipv4/tcp_syn_retries        ##默认6
cat /proc/sys/net/ipv4/tcp_synack_retries    ##默认5
cat /proc/sys/net/ipv4/tcp_keepalive_time    ##默认7200
cat /proc/sys/net/ipv4/tcp_keepalive_probes    ##默认9
cat /proc/sys/net/ipv4/tcp_keepalive_intvl    ##默认75
cat /proc/sys/net/ipv4/tcp_retries2        ##默认15
cat /proc/sys/net/ipv4/tcp_fin_timeout        ##默认60
cat /proc/sys/net/ipv4/tcp_max_tw_buckets    ##默认65536
cat /proc/sys/net/ipv4/tcp_tw_recycle        ##默认0
cat /proc/sys/net/ipv4/tcp_tw_reuse        ##默认0
cat /proc/sys/net/ipv4/tcp_max_orphans        ##默认65536
cat /proc/sys/net/ipv4/tcp_syncookies        ##默认0
cat /proc/sys/net/ipv4/tcp_max_syn_backlog    ##默认512
cat /proc/sys/net/ipv4/tcp_wmem            ##默认4096    16384   4194304
cat /proc/sys/net/ipv4/tcp_rmem            ##默认4096    87380   6291456
cat /proc/sys/net/ipv4/tcp_mem            ##默认378486  504649  756972
cat /proc/sys/net/ipv4/ip_local_port_range    ##默认32768   60999

以下几项在开启防火墙的情况下可以配置
systemctl start firewalld    这是启动防火墙服务

cat /proc/sys/net/ipv4/ip_conntrack_max        ##默认
cat /proc/sys/net/ipv4/netfilter.ip_conntrack_max    ##默认
cat /proc/sys/net/ipv4/netfilter.ip_conntrack_tcp_timeout_established    ##默认
cat /proc/sys/net/ipv4/net.core.somaxconn    ##默认
cat /proc/sys/net/ipv4/net.core.netdev_max_backlog    ##默认

优化系统内核参数的配置如下,前面已经配置过了,此处就不要再执行下面命令了,cat 编辑配置文件比 vim方便多了,超级喜欢这样玩,

再集中的贴一遍,哪天需要看看这个也比较集中:

[root@contoso ~]# cat > /etc/sysctl.conf
# sysctl settings are defined through files in
# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.
#
# Vendors settings live in /usr/lib/sysctl.d/.
# To override a whole file, create a new file with the same in
# /etc/sysctl.d/ and put new settings there. To override
# only specific settings, add a file with a lexically later
# name in /etc/sysctl.d/ and put new settings there.
#
# For more information, see sysctl.conf(5) and sysctl.d(5).
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15
net.ipv4.tcp_retries2 = 5
net.ipv4.tcp_fin_timeout = 2
net.ipv4.tcp_max_tw_buckets = 36000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_max_orphans = 32768
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_wmem = 8192 131072 16777216
net.ipv4.tcp_rmem = 32768 131072 16777216
net.ipv4.tcp_mem = 786432 1048576 1572864
net.ipv4.ip_local_port_range = 1024  65000

[root@contoso ~]# sysctl -p

[root@contoso ~]# cat > /etc/security/limits.conf
# /etc/security/limits.conf
#
#This file sets the resource limits for the users logged in via PAM.
#It does not affect resource limits of the system services.
#
#Also note that configuration files in /etc/security/limits.d directory,
#which are read in alphabetical order, override the settings in this
#file in case the domain is the same or more specific.
#That means for example that setting a limit for wildcard domain here
#can be overriden with a wildcard setting in a config file in the
#subdirectory, but a user specific setting here can be overriden only
#with a user specific setting in the subdirectory.
#
#Each line describes a limit for a user in the form:
#
#<domain>        <type>  <item>  <value>
#
#Where:
#<domain> can be:
#        - a user name
#        - a group name, with @group syntax
#        - the wildcard *, for default entry
#        - the wildcard %, can be also used with %group syntax,
#                 for maxlogin limit
#
#<type> can have the two values:
#        - "soft" for enforcing the soft limits
#        - "hard" for enforcing hard limits
#
#<item> can be one of the following:
#        - core - limits the core file size (KB)
#        - data - max data size (KB)
#        - fsize - maximum filesize (KB)
#        - memlock - max locked-in-memory address space (KB)
#        - nofile - max number of open file descriptors
#        - rss - max resident set size (KB)
#        - stack - max stack size (KB)
#        - cpu - max CPU time (MIN)
#        - nproc - max number of processes
#        - as - address space limit (KB)
#        - maxlogins - max number of logins for this user
#        - maxsyslogins - max number of logins on the system
#        - priority - the priority to run user process with
#        - locks - max number of file locks the user can hold
#        - sigpending - max number of pending signals
#        - msgqueue - max memory used by POSIX message queues (bytes)
#        - nice - max nice priority allowed to raise to values: [-20, 19]
#        - rtprio - max realtime priority
#
#<domain>      <type>  <item>         <value>
#

#*               soft    core            0
#*               hard    rss             10000
#@student        hard    nproc           20
#@faculty        soft    nproc           20
#@faculty        hard    nproc           50
#ftp             hard    nproc           0
#@student        -       maxlogins       4

# End of file

* soft nofile 65536
* hard nofile 65536

[root@contoso ~]#  ulimit -n 65536

接下来配置httpd 服务器的最大并发数,前面已经配置过了,此处跳过配置httpd的最大并发数:

[root@contoso ~]# cat  > /etc/httpd/conf.modules.d/00-mpm.conf
# Select the MPM module which should be used by uncommenting exactly
# one of the following LoadModule lines:

# prefork MPM: Implements a non-threaded, pre-forking web server
# See: http://httpd.apache.org/docs/2.4/mod/prefork.html
LoadModule mpm_prefork_module modules/mod_mpm_prefork.so
<IfModule mpm_prefork_module>
  StartServers        100
  MinSpareServers     100
  MaxSpareServers     120  
  ServerLimit         3000
  MaxClients          3000
  MaxRequestsPerChild 500000
</IfModule>
# worker MPM: Multi-Processing Module implementing a hybrid
# multi-threaded multi-process web server
# See: http://httpd.apache.org/docs/2.4/mod/worker.html
#
#LoadModule mpm_worker_module modules/mod_mpm_worker.so

# event MPM: A variant of the worker MPM with the goal of consuming
# threads only for connections with active processing
# See: http://httpd.apache.org/docs/2.4/mod/event.html
#
#LoadModule mpm_event_module modules/mod_mpm_event.so

[root@contoso ~]#

创建数据库db1,初始化测试数据表

CREATE DATABASE IF NOT EXISTS `db1` DEFAULT CHARACTER SET utf8 ;

USE `db1`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (
  `user_id` int(4) NOT NULL,
  `amount` decimal(18,2) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert  into `think_account`(`user_id`,`amount`) values (1,'9999851572743.00'),(3,'9999999999999.00'),(5,'9999999999999.00');

DROP TABLE IF EXISTS `think_message_supply`;

CREATE TABLE `think_message_supply` (
  `msg_id` varchar(40) NOT NULL,
  `user_from` int(11) DEFAULT NULL,
  `amount` decimal(18,2) DEFAULT NULL,
  `status` tinyint(4) DEFAULT 0,
  `user_to` int(11) DEFAULT NULL,
  `time` timestamp NULL DEFAULT current_timestamp(),
  PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建数据库db2,初始化测试数据表

CREATE DATABASE IF NOT EXISTS `db2` DEFAULT CHARACTER SET utf8 ;

USE `db2`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (
  `user_id` int(4) NOT NULL,
  `amount` decimal(18,2) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert  into `think_account`(`user_id`,`amount`) values (2,'0.00'),(4,'500.00'),(6,'500.00');

DROP TABLE IF EXISTS `think_message_apply`;

CREATE TABLE `think_message_apply` (
  `msg_id` varchar(40) NOT NULL,
  `user_from` int(11) DEFAULT NULL,
  `amount` decimal(18,2) DEFAULT NULL,
  `status` tinyint(4) DEFAULT 0,
  `user_to` int(11) DEFAULT NULL,
  `time` timestamp NULL DEFAULT current_timestamp(),
  PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

ab工具的压力测试:

首先在主机192.168.10.20上用root账户登录,启动消费者监听,以下是启动命令:

[root@contoso ~]# cd /home/myth/www/think && php public/index.php index/Consumer/listen

测试500个并发,可能有点高了在虚拟机环境里,因为有请求丢失了(ab 工具因为socket接收数据出现错误,关闭了出现错误的请求连接,相当于这条请求没发送成功)

[myth@contoso ~]$ ab  -r -t 7200 -s 7200 -k -n 100000 -c 500 "http://contoso.org/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"

查看一下消息服务器 队列瞬间执行的情形:

看看系统的实时性能吧

看看启动的httpd进程个数吧(没有直接统计,而是把SecureCRT终端输出的全部httpd进程贴到NotePad++里滚屏截图(FastStone Capture)打印的) 进程总的个数 = 第623行  -  第3行  +  1  =   621个 httpd进程      空闲httpd个数是100个 + 500个并发 + 默认启动的1个httpd  =  刚好等于621个httpd进程

删掉默认消息服务器上的所有数据,重新创建一个与如同新装后一摸一样的默认消息服务器:

[root@rabbitmq1 ~]# rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'
Deleting vhost "/"
Creating vhost "/"
Setting permissions for user "guest" in vhost "/"
[root@rabbitmq1 ~]# systemctl restart rabbitmq-server && rabbitmqadmin list exchanges
+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
|                    | direct  |
| amq.direct         | direct  |
| amq.fanout         | fanout  |
| amq.headers        | headers |
| amq.match          | headers |
| amq.rabbitmq.log   | topic   |
| amq.rabbitmq.trace | topic   |
| amq.topic          | topic   |
+--------------------+---------+
[root@rabbitmq1 ~]#

读写文件的用户是httpd的用户apache  所以data.csv文件的读写权限是分配给apache用户的

如果你够细心runtime文件夹的读写权限也是apache 连root都无法写操作 要留意哦,PHP源码注释里我写这些东西不为了好看才写的,

有经验的你看到这个就要知道这是必须要执行终端命令

[root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps
 [root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data
 [root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv
  
 GET http://contoso.org/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2
 
 [root@rabbitmq1 ~]# rabbitmqadmin list bindings
 [root@rabbitmq1 ~]# rabbitmqadmin list queues
 [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30
 [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10

跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现相关推荐

  1. 理论修炼之RabbitMQ,消息队列服务的稳健者

    ????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...

  2. Rabbitmq延迟消息队列服务启动报错

    报错 报错信息如下: Caused by: java.io.IOException: nullat com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne ...

  3. ①RabbitMQ 消息中间件/消息队列、单节点、集群、镜像集群

    文章目录 RabbitMQ 消息中间件/消息队列 1.消息中间件 1.简介 2.作用 消息中间件的两种模式 P2P模式 Rabbitmq Pub/Sub模式(发布/订阅:Topic,可以重复消费) K ...

  4. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  5. RabbitMQ#RabbitMQ+Haproxy消息队列集群和代理部署

    文章目录 一.消息队列/中间件 1.RabbitMQ本质上起到的作用就是削峰填谷 2.MQ简介(RabbitMQ比Kafka) 3.MQ消息队列的分类 二.RabbitMQ介绍(端口15672) 1. ...

  6. 滴滴出行基于RocketMQ构建企业级消息队列服务的实践

    \n 本文整理自滴滴出行消息队列负责人 江海挺 在Apache RocketMQ开发者沙龙北京站的分享. \n \n 滴滴出行的消息技术选型 \n 历史 \n 初期,公司内部没有专门的团队维护消息队列 ...

  7. golang 基于文件的消息队列 ---> diskqueue

    前言: 提到消息队列,首先想到:rabbit.kafka.redis/cordis.zeromq这种分布式的消息队列,基于内存缓存和服务发现算法,跨节点的这种消息发布订阅机制. 有时候的需求可能比较简 ...

  8. activemq消息丢失_基于Redis实现消息队列的典型方案

    基于Redis实现消息队列典型方案 1 概述 2 基于List的 LPUSH+BRPOP 的实现 3 PUB/SUB,订阅/发布模式 4 基于SortedSet有序集合的实现 5 基于 Stream ...

  9. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  10. rabbitmq 消费端代码获取队列名称_C#调用RabbitMQ实现消息队列的示例代码

    前言 我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队 ...

最新文章

  1. 生成从A到Z这个一个字符序列
  2. python中rim的用法_Python笔记(六)_函数
  3. 以Java的视角来聊聊BIO、NIO与AIO的区别
  4. Scala代码案例:统计三个班成绩情况,每个班有5名同学,求出各个班的平均分和所有班级的平均分
  5. MongoDB 常用运算符
  6. .NET面试题系列(七)IIS
  7. ModuleNotFoundError: No module named ‘torch.utils.serialization‘
  8. Java LinkedList对象的clone()方法和示例
  9. c++11 stl 学习之 shared_ptr
  10. hdu 1978 How many ways(dp)
  11. 电影图标:杀死比尔(Kil Bill)
  12. matlab中计算sinad,图解射频天线指标,秒懂!
  13. linux lamp php5.4一键安装包,LAMP一键安装包(linux+apache+mysql+php)
  14. 【Java愚公】gitlab关闭注册功能
  15. JavaScript 设计模式学习第七篇- 单例模式
  16. 今天,IT女神们是怎么度过的(文末送礼品)
  17. Android当方法总数超过64K时(Android Studio)
  18. 利用AIDA64对电脑进行简单的稳定性测试的技巧分享
  19. Sequoia DB数据库操作
  20. 通信原理 | 通信的基本概念和通信系统的组成

热门文章

  1. python extension c++ 扩展
  2. 桥接模式---Bridge
  3. Jsonp解决跨域问题原理
  4. 郭盛华年收入5000万是真的吗?
  5. 0302、DNS服务器、多区域的DNS服务器、DNS主从架构、DNS主从数据同步、特殊解析、缓存DNS
  6. NTC 热敏电阻温度计算
  7. c语言数组作业题,数组练习题
  8. R 聚类热图-数据的标准化
  9. 老狗——python求中位数
  10. 计算机内存不足应该换什么,电脑内存不够用了怎么办