cascading--wordcount
在eclipse下运行wordcount,使用cascading封装
准备:centos系统,jdk,hadoop,eclipse,cascading的lib包,官网可下载,自带cascading封装的wordcount源码,以及爬虫数据data目录,这些均可以在官网下载
我是在cascading官网把材料下载好后,在eclipse中运行,可以得到测试数据
难点:cascading的版本与官网自带的wordcount实例可能不匹配,这需要自己自行修改,我的cascading版本不是在官网下载的
给出我的运行结果图:
代码如下:完整版
package com.zjf.cascading.example;/** WordCount example* zjf-pc* Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.* Project and contact information: http://www.concurrentinc.com/*/import java.util.Map; import java.util.Properties;import cascading.cascade.Cascade; import cascading.cascade.CascadeConnector; import cascading.cascade.Cascades; import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.operation.Identity; import cascading.operation.aggregator.Count; import cascading.operation.regex.RegexFilter; import cascading.operation.regex.RegexGenerator; import cascading.operation.regex.RegexReplace; import cascading.operation.regex.RegexSplitter; import cascading.operation.xml.TagSoupParser; import cascading.operation.xml.XPathGenerator; import cascading.operation.xml.XPathOperation; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.pipe.Pipe; import cascading.pipe.SubAssembly; import cascading.scheme.SequenceFile; import cascading.scheme.TextLine; import cascading.tap.Tap; import cascading.tap.Hfs; import cascading.tap.Lfs; import cascading.tuple.Fields;public class WordCount{@SuppressWarnings("serial") private static class ImportCrawlDataAssembly extends SubAssembly{public ImportCrawlDataAssembly( String name ){//拆分文本行到url和rawRegexSplitter regexSplitter = new RegexSplitter( new Fields( "url", "raw" ) );Pipe importPipe = new Each( name, new Fields( "line" ), regexSplitter );//删除所有pdf文档importPipe = new Each( importPipe, new Fields( "url" ), new RegexFilter( ".*\\.pdf$", true ) );//把":n1"替换为"\n",丢弃无用的字段RegexReplace regexReplace = new RegexReplace( new Fields( "page" ), ":nl:", "\n" );importPipe = new Each( importPipe, new Fields( "raw" ), regexReplace, new Fields( "url", "page" ) );//此句强制调用 setTails( importPipe );}}@SuppressWarnings("serial") private static class WordCountSplitAssembly extends SubAssembly{public WordCountSplitAssembly( String sourceName, String sinkUrlName, String sinkWordName ){//创建一个新的组件,计算所有页面中字数,和一个页面中的字数Pipe pipe = new Pipe(sourceName);//利用TagSoup将HTML转成XHTML,只保留"url"和"xml"去掉其它多余的pipe = new Each( pipe, new Fields( "page" ), new TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) );//对"xml"字段运用XPath(XML Path Language)表达式,提取"body"元素XPathGenerator bodyExtractor = new XPathGenerator( new Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" );pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new Fields( "url", "body" ) );//运用另一个XPath表达式删除所有元素,只保留文本节点,删除在"script"元素中的文本节点String elementXPath = "//text()[ name(parent::node()) != 'script']";XPathGenerator elementRemover = new XPathGenerator( new Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath );pipe = new Each( pipe, new Fields( "body" ), elementRemover, new Fields( "url", "words" ) );//用正则表达式将文档打乱成一个个独立的单词,和填充每个单词(新元组)到当前流使用"url"和"word"字段RegexGenerator wordGenerator = new RegexGenerator( new Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" );pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new Fields( "url", "word" ) );//按"url"分组Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new Fields( "url", "word" ) );urlCountPipe = new Every( urlCountPipe, new Fields( "url", "word" ), new Count(), new Fields( "url", "word", "count" ) );//按"word"分组Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new Fields( "word" ) );wordCountPipe = new Every( wordCountPipe, new Fields( "word" ), new Count(), new Fields( "word", "count" ) );//此句强制调用 setTails( urlCountPipe, wordCountPipe );}}public static void main( String[] args ){//设置当前工作jarProperties properties = new Properties(); FlowConnector.setApplicationJarClass(properties, WordCount.class);FlowConnector flowConnector = new FlowConnector(properties);/*** 在运行设置的参数里设置如下代码:* 右击Main.java,选择run as>run confugrations>java application>Main>Agruments->Program arguments框内写入如下代码* data/url+page.200.txt output local * 分析:* args[0]代表data/url+page.200.txt,它位于当前应用所在的目录下面,且路径必须是本地文件系统里的路径* 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/data/url+page.200.txt* 且该路径需要自己创建,url+page.200.txt文件也必须要有,可以在官网下下载* * args[1]代表output文件夹,第二个参数,它位于分布式文件系统hdfs中* 我的路径是:hdfs://s104:9000/user/hadoop/output,该路径需要自己创建* 在程序运行成功后,output目录下会自动生成三个文件夹pages,urls,words* 里面分别包含所有的page,所有的url,所有的word* * args[2]代表local,第三个参数,它位于本地文件系统中* 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/local* 该文件夹不需要自己创建,在程序运行成功后会自动生成在我的上述目录中,* 且在该local文件夹下会自动生成两个文件夹urls和words,里面分别是url个数和word个数*/String inputPath = args[ 0 ];String pagesPath = args[ 1 ] + "/pages/";String urlsPath = args[ 1 ] + "/urls/";String wordsPath = args[ 1 ] + "/words/";String localUrlsPath = args[ 2 ] + "/urls/";String localWordsPath = args[ 2 ] + "/words/";// import a text file with crawled pages from the local filesystem into a Hadoop distributed filesystem// the imported file will be a native Hadoop sequence file with the fields "page" and "url"// note this examples stores crawl pages as a tabbed file, with the first field being the "url"// and the second being the "raw" document that had all new line chars ("\n") converted to the text ":nl:".//初始化Pipe管道处理爬虫数据装配,返回字段url和pagePipe importPipe = new ImportCrawlDataAssembly( "import pipe" );//创建tap实例Tap localPagesSource = new Lfs( new TextLine(), inputPath );Tap importedPages = new Hfs( new SequenceFile( new Fields( "url", "page" ) ), pagesPath );//链接pipe装配到tap实例Flow importPagesFlow = flowConnector.connect( "import pages", localPagesSource, importedPages, importPipe );//拆分之前定义的wordcount管道到新的两个管道url和word// these pipes could be retrieved via the getTails() method and added to new pipe instancesSubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount pipe", "url pipe", "word pipe" );//创建hadoop SequenceFile文件存储计数后的结果Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url", "word", "count" ) ), urlsPath );Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word", "count" ) ), wordsPath );//绑定多个pipe和tap,此处指定的是pipe名称Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) );//wordCountPipe指的是一个装配Flow count = flowConnector.connect( importedPages, sinks, wordCountPipe );//创建一个装配,导出hadoop sequenceFile 到本地文本文件Pipe exportPipe = new Each( "export pipe", new Identity() );Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath );Tap localSinkWord = new Lfs( new TextLine(), localWordsPath );// 使用上面的装配来连接两个sinkFlow exportFromUrl = flowConnector.connect( "export url", sinkUrl, localSinkUrl, exportPipe );Flow exportFromWord = flowConnector.connect( "export word", sinkWord, localSinkWord, exportPipe );装载flow,顺序随意,并执行Cascade cascade = new CascadeConnector().connect( importPagesFlow, count, exportFromUrl, exportFromWord );cascade.complete();}}
转载于:https://www.cnblogs.com/zjf-293916/p/6809015.html
cascading--wordcount相关推荐
- 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...
- 2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
目录 案例一:花式查询 案例二:WordCount 基于DSL编程 基于SQL编程 具体演示代码如下: 案例一:花式查询 package cn.itcast.sqlimport org.apache. ...
- Spark-Spark setMaster WordCount Demo
Spark setMaster源码 /*** The master URL to connect to, such as "local" to run locally with o ...
- Ubantu下hadoop运行第一个例子wordcount过程
Ubantu12.04下hadoop-1.2.1运行第一个例子wordcount过程,分享一下 将WordCount.java文件放在Hadoop安装目录下,并在安装目录下创建输入目录input,目录 ...
- WordCount扩展与优化
合作者:201631062327,201631062128 码云地址:https://gitee.com/LIUJIA6/WordCount3 一:项目说明 本次项目是在上次作业WorldCount的 ...
- 初学Hadoop之图解MapReduce与WordCount示例分析
Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...
- java8 wordcount_Spark2.x与Java8下WordCount示例
spark深度学习指南+进阶实战2册 123.9元 包邮 (需用券) 去购买 > 不用 lambda 的基础版 import org.apache.spark.api.java.JavaPair ...
- 006 Spark中的wordcount以及TopK的程序编写
1.启动 启动HDFS 启动spark的local模式./spark-shell 2.知识点 textFile: def textFile( path: String, minPartitions: ...
- 命令行下编译Wordcount
1. 编辑WordCount.java文件,在下载的hadoop安装包里有WordCount的例子 http://mirrors.hust.edu.cn/apache/hadoop/common/ha ...
- 命令行运行hadoop实例wordcount程序
参考1:http://www.cnblogs.com/flying5/archive/2011/05/04/2078408.html 需要说明的有以下几点. 1.如果wordcount程序不含层次,即 ...
最新文章
- 如何调试Node.js应用程序?
- springboot @ConfigurationProperties
- Spring Boot 之路(一):一个简单的Spring Boot应用
- Linux下恢复误删文件:思路+实践
- Vue004_条件渲染
- 程序开机全屏且不能见任何windows界面、不能使用系统热键
- 控制元素的div属性
- android glide加载不出图片_Glide实现共享元素无缝转场效果,只需四步!
- BPDU内容、BPDU中flag位详解、RSTP协议BPDU中的flag位和STP中的BPDU flag位的区别(附图,建议PC观看)
- pip软件包安装 + Anaconda软件库安装 教程
- Java高并发编程详解系列-ThreadAPI简单说明
- 5-顺序表查找及插入问题
- redis介绍、主从配置
- 2020年最好用的手机是哪一款_2020,哪款5G手机最值得入手?
- JS拖动技术--- 关于setCapture
- 使用js-xlsx纯前端导出excel
- Markdown记录
- android抖音自动刷新,Android 使用SwipeRefreshLayout控件仿抖音做的视频下拉刷新效果...
- “拼多多”惊爆重大 Bug!程序员的眼泪,羊毛党的狂欢
- ubuntu14.04LTS下搜狗输入法问题汇总 (搜狗输入法崩溃,搜狗输入法候选区乱码,没有搜狗输入法皮肤)
热门文章
- 苹果未能与恢复服务器取得联系解决
- Matlab 图片批量复制到word中
- html5 跨平台播放器,开源ckplayer 网页播放器, 跨平台(html5, mobile),flv, f4v, mp4, rt...
- 【其他】bilibili下载的m4s格式视频如何还原为mp4
- HS6621CG 片上 Flash 读写
- psutil:系统、进程,信息都在我的掌握之中
- 编程疑难杂症の无法剔除的神秘重复记录
- 努比亚 Z5 mini刷机包 omni4.4.2修改V4.0 自用版 精简 MIUI特效
- HDFS与HBASE的动态节点的扩容(增删)小白级(二)
- 2016依然会给我惊喜,谢谢