前面学习了一些源码和datax的执行,其中有一个重要的流程任务切分。今天梳理下;


一、概述

Datax根首先据配置文件,确定好channel的并发数目。然后将整个job分成一个个小的task,然后划分成组。从JobContainer的start()方法开始,进入split()方法,split方法里执行后续所有的切分;


二、总体流程

  1. 切分任务
  2. channel数目的确定
  3. reader的切分
  4. Writer的切分
  5. 合并配置 分
  6. 配任务

三、切分任务

JobContainer 的split负责将整个job切分成多个task,生成task配置的列表。

  /*** 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, 达到切分后数目相等,* 才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起。然后,为避免顺序给读写端带来长尾影响,* 将整合的结果shuffler掉 <br/>* 1 动态调整并获取channel数量 <br/>* 2 根据1的channel数量 切割reader 得到reader的cfg列表 <br/>* 3 根据2的 cfg数量切割writer,得到writer cfg 列表 <br/>* 4 获取transform的cfg 列表 <br/>* 5 合并234的cfg <br/>*/private int split() {this.adjustChannelNumber();needChannelNumber = needChannelNumber <= 0 ? 1 : needChannelNumber;List<Configuration> readerTaskCfgs = this.doReaderSplit(this.needChannelNumber);int taskNumber = readerTaskCfgs.size();List<Configuration> writerTaskCfs = this.doWriterSplit(taskNumber);List<Configuration> trsfms = configuration.getListConfiguration(DATAX_JOB_CONTENT_TRANSFORMER);LOG.debug("transformer configuration: " + JSON.toJSONString(trsfms));//输入是reader和writer的parameter list,输出是content下面元素的listList<Configuration> contentCfgs = mergeReaderAndWriterTaskConfigs(readerTaskCfgs, writerTaskCfs,trsfms);LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentCfgs));this.configuration.set(DATAX_JOB_CONTENT, contentCfgs);return contentCfgs.size();}

执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型.


四、channel数目的确定

datax先从core.json 和 job.json 里获取用户指定的channel,然后再内部根据实际情况进行调整channel数量;

1、用户指定channel数:

{"core": {"transport" : {"channel": {"speed": {"record": 100,"byte": 100}}}},"job": {"setting": {"speed": {"record": 500,"byte": 1000,"channel" : 1}}}
}

job里的是全局配置, core里的channel是单个channel的限制。首先计算按照字节数限速,channel的数目应该为 500 / 100 = 5,然后按照记录数限速, channel的数目应该为 1000 / 100 = 10, 最后返回两者的最小值 5。虽然指定了channel为1, 但只有在没有限速的条件下,才会使用。

2、程序根据实际情况调整channel数

所有调整channel的代码在JobContainer的adjustChannelNumber方法

  /*** 根据byteNum和RecordNum调整channel数量 <br>* 1 是否有全局(job) byte限制,如果有,则必须要有channel的byte设置,最后计算出 需要的channelByByte数量  <br>* 2 是否有全局(job) record限制,如果有,则必须要有channel的record设置,最后计算出 需要的channelByRecord数量 <br>* 3 取1和2的最小值设置到job的channelNumber,如果可以设置,则该方法任务完成,退出 <br>* 4 如果3 未能设置,则从cfg中判断用户是否自己设置了channelNum,如果用户设置了,将用户设置的给本job channel <br>*/private void adjustChannelNumber() {int needChannelNumByByte = Integer.MAX_VALUE;boolean hasByteLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);if (hasByteLimit) {long jobByteSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!Long channelByteSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);if (channelByteSpeed == null || channelByteSpeed <= 0) {throw DataXException.asDataXException(CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");}needChannelNumByByte = (int) (jobByteSpeed / channelByteSpeed);needChannelNumByByte = needChannelNumByByte > 0 ? needChannelNumByByte : 1;LOG.info("Job set Max-Byte-Speed to " + jobByteSpeed + " bytes.");}int needChannelNumByRecord = Integer.MAX_VALUE;boolean hasRecordLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;if (hasRecordLimit) {long jobRecordSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 100000);Long channelRecordSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);if (channelRecordSpeed == null || channelRecordSpeed <= 0) {throw DataXException.asDataXException(CONFIG_ERROR,"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");}needChannelNumByRecord = (int) (jobRecordSpeed / channelRecordSpeed);needChannelNumByRecord = needChannelNumByRecord > 0 ? needChannelNumByRecord : 1;LOG.info("Job set Max-Record-Speed to " + jobRecordSpeed + " records.");}// 全局的 needChannelNumber 按照needChannelNumByByte 和needChannelNumByRecord  取较小值needChannelNumber = Math.min(needChannelNumByByte, needChannelNumByRecord);// 如果从byte或record上设置了needChannelNumber则退出if (this.needChannelNumber < Integer.MAX_VALUE) {return;}boolean hasChannelLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);if (hasChannelLimit) {needChannelNumber = this.configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL);LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");return;}throw DataXException.asDataXException(CONFIG_ERROR, "Job运行速度必须设置");}

