Web3j监听功能代码研究

高洪涛 2021-03-19

本周深入研究了web3j工具包实现以太坊的监听功能,实现了交易监听和代币监听的方法,对监听过程中的常见问题进行了处理,本文就是对这部分开发经验的总结。

1 web3j 版本

Web3官网:https://www.web3labs.com/web3j-sdk

Docs: https://docs.web3j.io/latest/quickstart/

我使用了3个版本的web3j, 3.6、4.5.5、4.8.4,分别进行说明。

1.1 Web3j 3.6版本

3.6版本可以实现各种监听。来源已经搞不清楚了,是一个文件夹,包含有许多jar文件。

这个文件夹打包链接:

使用时需要添加到编译路径中。

1.2 4.5.5版本

我下载了4.5.5版本工具包,只有一个文件,里面包含了各种jar包:console-4.5.5-all.jar

经过测试,该包可以正常的完成查询和交易,但是无法监听,提示缺少rxjava相关jar,我就没有再折腾,放弃了。

1.3 4.8.4版本

该版本可以实现监听。从官网自动下载,maven工程中添加依赖:

<dependency>

<groupId>org.web3j</groupId>

<artifactId>core</artifactId>

<version>4.8.4</version>

</dependency>

要实现监听,还需要添加另一个依赖:

<dependency>

<groupId>io.reactivex.rxjava3</groupId>

<artifactId>rxjava</artifactId>

<version>3.0.11</version>

</dependency>

使用到了json需要添加依赖:

<!-- JSONObject对象依赖的jar包 开始 -->

<dependency>

<groupId>commons-beanutils</groupId>

<artifactId>commons-beanutils</artifactId>

<version>1.9.3</version>

</dependency>

<dependency>

<groupId>commons-collections</groupId>

<artifactId>commons-collections</artifactId>

<version>3.2.1</version>

</dependency>

<dependency>

<groupId>commons-lang</groupId>

<artifactId>commons-lang</artifactId>

<version>2.6</version>

</dependency>

<dependency>

<groupId>commons-logging</groupId>

<artifactId>commons-logging</artifactId>

<version>1.1.1</version>

</dependency>

<dependency>

<groupId>net.sf.ezmorph</groupId>

<artifactId>ezmorph</artifactId>

<version>1.0.6</version>

</dependency>

<dependency>

<groupId>net.sf.json-lib</groupId>

<artifactId>json-lib</artifactId>

<version>2.2.3</version>

<classifier>jdk15</classifier>

<!-- jdk版本 -->

</dependency>

<!-- Json依赖架包下载结束 -->

2 3.6版本监听

我认为监听有3种类型,分别是:

代币监听:监听ERC20代币交易,从startBlock区块开始监听token转账事件

重放交易:监听过往交易,需要指定开始和结束区块号

交易监听:从当前区块开始监听交易

其中交易监听收到的交易事件最多,包含了代币交易。代币监听优点是直接过滤指定的代币转账事件,用起来方便。重放交易是查询历史交易记录,可以针对某段时间查询交易。

2.1 代币监听

一般步骤:

  1. 指定监听事件event;
  2. 指定过滤器filter,包含起始区块,代币合约列表;
  3. 启动监听,
  4. 检查交易地址是否是自己需要的,是的话就调用具体的事件处理函数。

说明: 再检查交易地址是否是自己需要的这一步,一般做法是采用地址字符串比较,这样非常费时间,我把关注的地址保存在hashMap中,查找时直接调用htAddress.containsKey(fromAddress),这样速度最快。

public List<String> contracts;  //代币合约地址列表,可以存放多个地址

public Subscription tokenSubscription;   //token事件订阅对象

public Subscription ethMissSubscription; //ETH交易空档事件订阅对象

public Subscription ethSubscription;     //ETH交易事件订阅对象

/*启动监听, 从startBlock区块开始监听token转账事件

代币监听会出现的问题: 如果启动区块距离当前区块稍远,非常可能的情况是中间出现的交易太多,监视代码内部出现空指针异常。

如果监听启动时接近当前区块问题出现概率小。

*/

