ethereum-etl学习3

> ethereumetl stream --start-block 500000 -e block,transaction,log,token_transfer --log-file log.txt \
--provider-uri https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c

​ 实现区块、交易、日志、货币不断地传输到控制台

function stream

def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):"""Streams all data types to console or Google Pub/Sub."""configure_logging(log_file)configure_signals()entity_types = parse_entity_types(entity_types)validate_entity_types(entity_types, output)from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapterfrom blockchainetl.streaming.streamer import Streamer# TODO: Implement fallback mechanism for provider uris instead of picking randomlyprovider_uri = pick_random_provider_uri(provider_uri)logging.info('Using ' + provider_uri)streamer_adapter = EthStreamerAdapter(batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),item_exporter=create_item_exporters(output),batch_size=batch_size,max_workers=max_workers,entity_types=entity_types)streamer = Streamer(blockchain_streamer_adapter=streamer_adapter,last_synced_block_file=last_synced_block_file,lag=lag,start_block=start_block,period_seconds=period_seconds,block_batch_size=block_batch_size,pid_file=pid_file)streamer.stream()
主体

1.日志的配置,以及实体类型的校验

2.随机选取一个提供的uri,构建EthStreamerAdapter对象用于封装Streamer对象

3.调用Streamer的stream方法


class EthStreamerAdapter

class EthStreamerAdapter:
构造(部分)

​ item_id_calculator 实例化一个EthItemIdCalculator对象,主要函数

def calculate(self, item):

​ 对各个实体对象(block,transaction,log,token_transfer,trace,contract,token等)进行数据内容的拼接

​ item_timestamp_calculator 实例化一个EthItemTimestampCalculator对象,主要函数

def calculate(self, item):

​ 不同类型的实体对应的时间戳转换为标准时间

方法
    def get_current_block_number(self):w3 = build_web3(self.batch_web3_provider)return int(w3.eth.getBlock("latest").number)
主体

​ 获取当前最新的代码块号

def export_all(self, start_block, end_block):
主体

​ 通过调用前面两篇博客学习的方法获取内容输出到目标位置


class EthStreamerAdapter

class Streamer:
构造(部分)

blockchain_streamer_adapter 对应之前构建的EthStreamerAdapter

last_synced_block 最后一次同步的区块,从默认文件读取

方法
def stream(self):try:if self.pid_file is not None:logging.info('Creating pid file {}'.format(self.pid_file))write_to_file(self.pid_file, str(os.getpid()))self.blockchain_streamer_adapter.open()self._do_stream()finally:self.blockchain_streamer_adapter.close()if self.pid_file is not None:logging.info('Deleting pid file {}'.format(self.pid_file))delete_file(self.pid_file)
主体

​ 如果用户指定了pid_file,那么需要写入文件程序运行的pid。然后通过调用适配器的open方法,open方法将打开输出数据的位置(如postgre,kafka以及其他)的写功能,默认是命令行。然后调用_do_stream方法,最后是关闭输出流,删除对应文件。

def _do_stream(self):while True and (self.end_block is None or self.last_synced_block < self.end_block):synced_blocks = 0try:synced_blocks = self._sync_cycle()except Exception as e:# https://stackoverflow.com/a/4992124/1580227logging.exception('An exception occurred while syncing block data.')if not self.retry_errors:raise eif synced_blocks <= 0:logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))time.sleep(self.period_seconds)
主体

​ 一个如果末尾块号为空或者最后一个同步块号小于最后一个块号时的while循环,循环内是一个获取当前需要同步的块数,如果需要同步的块数小于等于0,则休眠period_seconds(默认为10s,因为以太坊平均15秒出块)

def _sync_cycle(self):current_block = self.blockchain_streamer_adapter.get_current_block_number()target_block = self._calculate_target_block(current_block, self.last_synced_block)blocks_to_sync = max(target_block - self.last_synced_block, 0)logging.info('Current block {}, target block {}, last synced block {}, blocks to sync {}'.format(current_block, target_block, self.last_synced_block, blocks_to_sync))if blocks_to_sync != 0:self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)logging.info('Writing last synced block {}'.format(target_block))write_last_synced_block(self.last_synced_block_file, target_block)self.last_synced_block = target_blockreturn blocks_to_sync
主体

​ 通过适配器获取当前的块号current_block,然后通过计算获得下一个应该获取的目标块号,然后得到当前应该要同步的块数目。如果需要同步的块数目不为0,则通过适配器函数获取需要同步块内的数据。