五、reader的切分

doReaderSplit方法, 调用Reader.Job的split方法,返回Reader.Task的Configuration列表

/**
* adviceNumber, 建议的数目
*/
private List<Configuration> doReaderSplit(int adviceNumber) {// 切换ClassLoaderclassLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));// 调用Job.Reader的split切分List<Configuration> readerSlicesConfigs =this.jobReader.split(adviceNumber);if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,"reader切分的task数目不能小于等于0");}LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",this.readerPluginName, readerSlicesConfigs.size());classLoaderSwapper.restoreCurrentThreadClassLoader();return readerSlicesConfigs;
}public List<Configuration> split(int adviceNumber) {LOG.info("split() begin...");List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();// warn:每个slice拖且仅拖一个文件,// int splitNumber = adviceNumber;int splitNumber = this.sourceFiles.size();if (0 == splitNumber) {//                throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
//                        String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));String message = String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH));LOG.info(message);return new ArrayList<Configuration>();}List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);for (List<String> files : splitedSourceFiles) {Configuration splitedConfig = this.readerOriginConfig.clone();splitedConfig.set(Constant.SOURCE_FILES, files);readerSplitConfigs.add(splitedConfig);}return readerSplitConfigs;}private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) {List<List<T>> splitedList = new ArrayList<List<T>>();int averageLength = sourceList.size() / adviceNumber;averageLength = averageLength == 0 ? 1 : averageLength;for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {end = begin + averageLength;if (end > sourceList.size()) {end = sourceList.size();}splitedList.add(sourceList.subList(begin, end));}return splitedList;}

这里的reader是hdfs reader 原文件数,reader task数等于文件数。


六、Writer的切分数

doWriterSplit方法, 调用Writer.JOb的split方法,返回Writer.Task的Configuration列表

private List<Configuration> doWriterSplit(int readerTaskNumber) {// 切换ClassLoaderclassLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));// 调用Job.Reader的split切分List<Configuration> writerSlicesConfigs = this.jobWriter.split(readerTaskNumber);if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,"writer切分的task不能小于等于0");}LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",this.writerPluginName, writerSlicesConfigs.size());classLoaderSwapper.restoreCurrentThreadClassLoader();return writerSlicesConfigs;
}

为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
比如rediswriter

 public List<Configuration> split(int mandatoryNumber) {List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);for (int i = 0; i < mandatoryNumber; i++) {configurations.add(getPluginJobConf());}return configurations;}

七、合并配置

合并reader,writer,transformer配置列表。并将任务列表,保存在配置job.content的值里。