public void startTransferListen_Token(BigInteger startBlock) {

// 要监听的合约事件

Event event = new Event("Transfer",

Arrays.asList(

new TypeReference<Address>() {},

new TypeReference<Address>() {},

new TypeReference<Uint>(){}));

//过滤器

EthFilter filter = new EthFilter(

DefaultBlockParameter.valueOf(startBlock),

DefaultBlockParameterName.LATEST,

contracts);

filter.addSingleTopic(EventEncoder.encode(event));

//注册监听,解析日志中的事件

block_TokenSub = startBlock.intValue();

tokenSubscription = web3j.ethLogObservable(filter).subscribe(log -> {

block_TokenSub = log.getBlockNumber().intValue();

String token = log.getAddress();  //这是Token合约地址

String txHash = log.getTransactionHash();

List<String> topics = log.getTopics();  // 提取转账记录

String fromAddress = "0x"+topics.get(1).substring(26);

String toAddress = "0x"+topics.get(2).substring(26);

System.out.println("  ---token ="+token+",  txHash ="+txHash);

//检查发送地址、接收地址是否属于系统用户, 不是系统用户就不予处理

if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {

String value1 = log.getData();

BigInteger big = new BigInteger(value1.substring(2), 16);

BigDecimal value = Convert.fromWei(big.toString(), Convert.Unit.ETHER);

//                    System.out.println("value="+value);

String timestamp = "";

try {

EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(log.getBlockNumber()), false).send();

timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

} catch (IOException e) {

System.out.println("Block timestamp get failure,block number is {}" + log.getBlockNumber());

System.out.println("Block timestamp get failure,{}"+  e.getMessage());

}

//执行关键的回调函数

callBack_Token(token,txHash,fromAddress,toAddress,value,timestamp);

}

}, error->{

System.out.println(" ### tokenSubscription   error= "+ error);

error.printStackTrace();

});

System.out.println("tokenSubscription ="+tokenSubscription);

System.out.println(tokenSubscription.isUnsubscribed());

}

2.2 重放交易

重放交易功能很重要,尤其涉及充币业务时,如果充币运行服务器停机维护,那么在此期间的代币充值就无法知晓造成遗漏损失。解决方法时充币运行服务器实时记录自己监听的区块高度,记录在数据库中,下次启动时查找这个区块到最新区块之间的交易。

说明: 当指定的区块交易重放完毕,该监听就自动终止。ethMissSubscription.isUnsubscribed()返回值就是false。

//启动监听以太坊上的过往交易

public void startReplayListen_ETH(BigInteger startBlockNum) {

System.out.println("  startReplayListen_ETH:  startBlockNum="+startBlockNum);

//回放空档期间的交易

BigInteger currentBlockNum=null;

try {

//获取当前区块号

currentBlockNum = web3j.ethBlockNumber().send().getBlockNumber();

System.out.println("  000 currentBlockNum= "+currentBlockNum.intValue());

if(startBlockNum.compareTo(currentBlockNum) > 0) {

return;  //测试曾经出现 currentBlockNum得到错误数字,比startBlockNum还小,这时不能启动监听

}

} catch (IOException e) {

// TODO Auto-generated catch block

System.out.println("  111 getBlockNumber() Error: ");

e.printStackTrace();

return;   //出现异常不能启动监听

}

//创建开始与结束区块, 重放这段时间内的交易,防止遗漏

DefaultBlockParameter startBlock = new DefaultBlockParameterNumber(startBlockNum);

DefaultBlockParameter endBlock = new DefaultBlockParameterNumber(currentBlockNum);

System.out.println("[ startTransferListen_ETH:  miss  startBlock="+startBlockNum+", endBlock="+currentBlockNum+"]");

block_EthMissSub = startBlockNum.intValue();

ethMissSubscription = web3j.replayTransactionsObservable(startBlock, endBlock)

.subscribe(tx -> {

//更新检查过的区块高度

block_EthMissSub = tx.getBlockNumber().intValue();

System.out.println("  ---replayPastTransactionsFlowable    block_EthMissSub = "+block_EthMissSub);

String fromAddress = tx.getFrom();

String toAddress   = tx.getTo();

//                  System.out.println("toAddress="+toAddress);

if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {  //发现了指定地址上的交易

String txHash = tx.getHash();

BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);

String timestamp = "";

try {

EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();

timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

} catch (IOException e) {

System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());

System.out.println("Block timestamp get failure,{}"+  e.getMessage());

}

// 监听以太坊上是否有系统生成地址的交易

callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);

}

}, error->{

System.out.println("   ### replayPastTransactionsFlowable  error= "+ error);

error.printStackTrace();

});

}

