异步写介绍

概览

许多应用使用关系型数据库来持久化数据。这种方式有数据延迟和瓶颈问题。GemFire使用Write-Behind技术,以内存的速度访问数据,以异步的形式把数据更新到数据源。

在Write-Behind模式中,更新缓存条目异步地写到后台数据库。这一特性,GemFire使用了独创的队列实现机制,Gateway Queue,网关队列能够关联多个数据区域。网关封装了一个高可靠的队列实现机制,线程根据数据批处理大小,和特定的应用回调来处理队列更新。

GemFire的Gateway Queue 技术本身用来对跨WAN网的分布式缓存数据进行传输和实时更新。跨WAN网的远程集群和本地关系型数据库,GemFire传输数据的方式相同,都是把小量数据,快速更新合并为一个大的数据块。利用TCP/IP 保证可靠传输,跨WAN网异步数据的速率和本地系统同步数据的速率基本相同。

传统关系型数据库基于磁盘的数据传输特点是大数据块/高延时/高吞吐,而GemFire Write-Behind技术是基于内存的,数据传输特点是小数据块/快速更新/低延时/高吞吐,利用不断传输小数据块,持续更新数据来达到低延时和高吞吐量。

GemFire Gateway Queue技术通过维护队列实例,并行地队列化缓存更新,消息周期性地与主备网关队列实例通信,来投递数据更新。

在故障切换方面,如果某个队列更新出现问题,则GemFire可以把更新切换到备用队列上,从而不影响应用正常使用。可以指定多个备用队列来达到队列冗余的目的。

GemFire  Write-behind 特性总结:

1. 异步更新数据到外部数据存储来提高应用的性能。应用直接与数据库进行交互,可能会受到数据库事务控制的影响而降低应用的性能。

2. GemFire Gateway Queue提供高可靠,保证达到零数据丢失,外部数据源故障丝毫不影响应用正常运行。

3. 比关系型数据库支持更多的并发用户,扩展“大事务更新”吞吐量,增强应用的能力。

4.    通过合并来减少数据库负载。相同键的多次更新能够合并成一次更新,只有最后的数据条目写入数据库中。

Gateways通过gateway hub来创建。一个Hub负责管理一个到多个Gateways。你可以配置从多个分布式系统来接受复制事件。比如,一个gateway负责队列化更新事件,把更新事件写到数据库中,同时,另一个gateway负责把更新事件传到另一个数据库中。

每一个gateway管理一个队列。队列被2个以上节点维护,主网关Hub或者多个备网关Hub都有自己的队列。这些队列可以配置到内存中,或者部分溢出到磁盘中,或者所有全部溢出到磁盘中。当然了,这种磁盘写,对于GemFire是没有瓶颈的,我们通过异步磁盘写模式来维护我们的磁盘文件。

GemFire队列溢出磁盘只是内存的一个扩展。与关系型数据库的做法是不同的,关系型数据库把数据写到物理磁盘上,而RondomAccessFile缓冲区更新是永久删除的。GemFire也支持内存数据拷贝备份到磁盘上,来保障高可用。即使在最差的应用场景中,应用选择同步磁盘锁来备份Gateway Queue 状态,GemFire架构也仍然有非常强大的优势: GemFire集群使用各个节点的本地磁盘,这样就给了整个集群聚合的吞吐能力。

在冗余方面,GemFire自动切换队列的故障,通过把当前队列的条目拷贝到备份节点来达到队列的冗余。GemFire会周期性地在备份节点重建冗余队列。当主队列由于异常情况不能使用,备份队列马上接管成为主队列,这一期间是没有数据丢失的。

由于一些技术人员可能不熟悉HA模式:当故障切换期间,在备用队列的一些事件,可能已经被处理了。GemFire标识了这些可能的重复事件允许应用回调。比如,如果事件是一个’create’通告,插入一条记录到数据库,如果事件被标记为’重复’,那么数据库监听器可能忽略’primary key violation’。

当外部数据源长时间不可用,那么持久化是必须的操作。这样当集群宕机或关闭时,也不会出现更新事件丢失的情况,也可以消去不断增长的队列,避免内存溢出。

异步写实现 – 代码和配置

应用可以在任一节点配置Gateway Hub。每一个数据区域能够配置启用,路由事件到Gateway Hub。多Hub被用来并行分发事件到外部数据源。比如,新的订单能够被并行路由到生产端。

所有相关的数据区域能配置使用一个Hub在外部数据源保存事件更新,避免数据完整性冲突。

Gateway listener

GatewayEventLister 接口定义了一个方法: public boolean processEvents(List events);

List events 包含GatewayEvents。

监听器接口实现

下面是一个GatewayEventLister的简单实现。