private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs,List<Configuration> writerTasksConfigs,List<Configuration> transformerConfigs) {// reader切分的任务数目必须等于writer切分的任务数目if (readerTasksConfigs.size() != writerTasksConfigs.size()) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",readerTasksConfigs.size(), writerTasksConfigs.size()));}List<Configuration> contentConfigs = new ArrayList<Configuration>();for (int i = 0; i < readerTasksConfigs.size(); i++) {Configuration taskConfig = Configuration.newDefault();// 保存reader相关配置taskConfig.set(CoreConstant.JOB_READER_NAME,this.readerPluginName);taskConfig.set(CoreConstant.JOB_READER_PARAMETER,readerTasksConfigs.get(i));// 保存writer相关配置taskConfig.set(CoreConstant.JOB_WRITER_NAME,this.writerPluginName);taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,writerTasksConfigs.get(i));// 保存transformer相关配置if(transformerConfigs!=null && transformerConfigs.size()>0){taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);}taskConfig.set(CoreConstant.TASK_ID, i);contentConfigs.add(taskConfig);}return contentConfigs;
}

分配任务
分配算法

首先根据指定的channel数目和每个Taskgroup的拥有channel数目,计算出Taskgroup的数目
根据每个任务的reader.parameter.loadBalanceResourceMark将任务分组
根据每个任务writer.parameter.loadBalanceResourceMark来讲任务分组
根据上面两个任务分组的组数,挑选出大的那个组
轮询上面步骤的任务组,依次轮询的向各个TaskGroup添加一个,直到所有任务都被分配完
这里举个实例:
目前有7个task,channel有20个,每个Taskgroup拥有5个channel。
首先计算出Taskgroup的数目, 20 / 5 = 4 。(实际不会有这种情况,channel数不会超过task数)

根据reader.parameter.loadBalanceResourceMark,将任务分组如下:

{"database_a" : [task_id_1, task_id_2],"database_b" : [task_id_3, task_id_4, task_id_5],"database_c" : [task_id_6, task_id_7]
}

根据writer.parameter.loadBalanceResourceMark,将任务分组如下:

{"database_dst_d" : [task_id_1, task_id_2],"database_dst_e" : [task_id_3, task_id_4, task_id_5, task_id_6, task_id_7]
}

因为readerResourceMarkAndTaskIdMap有三个组,而writerResourceMarkAndTaskIdMap只有两个组。从中选出组数最多的,所以这里按照readerResourceMarkAndTaskIdMap将任务分配。

执行过程是,轮询database_a, database_b, database_c,取出第一个。循环上一步

1. 取出task_id_1 放入 taskGroup_1
2. 取出task_id_3 放入 taskGroup_2
3. 取出task_id_6 放入 taskGroup_3
4. 取出task_id_2 放入 taskGroup_4
5. 取出task_id_4 放入 taskGroup_1
6. ………

最后返回的结果为

{"taskGroup_1": [task_id_1, task_id_4],"taskGroup_2": [task_id_3, task_id_7],"taskGroup_3": [task_id_6, task_id_5],"taskGroup_4": [task_id_2]
}

代码解释
任务的分配是由JobAssignUtil类负责。使用者调用assignFairly方法,传入参数,返回TaskGroup配置列表

public final class JobAssignUtil {/*** configuration 配置* channelNumber, channel总数* channelsPerTaskGroup, 每个TaskGroup拥有的channel数目*/public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);// 计算TaskGroup的数目int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);......// 任务分组LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);// 调用doAssign方法,分配任务List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);// 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);return taskGroupConfig;}
}

任务分组
按照task配置的reader.parameter.loadBalanceResourceMark和writer.parameter.loadBalanceResourceMark,分别对任务进行分组,选择分组数最高的那组,作为任务分组的源。