ethereum-etl学习3相关推荐

  1. ETL学习之四:SQL Server Integration Services入门

    ETL学习之四:SQL Server Integration Services入门 SSIS就是微软在SQL SERVER2005上对DTS的升级,不得不说,微软在BI上是花了很大功夫的,包括提供了S ...

  2. ETL学习心得:探求数据仓库关键环节ETL的本质【转】

    ETL学习心得:探求数据仓库关键环节ETL的本质 做数据仓库系统,ETL是关键的一环.说大了,ETL是数据整合解决方案,说小了,就是倒数据的工具.回忆 一下工作这么些年来,处理数据迁移.转换的工作倒还 ...

  3. ETL学习总结(2)——ETL数据集成工具之kettle、sqoop、datax、streamSets 比较

    前言 对于数据集成类应用,通常会采用ETL工具辅助完成.ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract).交互转换(transfo ...

  4. ETL学习总结(1)——ETL 十大功能特性详解

    前言 Kettle是一款国外开源的ETL工具,纯java编写,可以在Window.Linux.Unix上运行.说白了就是,很有必要去理解一般ETL工具必备的特性和功能,这样才更好的掌握Kettle的使 ...

  5. ETL学习-前期准备

    一.ETL (一).ETL是什么 ETL,Extract-Transform-Load的缩写,中文名为数据抽取.转换和加载.ETL 代表提取.转换和加载,是数据工程师用从不同来源提取数据.将数据转换为 ...

  6. ETL学习之五:创建新的 Integration Services 项目

    在 Microsoft SQL Server 2005 Integration Services (SSIS) 中创建包的第一步就是创建一个 Integration Services 项目.此项目包含 ...

  7. 【从零开始的大数据学习】Flink官方教程学习笔记(一)

    Flink官方教程学习笔记 学习资源 基础Scala语法 Scala数据结构专题 声明变量 代码块 函数(function) 方法(methods) Traits (接口) class(类) tupl ...

  8. 视频教程-Informatica PowerCenter 10.2 权威指南中文版-ETL

    Informatica PowerCenter 10.2 权威指南中文版 2018 -- 今天 派客动力 CTO 2007-2018 Informatica 中国区技术总监 <Informati ...

  9. 以太坊开发者工具的最新清单

    以太坊开发者工具的最新终极清单,用于在以太坊上开发应用程序的可用工具,组件,框架和平台的指南. 对于任何开发者,无论你是一个睁大眼睛的Web3新手还是一个头发灰白的OG加密无政府主义技术霸主,Gith ...

  10. 100个数据分析常用指标和术语

    大家好,我是辰哥~ 有个朋友是金融行业产品经理,最近在对已有的站内用户做分层与标签分类,需要对用户进行聚类分析.一般从事数据分析行业的朋友对这类词并不陌生,但是像市场运营人员就会把这类些名词概念搞混, ...

最新文章

  1. 工具分享-自动生成正则表达式的各种代码,附带正则表达式介绍
  2. GPT-3距离下一代AI生态平台还有多远?
  3. Python 代码性能优化技巧
  4. mysql 5.6到percona 5.6小版本升级
  5. 静态方法和实例化方法的区别 -转载
  6. 抖音怎么设置保存路径_抖音限时可见视频怎么弄 设置限时可见作品方法
  7. Oracle存储使用情况,我收藏的oracle中一些分析空间使用情况的存储过程!
  8. vue 拖拽(笔记)
  9. iPhone开发之Rotation
  10. Chapter 15 配置服务器存储和群集 第1课
  11. html中怎么在横线中加字_传说中仓颉造字,汉字是怎么演变来的?
  12. 跨域解决方案CROS最简单演示——JSP演示示例
  13. 一个女人不收拾厨房,卫生间便池也不刷,为什么老公也不嫌弃?
  14. [NOI 2014]起床困难综合症
  15. sd卡卡槽_SD卡面包板插槽DIY图解
  16. 密码学写作论文排版操作手册,latex模板cryptocode
  17. 【C++】atomic简介
  18. NVIDIA Jetson TK1学习与开发(四):一些细节问题
  19. java中singleton_java中singleton的几种实现方式
  20. 瑞星卡卡升级出现错误:kmon.dll.zip,错误代码:0x0/0x0(chech)

热门文章

  1. .NET Core 和 ASP.NET 5 RC1 发布
  2. 「镁客早报」未来中国数据量将超美国;巴菲特四季度股票资产缩水380亿美元,减持苹果甲骨文...
  3. 库克逼腾讯分成30%遭拒,苹果APP Store或将微信下架!
  4. 面试官角度观察到的程序员技能瓶颈,同时给出突破瓶颈的建议 ...
  5. 计网PPT 第五章 运输层
  6. python基础编程小实例2——绝对温标
  7. 如何编程访问(读,写)Revit项目信息
  8. vivo V5s的USB调试模式在哪里,打开vivo V5sUSB调试模式的经验
  9. 【gpt】免费部署个人gpt平台(无需tz)
  10. 快速阅读等三种读书方法