Datax的整体框架我们已经大体了解。这次来分析一下reader到writer中间数据的传输层。

这次采取另外一种方式,我们把代码抽取,自己实现一个通道

1-首先是定义一个接口代表传输的每一条数据public interface Record {

public void addColumn(Column column);

public void setColumn(int i, final Column column);

public Column getColumn(int i);

public String toString();

public int getColumnNumber();

public int getByteSize();

public int getMemorySize();

}

2-然后会定义一个抽象类 代表每一条数据中的每一列数据public abstract class Column {

private Type type;

private Object rawData;

private int byteSize;

public Column(final Object object, final Type type, int byteSize) {

this.rawData = object;

this.type = type;

this.byteSize = byteSize;

}

public Object getRawData() {

return this.rawData;

}

public Type getType() {

return this.type;

}

public int getByteSize() {

return this.byteSize;

}

protected void setType(Type type) {

this.type = type;

}

protected void setRawData(Object rawData) {

this.rawData = rawData;

}

protected void setByteSize(int byteSize) {

this.byteSize = byteSize;

}

public abstract Long asLong();

public abstract Double asDouble();

public abstract String asString();

public abstract Date asDate();

public abstract byte[] asBytes();

public abstract Boolean asBoolean();

public abstract BigDecimal asBigDecimal();

public abstract BigInteger asBigInteger();

@Override

public String toString() {

return JSON.toJSONString(this);

}

public enum Type {

BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES

}

}

具体实现类如:public class StringColumn extends Column {

public StringColumn() {

this((String) null);

}

public StringColumn(final String rawData) {

super(rawData, Column.Type.STRING, (null == rawData ? 0 : rawData

.length()));

}

@Override

public Long asLong() {

return null;

}

@Override

public Double asDouble() {

return null;

}

@Override

public String asString() {

if (null == this.getRawData()) {

return null;

}

return (String) this.getRawData();

}

@Override

public Date asDate() {

return null;

}

@Override

public byte[] asBytes() {

return new byte[0];

}

@Override

public Boolean asBoolean() {

return null;

}

@Override

public BigDecimal asBigDecimal() {

return null;

}

@Override

public BigInteger asBigInteger() {

return null;

}

}

注意每个实现保存数据的同时,会计算数据的大小,后面限制时候使用

3-定义一个类实现Record,主要是对reader或者writer一行数据的增加和获取public class DefaultRecord implements Record {

private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;

private List columns;

private int byteSize;

// 首先是Record本身需要的内存

private int memorySize = ClassSize.DefaultRecordHead;

public DefaultRecord() {

this.columns = new ArrayList(RECORD_AVERGAE_COLUMN_NUMBER);

}

@Override

public void addColumn(Column column) {

columns.add(column);

incrByteSize(column);

}

@Override

public Column getColumn(int i) {

if (i = columns.size()) {

return null;

}

return columns.get(i);

}

@Override

public void setColumn(int i, final Column column) {

if (i

throw new RuntimeException("不能给index小于0的column设置值");

}

if (i >= columns.size()) {

expandCapacity(i + 1);

}

decrByteSize(getColumn(i));

this.columns.set(i, column);

incrByteSize(getColumn(i));

}

@Override

public String toString() {

Map json = new HashMap();

json.put("size", this.getColumnNumber());

json.put("data", this.columns);

return JSON.toJSONString(json);

}

@Override

public int getColumnNumber() {

return this.columns.size();

}

@Override

public int getByteSize() {

return byteSize;

}

public int getMemorySize(){

return memorySize;

}

private void decrByteSize(final Column column) {

if (null == column) {

return;

}

byteSize -= column.getByteSize();

//内存的占用是column对象的头 再加实际大小

memorySize = memorySize -  ClassSize.ColumnHead - column.getByteSize();

}

private void incrByteSize(final Column column) {

if (null == column) {

return;

}

byteSize += column.getByteSize();

//内存的占用是column对象的头 再加实际大小

memorySize = memorySize + ClassSize.ColumnHead + column.getByteSize();

}

private void expandCapacity(int totalSize) {

if (totalSize <= 0) {

return;

}

int needToExpand = totalSize - columns.size();

while (needToExpand-- > 0) {

this.columns.add(null);

}

}

}