2.3 交易监听

这种方式监听每一笔交易,以太坊上交易量太大,只能自己过滤出关注的交易进行处理。要尽可能的快速处理。可以考虑线程池模型进行处理。

//启动监听以太坊上的交易

public void startTransactionListen_ETH() {

//监听当前区块以后的交易

ethSubscription = web3j.transactionObservable().subscribe(tx -> {

//更新检查过的区块高度

block_EthSub = tx.getBlockNumber().intValue();

System.out.println("  ---transactionFlowable  block_EthSub = "+block_EthSub);

String txHash = tx.getHash();

String fromAddress = tx.getFrom();

String toAddress = tx.getTo();

if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {  //发现了指定地址上的交易

BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);

String timestamp = "";

try {

EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();

timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

} catch (IOException e) {

System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());

System.out.println("Block timestamp get failure,{}"+  e.getMessage());

}

// 监听以太坊上是否有系统生成地址的交易

callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);

}

}, error->{

System.out.println("   ### transactionFlowable  error= "+ error);

error.printStackTrace();

});

}

最后回调函数示例:

//token转账事件的处理函数

public void  callBack_Token(String token, String txHash, String from, String to, BigDecimal value, String timestamp) {

System.out.println("----callBack_Token:");

System.out.println("    token = "+token);

System.out.println("    txHash = "+token);

System.out.println("    from = "+from);

System.out.println("    to = "+to);

System.out.println("    value = "+value.doubleValue());

}

3 4.8.4版本监听

版本升级后原来的监听函数改变了,用法如下:

public Disposable  tokenSubscription;   //token事件订阅对象, 如果监视启动成功,isDisposed()返回false;否则监视失败返回true

public Disposable  ethMissSubscription; //ETH交易空档事件订阅对象

public Disposable  ethSubscription;     //ETH交易事件订阅对象

tokenSubscription = web3j.ethLogFlowable(filter)

.subscribe(log -> {……});

ethMissSubscription = web3j.replayPastTransactionsFlowable(startBlock, endBlock)

.subscribe(tx -> {……});

ethSubscription = web3j.transactionFlowable()

.subscribe(tx -> {……});

判断监听对象是否运行:

tokenSubscription.isDisposed()

原来通过监听对象取消监听:

ethSubscription.cancel();

现在没有这个方法啦, 就是不能主动停止监听啦。

4 常见问题

4.1 监听无法启动

指定监听开始区块高度后,出现启动监听失败,监听对象为false。原因未知,我多次实践经验:

开始区块距离最新区块越远越容易失败;

一个开始区块启动监视成功,以后该区块重新监听也大概率成功,小概率失败;

即使监听成功,持续运行期间内部常常出现空指针异常,可能导致监视停止运行;

对于监听成功启动后出现的停止运行问题,我的做法是另开一个线程专门检查监听对象的状态,一旦发现停止运行就立即重新启动监听,该方法有效。

-----End-----

