hadoop是基于java的数据计算平台,引入第三方库,例如C语言实现的开发包将会大大增强数据分析的效率和能力。 通常在是用一些工具的时候都要用到一些配置文件、资源文件等。接下来,借一个例子来说明hadoop上面如何使用JNI、以及调用资源文件。

首先介绍一下ICTClass,ICTClass是中国科学院开发的一个分词软件(ICTClass官网)。该套软件采用C/C++编写。ICTClass虽然支持java,但是必须使用到的JNI技术。因此,在使用ICTClass之前需要配置好JNI资源以及ICTClass运行所需的一些资源文件。这造成了将ICTClass直接移植到Hadoop之上有点困难。不管怎样,我们先来探讨下怎个使用ICTClass的流程吧。

1. 封装Wrapper类

首先,要使用JNI技术创建ICTClass的Wrapper类ICTCLAS50.java:

package ICTCLAS.I3S.AC;
import java.io.*;
import java.net.URI;
import java.net.URL;
public class ICTCLAS50
{public native boolean ICTCLAS_Init(byte[] sPath);public native boolean ICTCLAS_Exit();public native int ICTCLAS_ImportUserDictFile(byte[] sPath,int eCodeType);public native int ICTCLAS_SaveTheUsrDic();public native int ICTCLAS_SetPOSmap(int nPOSmap);public native boolean ICTCLAS_FileProcess(byte[] sSrcFilename, int eCodeType, int bPOSTagged,byte[] sDestFilename);public native byte[] ICTCLAS_ParagraphProcess(byte[] sSrc, int eCodeType, int bPOSTagged);public native byte[] nativeProcAPara(byte[] sSrc, int eCodeType, int bPOStagged);/* Use static intializer */static{

      String absolutePath = new File("").getAbsolutePath();
      System.load(absolutePath+"/ICTClassLib/libICTCLAS50.so");

    }
}

这里要注意,ICTCLAS50这个类必须在pacakage ICTCLAS.I3S.AC下否则将会出错。

2. 打包ICTCLAS以及资源文件

这里要打包两个jar文件。ICTClass.jar中打包了ICTCLAS50类作为即将被调用的API。ICTClassLib.jar打包了ICTCLAS所需的所有资源。

看一下两个jar的清单:

**********************************
ICTClass.jar
ICTClass|_I3S|_AC|_ICTCLAS50.class___________________________________
ICTClassLib.jar
ICTClassLib|_libICTCLAS50.so|_userdict.txt |_user.lic  |_Configure.xml|_Data|_BiWord.big|_character.idx|_character.type|_CoreDict.pdat|_CoreDict.pos|_CoreDict.unig|_FieldDict.pdat|_FieldDict.pos|_GranDict.pdat|_GranDict.pos|_ICTCLAS30.ctx|_ICTCLAS_First.map|_ICTPOS.map|_nr.ctx|_nr.fsa|_nr.role|_PKU_First.map|_PKU.map|_temp|_UserDict.map|_UserDict.pdat|_UserDict.pos

3. 在hadoop上使用JNI和资源

hadoop有很特殊的文件系统,这里笔者针对性介绍一下DistributeCache的机制。Hadoop可以将HDFS的一些文件分发到运行的某台机器的工作目录下,并按照一定的逻辑解压。通过以下API实现:

URI uri = null;
try {   //这里filepath指的是hadoop文件系统目录uri = new URI("hdfs://filepath/ICTClassLib.jar#ICTClassLib");
} catch (URISyntaxException e) {logger.error(e.getMessage(),e);
}
if( uri != null){DistributedCache.addCacheArchive(uri, jobConf);
}

上面的API将ICTClassLib.jar(当然该文件必须是在hadoop集群上的hdfs文件系统上)分发到tasknode上,并解压到工作目录的link目录下。ICTClassLib.jar包含ICTClass相关的资源文件。在tasknode上,每个task工作目录下的文件是:

从上面可以看出,通过该机制hadoop能够把需要的文件下发到指定的task的工作目录下。

4. Hadoop程序源码(笔者是hadoop菜鸟,限于能力这里涉及到的资源访问使用的都是绝对路径。笔者试过相对路径但是总会抛出错误,无奈选择了这个折衷的办法)。