中间会用到计算对象头大小的工具类,在datax源码中,此处希望有大神可以讲解一下对象头大小计算等知识!!!

4-接下来定义一个抽象类Channel,统计和限速都在这里

"capacity": 512, 队列大小

"byteCapacity": 67108864  每条数据大小的现在public abstract class Channel {

private static final Logger LOG = LoggerFactory.getLogger(Channel.class);

protected int taskGroupId;

protected int capacity;

protected int byteCapacity;

protected long byteSpeed; // bps: bytes/s

protected long recordSpeed; // tps: records/s

protected long flowControlInterval;

protected volatile boolean isClosed = false;

protected Configuration configuration = null;

protected volatile long waitReaderTime = 0;

protected volatile long waitWriterTime = 0;

private static Boolean isFirstPrint = true;

public Channel(final Configuration configuration) {

//channel的queue里默认record为1万条。原来为512条

int capacity = configuration.getInt(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);

long byteSpeed = configuration.getLong(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);

long recordSpeed = configuration.getLong(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);

if (capacity <= 0) {

throw new IllegalArgumentException(String.format(

"通道容量[%d]必须大于0.", capacity));

}

synchronized (isFirstPrint) {

if (isFirstPrint) {

Channel.LOG.info("Channel set byte_speed_limit to " + byteSpeed

+ (byteSpeed <= 0 ? ", No bps activated." : "."));

Channel.LOG.info("Channel set record_speed_limit to " + recordSpeed

+ (recordSpeed <= 0 ? ", No tps activated." : "."));

isFirstPrint = false;

}

}

this.capacity = capacity;

this.byteSpeed = byteSpeed;

this.recordSpeed = recordSpeed;

this.flowControlInterval = configuration.getLong(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);

//channel的queue默认大小为8M,原来为64M

this.byteCapacity = configuration.getInt(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);

this.configuration = configuration;

}

public void pushTerminate(final TerminateRecord r) {

Validate.notNull(r, "record不能为空.");

this.doPush(r);

}

public void close() {

this.isClosed = true;

}

public void open() {

this.isClosed = false;

}

public boolean isClosed() {

return isClosed;

}

public int getTaskGroupId() {

return this.taskGroupId;

}

public int getCapacity() {

return capacity;

}

public long getByteSpeed() {

return byteSpeed;

}

public Configuration getConfiguration() {

return this.configuration;

}

public void push(final Record r) {

Validate.notNull(r, "record不能为空.");

this.doPush(r);

}

public void pushAll(final Collection rs) {

Validate.notNull(rs);

Validate.noNullElements(rs);

this.doPushAll(rs);

}

public Record pull() {

Record record = this.doPull();

return record;

}

public void pullAll(final Collection rs) {

Validate.notNull(rs);

this.doPullAll(rs);

}

protected abstract void doPush(Record r);

protected abstract void doPushAll(Collection rs);

protected abstract Record doPull();

protected abstract void doPullAll(Collection rs);

public abstract int size();

public abstract boolean isEmpty();

public abstract void clear();

private long getByteSize(final Collection rs) {

long size = 0;

for (final Record each : rs) {

size += each.getByteSize();

}

return size;

}

}

重要的是它的实现类,内存Channel的具体实现,底层其实是一个ArrayBlockingQueue

生产者消费者模型基于ReentrantLock,Condition!!!public class MemoryChannel extends Channel {

private int bufferSize = 0;

private AtomicInteger memoryBytes = new AtomicInteger(0);

private ArrayBlockingQueue queue = null;

private ReentrantLock lock;

private Condition notInsufficient, notEmpty;//不充足的,不为空

public MemoryChannel(final Configuration configuration) {

super(configuration);

this.queue = new ArrayBlockingQueue(this.getCapacity());//初始化队列的大小512

this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);

//初始化锁和线程同步工具

lock = new ReentrantLock();

notInsufficient = lock.newCondition();

notEmpty = lock.newCondition();

}

@Override

public void clear(){

this.queue.clear();

}

@Override

