模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)
本案例要实现的目标:
1、模拟修改配置,通过发指令的方式统计一个文件中出现的单词的字数。
案例代码结构如下:
在整个案例中需要有以下几类文件:
A:worker服务端,用于类似Mapreduce接收jar,接收配置文件,执行业务逻辑
B:程序客户端、用于组装配置文件、发送业务执行的命令(听过socket发送jarfile、jobconf、和job2run的命令)
代码结构,每个包和代码作用介绍
cn.toto.bigdata.mymr.task |
TaskProcessor |
核心的主体执行程序 |
ProcessLogic |
定义客户端调用必须实现的方法,相当于WebService中的接口规范 |
|
cn.toto.bigdata.mymr.io |
InputFormat |
封装读文件的组件(接口用途) |
DefaultInputFormat |
封装读文件的组件的实现类 |
|
OutPutFormat |
封装写文件的组件(接口用途) |
|
DefaultOutPutFormat |
封装写文件的组件的实现 |
|
cn.toto.bigdata.mymr.common |
Constants |
常量定义 |
Context |
应用上下文,用于存储计算单词出现字数的次数的中间变量 |
|
cn.toto.bigdata.mymr.userapp |
UserLogic |
客户端对ProcessLogic规范的实现 |
UserApp |
客户端主入口程序 |
|
cn.toto.bigdata.mymr.scheduler |
Runner |
客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。 |
WorkerClient |
客户端执行时需要用到的client相关的代码 |
|
WorkerServer |
UserApp执行时,需要提前启动的服务端 |
|
WorkerRunnable |
服务端执行的相关逻辑 |
运行条件:
1、将mapreduce-my-demo导出成test.jar放置在E:/test.jar下。 |
2、需要有用于统计用的文本文件a.txt,文件在E:\a.txt 内容截图类似: 假设a.txt内容为: The true nobility is in being superior to your previous self guess No great discovery was ever made without a bold Knowledge will give you power but character respect The sun is just rising in the morning of another day I I figure life is a gift and I don’t intend on wasting |
3、首先运行:WorkerServer,相当于是启动服务端的代码 |
4、再次运行:UserApp,相当于是客户端 |
5、最终的统计结果将显示在:E:/out.txt中。统计结果如下: nobility 1 but 1 gift 1 wasting 1 rising 1 don't 1 another 1 I 3 your 1 Knowledge 1 sun 1 without 1 life 1 The 2 character 1 and 1 of 1 power 1 just 1 day 1 you 1 on 1 No 1 a 2 give 1 figure 1 previous 1 in 2 will 1 made 1 was 1 is 3 being 1 bold 1 great 1 respect 1 morning 1 the 1 ever 1 superior 1 guess 1 discovery 1 true 1 self 1 to 1 intend 1 |
6、最终的日志将存储在:E:/task/task.log,最终的配置和工作用的jar也将会生成到这个目录下面,效果如下: 其中job.conf的内容为: 生成的task.log效果如下: |
接着:具体的代码实现如下:
cn.toto.bigdata.mymr.task |
TaskProcessor |
核心的主体执行程序 |
ProcessLogic |
定义客户端调用必须实现的方法,相当于WebService中的接口规范 |
|
TaskProcessor代码如下 package cn.toto.bigdata.mymr.task;
import java.util.HashMap; import java.util.logging.FileHandler; import java.util.logging.Level; import java.util.logging.Logger;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.io.InputFormat; import cn.toto.bigdata.mymr.io.OutPutFormat;
/** * 1、核心的主体执行程序 * 这里是任务执行者 */ public class TaskProcessor {
public static void main(String[] args) throws Exception { // 加载用户指定的所有配置参数到上下文对象中,同时读取配置文件 Context context = new Context(); //获取上下文中的配置文件 HashMap<String, String> conf = context.getConfiguration();
//通过打印日志的方式查看程序运行的结果 Logger logger = Logger.getLogger("TaskProcessor"); //设置日志的输出级别是INFO级别 logger.setLevel(Level.INFO); FileHandler fileHandler = new FileHandler("E:/task/task.log"); fileHandler.setLevel(Level.INFO); logger.addHandler(fileHandler); logger.info("context:" + context); logger.info("conf:" + conf);
//初始化文件读取组件 //从配置文件中获取用于读取的组件的class信息 Class<?> forName = Class.forName(conf.get(Constants.INPUT_FORMAT)); InputFormat inputFormat = (InputFormat) forName.newInstance(); inputFormat.init(context);
//用inputFormat组件读数据,并调用用户逻辑 Class<?> forName2 = Class.forName(conf.get(Constants.USER_LOGIC)); ProcessLogic userLogic = (ProcessLogic) forName2.newInstance(); //对每一行调用用户逻辑,并通过context将用户调用结果存储内部缓存 while(inputFormat.hasNext()) { Integer key = inputFormat.nextKey(); String value = inputFormat.nextValue(); userLogic.process(key, value, context); } userLogic.cleanUp(context);
//替用户输出结果 Class<?> forName3 = Class.forName(conf.get(Constants.OUTPUT_FORMAT)); OutPutFormat outputFormat = (OutPutFormat) forName3.newInstance(); outputFormat.write(context); } } ProcessLogic代码如下: package cn.toto.bigdata.mymr.task; import cn.toto.bigdata.mymr.common.Context; /** * 1、规定的业务逻辑编写规范 * process() 和 cleanUp都没有写实现,这里的实现在客户端 */ public abstract class ProcessLogic { /** * 这里的context存储处理后的结果值 * @param key :行号 * @param value :所在行的一行内容 * @param context :应用上下文的内容 */ public abstract void process(Integer key,String value,Context context); /** * 通过CleanUp输出处理后的结果 */ public void cleanUp(Context context){} } |
||
cn.toto.bigdata.mymr.io |
InputFormat |
封装读文件的组件(接口用途) |
DefaultInputFormat |
封装读文件的组件的实现类 |
|
OutPutFormat |
封装写文件的组件(接口用途) |
|
DefaultOutPutFormat |
封装写文件的组件的实现 |
|
package cn.toto.bigdata.mymr.io; import cn.toto.bigdata.mymr.common.Context; public abstract class InputFormat { /** * 获取下一行要读的行的位置 */ public abstract int nextKey(); /** * 获取从文件中读取的到的行的信息 */ public abstract String nextValue(); /** * 从文件中读取到一行信息 */ public abstract String readLine() throws Exception; /** * 判断是否还可以读取到下一行的内容 */ public abstract boolean hasNext() throws Exception; /** * 初始化要读取的文件的路径和文件流 */ public abstract void init(Context context) throws Exception; } |
||
package cn.toto.bigdata.mymr.io;
import java.io.BufferedReader; import java.io.FileReader;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context;
/** * 这里是默认的读取的实现类 */ public class DefaultInputFormat extends InputFormat{ //这里表示要读取的文件的路径 private String inputPath; private BufferedReader br = null; //这里的key是指文本中类似读取到的指针的偏移量,是行号的偏移量 private int key; //这里的value是指一行中的数据 private String value; //默认读取的行是第0行 private int lineNumber = 0;
@Override public void init(Context context) throws Exception { //获取要读的文件的路径 this.inputPath = context.getConfiguration().get(Constants.INPUT_PATH); //开始初始化输入流,只不过,这个流是从文件中获取的 this.br = new BufferedReader(new FileReader(inputPath)); }
@Override public int nextKey() { return this.key; }
@Override public String nextValue() { return this.value; }
@Override public boolean hasNext() throws Exception { String line = null; line = readLine();
//数据读取完成之后行号加一 this.key = lineNumber++; this.value = line;
return null != line; }
/** * 读取一行数据 */ @Override public String readLine() throws Exception { String line = br.readLine(); //如果读取到空了之后,将BufferedReader的值变成空 if (line == null) { br.close(); } return line; } } |
||
package cn.toto.bigdata.mymr.io; import cn.toto.bigdata.mymr.common.Context; /** * 用于输出结果的类 */ public abstract class OutPutFormat { /** * 将结果写入文件中 */ public abstract void write(Context context) throws Exception; /** * 关闭流 */ public abstract void cleanUp() throws Exception; } |
||
package cn.toto.bigdata.mymr.io; import java.io.BufferedWriter; import java.io.FileWriter; import java.util.HashMap; import java.util.Set; import java.util.Map.Entry; import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; public class DefaultOutPutFormat extends OutPutFormat{ BufferedWriter bw = null; @Override public void write(Context context) throws Exception { String outputPath = context.getConfiguration().get(Constants.OUTPUT_PATH); HashMap<String, Integer> KVBuffer = context.getKVBuffer(); this.bw = new BufferedWriter(new FileWriter(outputPath)); Set<Entry<String, Integer>> entrySet = KVBuffer.entrySet(); for (Entry<String, Integer> entry : entrySet) { bw.write(entry.getKey() + "\t" + entry.getValue() + "\r"); } bw.flush(); } @Override public void cleanUp() throws Exception { bw.close(); } } |
||
cn.toto.bigdata.mymr.common |
Constants |
常量定义 |
Context |
应用上下文,用于存储计算单词出现字数的次数的中间变量 |
|
package cn.toto.bigdata.mymr.common; public class Constants { public static final String JAR_PATH = "jar.path"; public static final String JAR_FILE = "job.jar"; public static final String WORKER_HOST = "worker.host"; public static final String WORKER_PORT = "worker.port"; public static final String CONF_FILE = "job.conf"; public static final String INPUT_FORMAT = "input.format.class"; public static final String OUTPUT_FORMAT = "output.format.class"; public static final String INPUT_PATH = "input.path"; public static final String OUTPUT_PATH = "output.path"; public static final String TASK_PROCESSOR = "cn.toto.bigdata.mymr.task.TaskProcessor"; public static final String USER_LOGIC = "user.logic.class"; public static final String TASK_WORK_DIR = "E:/task"; } |
||
package cn.toto.bigdata.mymr.common; import java.io.File; import java.io.FileInputStream; import java.io.ObjectInputStream; import java.util.HashMap; /** * 应用上下文,通过这个内容获取配置文件 * 通过这个上下文最终输出结果 */ public class Context { private HashMap<String, Integer> KVBuffer = new HashMap<String, Integer>(); private HashMap<String, String> conf; @SuppressWarnings("unchecked") public Context() throws Exception { //加载配置参数 File file = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); if (file.exists()) { @SuppressWarnings("resource") ObjectInputStream oi = new ObjectInputStream(new FileInputStream(file)); this.conf = (HashMap<String, String>) oi.readObject(); } else { // throw new RuntimeException("read conf failed ...."); } } /** * 通过这种变量最后输出结果 */ public void write(String k, Integer v) { KVBuffer.put(k, v); } public HashMap<String, Integer> getKVBuffer() { return KVBuffer; } public void setKVBuffer(HashMap<String, Integer> tmpKV) { this.KVBuffer = tmpKV; } /** * 获取配置文件中的信息 */ public HashMap<String, String> getConfiguration() { return conf; } /** * 在Context()构造函数里面已经有了conf的配置,这里再次传入说明配置可以让用户手动指定 */ public void setConfiguration(HashMap<String, String> configuration) { this.conf = configuration; } } |
||
cn.toto.bigdata.mymr.userapp |
UserLogic |
客户端对ProcessLogic规范的实现 |
UserApp |
客户端主入口程序 |
|
package cn.toto.bigdata.mymr.userapp; import java.util.HashMap; import java.util.Set; import java.util.Map.Entry; import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.task.ProcessLogic; public class UserLogic extends ProcessLogic { private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); @Override public void process(Integer key, String value, Context context) { String [] words = value.split(" "); for(String word : words) { Integer count = wordCount.get(word); if (count == null) { wordCount.put(word, 1); } else { wordCount.put(word, count + 1); } } } public void cleanUp(Context context) { Set<Entry<String, Integer>> entrySet = wordCount.entrySet(); for(Entry<String, Integer> entry : entrySet) { context.write(entry.getKey(), entry.getValue()); } } } |
||
package cn.toto.bigdata.mymr.userapp; import java.util.HashMap; import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.scheduler.Runner; public class UserApp { public static void main(String[] args) throws Exception { HashMap<String, String> conf = new HashMap<String,String>(); conf.put(Constants.INPUT_PATH, "E:/a.txt"); conf.put(Constants.OUTPUT_PATH, "E:/out.txt"); conf.put(Constants.INPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultInputFormat"); conf.put(Constants.OUTPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultOutPutFormat"); conf.put(Constants.JAR_PATH, "E:/test.jar"); conf.put(Constants.WORKER_HOST, "localhost"); conf.put(Constants.WORKER_PORT, "9889"); conf.put(Constants.USER_LOGIC, "cn.toto.bigdata.mymr.userapp.UserLogic"); Runner runner = new Runner(conf); runner.submit("localhost", 9889); } } |
||
cn.toto.bigdata.mymr.scheduler |
Runner |
客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。 |
WorkerClient |
客户端执行时需要用到的client相关的代码 |
|
WorkerServer |
UserApp执行时,需要提前启动的服务端 |
|
WorkerRunnable |
服务端执行的相关逻辑 |
|
package cn.toto.bigdata.mymr.scheduler; import java.io.FileOutputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import cn.toto.bigdata.mymr.common.Constants; public class Runner { private HashMap<String, String> conf; public Runner(HashMap<String, String> conf) { this.conf = conf; } public void submit(String host,int port) throws Exception { ObjectOutputStream jobConfStream = new ObjectOutputStream(new FileOutputStream(Constants.CONF_FILE)); jobConfStream.writeObject(conf); WorkerClient workerClient = new WorkerClient(conf); workerClient.submit(); } } |
||
package cn.toto.bigdata.mymr.scheduler; import java.io.FileInputStream; import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; import cn.toto.bigdata.mymr.common.Constants; public class WorkerClient { private HashMap<String, String> conf; Socket socket = null; OutputStream so = null; public WorkerClient(HashMap<String, String> conf) { this.conf = conf; } public void submit() throws Exception { socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream(); String jarPath = conf.get(Constants.JAR_PATH); // 发送jar包 byte[] buff = new byte[4096]; FileInputStream jarIns = new FileInputStream(jarPath); so.write("jarfile".getBytes()); int read = 0; while ((read=jarIns.read(buff)) != -1) { so.write(buff,0,read); } jarIns.close(); so.close(); socket.close(); // 发送job.conf文件 socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream(); FileInputStream confIns = new FileInputStream(Constants.CONF_FILE); so.write("jobconf".getBytes()); while ((read = confIns.read(buff)) != -1) { so.write(buff,0,read); } confIns.close(); so.close(); socket.close(); // 发送启动命令 socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream(); so.write("job2run".getBytes()); String shell = "java -cp E:/test.jar cn.toto.bigdata.mymr.task.TaskProcessor"; so.write(shell.getBytes()); so.close(); socket.close(); } } |
||
package cn.toto.bigdata.mymr.scheduler; import java.net.ServerSocket; import java.net.Socket; public class WorkerServer { public static void main(String[] args) throws Exception { ServerSocket ssc = new ServerSocket(9889); System.out.println("Worker服务器启动-->9889"); while (true) { Socket accept = ssc.accept(); new Thread(new WorkerRunnable(accept)).start(); } } } |
||
package cn.toto.bigdata.mymr.scheduler; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; import cn.toto.bigdata.mymr.common.Constants; public class WorkerRunnable implements Runnable { Socket socket; InputStream in = null; volatile long confSize = 0; volatile long jarSize = 0; public WorkerRunnable(Socket socket) { this.socket = socket; } @Override public void run() { try { this.in = socket.getInputStream(); byte[] protocal = new byte[7]; int read = in.read(protocal, 0, 7); if (read < 7) { System.out.println("客户端请求不符合协议规范......"); return; } String command = new String(protocal); switch (command) { case "jarfile": receiveJarFile(); break; case "jobconf": receiveConfFile(); break; case "job2run": runJob(); break; default: System.out.println("客户端请求不符合协议规范....."); socket.close(); break; } } catch (Exception e) { e.printStackTrace(); } } private void receiveConfFile() throws Exception { System.out.println("开始接收conf文件"); FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); byte[] buff = new byte[4096]; int read = 0; while ((read = in.read(buff)) != -1) { confSize += read; fo.write(buff, 0, read); } fo.flush(); fo.close(); in.close(); socket.close(); } private void receiveJarFile() throws Exception { System.out.println("开始接收jar文件"); FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); byte[] buff = new byte[4096]; int read = 0; while ((read = in.read(buff)) != -1) { jarSize += read; fo.write(buff, 0, read); } fo.flush(); fo.close(); in.close(); socket.close(); } private void runJob() throws Exception { byte[] buff = new byte[4096]; int read = in.read(buff); String shell = new String(buff, 0, read); System.out.println("接收到启动命令......." + shell); in.close(); socket.close(); Thread.sleep(500); File jarFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); File confFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); System.out.println("jarfile 存在?" + jarFile.exists()); System.out.println("confFile 存在?" + confFile.exists()); System.out.println("jarfile可读?" + jarFile.canRead()); System.out.println("jarfile可写?" + jarFile.canWrite()); System.out.println("confFile可读?" + confFile.canRead()); System.out.println("confFile可写?" + confFile.canWrite()); System.out.println("jarFile.length():" + jarFile.length()); System.out.println("confFile.length():" + confFile.length()); /*if (jarFile.length() == jarSize && confFile.length() == confSize) { System.out.println("jar 和 conf 文件已经准备就绪......"); }*/ System.out.println("开始启动数据处理TaskProcessor......"); Process exec = Runtime.getRuntime().exec(shell); int waitFor = exec.waitFor(); InputStream errStream = exec.getErrorStream(); BufferedReader errReader = new BufferedReader(new InputStreamReader(errStream)); String inLine = null; /* * InputStream stdStream = exec.getInputStream(); BufferedReader * stdReader = new BufferedReader(new InputStreamReader(stdStream)); * while ((inLine = stdReader.readLine()) != null) { * System.out.println(inLine); } */ while ((inLine = errReader.readLine()) != null) { System.out.println(inLine); } if (waitFor == 0) { System.out.println("task成功运行完毕....."); } else { System.out.println("task异常退出......"); } } } |
模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)相关推荐
- 使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
工程结构: 在整个案例过程中,代码如下: WordCountMapper的代码如下: package cn.toto.bigdata.mr.wc; import java.io.IOException ...
- python中英文字频率_python实现统计文本中单词出现的频率详解
本文实例为大家分享了python统计文本中单词出现频率的具体代码,供大家参考,具体内容如下 #coding=utf-8 import os from collections import Counte ...
- 请编写一个程序,用于统计字符串中每个字母的出现次数(字母忽略大小写),统计出结果后,请按照{'a':3,'b':2}的格式输出。
请编写一个程序,用于统计字符串中每个字母的出现次数(字母忽略大小写),统计出结果后,请按照{'a':3,'b':2}的格式输出. ras = [] mystr = input("请输入一个全 ...
- c语言统计输入文本不同字母单词数,统计文本中单词的个数
㈠ 统计一行文本的单词个数:输入一行字符,统计其中单词的个数.个单词之间用空格分隔,空格数可以是多个, 代码部分: #include int main() { int count=0; char te ...
- python单词个数统计_Python 统计文本中单词的个数
1.读文件,通过正则匹配 def statisticWord(): line_number = 0 words_dict = {} with open (r'D:\test\test.txt',enc ...
- python统计单词出现次数最多的5个单词_【Python】统计文本中单词的出现次数前十的单词...
代码: # 读取一个文本,并且统计文本中单词的出现次数 def read_file(): # 在windows环境中的编码问题,指定utf-8 with open('F:/python源码/实验区/0 ...
- Java案例:统计文本中所有整数之和
一.源代码 /*** 功能:统计文件中的数字之和* 作者:华卫* 日期:2013年7月20日*/package regex;import java.io.BufferedReader; import ...
- python统计文本中单词出现次数
任一个英文的纯文本文件,统计其中的单词出现的个数,其实就是考察re的运用,代码: #-*-coding:utf-8-*- import redef count_words(file_path):wit ...
- linux awk统计文本单词,shell统计文本中单词的出现次数
Ubuntu14.04 给定一个文本,统计其中单词出现的次数 # solution 1 grep与awk配合使用,写成一个sh脚本 fre.sh sh fre.sh wordfretest.txt # ...
最新文章
- .NET Remoting中的通道注册
- mysql 5.5 5.6 主从_mysql5.6+主从集的版本号(mysql5.5主机和从机载带后,5.5在设置有一定的差距)...
- Android之 RecyclerView,CardView 详解和相对应的上拉刷新下拉加载
- git连接到github(SSH无密码登陆)
- 斑马888t打印机墨盒安装_硒鼓?墨盒?究竟哪个才是打印机的“灵魂伴侣”?...
- Oracle 中调用外部C动态库函数
- oracle做子查询注意事项,Oracle子查询详解
- 新IT运维时代 | Docker运维之最佳实践-上篇
- 图像增强算法效果评价指标及实现
- Scala中的下划线使用总结
- elasticsearch -- 问题纪录
- 经典教程 | 基于Spark GraphX实现微博二度关系推荐
- Python项目导出依赖包requirements.txt
- BZOJ2525: [Poi2011]Dynamite
- 360手机助手pc版 v2.4.0.1265 官方版
- 基于JavaWeb的餐厅点餐系统设计与实现
- 工作中遇到的印象深刻的Bug(APP端)
- 面试字节跳动之感谢篇
- potplay皮肤装扮
- android来电显示,在Android 9中获取来电显示
热门文章
- Django框架(7.Django中视图,url的配置)
- 电气论文实现:通过电力光伏负荷预测讲解seq2seq翻译模型
- 爬虫实战:Requests+BeautifulSoup 爬取京东内衣信息并导入表格(python)
- leetcodeZ字形变换第1363题python
- 面向对象编程之生成器与迭代器
- wxWidgets:wxTopLevelWindow类用法
- wxWidgets:wxThreadHelper类用法
- wxWidgets:获取主机的IP地址
- boost::mp11::mp_transform_third相关用法的测试程序
- boost::graph::distributed::distributed_queue用法的测试程序