/**
* contentConfig参数,task的配置列表
*/
private static LinkedHashMap<String, List<Integer>> parseAndGetResourceMarkAndTaskIdMap(List<Configuration> contentConfig) {// reader的任务分组,key为分组的名称,value是taskId的列表LinkedHashMap<String, List<Integer>> readerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();// writer的任务分组,key为分组的名称,value是taskId的列表LinkedHashMap<String, List<Integer>> writerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();for (Configuration aTaskConfig : contentConfig) {int taskId = aTaskConfig.getInt(CoreConstant.TASK_ID);// 取出reader.parameter.loadBalanceResourceMark的值,作为分组名String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);if (readerResourceMarkAndTaskIdMap.get(readerResourceMark) == null) {readerResourceMarkAndTaskIdMap.put(readerResourceMark, new LinkedList<Integer>());}// 把 readerResourceMark 加到 readerResourceMarkAndTaskIdMap 中readerResourceMarkAndTaskIdMap.get(readerResourceMark).add(taskId);// 取出writer.parameter.loadBalanceResourceMark的值,作为分组名String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);if (writerResourceMarkAndTaskIdMap.get(writerResourceMark) == null) {writerResourceMarkAndTaskIdMap.put(writerResourceMark, new LinkedList<Integer>());}// 把 writerResourceMark 加到 writerResourceMarkAndTaskIdMap 中writerResourceMarkAndTaskIdMap.get(writerResourceMark).add(taskId);}// 选出reader和writer其中最大的if (readerResourceMarkAndTaskIdMap.size() >= writerResourceMarkAndTaskIdMap.size()) {// 采用 reader 对资源做的标记进行 shufflereturn readerResourceMarkAndTaskIdMap;} else {// 采用 writer 对资源做的标记进行 shufflereturn writerResourceMarkAndTaskIdMap;}
}

分配任务
将上一部任务的分组,划分到每个taskGroup里

private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);Configuration taskGroupTemplate = jobConfiguration.clone();taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);List<Configuration> result = new LinkedList<Configuration>();// 初始化taskGroupConfigListList<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);for (int i = 0; i < taskGroupNumber; i++) {taskGroupConfigList.add(new LinkedList<Configuration>());}// 取得resourceMarkAndTaskIdMap的值的最大个数int mapValueMaxLength = -1;List<String> resourceMarks = new ArrayList<String>();for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {resourceMarks.add(entry.getKey());if (entry.getValue().size() > mapValueMaxLength) {mapValueMaxLength = entry.getValue().size();}}int taskGroupIndex = 0;// 执行mapValueMaxLength次数,每一次轮询一遍resourceMarkAndTaskIdMapfor (int i = 0; i < mapValueMaxLength; i++) {// 轮询resourceMarkAndTaskIdMapfor (String resourceMark : resourceMarks) {if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {// 取出第一个int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);// 轮询的向taskGroupConfigList插入值taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));// taskGroupIndex自增taskGroupIndex++;// 删除第一个resourceMarkAndTaskIdMap.get(resourceMark).remove(0);}}}Configuration tempTaskGroupConfig;for (int i = 0; i < taskGroupNumber; i++) {tempTaskGroupConfig = taskGroupTemplate.clone();// 设置TaskGroup的配置tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);result.add(tempTaskGroupConfig);}// 返回结果return result;
}

为组分配channel
上面已经把任务划分成多个组,为了每个组能够均匀的分配channel,还需要调整。算法原理是,当channel总的数目,不能整除TaskGroup的数目时。多的余数个channel,从中挑选出余数个TaskGroup,每个多分配一个。

比如现在有13个channel,然后taskgroup确有5个。那么首先每个组先分 13 / 5 = 2 个。那么还剩下多的3个chanel,分配给前面个taskgroup。