protected void doPush(Record r) {

try {

long startTime = System.nanoTime();

this.queue.put(r);

waitWriterTime += System.nanoTime() - startTime;

memoryBytes.addAndGet(r.getMemorySize());

} catch (InterruptedException ex) {

Thread.currentThread().interrupt();

}

}

@Override

protected void doPushAll(Collection rs) {

try {

long startTime = System.nanoTime();

lock.lockInterruptibly();

int bytes = getRecordBytes(rs);

while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {//检查剩余可以插入的数量

notInsufficient.await(200L, TimeUnit.MILLISECONDS);

}

this.queue.addAll(rs);

waitWriterTime += System.nanoTime() - startTime;

memoryBytes.addAndGet(bytes);

notEmpty.signalAll();

} catch (InterruptedException e) {

throw new RuntimeException();

} finally {

lock.unlock();

}

}

@Override

protected Record doPull() {

try {

long startTime = System.nanoTime();

Record r = this.queue.take();

waitReaderTime += System.nanoTime() - startTime;

memoryBytes.addAndGet(-r.getMemorySize());

return r;

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

throw new IllegalStateException(e);

}

}

@Override

protected void doPullAll(Collection rs) {

assert rs != null;

rs.clear();

try {

long startTime = System.nanoTime();

lock.lockInterruptibly();

while (this.queue.drainTo(rs, bufferSize) <= 0) {

notEmpty.await(200L, TimeUnit.MILLISECONDS);

}

waitReaderTime += System.nanoTime() - startTime;

int bytes = getRecordBytes(rs);

memoryBytes.addAndGet(-bytes);

notInsufficient.signalAll();

} catch (InterruptedException e) {

throw new RuntimeException();

} finally {

lock.unlock();

}

}

private int getRecordBytes(Collection rs){

int bytes = 0;

for(Record r : rs){

bytes += r.getMemorySize();

}

return bytes;

}

@Override

public int size() {

return this.queue.size();

}

@Override

public boolean isEmpty() {

return this.queue.isEmpty();

}

}

5-最后会定义一个RecordExchanger用来具体的传输数据或者获取数据public class BufferedRecordExchanger  {

private final Channel channel;

private final Configuration configuration;

private final List buffer;

private int bufferSize ;

protected final int byteCapacity;

private final AtomicInteger memoryBytes = new AtomicInteger(0);

private int bufferIndex = 0;

private static Class extends Record> RECORD_CLASS;

private volatile boolean shutdown = false;

public BufferedRecordExchanger(final Channel channel) {

assert null != channel;

assert null != channel.getConfiguration();

this.channel = channel;

this.configuration = channel.getConfiguration();

this.bufferSize = configuration

.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);

this.buffer = new ArrayList(bufferSize);

//channel的queue默认大小为8M,原来为64M

this.byteCapacity = configuration.getInt(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);

try {

BufferedRecordExchanger.RECORD_CLASS = ((Class extends Record>) Class

.forName("com.fayayo.core.transport.record.DefaultRecord"));

} catch (Exception e) {

throw new RuntimeException();

}

}

public Record createRecord() {

try {

return BufferedRecordExchanger.RECORD_CLASS.newInstance();

} catch (Exception e) {

throw new RuntimeException();

}

}

public void sendToWriter(Record record) {

if(shutdown){

throw new RuntimeException();

}

Validate.notNull(record, "record不能为空.");

if (record.getMemorySize() > this.byteCapacity) {

throw new RuntimeException("单条记录超过大小限制,当前限制为:"+this.byteCapacity);

}

//bufferSize默认是32   如果大于32或者超过最大限制  则提交一个批次

boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);

if (isFull) {

flush();

}

this.buffer.add(record);

this.bufferIndex++;

memoryBytes.addAndGet(record.getMemorySize());

}

//真正加入队列的方法

public void flush() {

if(shutdown){

throw new RuntimeException();

}

this.channel.pushAll(this.buffer);//加入到arrayBlockQueue

this.buffer.clear();

this.bufferIndex = 0;

this.memoryBytes.set(0);

}

//从队列中获取一行数据