package com.example.listener;
import com.example.common.Data;
import com.example.db.DBPersistence;import com. gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.util.GatewayEvent;
import com.gemstone.gemfire.cache.util.GatewayEventListener;
import com.gemstone.gemfire.cache.Operation;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class TestGatewayListener implements GatewayEventListener, Declarable {public boolean processEvents(List events) {for (Iterator i = events.iterator(); i.hasNext(); ) {GatewayEvent ge = (GatewayEvent) i.next();process(ge);}return true;}public void init(Properties p) {}protected void process(GatewayEvent ge) {String key = (String) ge.getKey();Data data = (Data) ge.getDeserializedValue();PreparedStatement pstmt=null;Connection conn = DBPersistence.getConnection();try {if ( ge.getOperation().equals(Operation.UPDATE)){pstmt = conn.prepareStatement("Update Data set value = ?, price = ? where id = ?");pstmt.setString(1, data.getValue());pstmt.setInt(2, data.getPrice());pstmt.setString(3, key);}else if ( ge.getOperation().equals(Operation.CREATE)){pstmt = conn.prepareStatement("Insert Data (id, value, price) values (?, ?, ?)");pstmt.setString(1, key);pstmt.setString(2, data.getValue());pstmt.setInt(3, data.getPrice());}else if ( ge.getOperation().equals(Operation.DESTROY)){pstmt = conn.prepareStatement("Delete from Data where key =>");pstmt.setString(1, key);}if ( pstmt != null){pstmt.execute();pstmt.close();}}catch (SQLException se) {se.printStackTrace();throw new CacheWriterException ( se );}}}

以上的GatewayEventListener接受回调,和操作类型,根据这些信息来更新数据库。这个事件接受一个包含键值对的参数。

Data Class定义如下:

package com.example.common;import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;public class Data implements DataSerializable {private int id;private String value;private int price;public Data()     {}public Data(int i, String s, int p){id = i;value = s;price =p;}public void setId(int i){id=i;}public int getId(){return id;}public void setValue(String s){value = s;}public String getValue(){return value;}public int getPrice(){return price;}public void setPrice(int p){price=p;}public void fromData(DataInput in) throws IOException, ClassNotFoundException{try {id = DataSerializer.readPrimitiveInt ( in );value = DataSerializer.readString(in);price = DataSerializer.readPrimitiveInt(in);} catch ( Exception e ) {System.out.println ( "Deserialize Error: " + e );e.printStackTrace();}}public void toData(DataOutput out) throws IOException{DataSerializer.writePrimitiveInt(id, out);DataSerializer.writeString(value, out);DataSerializer.writePrimitiveInt(price, out);}
}