package com.ict.hadoop;import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;public class WxSegment{private static Logger logger = Logger.getLogger(WxSegment.class);public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text>{public ICTCLAS50 testICTCLAS50;public static Set<String> listAll(String dir) throws IOException {//获取文件夹内所有文件的操作,可以忽略
        }@Overridepublic void configure(JobConf jobConf){testICTCLAS50 = new ICTCLAS50();String absolutePath = new File("").getAbsolutePath()+"/ICTClassLib/";if(testICTCLAS50.ICTCLAS_Init(absolutePath.getBytes("GB2312")) == false){System.out.println("Init Fail!");  System.exit(1);}super.configure(jobConf);}public String getSplitFileName(InputSplit inputSplit){return ((FileSplit)inputSplit).getPath().getName();}@Overridepublic void map(Text key, Text value,OutputCollector<Text,Text> output, Reporter reporter)throws IOException {logger.info("AbsolutePath:" + new File("./ICTClassLib").getAbsolutePath());String lineString = value.toString();byte nativeBytes[] = testICTCLAS50.ICTCLAS_ParagraphProcess(  lineString.getBytes("GB2312"), 0, 0);  String nativeStr = new String(nativeBytes, 0,  nativeBytes.length, "GB2312");  // System.out.println("The result is :" + nativeStr);  String[] rest = nativeStr.split("\\s+");  for (int i = 0; i < rest.length; i++)  sb.append(rest[i] + " ");  lineString = sb.toString();logger.info(lineString);output.collect(key, new Text(lineString));}@Overridepublic void close() throws IOException {//保存用户字典
            testICTCLAS50.ICTCLAS_SaveTheUsrDic();//释放分词组件资源
            testICTCLAS50.ICTCLAS_Exit();}}public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable>{private MultipleOutputs mos;private OutputCollector<Text,NullWritable> collector;@Overridepublic void configure(JobConf conf) {mos = new MultipleOutputs(conf);}@Overridepublic void reduce(Text key, Iterator<Text> values,OutputCollector<Text,NullWritable> output, Reporter reporter)throws IOException {System.out.print("This is reduce");collector = mos.getCollector(key.toString(), reporter);output.collect( new Text(values.hasNext()?"has values":"no values"),NullWritable.get());int count = 0;while(values.hasNext()){Text tmp = values.next();collector.collect(tmp,NullWritable.get());output.collect( tmp,NullWritable.get());logger.info(tmp);count ++;}logger.info("The total count is "+ count);}@Overridepublic void close() throws IOException {mos.close();}}public static Set<String> listAll(String dir) throws IOException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);FileStatus[] stats = fs.listStatus(new Path(dir));Set<String> list = new HashSet<String>();for (int i = 0; i < stats.length; ++i){if ( !stats[i].isDir() ){String filename = stats[i].getPath().getName();filename = filename.replaceAll("[^a-zA-Z0-9]", "");list.add(filename);//System.out.println("File: "+ stats[i].getPath().getName());logger.info("File: "+ filename);}elselogger.warn(stats[i].getPath().getName() +" is a directory.");}fs.close();return list;}public static void main(String [] args) throws IOException, URISyntaxException{Configuration conf = new Configuration();String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();//    new GenericOptionsParser(conf, args).getRemainingArgs();if (remainingArgs.length != 2) {System.err.println("Error!");System.exit(1);}JobConf jobConf = new JobConf(conf,WxSegment.class);DistributedCache.createSymlink(jobConf);logger.info("AbsolutePath:" + new File("").getAbsolutePath()); URI uri = null;try {uri = new URI("hdfs://filepath/ICTClassLib.jar#ICTClassLib");} catch (URISyntaxException e) {logger.error(e.getMessage(),e);}if( uri != null)logger.info(uri.getPath());DistributedCache.addCacheArchive(uri, jobConf);Path in = new Path(remainingArgs[0]);Path out = new Path(remainingArgs[1]);FileInputFormat.setInputPaths(jobConf, in);FileOutputFormat.setOutputPath(jobConf, out);jobConf.setJobName("WxSegment");jobConf.setMapperClass(Map.class);jobConf.setReducerClass(Reduce.class);jobConf.setMapOutputKeyClass(Text.class);jobConf.setMapOutputValueClass(Text.class);jobConf.setInputFormat(MsgSplitInputFormat.class);jobConf.setOutputKeyClass(Text.class);jobConf.setOutputValueClass(NullWritable.class);Set<String> set = listAll(remainingArgs[0]);for( String filename : set){MultipleOutputs.addNamedOutput(jobConf,filename,TextOutputFormat.class,Text.class,NullWritable.class);}jobConf.setJarByClass(WxSegment.class);JobClient.runJob(jobConf);}
}

另外一篇值得参考的文章:《如何在Hadoop集群运行JNI程序》

转载于:https://www.cnblogs.com/forzhongyou/p/3926557.html

Hadoop集群上使用JNI,调用资源文件相关推荐

  1. jar包在Hadoop集群上测试(MapReduce)

    本片使用MapReduce--统计输出给定的文本文档每一个单词出现的总次数的案例进行,jar包在集群上测试 1.添加打包插件依赖 <build><plugins><plu ...

  2. 在Hadoop集群上,搭建HBase集群

    (1)下载Hbase包,并解压:这里下载的是0.98.4版本,对应的hadoop-1.2.1集群 (2)覆盖相关的包:在这个版本里,Hbase刚好和Hadoop集群完美配合,不需要进行覆盖. 不过这里 ...

  3. hadoop集群中客户端修改、删除文件失败

    这是因为hadoop集群在启动时自动进入安全模式 查看安全模式状态:hadoop fs –safemode get 进入安全模式状态:hadoop fs –safemode enter 退出安全模式状 ...

  4. Hadoop 集群在WebUI界面不能下载文件

    Hadoop集群在50070端口可以打开,但是点击文件下载时总是没响应, 但是可以在控制台通过命令下载 [root@chh01 home]# hadoop fs -get /XXX/xxx.csv 导 ...

  5. 使用HDFS客户端java api读取hadoop集群上的信息

    本文介绍使用hdfs java api的配置方法. 1.先解决依赖,pom <dependency><groupId>org.apache.hadoop</groupId ...

  6. Centos7系统、Hadoop集群上部署ntp服务器

    集群情况: 三台机器分别: master:180.201.163.46 slave1:180.201.156.76 slave2:180.201.130.17 网关:255.255.192.0 在sl ...

  7. Hadoop集群上的Hive安装时进行初始化元数据信息出现错误HiveSchemaTool:Parsing failed. Reason: Missing required option:

    同样的命令,从文档上复制过来用时出现错误 然后自己重新手打了一般,成功,原因可能是从word文档复制过来时的命令行中-的中文化

  8. CENTOS上的网络安全工具(十二)走向Hadoop(4) Hadoop 集群搭建

    目录 〇.踩坑指南 1.OpenJDK的版本 2.WEB用户 一.克隆虚拟机 二.配置主机名和网络 1.配置网络 2.设置主机名 3.将主机关系对应名写入host文件 三.配置免密SSH访问 1.本机 ...

  9. hadoop集群硬盘损坏_Hadoop集群(万台规模)的磁盘故障自动处理

    之前的系列文章讲述了无状态服务的自动维修. 后来我们尝试将这套方案应用在有状态的Hadoop集群上,经过几个月的线上运行,也取得了不错的效果,本文将分享这个实践过程. 梳理问题 我们先调研了下Hado ...

最新文章

  1. STM8单片机定时器驱动的深度解析
  2. Android开发之大位图二次採样压缩处理(源码分享)
  3. spring MVC之返回JSON数据(Spring3.0 MVC+Jackson+AJAX)
  4. 第三次学JAVA再学不好就吃翔(part78)--List类
  5. 最大似然估计_状态估计的基本概念(2)最大似然估计和最大后验估计
  6. 郫都区计算机老师周俊老师,教师节,带你走进郫都教师背后的故事
  7. eclipse新建maven项目和聚合项目
  8. 拓端tecdat|R语言使用蒙特卡洛模拟进行正态性检验及可视化
  9. 解决Mantis乱码问题
  10. VS2013 ADO.NET 连接 SQLEXPRESS
  11. JAVA毕业设计飞羽羽毛球馆管理系统计算机源码+lw文档+系统+调试部署+数据库
  12. vm虚拟机配置动态ip和静态ip的方法
  13. 《中国历史2000问》读后笔记
  14. 实现一个简单的Java类:长方形与梯形的面积计算
  15. 2022.04.17-高宝琪毕设阶段性汇报
  16. 【CTS2019】氪金手游(动态规划)
  17. 兔年幸运转转盘,看看你今年过年能赚到啥
  18. 我也写点八卦系文章:从李彦宏八卦说起
  19. 【算法】算法之美—Crashing Balloon
  20. 青岛物联网关键技术资源发展路线图发布

热门文章

  1. 转载:Beginning WF 4.0翻译——第四章(传递参数)
  2. 用脑电波玩游戏,这款VR体验逆天了
  3. FineReport搭建物流报表平台的解决方案
  4. Python之迭代器和生成器(Day17)
  5. 2016年IoT和新的逃逸技术将会引领威胁态势
  6. C# 获取几种路径的方式
  7. CPU占用率高的九种可能~
  8. jQuery框架学习第二天:jQuery中万能的选择器
  9. 《代码敲不队》第五次作业:项目需求分析改进与系统设计
  10. Android 面试系列 Dn.1---- Service?