public Record getFromReader() {

if(shutdown){

throw new RuntimeException();

}

boolean isEmpty = (this.bufferIndex >= this.buffer.size());

if (isEmpty) {//为空的话就去拉去数据,陷入等待

log.info("没有数据等待.....");

receive();

}

Record record = this.buffer.get(this.bufferIndex++);

if (record instanceof TerminateRecord) {//生产者的结束标志

record = null;

}

return record;

}

public void shutdown(){

shutdown = true;

try{

buffer.clear();

channel.clear();

}catch(Throwable t){

t.printStackTrace();

}

}

private void receive() {

this.channel.pullAll(this.buffer);

this.bufferIndex = 0;

this.bufferSize = this.buffer.size();

}

public void terminate() {

if(shutdown){

throw new RuntimeException();

}

flush();

this.channel.pushTerminate(TerminateRecord.get());

}

}

6- 生产者传输的前提要有个结束标志

/**

* 作为标示 生产者已经完成生产的标志

*/

public class TerminateRecord implements Record {

private final static TerminateRecord SINGLE = new TerminateRecord();

private TerminateRecord() {

}

public static TerminateRecord get() {

return SINGLE;

}

@Override

public void addColumn(Column column) {

}

@Override

public Column getColumn(int i) {

return null;

}

@Override

public int getColumnNumber() {

return 0;

}

@Override

public int getByteSize() {

return 0;

}

@Override

public int getMemorySize() {

return 0;

}

@Override

public void setColumn(int i, Column column) {

return;

}

}

7- 测试

public class MainTest {

public static void main(String[] args) {

System.setProperty("elastic.home","D:\\app\\workspace\\idea\\elastic-job\\target\\elastic\\elastic");

Configuration configuration=Configuration.newDefault();

Configuration allConfig=configuration.merge(ConfigParser.parseCoreConfig(CoreConstant.ELASTIC_CONF_PATH),false);

Channel channel=new MemoryChannel(allConfig);

BufferedRecordExchanger bufferedRecordExchanger=new BufferedRecordExchanger(channel);

Record record=bufferedRecordExchanger.createRecord();//创建一行数据

String str="abcdefghigklmnopqrstuvwxyz";

//生产数据

new Thread(new Runnable() {

@Override

public void run() {

//发送数据

record.setColumn(0,new StringColumn(str));//添加完数据

bufferedRecordExchanger.sendToWriter(record);//发送数据

bufferedRecordExchanger.sendToWriter(record);//发送数据

bufferedRecordExchanger.terminate();//发送结束标志

}

}).start();

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

//消费数据

//生产数据

new Thread(new Runnable() {

@Override

public void run() {

//消费数据

Record record1=null;//接收数据

while ((record1=bufferedRecordExchanger.getFromReader())!=null){

System.out.println(record1.toString());

}

System.out.println("读取完毕");

}

}).start();

}

}

测试的时候用到了datax中的json工具以及core.json,会用到里面的限制的参数:

"transport": {

"channel": {

"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",

"speed": {

"byte": -1,

"record": -1

},

"flowControlInterval": 20,

"capacity": 512,

"byteCapacity": 67108864

},

"exchanger": {

"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",

"bufferSize": 32

}

}

