不多说,直接上代码。

Hadoop MapReduce编程 API入门系列之小文件合并(二十九)

生成的结果,作为输入源。

代码

package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;

import java.net.URI;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
*
* @function 统计无效数据和对输出结果进行压缩
* @author 小讲
*
*/
public class CompressAndCounter extends Configured implements Tool
{
// 定义枚举对象
public static enum LOG_PROCESSOR_COUNTER
{
BAD_RECORDS
};
/**
*
* @function Mapper 解析数据,统计无效数据,并输出有效数据
*
*/
public static class CompressAndCounterMap extends Mapper<LongWritable, Text, Text, Text>
{
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException
{
// 解析每条机顶盒记录,返回list集合
List<String> list = ParseTVData.transData(value.toString()); //调用ParseTVData.java下的transData方法
int length = list.size();
// 无效记录
if (length == 0)
{
// 动态自定义计数器
context.getCounter("ErrorRecordCounter", "ERROR_Record_TVData").increment(1);
// 枚举声明计数器
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS).increment(1);
} else
{
for (String validateRecord : list)
{
//输出解析数据
context.write(new Text(validateRecord), new Text(""));
}
}

}
}
/**
* @function 任务驱动方法
*
*/
@Override
public int run(String[] args) throws Exception
{
// TODO Auto-generated method stub
//读取配置文件
Configuration conf = new Configuration();
//文件系统接口
URI uri = new URI("hdfs://HadoopMaster:9000");
//输出路径
Path mypath = new Path(args[1]);
// 创建FileSystem对象
FileSystem hdfs = FileSystem.get(uri, conf);
if (hdfs.isDirectory(mypath))
{
//删除已经存在的文件路径
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "CompressAndCounter");//新建一个任务
job.setJarByClass(CompressAndCounter.class);//设置主类

job.setMapperClass(CompressAndCounterMap.class);//只有 Mapper
job.setOutputKeyClass(Text.class);//输出 key 类型
job.setOutputValueClass(Text.class);//输出 value 类型

FileInputFormat.addInputPath(job, new Path(args[0]));//输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径

FileOutputFormat.setCompressOutput(job, true);//对输出结果设置压缩
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//设置压缩类型

job.waitForCompletion(true);//提交任务
return 0;
}
/**
* @function main 方法
* @param args 输入 输出路径
* @throws Exception
*/
public static void main(String[] args) throws Exception
{
String[] date = {"20120917","20120918","20120919","20120920","20120921","20120922","20120923"};
int ec = 1;
for(String dt:date)
{
String[] args0 = { "hdfs://HadoopMaster:9000/middle/tv/"+dt+".txt",
"hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };

// String[] args0 = { "./data/compressAndCounter/"+dt+".txt",
// "hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };

ec = ToolRunner.run(new Configuration(), new CompressAndCounter(), args0);
}
System.exit(ec);
}
}

package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;

import java.util.ArrayList;

import java.util.List;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

/**
*
* @function 解析数据
*
*
*/
public class ParseTVData
{
/**
* @function 使用 Jsoup 工具,解析输入数据,
* @param text
* @return list
*/
public static List<String> transData(String text)
{
List<String> list = new ArrayList<String>();
Document doc;
String rec = "";
try
{
doc = Jsoup.parse(text);// jsoup解析数据
Elements content = doc.getElementsByTag("WIC");
String num = content.get(0).attr("cardNum");// 记录编号
if (num == null || num.equals(""))
{
num = " ";
}

String stbNum = content.get(0).attr("stbNum");// 机顶盒号
if (stbNum.equals(""))
{
return list;
}

String date = content.get(0).attr("date");// 日期

Elements els = doc.getElementsByTag("A");
if (els.isEmpty())
{
return list;
}

for (Element el : els)
{
String e = el.attr("e");// 结束时间

String s = el.attr("s");// 开始时间

String sn = el.attr("sn");// 频道名称

rec = stbNum + "@" + date + "@" + sn + "@" + s + "@" + e;
list.add(rec);
}
} catch (Exception e)
{
System.out.println(e.getMessage());
return list;
}
return list;
}
}