配置网关

Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("DBWriterHub", \-1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway = hub.addGateway("DBWriter");
gateway.addListener(new TestGatewayListener());
 
// Start the gateway hub using the start method.
hub.start();

数据区域路由事件到Gateway Hubs:

RegionAttributes ra = AttributesFactory.create();ra.setEnableGateway(true);
ra.setGatewayHubId("DBWriterHub");
 
... set other attributes and create the region ...

XML配置

<gateway-hub id="DBWriterHub" port="-1" startup-policy="primary"><gateway id="DBWriter"><gateway-listener><class-name>com.example.listener.TestGatewayListener</class-name></gateway-listener></gateway>
</gateway-hub>

为缓存配置多Gateway Hubs

多Gateway Hubs能够在单缓存中定义。这一特性关联'GatewayHub Startup Policy'和'Region to GatewayHub Distribution',跨多个VMs来传播Gateway负载。

代码配置

Cache cache = Cache.getAnyInstance();GatewayHub hub1 = cache.addGatewayHub("OrderDBWriter", \-1);
hub1.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway1 = hub1.addGateway("OrderDBWriter");
gateway1.addListener(new OrderDBGatewayListener());
hub1.start();
 
GatewayHub hub2 = cache.addGatewayHub("ProductDBWriter", \-1);
hub2.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway2 = hub2.addGateway("ProductDBWriter");
gateway2.addListener(new ProductDBGatewayListener());
hub2.start();

XML配置

<gateway-hub id="OrderDBWriter" port="-1" startup-policy="primary">
   <gateway id="OrderDBWriter">
      <gateway-listener>
         <class-name>OrderDBGatewayListener</class-name>
      </gateway-listener>
   </gateway>
</gateway-hub>
 
<gateway-hub id="ProductDBWriter" port="-1" startup-policy="secondary">
   <gateway id="ProductDBWriter">
     <gateway-listener>
        <class-name>ProductDBGatewayListener</class-name>
     </gateway-listener>
   </gateway>
</gateway-hub>

GatewayHub 启动策略

GatewayHubs有三种启动策略: primary, secondary 和 none。启动策略none 是缺省的。在这个模式中,GatewayHub尝试获得一个特定的分布式锁。如果成功,那么就是 primary。如果不成功,就变成secondary。如果GatewayHub被设定成primary,尝试获得分布式锁。如果失败的话就抛出GatewayException。GatewayHub在这种情况下不启动。如果GatewayHub被设定成secondary,尝试获得分布式锁。如果锁已经被其他GatewayHub获得,他就释放这个锁,继续获得锁,每60秒一次。如果失败的话就抛出GatewayException。在60秒期间,GatewayHub的状态是secondary。

代码配置


Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("OrderDBWriter", -1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway = hub.addGateway("OrderDBWriter");
gateway.addListener(new OrderDBGatewayListener());
hub.start();

XML配置

区域到GatewayHub 分布

一个区域能够指定GatewayHub去分布他的事件。使用’hub-id’属性。如果’hub-id’区域属性被设定,然后任何的Region事件被分布到带有此hub-id的GatewayHub上。这样允许负载在不同的VM上传播。比如,区域1能够配置成分布事件到GatewayHub1,区域2能够配置成分布事件到GatewayHub2。如果GatewayHub1在VM1上是primary的,GatewayHub1在相同VM上是secondary的,同时在VM2上正好相反,那么VM1将要分布区域1事件,VM2将要分布区域2事件。

代码配置

Cache cache = Cache.getAnyInstance();
AttributesFactory factory = new AttributesFactory();
factory.setGatewayHubId("OrderDBWriter");
RegionAttributes attributes = factory.createRegionAttributes();
Region region = cache.createRegion("Orders", attributes);

XML配置

<region name="Orders"><region-attributes scope="local" enable-gateway="true" hub-id="OrderDBWriter"/>
</region>

同步写

当一个用户请求一个数据条目时,比如条目key1,在内存中已经不存在了,如果同步读开启了,GemFire将要从外部数据源载入需要的条目key1。我们通过在区域上定义一个data loader 来开启同步读数据功能。Loader 在get操作中调用,把值返回给调用线程。Cache loader行为的依赖于区域的类型。

在分区区域中的数据载入

由于要处理大量数据,已分区的区域支持分区载入数据。每一个cache loader只载入本地分区定义的数据。如果配置了数据冗余,只要本地分区持有主拷贝那么数据就能载入。分区载入需要Cache Loader安装在每一个分区上。

如果你使用JDBC连接,那么每一个分区必须连接到数据源。如图1所示,三个成员需要三个JDBC连接,此图与图2做比较,三个成员共享一个JDBC连接。

分区需要更多的JDBC连接

在非分区区域中的数据载入

在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。

在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。

在非分区区域中的数据载入

在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。

在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。

图2 非分区区域中的Cache Loader

带有Local Scope区域的数据载入

带有Local Scope区域中,Cache Loader只在定义的成员中可用。无论值是否在本地缓存中存在Loader都被调用。

实现 Cache Loader

实现Cache Loader需要在cache.xml 文件中声明Loader。或者在代码中实现com.gemstone.GemFire.cache.Declarable。

CacheLoader 接口有一个单独的回调方法,load,你的应用在任何时候都可以调用这个方法来从缓存外边查询数据。有关更详细的信息,请看相关Java 文档。但是你不能在load()方法中直接调用Region方法。这样做将导致Cache Loader 阻塞,这样会损伤分布式系统的性能。

XML代码

<region name="exampleRegion"><region-attributes><cache-loader><class-name>quickstart.SimpleCacheLoader</class-name></cache-loader></region-attributes>
</region>

代码实现

public class SimpleCacheLoader implements CacheLoader, Declarable {PreparedStatement pstmt=null;Connection conn = null;public Object load(LoaderHelper helper) {String key = (String) helper.getKey();System.out.println(" Loader called to retrieve value for " + key);ResultSet rs = null;// Create a value using the suffix number of the key (key1, key2, etc.)try {pstmt = conn.prepareStatement("Select Data where id = ?");pstmt.setString(1, key);rs = pstmt.execute();} catch (SQLException se) {se.printStackTrace();throw new CacheLoaderException ( se );}return rs;}public void close() { // do nothing }public void init(Properties props) {conn = DriverManager.getConnection(URL, props);}
}

带有数据库连接的CacheLoader 声明

<cache-loader><class-name>com.company.data.DatabaseLoader</class-name><parameter name="URL"><string>jdbc:cloudscape:rmi:MyData</string></parameter>
</cache-loader>

GemFire 异步写和同步读相关推荐

  1. Qt ModbusTCP ModbusRTU 使用同步读和异步写

    使用Qt自带的库开发,添加相关头文件 #include <QModbusTcpClient> #include <QModbusReply> #include <QSer ...

  2. 【FPGA】单端口RAM的设计(异步读、异步写)

    前面有博文写了同步读写和异步读.同步写的单端口RAM设计: [FPGA]单端口RAM的设计(同步读.同步写) [FPGA]单端口RAM的设计(异步读.同步写) 这篇博文讲异步读写: 在博文:[FPGA ...

  3. 关于异步IO与同步IO的写操作区别

    最近这两天都在看IO相关的知识点.一开始太凌乱,太杂,不过终于整理清楚了.觉得杂乱是因为一开始以为异步IO等于非阻塞IO,这完全是两个概念, LINUX下的异步IO有两类,一类为glibc AIO,这 ...

  4. mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式

    序言 本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式. 思维导图如下: 异步转同步 业务需求 有些接口查询反馈结果是异步返回的,无法立刻获取查询结果. 比如业务开发中我们 ...

  5. 并发查询_java 手写并发框架(一)异步查询转同步的7种实现方式

    序言 本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式. 思维导图如下: 思维导图 异步转同步 业务需求 有些接口查询反馈结果是异步返回的,无法立刻获取查询结果. 比如业务 ...

  6. 实验九:采用异步方式实现文件读/写

    一:实验目的 (1)了解Windows系统异步文件读/写的概念. (2)熟悉Windows系统文件读/写相关的API. (3)掌握采用异步方式实现文件读/写的相关参数设置. 二:实验准备知识:文件异步 ...

  7. 实验九 使用异步方式实现文件读\写

    实验九 使用异步方式实现文件读\写 一.实验目的 了解Windows系统异步文件读/写的概念. 熟悉Windows系统文件读/写相关的API. 掌握采用异步方式实现文件读/写的相关参数设置. 二.实验 ...

  8. mysql 5.5 主从同步问题_MySQL 5.5 主从复制异步、半同步以及注意事项详解

    大纲 一.前言 二.Mysql 基础知识 三.Mysql 复制(Replication) 四.Mysql 复制(Replication)类型 五.Mysql 主从复制基本步骤 六.Mysql 主从复制 ...

  9. MySQL 5.5 主从复制异步、半同步以及注意事项详解

    大纲 一.前言 二.Mysql 基础知识 三.Mysql 复制(Replication) 四.Mysql 复制(Replication)类型 五.Mysql 主从复制基本步骤 六.Mysql 主从复制 ...

最新文章

  1. android webview实例,android WebView 使用实例
  2. batchupdate写法_Mybatis中使用updateBatch进行批量更新
  3. 数学建模学习笔记——相关性分析
  4. 广联达2018模板算量步骤_工程人必须掌握:这9份软件算量教程+24份算量计算表,无偿分享...
  5. android自定义滤镜,【Android】自定义View那点事(三)ColorFilter篇
  6. 微机原理 寻址方式 及基于EMU8086的用例
  7. 2022最新版影视小程序源码支持josn官解+卡密系统
  8. Git系列教程(一)——git版本管理
  9. kettle连接oracle汉字乱码
  10. canvas实现简单画板
  11. 彩虹易支付程序源码php,彩虹易支付聚合支付源码全解全网程序附对接接口教程...
  12. 重新定义高端存储架构,华为Dorado V6树立全闪存新标杆
  13. oracle 格式化命令,format 命令的输入规则 - Oracle Solaris 管理:设备和文件系统
  14. 软件工程专业毕业计算机水平,软件工程专业很“霸气”,不管985还是“二本”,毕业几乎都高薪就业...
  15. H.264——H.264的基本介绍
  16. 《精要主义》读书笔记(一)
  17. 杨振宁 莫言 范曾 《开讲啦》
  18. linux小说编辑,Fade In Pro——剧本小说编辑软件
  19. matlab添加旁白,在MATLAB中向已知信号添加高斯白噪声 (转载)
  20. ajax $什么意思,请教一下Ajax 的$('')是什么意思?

热门文章

  1. 热乎乎的宇宙头条校招前端面经
  2. 2021-06-24CTF-攻防世界-WEB新手练习区(12题入门题)
  3. 赵小楼《天道》《遥远的救世主》深度解析(65)实事求是的辩证思维(应试教育学不到的思维)
  4. linux mod jk.so,linux - mod_jk无法连接Apache和tomcat - SO中文参考 - www.soinside.com
  5. MySQL数据库入门【最详细的入门教学】
  6. PHP 微信支付v3签名生成
  7. 杰理之EQ drc 限幅器、多带限幅器、压缩器、多带压缩器调节【篇】
  8. keras非线性回归代码专题
  9. ffmpeg+h264_nvenc+vs2019配置编译
  10. Docker - 编译安装nginx镜像