datax底层原理_手把手实现Datax3.0中的传输通道相关推荐

  1. synchronized底层原理_你用过synchronized吗?它的底层原理是什么?Java经典面试题来了...

    并发编程已经成为程序员必备技能 作为Java程序员,不懂得并发编程显然已经不能满足市场需求了,尤其是在面试过程中将处于被动地位,也有可能面试将就此终结. 那么作为Java开发者的你,日常虽然可以基于J ...

  2. 底层原理_自动装箱与拆箱底层原理

    1.自动装箱与拆箱 Java中的数据类型分为两大类,基本数据类型与引用数据类型.Java中共提供了八种基本数据类型,同时提供了这八种基本数据类型对应的引用数据类型. 自动装箱:基本数据类型的数据自动转 ...

  3. datax底层原理_Datax 插件加载原理

    Datax 插件加载原理 插件类型 Datax有好几种类型的插件,每个插件都有不同的作用. reader, 读插件.Reader就是属于这种类型的 writer, 写插件.Writer就是属于这种类型 ...

  4. 不要给我说什么底层原理_连集合底层实现原理都不知道,你敢说 Redis 用的很溜?...

    目录 SDS 的设计到底有多牛逼. List.Set.Sorted Set.Hash 底层实现原理 SDS 的设计到底有多牛逼 Redis 使用 C 语言编写,但是并没有直接使用 C 语言自带的字符串 ...

  5. Struts2_day04--课程介绍_Struts2拦截器概述底层原理_重要的概念

    Struts2_day04 上节内容 今天内容 Struts2拦截器概述 拦截器底层原理 重要的概念 自定义拦截器 自定义登录拦截器 Struts2的标签库 Struts2表单标签(会用) Strut ...

  6. hashmap底层原理_周末自己动手撸一个 HashMap,美滋滋

    对HashMap的思考 通过写一个迷你版的HashMap来深刻理解 定义接口 接口实现 看MyHashMap的构造 Entry 看put如何实现 hash函数 resize和rehash get实现 ...

  7. java web底层原理_详解Java开发Web应用程序的底层原理

    前言 前面一篇文章,我从整个应用程序的整体以及跟运行环境的关系简单聊了一下我们现在常用的Spring框架的设计基础和准则,其中主要是控制反转和依赖注入,以及容器化编程等概念. 这里我不想去复述这些概念 ...

  8. SpringCloud工作笔记065---lombok的使用和原理_在开发工具STS_eclipse_Idea中集成lombok

    JAVA技术交流QQ群:170933152 简单说就是在类中不用再写:get set方法,hashcode toStirng equals方法了 eclipse中集成: update时,勾选上Forc ...

  9. 三坐标最小二乘法原理_全最小二乘法在三坐标测量中的应用

    第 3O卷第 4期 2008年 O7月 武 汉 工 程 大 学 学 报 J. Wuhan Inst. Tech. Vo1.30 NO.4 Ju1. 2008 文章编号 :1674-2869(2008) ...

最新文章

  1. Android/Java 单例使用总结
  2. mysql 时区与时间函数
  3. mysql mysql_row 整行数据_PHP使用mysql_fetch_row查询获得数据行列表的方法,phpmysql_fetch_row_PHP教程...
  4. 如何设置TextView textStyle,例如粗体,斜体
  5. o_rdonly_O_RDWR, O_CREAT等open函数标志位在哪里定义? | 学步园
  6. 17.3.10--关于C元的变量类型所占字节问题和类型转化
  7. 详解Guitar Pro的自动化编辑器之节拍自动化
  8. JavaScript MD5加密实现
  9. 西门子STEP7 MICROWIN V4 SP5 下载
  10. 人脸识别源码运行指南
  11. coolfire的八篇入门文章(.txt)
  12. greensock下载_使用GreenSock完成我们的可拖动的画布外菜单
  13. java 数据字典使用_java中数据字典怎么用?图文详解
  14. IDM安装使用 解决下载限速
  15. 如何使用PS进行P图
  16. unity 制作的app发布到andriod手机
  17. Idea Eclips快捷键
  18. python处理pdf实例_Python程序图片和pdf上文字识别实例
  19. 「CTS2019 | CTSC2019」氪金手游 解题报告
  20. 一个团队需要什么样的人

热门文章

  1. 技术贴 | MetaboAnalyst 4.0,代谢组学研究利器的升级
  2. R语言ggplot2可视化:ggplot2可视化半小提琴图(Half Violin Plots)
  3. R语言dplyr包的mutate函数将列添加到dataframe中或者修改现有的数据列:使用na_if()函数将0值替换为NA值、负收入替换为NA值
  4. R语言sample函数数据对象采样实战
  5. 简要介绍一下贝叶斯定理( Bayes‘ theorem)
  6. GPU信息查看以及确认Pytorch使用了GPU计算模块进行深度学习的训练
  7. 词移距离 Word Mover‘s Distance
  8. 服务器安装使用rstudio-server
  9. c语言一维数组逆序输出_剑指信奥 | C 语言之兵人来袭!
  10. Fast and accurate short read alignment with Burrows-Wheeler transform