转载于:https://www.cnblogs.com/zlslch/p/6171823.html

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)相关推荐

  1. Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

    一共12列,我们只需提取有用的列:第二列(犯罪类型).第四列(一周的哪一天).第五列(具体时间)和第七列(犯罪场所). 思路分析 基于项目的需求,我们通过以下几步完成: 1.首先根据数据集,分别统计出 ...

  2. Hadoop MapReduce编程 API入门系列之查找相同字母组成的字谜(三)

    找出相同单词的所有单词.现在,是拿取部分数据集(如下)来完成本项目. 项目需求 一本英文书籍包含成千上万个单词或者短语,现在我们需要在大量的单词中,找出相同字母组成的所有anagrams(字谜). 思 ...

  3. Hadoop MapReduce编程 API入门系列之join(二十六)

    天气记录数据库 气象站数据库 气象站和天气记录合并之后的示意图如下所示. 011990-99999 SIHCCAJAVRI 195005150700 0 011990-99999 SIHCCAJAVR ...

  4. Hadoop MapReduce编程 API入门系列之最短路径(十五)

    不多说,直接上代码. ====================================== = Iteration: 1 = Input path: out/shortestpath/inpu ...

  5. Hadoop MapReduce编程 API入门系列之wordcount版本2(六)

    这篇博客,给大家,体会不一样的版本编程. 代码 1 package zhouls.bigdata.myMapReduce.wordcount4; 2 3 import java.io.IOExcept ...

  6. Hadoop MapReduce编程 API入门系列之wordcount版本3(七)

    代码 1 package zhouls.bigdata.myMapReduce.wordcount3; 2 3 import java.io.IOException; 4 5 import org.a ...

  7. HBase编程 API入门系列之HTable pool(6)

    HTable是一个比较重的对此,比如加载配置文件,连接ZK,查询meta表等等,高并发的时候影响系统的性能,因此引入了"池"的概念. 引入"HBase里的连接池" ...

  8. Windows SDK编程 API入门系列(转)

    之一 -那'烦人'的Windows数据类型 原创文章,转载请注明作者及出处. 首发 http://blog.csdn.net/beyondcode http://www.cnblogs.com/bey ...

  9. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

最新文章

  1. 猫叫老鼠跑的事件例子
  2. Android与Js交互时,屏幕不适配问题
  3. bzoj3252攻略(线段树+dfs序)或者(树链剖分+dfs)
  4. 机器学习(一) 基于sklearn库的数据集划分(交叉验证)
  5. php中时间轴,PHP时间轴函数
  6. 修饰符.lazy .number .trim
  7. hdu 1161 Eddy's mistakes
  8. C# continue,break,return 跳转语句的用法
  9. FPGA双沿发送之ODDR原语实现
  10. 从零开始利用vue-cli搭建简单音乐网站(四)
  11. C++Primer第5版学习笔记(一)
  12. fpga烧写bin文件_S3C2440移植uboot之编译烧写uboot
  13. 记录一次json转换的经历
  14. 如何提高你的工作效率
  15. EXCEL工作表保护密码忘记,撤消工作表保护
  16. css 页面缩放边框产生空隙相关
  17. Linux 之旅 8:初识 BASH
  18. Redis爬坑记(一):incr命令和expire命令的误区
  19. wireshark编译基于openflow1.3协议开发
  20. 1366 mysql_mysql出现ERROR 1366 (HY000):的解决办法

热门文章

  1. 30篇「CVPR2020」最新论文抢先看!看计算机视觉2020在研究什么?
  2. Python编辑统一缩进(Pycharm)
  3. mysql主从不同步不报错_MySQL主从不同步解决
  4. 【图像处理】U-Net中的重叠-切片(Overlap-tile)
  5. 腾讯,创新工场,淘宝等公司最新面试三十题(第171-200题)
  6. 主成分分析降维(MNIST数据集)
  7. 数据结构(十七)最小生成树
  8. 村庄规划中核心技术(村土地利用规划方面)
  9. 豆瓣 为什么不用php,豆瓣网友是不是都疯了?
  10. linux添加svn副本目录,关于linux svn添加忽略目录的梗