GemFire 异步写和同步读
异步写介绍
概览
许多应用使用关系型数据库来持久化数据。这种方式有数据延迟和瓶颈问题。GemFire使用Write-Behind技术,以内存的速度访问数据,以异步的形式把数据更新到数据源。
在Write-Behind模式中,更新缓存条目异步地写到后台数据库。这一特性,GemFire使用了独创的队列实现机制,Gateway Queue,网关队列能够关联多个数据区域。网关封装了一个高可靠的队列实现机制,线程根据数据批处理大小,和特定的应用回调来处理队列更新。
GemFire的Gateway Queue 技术本身用来对跨WAN网的分布式缓存数据进行传输和实时更新。跨WAN网的远程集群和本地关系型数据库,GemFire传输数据的方式相同,都是把小量数据,快速更新合并为一个大的数据块。利用TCP/IP 保证可靠传输,跨WAN网异步数据的速率和本地系统同步数据的速率基本相同。
传统关系型数据库基于磁盘的数据传输特点是大数据块/高延时/高吞吐,而GemFire Write-Behind技术是基于内存的,数据传输特点是小数据块/快速更新/低延时/高吞吐,利用不断传输小数据块,持续更新数据来达到低延时和高吞吐量。
GemFire Gateway Queue技术通过维护队列实例,并行地队列化缓存更新,消息周期性地与主备网关队列实例通信,来投递数据更新。
在故障切换方面,如果某个队列更新出现问题,则GemFire可以把更新切换到备用队列上,从而不影响应用正常使用。可以指定多个备用队列来达到队列冗余的目的。
GemFire Write-behind 特性总结:
1. 异步更新数据到外部数据存储来提高应用的性能。应用直接与数据库进行交互,可能会受到数据库事务控制的影响而降低应用的性能。
2. GemFire Gateway Queue提供高可靠,保证达到零数据丢失,外部数据源故障丝毫不影响应用正常运行。
3. 比关系型数据库支持更多的并发用户,扩展“大事务更新”吞吐量,增强应用的能力。
4. 通过合并来减少数据库负载。相同键的多次更新能够合并成一次更新,只有最后的数据条目写入数据库中。
Gateways通过gateway hub来创建。一个Hub负责管理一个到多个Gateways。你可以配置从多个分布式系统来接受复制事件。比如,一个gateway负责队列化更新事件,把更新事件写到数据库中,同时,另一个gateway负责把更新事件传到另一个数据库中。
每一个gateway管理一个队列。队列被2个以上节点维护,主网关Hub或者多个备网关Hub都有自己的队列。这些队列可以配置到内存中,或者部分溢出到磁盘中,或者所有全部溢出到磁盘中。当然了,这种磁盘写,对于GemFire是没有瓶颈的,我们通过异步磁盘写模式来维护我们的磁盘文件。
GemFire队列溢出磁盘只是内存的一个扩展。与关系型数据库的做法是不同的,关系型数据库把数据写到物理磁盘上,而RondomAccessFile缓冲区更新是永久删除的。GemFire也支持内存数据拷贝备份到磁盘上,来保障高可用。即使在最差的应用场景中,应用选择同步磁盘锁来备份Gateway Queue 状态,GemFire架构也仍然有非常强大的优势: GemFire集群使用各个节点的本地磁盘,这样就给了整个集群聚合的吞吐能力。
在冗余方面,GemFire自动切换队列的故障,通过把当前队列的条目拷贝到备份节点来达到队列的冗余。GemFire会周期性地在备份节点重建冗余队列。当主队列由于异常情况不能使用,备份队列马上接管成为主队列,这一期间是没有数据丢失的。
由于一些技术人员可能不熟悉HA模式:当故障切换期间,在备用队列的一些事件,可能已经被处理了。GemFire标识了这些可能的重复事件允许应用回调。比如,如果事件是一个’create’通告,插入一条记录到数据库,如果事件被标记为’重复’,那么数据库监听器可能忽略’primary key violation’。
当外部数据源长时间不可用,那么持久化是必须的操作。这样当集群宕机或关闭时,也不会出现更新事件丢失的情况,也可以消去不断增长的队列,避免内存溢出。
异步写实现 – 代码和配置
应用可以在任一节点配置Gateway Hub。每一个数据区域能够配置启用,路由事件到Gateway Hub。多Hub被用来并行分发事件到外部数据源。比如,新的订单能够被并行路由到生产端。
所有相关的数据区域能配置使用一个Hub在外部数据源保存事件更新,避免数据完整性冲突。
Gateway listener
GatewayEventLister 接口定义了一个方法: public boolean processEvents(List events);
List events 包含GatewayEvents。
监听器接口实现
下面是一个GatewayEventLister的简单实现。
package com.example.listener;
import com.example.common.Data;
import com.example.db.DBPersistence;import com. gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.util.GatewayEvent;
import com.gemstone.gemfire.cache.util.GatewayEventListener;
import com.gemstone.gemfire.cache.Operation;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class TestGatewayListener implements GatewayEventListener, Declarable {public boolean processEvents(List events) {for (Iterator i = events.iterator(); i.hasNext(); ) {GatewayEvent ge = (GatewayEvent) i.next();process(ge);}return true;}public void init(Properties p) {}protected void process(GatewayEvent ge) {String key = (String) ge.getKey();Data data = (Data) ge.getDeserializedValue();PreparedStatement pstmt=null;Connection conn = DBPersistence.getConnection();try {if ( ge.getOperation().equals(Operation.UPDATE)){pstmt = conn.prepareStatement("Update Data set value = ?, price = ? where id = ?");pstmt.setString(1, data.getValue());pstmt.setInt(2, data.getPrice());pstmt.setString(3, key);}else if ( ge.getOperation().equals(Operation.CREATE)){pstmt = conn.prepareStatement("Insert Data (id, value, price) values (?, ?, ?)");pstmt.setString(1, key);pstmt.setString(2, data.getValue());pstmt.setInt(3, data.getPrice());}else if ( ge.getOperation().equals(Operation.DESTROY)){pstmt = conn.prepareStatement("Delete from Data where key =>");pstmt.setString(1, key);}if ( pstmt != null){pstmt.execute();pstmt.close();}}catch (SQLException se) {se.printStackTrace();throw new CacheWriterException ( se );}}}
以上的GatewayEventListener接受回调,和操作类型,根据这些信息来更新数据库。这个事件接受一个包含键值对的参数。
Data Class定义如下:
package com.example.common;import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;public class Data implements DataSerializable {private int id;private String value;private int price;public Data() {}public Data(int i, String s, int p){id = i;value = s;price =p;}public void setId(int i){id=i;}public int getId(){return id;}public void setValue(String s){value = s;}public String getValue(){return value;}public int getPrice(){return price;}public void setPrice(int p){price=p;}public void fromData(DataInput in) throws IOException, ClassNotFoundException{try {id = DataSerializer.readPrimitiveInt ( in );value = DataSerializer.readString(in);price = DataSerializer.readPrimitiveInt(in);} catch ( Exception e ) {System.out.println ( "Deserialize Error: " + e );e.printStackTrace();}}public void toData(DataOutput out) throws IOException{DataSerializer.writePrimitiveInt(id, out);DataSerializer.writeString(value, out);DataSerializer.writePrimitiveInt(price, out);}
}
配置网关
Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("DBWriterHub", \-1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway = hub.addGateway("DBWriter");
gateway.addListener(new TestGatewayListener());
// Start the gateway hub using the start method.
hub.start();
数据区域路由事件到Gateway Hubs:
RegionAttributes ra = AttributesFactory.create();ra.setEnableGateway(true);
ra.setGatewayHubId("DBWriterHub");
... set other attributes and create the region ...
XML配置
<gateway-hub id="DBWriterHub" port="-1" startup-policy="primary"><gateway id="DBWriter"><gateway-listener><class-name>com.example.listener.TestGatewayListener</class-name></gateway-listener></gateway>
</gateway-hub>
为缓存配置多Gateway Hubs
多Gateway Hubs能够在单缓存中定义。这一特性关联'GatewayHub Startup Policy'和'Region to GatewayHub Distribution',跨多个VMs来传播Gateway负载。
代码配置
Cache cache = Cache.getAnyInstance();GatewayHub hub1 = cache.addGatewayHub("OrderDBWriter", \-1);
hub1.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway1 = hub1.addGateway("OrderDBWriter");
gateway1.addListener(new OrderDBGatewayListener());
hub1.start();
GatewayHub hub2 = cache.addGatewayHub("ProductDBWriter", \-1);
hub2.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway2 = hub2.addGateway("ProductDBWriter");
gateway2.addListener(new ProductDBGatewayListener());
hub2.start();
XML配置
<gateway-hub id="OrderDBWriter" port="-1" startup-policy="primary">
<gateway id="OrderDBWriter">
<gateway-listener>
<class-name>OrderDBGatewayListener</class-name>
</gateway-listener>
</gateway>
</gateway-hub>
<gateway-hub id="ProductDBWriter" port="-1" startup-policy="secondary">
<gateway id="ProductDBWriter">
<gateway-listener>
<class-name>ProductDBGatewayListener</class-name>
</gateway-listener>
</gateway>
</gateway-hub>
GatewayHub 启动策略
GatewayHubs有三种启动策略: primary, secondary 和 none。启动策略none 是缺省的。在这个模式中,GatewayHub尝试获得一个特定的分布式锁。如果成功,那么就是 primary。如果不成功,就变成secondary。如果GatewayHub被设定成primary,尝试获得分布式锁。如果失败的话就抛出GatewayException。GatewayHub在这种情况下不启动。如果GatewayHub被设定成secondary,尝试获得分布式锁。如果锁已经被其他GatewayHub获得,他就释放这个锁,继续获得锁,每60秒一次。如果失败的话就抛出GatewayException。在60秒期间,GatewayHub的状态是secondary。
代码配置
Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("OrderDBWriter", -1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway = hub.addGateway("OrderDBWriter");
gateway.addListener(new OrderDBGatewayListener());
hub.start();
XML配置
区域到GatewayHub 分布
一个区域能够指定GatewayHub去分布他的事件。使用’hub-id’属性。如果’hub-id’区域属性被设定,然后任何的Region事件被分布到带有此hub-id的GatewayHub上。这样允许负载在不同的VM上传播。比如,区域1能够配置成分布事件到GatewayHub1,区域2能够配置成分布事件到GatewayHub2。如果GatewayHub1在VM1上是primary的,GatewayHub1在相同VM上是secondary的,同时在VM2上正好相反,那么VM1将要分布区域1事件,VM2将要分布区域2事件。
代码配置
Cache cache = Cache.getAnyInstance();
AttributesFactory factory = new AttributesFactory();
factory.setGatewayHubId("OrderDBWriter");
RegionAttributes attributes = factory.createRegionAttributes();
Region region = cache.createRegion("Orders", attributes);
XML配置
<region name="Orders"><region-attributes scope="local" enable-gateway="true" hub-id="OrderDBWriter"/>
</region>
同步写
当一个用户请求一个数据条目时,比如条目key1,在内存中已经不存在了,如果同步读开启了,GemFire将要从外部数据源载入需要的条目key1。我们通过在区域上定义一个data loader 来开启同步读数据功能。Loader 在get操作中调用,把值返回给调用线程。Cache loader行为的依赖于区域的类型。
在分区区域中的数据载入
由于要处理大量数据,已分区的区域支持分区载入数据。每一个cache loader只载入本地分区定义的数据。如果配置了数据冗余,只要本地分区持有主拷贝那么数据就能载入。分区载入需要Cache Loader安装在每一个分区上。
如果你使用JDBC连接,那么每一个分区必须连接到数据源。如图1所示,三个成员需要三个JDBC连接,此图与图2做比较,三个成员共享一个JDBC连接。
分区需要更多的JDBC连接
在非分区区域中的数据载入
在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。
在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。
在非分区区域中的数据载入
在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。
在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。
图2 非分区区域中的Cache Loader
带有Local Scope区域的数据载入
带有Local Scope区域中,Cache Loader只在定义的成员中可用。无论值是否在本地缓存中存在Loader都被调用。
实现 Cache Loader
实现Cache Loader需要在cache.xml 文件中声明Loader。或者在代码中实现com.gemstone.GemFire.cache.Declarable。
CacheLoader 接口有一个单独的回调方法,load,你的应用在任何时候都可以调用这个方法来从缓存外边查询数据。有关更详细的信息,请看相关Java 文档。但是你不能在load()方法中直接调用Region方法。这样做将导致Cache Loader 阻塞,这样会损伤分布式系统的性能。
XML代码
<region name="exampleRegion"><region-attributes><cache-loader><class-name>quickstart.SimpleCacheLoader</class-name></cache-loader></region-attributes>
</region>
代码实现
public class SimpleCacheLoader implements CacheLoader, Declarable {PreparedStatement pstmt=null;Connection conn = null;public Object load(LoaderHelper helper) {String key = (String) helper.getKey();System.out.println(" Loader called to retrieve value for " + key);ResultSet rs = null;// Create a value using the suffix number of the key (key1, key2, etc.)try {pstmt = conn.prepareStatement("Select Data where id = ?");pstmt.setString(1, key);rs = pstmt.execute();} catch (SQLException se) {se.printStackTrace();throw new CacheLoaderException ( se );}return rs;}public void close() { // do nothing }public void init(Properties props) {conn = DriverManager.getConnection(URL, props);}
}
带有数据库连接的CacheLoader 声明
<cache-loader><class-name>com.company.data.DatabaseLoader</class-name><parameter name="URL"><string>jdbc:cloudscape:rmi:MyData</string></parameter>
</cache-loader>
GemFire 异步写和同步读相关推荐
- Qt ModbusTCP ModbusRTU 使用同步读和异步写
使用Qt自带的库开发,添加相关头文件 #include <QModbusTcpClient> #include <QModbusReply> #include <QSer ...
- 【FPGA】单端口RAM的设计(异步读、异步写)
前面有博文写了同步读写和异步读.同步写的单端口RAM设计: [FPGA]单端口RAM的设计(同步读.同步写) [FPGA]单端口RAM的设计(异步读.同步写) 这篇博文讲异步读写: 在博文:[FPGA ...
- 关于异步IO与同步IO的写操作区别
最近这两天都在看IO相关的知识点.一开始太凌乱,太杂,不过终于整理清楚了.觉得杂乱是因为一开始以为异步IO等于非阻塞IO,这完全是两个概念, LINUX下的异步IO有两类,一类为glibc AIO,这 ...
- mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式
序言 本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式. 思维导图如下: 异步转同步 业务需求 有些接口查询反馈结果是异步返回的,无法立刻获取查询结果. 比如业务开发中我们 ...
- 并发查询_java 手写并发框架(一)异步查询转同步的7种实现方式
序言 本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式. 思维导图如下: 思维导图 异步转同步 业务需求 有些接口查询反馈结果是异步返回的,无法立刻获取查询结果. 比如业务 ...
- 实验九:采用异步方式实现文件读/写
一:实验目的 (1)了解Windows系统异步文件读/写的概念. (2)熟悉Windows系统文件读/写相关的API. (3)掌握采用异步方式实现文件读/写的相关参数设置. 二:实验准备知识:文件异步 ...
- 实验九 使用异步方式实现文件读\写
实验九 使用异步方式实现文件读\写 一.实验目的 了解Windows系统异步文件读/写的概念. 熟悉Windows系统文件读/写相关的API. 掌握采用异步方式实现文件读/写的相关参数设置. 二.实验 ...
- mysql 5.5 主从同步问题_MySQL 5.5 主从复制异步、半同步以及注意事项详解
大纲 一.前言 二.Mysql 基础知识 三.Mysql 复制(Replication) 四.Mysql 复制(Replication)类型 五.Mysql 主从复制基本步骤 六.Mysql 主从复制 ...
- MySQL 5.5 主从复制异步、半同步以及注意事项详解
大纲 一.前言 二.Mysql 基础知识 三.Mysql 复制(Replication) 四.Mysql 复制(Replication)类型 五.Mysql 主从复制基本步骤 六.Mysql 主从复制 ...
最新文章
- android webview实例,android WebView 使用实例
- batchupdate写法_Mybatis中使用updateBatch进行批量更新
- 数学建模学习笔记——相关性分析
- 广联达2018模板算量步骤_工程人必须掌握:这9份软件算量教程+24份算量计算表,无偿分享...
- android自定义滤镜,【Android】自定义View那点事(三)ColorFilter篇
- 微机原理 寻址方式 及基于EMU8086的用例
- 2022最新版影视小程序源码支持josn官解+卡密系统
- Git系列教程(一)——git版本管理
- kettle连接oracle汉字乱码
- canvas实现简单画板
- 彩虹易支付程序源码php,彩虹易支付聚合支付源码全解全网程序附对接接口教程...
- 重新定义高端存储架构,华为Dorado V6树立全闪存新标杆
- oracle 格式化命令,format 命令的输入规则 - Oracle Solaris 管理:设备和文件系统
- 软件工程专业毕业计算机水平,软件工程专业很“霸气”,不管985还是“二本”,毕业几乎都高薪就业...
- H.264——H.264的基本介绍
- 《精要主义》读书笔记(一)
- 杨振宁 莫言 范曾 《开讲啦》
- linux小说编辑,Fade In Pro——剧本小说编辑软件
- matlab添加旁白,在MATLAB中向已知信号添加高斯白噪声 (转载)
- ajax $什么意思,请教一下Ajax 的$('')是什么意思?
热门文章
- 热乎乎的宇宙头条校招前端面经
- 2021-06-24CTF-攻防世界-WEB新手练习区(12题入门题)
- 赵小楼《天道》《遥远的救世主》深度解析(65)实事求是的辩证思维(应试教育学不到的思维)
- linux mod jk.so,linux - mod_jk无法连接Apache和tomcat - SO中文参考 - www.soinside.com
- MySQL数据库入门【最详细的入门教学】
- PHP 微信支付v3签名生成
- 杰理之EQ drc 限幅器、多带限幅器、压缩器、多带压缩器调节【篇】
- keras非线性回归代码专题
- ffmpeg+h264_nvenc+vs2019配置编译
- Docker - 编译安装nginx镜像