在eclipse下运行wordcount,使用cascading封装

准备:centos系统,jdk,hadoop,eclipse,cascading的lib包,官网可下载,自带cascading封装的wordcount源码,以及爬虫数据data目录,这些均可以在官网下载

我是在cascading官网把材料下载好后,在eclipse中运行,可以得到测试数据

难点:cascading的版本与官网自带的wordcount实例可能不匹配,这需要自己自行修改,我的cascading版本不是在官网下载的

给出我的运行结果图:

代码如下:完整版

package com.zjf.cascading.example;/** WordCount example* zjf-pc* Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.* Project and contact information: http://www.concurrentinc.com/*/import java.util.Map;
import java.util.Properties;import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexGenerator;
import cascading.operation.regex.RegexReplace;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.xml.TagSoupParser;
import cascading.operation.xml.XPathGenerator;
import cascading.operation.xml.XPathOperation;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
import cascading.tap.Tap;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tuple.Fields;public class WordCount{@SuppressWarnings("serial")
private static class ImportCrawlDataAssembly extends SubAssembly{public ImportCrawlDataAssembly( String name ){//拆分文本行到url和rawRegexSplitter regexSplitter = new RegexSplitter( new Fields( "url", "raw" ) );Pipe importPipe = new Each( name, new Fields( "line" ), regexSplitter );//删除所有pdf文档importPipe = new Each( importPipe, new Fields( "url" ), new RegexFilter( ".*\\.pdf$", true ) );//把":n1"替换为"\n",丢弃无用的字段RegexReplace regexReplace = new RegexReplace( new Fields( "page" ), ":nl:", "\n" );importPipe = new Each( importPipe, new Fields( "raw" ), regexReplace, new Fields( "url", "page" ) );//此句强制调用
      setTails( importPipe );}}@SuppressWarnings("serial")
private static class WordCountSplitAssembly extends SubAssembly{public WordCountSplitAssembly( String sourceName, String sinkUrlName, String sinkWordName ){//创建一个新的组件,计算所有页面中字数,和一个页面中的字数Pipe pipe = new Pipe(sourceName);//利用TagSoup将HTML转成XHTML,只保留"url"和"xml"去掉其它多余的pipe = new Each( pipe, new Fields( "page" ), new TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) );//对"xml"字段运用XPath(XML Path Language)表达式,提取"body"元素XPathGenerator bodyExtractor = new XPathGenerator( new Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" );pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new Fields( "url", "body" ) );//运用另一个XPath表达式删除所有元素,只保留文本节点,删除在"script"元素中的文本节点String elementXPath = "//text()[ name(parent::node()) != 'script']";XPathGenerator elementRemover = new XPathGenerator( new Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath );pipe = new Each( pipe, new Fields( "body" ), elementRemover, new Fields( "url", "words" ) );//用正则表达式将文档打乱成一个个独立的单词,和填充每个单词(新元组)到当前流使用"url"和"word"字段RegexGenerator wordGenerator = new RegexGenerator( new Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" );pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new Fields( "url", "word" ) );//按"url"分组Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new Fields( "url", "word" ) );urlCountPipe = new Every( urlCountPipe, new Fields( "url", "word" ), new Count(), new Fields( "url", "word", "count" ) );//按"word"分组Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new Fields( "word" ) );wordCountPipe = new Every( wordCountPipe, new Fields( "word" ), new Count(), new Fields( "word", "count" ) );//此句强制调用
      setTails( urlCountPipe, wordCountPipe );}}public static void main( String[] args ){//设置当前工作jarProperties properties = new Properties(); FlowConnector.setApplicationJarClass(properties, WordCount.class);FlowConnector flowConnector = new FlowConnector(properties);/*** 在运行设置的参数里设置如下代码:* 右击Main.java,选择run as>run confugrations>java application>Main>Agruments->Program arguments框内写入如下代码* data/url+page.200.txt output local * 分析:* args[0]代表data/url+page.200.txt,它位于当前应用所在的目录下面,且路径必须是本地文件系统里的路径* 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/data/url+page.200.txt* 且该路径需要自己创建,url+page.200.txt文件也必须要有,可以在官网下下载* * args[1]代表output文件夹,第二个参数,它位于分布式文件系统hdfs中* 我的路径是:hdfs://s104:9000/user/hadoop/output,该路径需要自己创建* 在程序运行成功后,output目录下会自动生成三个文件夹pages,urls,words* 里面分别包含所有的page,所有的url,所有的word* * args[2]代表local,第三个参数,它位于本地文件系统中* 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/local* 该文件夹不需要自己创建,在程序运行成功后会自动生成在我的上述目录中,* 且在该local文件夹下会自动生成两个文件夹urls和words,里面分别是url个数和word个数*/String inputPath = args[ 0 ];String pagesPath = args[ 1 ] + "/pages/";String urlsPath = args[ 1 ] + "/urls/";String wordsPath = args[ 1 ] + "/words/";String localUrlsPath = args[ 2 ] + "/urls/";String localWordsPath = args[ 2 ] + "/words/";// import a text file with crawled pages from the local filesystem into a Hadoop distributed filesystem// the imported file will be a native Hadoop sequence file with the fields "page" and "url"// note this examples stores crawl pages as a tabbed file, with the first field being the "url"// and the second being the "raw" document that had all new line chars ("\n") converted to the text ":nl:".//初始化Pipe管道处理爬虫数据装配,返回字段url和pagePipe importPipe = new ImportCrawlDataAssembly( "import pipe" );//创建tap实例Tap localPagesSource = new Lfs( new TextLine(), inputPath );Tap importedPages = new Hfs( new SequenceFile( new Fields( "url", "page" ) ), pagesPath );//链接pipe装配到tap实例Flow importPagesFlow = flowConnector.connect( "import pages", localPagesSource, importedPages, importPipe );//拆分之前定义的wordcount管道到新的两个管道url和word// these pipes could be retrieved via the getTails() method and added to new pipe instancesSubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount pipe", "url pipe", "word pipe" );//创建hadoop SequenceFile文件存储计数后的结果Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url", "word", "count" ) ), urlsPath );Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word", "count" ) ), wordsPath );//绑定多个pipe和tap,此处指定的是pipe名称Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) );//wordCountPipe指的是一个装配Flow count = flowConnector.connect( importedPages, sinks, wordCountPipe );//创建一个装配,导出hadoop sequenceFile 到本地文本文件Pipe exportPipe = new Each( "export pipe", new Identity() );Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath );Tap localSinkWord = new Lfs( new TextLine(), localWordsPath );// 使用上面的装配来连接两个sinkFlow exportFromUrl = flowConnector.connect( "export url", sinkUrl, localSinkUrl, exportPipe );Flow exportFromWord = flowConnector.connect( "export word", sinkWord, localSinkWord, exportPipe );装载flow,顺序随意,并执行Cascade cascade = new CascadeConnector().connect( importPagesFlow, count, exportFromUrl, exportFromWord );cascade.complete();}}

转载于:https://www.cnblogs.com/zjf-293916/p/6809015.html

cascading--wordcount相关推荐

  1. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  2. 2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount

    目录 案例一:花式查询 案例二:WordCount 基于DSL编程 基于SQL编程 具体演示代码如下: 案例一:花式查询 package cn.itcast.sqlimport org.apache. ...

  3. Spark-Spark setMaster WordCount Demo

    Spark setMaster源码 /*** The master URL to connect to, such as "local" to run locally with o ...

  4. Ubantu下hadoop运行第一个例子wordcount过程

    Ubantu12.04下hadoop-1.2.1运行第一个例子wordcount过程,分享一下 将WordCount.java文件放在Hadoop安装目录下,并在安装目录下创建输入目录input,目录 ...

  5. WordCount扩展与优化

    合作者:201631062327,201631062128 码云地址:https://gitee.com/LIUJIA6/WordCount3 一:项目说明 本次项目是在上次作业WorldCount的 ...

  6. 初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  7. java8 wordcount_Spark2.x与Java8下WordCount示例

    spark深度学习指南+进阶实战2册 123.9元 包邮 (需用券) 去购买 > 不用 lambda 的基础版 import org.apache.spark.api.java.JavaPair ...

  8. 006 Spark中的wordcount以及TopK的程序编写

    1.启动 启动HDFS 启动spark的local模式./spark-shell 2.知识点 textFile: def textFile( path: String, minPartitions: ...

  9. 命令行下编译Wordcount

    1. 编辑WordCount.java文件,在下载的hadoop安装包里有WordCount的例子 http://mirrors.hust.edu.cn/apache/hadoop/common/ha ...

  10. 命令行运行hadoop实例wordcount程序

    参考1:http://www.cnblogs.com/flying5/archive/2011/05/04/2078408.html 需要说明的有以下几点. 1.如果wordcount程序不含层次,即 ...

最新文章

  1. 如何调试Node.js应用程序?
  2. springboot @ConfigurationProperties
  3. Spring Boot 之路(一):一个简单的Spring Boot应用
  4. Linux下恢复误删文件:思路+实践
  5. Vue004_条件渲染
  6. 程序开机全屏且不能见任何windows界面、不能使用系统热键
  7. 控制元素的div属性
  8. android glide加载不出图片_Glide实现共享元素无缝转场效果,只需四步!
  9. BPDU内容、BPDU中flag位详解、RSTP协议BPDU中的flag位和STP中的BPDU flag位的区别(附图,建议PC观看)
  10. pip软件包安装 + Anaconda软件库安装 教程
  11. Java高并发编程详解系列-ThreadAPI简单说明
  12. 5-顺序表查找及插入问题
  13. redis介绍、主从配置
  14. 2020年最好用的手机是哪一款_2020,哪款5G手机最值得入手?
  15. JS拖动技术--- 关于setCapture
  16. 使用js-xlsx纯前端导出excel
  17. Markdown记录
  18. android抖音自动刷新,Android 使用SwipeRefreshLayout控件仿抖音做的视频下拉刷新效果...
  19. “拼多多”惊爆重大 Bug!程序员的眼泪,羊毛党的狂欢
  20. ubuntu14.04LTS下搜狗输入法问题汇总 (搜狗输入法崩溃,搜狗输入法候选区乱码,没有搜狗输入法皮肤)

热门文章

  1. 苹果未能与恢复服务器取得联系解决
  2. Matlab 图片批量复制到word中
  3. html5 跨平台播放器,开源ckplayer 网页播放器, 跨平台(html5, mobile),flv, f4v, mp4, rt...
  4. 【其他】bilibili下载的m4s格式视频如何还原为mp4
  5. HS6621CG 片上 Flash 读写
  6. psutil:系统、进程,信息都在我的掌握之中
  7. 编程疑难杂症の无法剔除的神秘重复记录
  8. 努比亚 Z5 mini刷机包 omni4.4.2修改V4.0 自用版 精简 MIUI特效
  9. HDFS与HBASE的动态节点的扩容(增删)小白级(二)
  10. 2016依然会给我惊喜,谢谢