private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {int taskGroupNumber = taskGroupConfig.size();int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;int remainderChannelCount = channelNumber % taskGroupNumber;// 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;// (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroupint i = 0;for (; i < remainderChannelCount; i++) {taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);}for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);}
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

datax(22):任务分配规则相关推荐

  1. camunda 流程执行追踪_Camunda BPM:基于DMN决策表的任务分配规则引擎 - Camunda团队博客...

    在涉及人员工作流程的业务流程中,任务分配逻辑可能变得非常复杂.例如,处理保险索赔或批准流程的其他变体可能需要许多或复杂的任务分配规则.决策模型和符号(DMN)决策表是管理这些规则的很好的工具之一. 示 ...

  2. 机器学习规则 (Rules of Machine Learning): 关于机器学习工程的最佳实践

    马丁·辛克维奇 本文档旨在帮助已掌握机器学习基础知识的人员从 Google 机器学习的最佳实践中受益.它介绍了一种机器学习样式,类似于 Google C++ 样式指南和其他常用的实用编程指南.如果您学 ...

  3. 谷歌机器学习规则:机器学习工程的43条最佳实践经验

    文章选自Google Developers,作者:Martin Zinkevich. 机器学习目前已经有非常多的应用,它相比于传统的软件工程,最大的特点即我们编写的是学习过程,因此系统能根据数据改善性 ...

  4. Linux IPTables:如何添加防火墙规则

    摘要:本文介绍了如何使用"iptables -A"命令添加 iptables 防火墙规则. 本文分享自华为云社区<Linux IPTables:如何添加防火墙规则(使用允许 ...

  5. 谷歌机器学习规则要点简析:43条黄金法则

    http://blog.itpub.net/31542492/viewspace-2156228/ 目录 术语 概览 在进行机器学习之前 机器学习第一阶段:您的第一个管道 关于机器学习工程的最佳实践 ...

  6. 谷歌机器学习规则 (Rules of Machine Learning)

    机器学习规则 (Rules of Machine Learning) 往期文章: 机器学习之特征工程 机器学习之分类(Classification) 精确率.准确率.召回率 ------------- ...

  7. 基于规则引擎实现规则可配置的机场地勤人员排班系统

    规则引擎是一种软件工具,能够在规则库中存储.管理和自动执行规则以快速决策.机场地勤人员排班系统需要根据多种因素进行自动排班. 基于规则引擎实现规则可配置的机场地勤人员排班系统的功能设计如下: 规则库管 ...

  8. ubuntu防火墙的安装,开启,关闭和添加规则等操作

    操作一:查看防火墙状态 输入命令:sudo ufw status 我的防火墙是开启状态,下面显示了防火墙允许通过的协议 如果防火墙没开启会显示,状态不活动 操作二:安装防火墙 输入命令:sudo ap ...

  9. 皮克斯开源_皮克斯的故事讲述规则适合网页设计师

    皮克斯开源 Six months ago Emma Coats – then a Pixar Story Artist, now an independent filmmaker – tweeted ...

最新文章

  1. 《剑指offer》-左旋转字符串
  2. ESI最新计算机学科统计:中国78所跻身高被引100强,中南大学夺冠
  3. 【GNN】一份完全解读:是什么使神经网络变成图神经网络?
  4. Android 编程下 px - dp 的相互转换
  5. 方立勋_30天掌握JavaWeb_EL表达式功能详解
  6. 使用Nginx反向代理来实现简单的负载均衡
  7. 微软服务器延迟,经过六个多月的延迟,微软终于推出Hyper-V Server 2019
  8. mysql 中空值平均_mysql中空值和null值的区别及处理方法总结
  9. day22 随机输出ArrayList
  10. _inflateEnd, referenced from _inflateInit_错误,
  11. Fiddler中文版汉化插件 0.1
  12. 【Tools】OBS Studio录制视频教程
  13. iir数字滤波器设计及matlab实现,终稿毕业论文:IIR数字滤波器设计及其MATLAB实现.docOK版(样例3)...
  14. CodeSmith模板
  15. 日志审计与分析实验4-1(掌握Linux下安装、删除软件的方法)
  16. 贷还是不贷:如何用 Python 和机器学习帮你决策?
  17. windows快速搜索神器everything,让你搜索文件提速百倍!
  18. 【Android】Android入门
  19. 008-break语句与continue语句的使用,循环嵌套
  20. 【项目实战-MATLAB】:基于模板匹配的人民币识别

热门文章

  1. 推荐系列(三):协同过滤
  2. win10系统给文件夹设置备注
  3. matlab 保存多个变量,Matlab将变量导出到文件心得
  4. 在嵌入式设备运行Rust/bluer蓝牙简单应用
  5. 利用局部有界性求函数有界无界_20160331
  6. 【C语言】实现Windows系统关机操作程序
  7. python爬虫爬取安居客房源信息
  8. Python技巧——巧用globals
  9. 用卷积神经网络实现猫狗图片分类
  10. python 生成器和迭代器