Web3j监听功能代码研究相关推荐

  1. freeswitch实现监听_Freeswitch监听功能--单向监听

    标签: 1.监听函数:eavesdrop lua脚本中调用监听: local cmd = "originate loopback/3333 &eavesdrop("..uu ...

  2. 基于uniapp开发的SUPOIN(销邦) PDA使用广播扫码监听功能

    广播扫码监听功能 扫码方案 摄像头 激光 封装组件 手持 PDA 是 Android 平台,其扫码的解决方案也有摄像头和激光扫描等多种解决方案,这里结合 uni-app 的开发特性,记录一下具体的实现 ...

  3. Android监听作用,Android开发之CheckBox的简单使用与监听功能示例

    本文实例讲述了Android开发之CheckBox的简单使用与监听功能.分享给大家供大家参考,具体如下: activity_main.xml android:layout_width="ma ...

  4. web3j监听合约logs事件

    以太坊使用web3j监听特定的logs,用法. 更多区块链技术与应用分类: 区块链应用    区块链开发 以太坊 | Fabric | BCOS | 密码技术 | 共识算法 | 比特币 | 其他链 通 ...

  5. FreeSwtich的监听功能

    FreeSwitch 提供了一个APP实现通道的监听功能,它的名字是:eavesdrop. 如果要持续监听某个用户,那么要用mod_spy里实现. 用法 Usage eavesdrop [<uu ...

  6. linux epoll机制对TCP 客户端和服务端的监听C代码通用框架实现

    1 TCP简介 tcp是一种基于流的应用层协议,其"可靠的数据传输"实现的原理就是,"拥塞控制"的滑动窗口机制,该机制包含的算法主要有"慢启动&quo ...

  7. php 监听端口数据客户端ip_PHP做端口监听示例代码

    1,PHP端口监听之服务器端 复制代码 代码示例: // Server // 设置错误处理 error_reporting(E_ALL); // 设置运行时间 set_time_limit(0); / ...

  8. php 实现mqtt 订阅监听功能出错_IoT物联网实现M2M设备之间联动实战

    M2M(即Machine-to-Machine)是一种端对端通信技术.IoT物联网平台支持使用规则引擎的Topic转发功能,实现M2M通信,您不用担心高并发场景下的稳定通信.低延时等技术难点,也不需要 ...

  9. 基于uniapp开发的ZEBRA(斑马) PDA使用广播扫码监听功能(文件配置+插件使用)

    一.首先进行DataWedge的配置 以ZEBRA TC52为例 1.点击桌面的DataWedge软件 2.点击第一项Profile0(default)进行配置 3.勾选"配置文件已启用&q ...

最新文章

  1. 用python爬取一个人所有信息_python实战===爬取所有微信好友的信息
  2. 对复杂业务组件在实际开发过程中被调用的反思
  3. boost::lexicographical_compare相关的测试程序
  4. php72w redis,docker php7安装php-redis
  5. linux里查看所有用户和用户组
  6. OJ1064: 加密字符(C语言)
  7. Windows Server 2016多用户同时登录远程桌面
  8. c# abstract抽象类与继承类子类的构造函数_base
  9. 推荐 ADO.NET Entity Framework (EDM) 相关技术文章
  10. angular6项目中使用echarts图表的方法(有一个坑,引用报错)
  11. Atitit.ide代码块折叠插件 eclipse
  12. mysql中勒索病毒的防护措施_勒索病毒防御措施
  13. HTML——使用表格制作个人简历
  14. python发送邮箱_利用Python自动发送电子邮件
  15. Mac彻底删除mysql,重新安装mysql,修改mysql用户权限
  16. 四大国际快递时效、优势对比
  17. python 显示表格数据_python显示excel表格数据-怎么用python读取excel表格的数据
  18. JAVA虚拟机的安装以及JAVA的环境配置
  19. we8iso8859p1 java_jdbc连oracle数据库,输出到页面上是乱码,请问怎么解决?
  20. 数据可视化:数据可视化的意义

热门文章

  1. 给入行新人的一点忠告
  2. 是不是所有行业都适合定制开发APP
  3. Raspi-config 设置解析
  4. 软件进度管理的基本原则和进度安排
  5. markdown编辑环境搭建
  6. 如何创建可引导的 macOS 安装器
  7. 瞎聊机器学习——LR(Logistic Regression)逻辑斯蒂回归(一)
  8. app把信息添加到mysql_10) 十分钟学会android--app数据保存三种方式
  9. 数学建模MATLAB代码知识点集合
  10. 微软Bing的AI人工只能对话体验名额申请教程