参考官方文档:http://gora.apache.org/current/tutorial.html

项目代码见:https://code.csdn.net/jediael_lu/mygorademo

另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521

当着数据已通过之前的示例存储在hbase中,数据如下:

\x00\x00\x00\x00\x00\x00\x00D              column=common:ip, timestamp=1422529645469, value=85.100.75.104                                                              \x00\x00\x00\x00\x00\x00\x00D              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0                                    \x00\x00\x00\x00\x00\x00\x00D              column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241                                  \x00\x00\x00\x00\x00\x00\x00D              column=http:httpMethod, timestamp=1422529645469, value=GET                                                                  \x00\x00\x00\x00\x00\x00\x00D              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8                                                 \x00\x00\x00\x00\x00\x00\x00D              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+                                                      \x00\x00\x00\x00\x00\x00\x00D              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241        \x00\x00\x00\x00\x00\x00\x00D              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7                                                                                                           \x00\x00\x00\x00\x00\x00\x00E              column=common:ip, timestamp=1422529645469, value=85.100.75.104                                                              \x00\x00\x00\x00\x00\x00\x00E              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP                                       \x00\x00\x00\x00\x00\x00\x00E              column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961                                 \x00\x00\x00\x00\x00\x00\x00E              column=http:httpMethod, timestamp=1422529645469, value=GET                                                                  \x00\x00\x00\x00\x00\x00\x00E              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8                                                 \x00\x00\x00\x00\x00\x00\x00E              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+                                                      \x00\x00\x00\x00\x00\x00\x00E              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961       \x00\x00\x00\x00\x00\x00\x00E              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7      

本例将使用MR读取hbase中的数据,并进行分析,分析每个url,一天时间内有多少人在访问,输出结果保存在hbase中,表中的key为“url+时间”格式的String,value包括三列,分别是url,时间,访问次数。

0、创建java project及gora.properties,内容如下:

##gora.datastore.default is the default detastore implementation to use
##if it is not passed to the DataStoreFactory#createDataStore() method.
gora.datastore.default=org.apache.gora.hbase.store.HBaseStore##whether to create schema automatically if not exists.
gora.datastore.autocreateschema=true

1、创建用于对应输入数据的json文件,并生成相应的类。
上个示例已经完成,见passview.json与PageView.java

{"type": "record","name": "Pageview", "default":null,"namespace": "org.apache.gora.tutorial.log.generated","fields" : [{"name": "url", "type": ["null","string"], "default":null},{"name": "timestamp", "type": "long", "default":0},{"name": "ip", "type": ["null","string"], "default":null},{"name": "httpMethod", "type": ["null","string"], "default":null},{"name": "httpStatusCode", "type": "int", "default":0},{"name": "responseSize", "type": "int", "default":0},{"name": "referrer", "type": ["null","string"], "default":null},{"name": "userAgent", "type": ["null","string"], "default":null}]
}

2、创建输入数据的类与表映射文件

<?xml version="1.0" encoding="UTF-8"?><!--Gora Mapping file for HBase Backend
-->
<gora-otd><table name="Pageview"> <!-- optional descriptors for tables --><family name="common"/> <!-- This can also have params like compression, bloom filters --><family name="http"/><family name="misc"/></table><class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"><field name="url" family="common" qualifier="url"/><field name="timestamp" family="common" qualifier="timestamp"/><field name="ip" family="common" qualifier="ip" /><field name="httpMethod" family="http" qualifier="httpMethod"/><field name="httpStatusCode" family="http" qualifier="httpStatusCode"/><field name="responseSize" family="http" qualifier="responseSize"/><field name="referrer" family="misc" qualifier="referrer"/><field name="userAgent" family="misc" qualifier="userAgent"/></class></gora-otd>

3、创建用于对于输出数据的json文件,并生成相应的类。

{"type": "record","name": "MetricDatum","namespace": "org.apache.gora.tutorial.log.generated","fields" : [{"name": "metricDimension", "type": "string"},{"name": "timestamp", "type": "long"},{"name": "metric", "type" : "long"}]
}

liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/
Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json
Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src
Compiler executed SUCCESSFULL.

4、创建输出数据的类与表映射内容,并将之加入第2步创建的文件中。

  <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"><field name="metricDimension" family="common"  qualifier="metricDimension"/><field name="timestamp" family="common" qualifier="ts"/><field name="metric" family="common" qualifier="metric"/></class>

5、写主类文件

程序处理的关键步骤:

(1)获取输入、输出DataStore

    if(args.length > 0) {String dataStoreClass = args[0];inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class, Pageview.class, conf);if(args.length > 1) {dataStoreClass = args[1];}outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);} else {inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);}

(2)设置job的一些基本属性

    Job job = new Job(getConf());job.setJobName("Log Analytics");log.info("Creating Hadoop Job: " + job.getJobName());job.setNumReduceTasks(numReducer);job.setJarByClass(getClass());

(3)定义job相关的Map类及mapr的输入输出信息。

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,LogAnalyticsMapper.class, true);

(4)定义job相关的Reduce类及reduce的输入输出信息。

    GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

(5)定义map类

public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,LongWritable> {private LongWritable one = new LongWritable(1L);private TextLong tuple;@Overrideprotected void setup(Context context) throws IOException ,InterruptedException {tuple = new TextLong();tuple.setKey(new Text());tuple.setValue(new LongWritable());};@Overrideprotected void map(Long key, Pageview pageview, Context context)throws IOException ,InterruptedException {CharSequence url = pageview.getUrl();long day = getDay(pageview.getTimestamp());tuple.getKey().set(url.toString());tuple.getValue().set(day);context.write(tuple, one);};/** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */private long getDay(long timeStamp) {return (timeStamp / DAY_MILIS) * DAY_MILIS; }}

(6)定义reduce类

public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,String, MetricDatum> {private MetricDatum metricDatum = new MetricDatum();@Overrideprotected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)throws IOException ,InterruptedException {long sum = 0L; //sum up the valuesfor(LongWritable value: values) {sum+= value.get();}String dimension = tuple.getKey().toString();long timestamp = tuple.getValue().get();metricDatum.setMetricDimension(new Utf8(dimension));metricDatum.setTimestamp(timestamp);String key = metricDatum.getMetricDimension().toString();key += "_" + Long.toString(timestamp);metricDatum.setMetric(sum);context.write(key, metricDatum);};}

(8)使用输入输出DataStore来创建一个job,并执行

    Job job = createJob(inStore, outStore, 3);boolean success = job.waitForCompletion(true);

其实使用Gora与一般的MR程序的主要区别在于:

(1)继承于GoraMapper/GoraReducer,而不是Mapper/Reducer。

(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。

如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。

对比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的运行一个job所需的基本属性:

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,  LogAnalyticsMapper.class, true);
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

以上语句同时完成了2、3、4、5步,即
指定了2、Map/Reduce的类:LogAnalyticsMapper.class与LogAnalyticsReducer.class
指定了3、4、输入格式及内容及5、reduce的输出类型:即输入输出均为DataSource格式,内容为inStore与outStore中的内容。
指定了5、指定了map的输出类型,这也是reduce的输入类型。

附详细代码:

(1)KeyValueWritable.java

package org.apache.gora.tutorial.log;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** A WritableComparable containing a key-value WritableComparable pair.* @param <K> the class of key * @param <V> the class of value*/
public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable> implements WritableComparable<KeyValueWritable<K,V>> {protected K key = null;protected V value =  null;public KeyValueWritable() {}public KeyValueWritable(K key, V value) {this.key = key;this.value = value;}public K getKey() {return key;}public void setKey(K key) {this.key = key;}public V getValue() {return value;}public void setValue(V value) {this.value = value;}@Overridepublic void readFields(DataInput in) throws IOException {if(key == null) {}key.readFields(in);value.readFields(in);}@Overridepublic void write(DataOutput out) throws IOException {key.write(out);value.write(out);}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((key == null) ? 0 : key.hashCode());result = prime * result + ((value == null) ? 0 : value.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;KeyValueWritable other = (KeyValueWritable) obj;if (key == null) {if (other.key != null)return false;} else if (!key.equals(other.key))return false;if (value == null) {if (other.value != null)return false;} else if (!value.equals(other.value))return false;return true;}@Overridepublic int compareTo(KeyValueWritable<K, V> o) {int cmp = key.compareTo(o.key);if(cmp != 0)return cmp;return value.compareTo(o.value);}
}

(2) TextLong.java

package org.apache.gora.tutorial.log;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;/*** A {@link KeyValueWritable} of {@link Text} keys and * {@link LongWritable} values. */
public class TextLong extends KeyValueWritable<Text, LongWritable> {public TextLong() {key = new Text();value = new LongWritable();}}

(3) LogAnalytics.java

package org.apache.gora.tutorial.log;import java.io.IOException;import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** LogAnalytics is the tutorial class to illustrate Gora MapReduce API. * The analytics mapreduce job reads the web access data stored earlier by the * {@link LogManager}, and calculates the aggregate daily pageviews. The* output of the job is stored in a Gora compatible data store. * * <p>See the tutorial.html file in docs or go to the * <a href="http://incubator.apache.org/gora/docs/current/tutorial.html"> * web site</a>for more information.</p>*/
public class LogAnalytics extends Configured implements Tool {private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);/** The number of miliseconds in a day */private static final long DAY_MILIS = 1000 * 60 * 60 * 24;/*** The Mapper takes Long keys and Pageview objects, and emits * tuples of <url, day> as keys and 1 as values. Input values are * read from the input data store.* Note that all Hadoop serializable classes can be used as map output key and value.* *///6、定义map类public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,LongWritable> {private LongWritable one = new LongWritable(1L);private TextLong tuple;@Overrideprotected void setup(Context context) throws IOException ,InterruptedException {tuple = new TextLong();tuple.setKey(new Text());tuple.setValue(new LongWritable());};@Overrideprotected void map(Long key, Pageview pageview, Context context)throws IOException ,InterruptedException {CharSequence url = pageview.getUrl();long day = getDay(pageview.getTimestamp());tuple.getKey().set(url.toString());tuple.getValue().set(day);context.write(tuple, one);};/** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */private long getDay(long timeStamp) {return (timeStamp / DAY_MILIS) * DAY_MILIS; }}/*** The Reducer receives tuples of <url, day> as keys and a list of * values corresponding to the keys, and emits a combined keys and* {@link MetricDatum} objects. The metric datum objects are stored * as job outputs in the output data store.*///7、定义reduce类public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,String, MetricDatum> {private MetricDatum metricDatum = new MetricDatum();@Overrideprotected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)throws IOException ,InterruptedException {long sum = 0L; //sum up the valuesfor(LongWritable value: values) {sum+= value.get();}String dimension = tuple.getKey().toString();long timestamp = tuple.getValue().get();metricDatum.setMetricDimension(new Utf8(dimension));metricDatum.setTimestamp(timestamp);String key = metricDatum.getMetricDimension().toString();key += "_" + Long.toString(timestamp);metricDatum.setMetric(sum);context.write(key, metricDatum);};}/*** Creates and returns the {@link Job} for submitting to Hadoop mapreduce.* @param inStore* @param outStore* @param numReducer* @return* @throws IOException*/public Job createJob(DataStore<Long, Pageview> inStore,DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {//3、设置job的一些基本属性Job job = new Job(getConf());job.setJobName("Log Analytics");log.info("Creating Hadoop Job: " + job.getJobName());job.setNumReduceTasks(numReducer);job.setJarByClass(getClass());/* Mappers are initialized with GoraMapper.initMapper() or * GoraInputFormat.setInput()*///4、定义job相关的Map类及mapr的输入输出信息。GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,LogAnalyticsMapper.class, true);//4、定义job相关的Reduce类及reduce的输入输出信息。/* Reducers are initialized with GoraReducer#initReducer().* If the output is not to be persisted via Gora, any reducer * can be used instead. */GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);return job;}@Overridepublic int run(String[] args) throws Exception {DataStore<Long, Pageview> inStore;DataStore<String, MetricDatum> outStore;Configuration conf = new Configuration();//1、获取输入、输出DataStore。if(args.length > 0) {String dataStoreClass = args[0];inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class, Pageview.class, conf);if(args.length > 1) {dataStoreClass = args[1];}outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);} else {inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);}//2、使用输入输出DataStore来创建一个jobJob job = createJob(inStore, outStore, 3);boolean success = job.waitForCompletion(true);inStore.close();outStore.close();log.info("Log completed with " + (success ? "success" : "failure"));return success ? 0 : 1;}private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>";public static void main(String[] args) throws Exception {if(args.length < 2) {System.err.println(USAGE);System.exit(1);}//run as any other MR jobint ret = ToolRunner.run(new LogAnalytics(), args);System.exit(ret);}}

6、运行程序
(1)导出程序—>runnable jar file,并将其上传到服务器

(2)运行程序
$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore

(3)查看hbase中的结果

hbase(main):001:0> list
TABLE                                                                                                                                                                   
AccessLog                                                                                                                                                               
Jan2814_webpage                                                                                                                                                         
Jan2819_webpage                                                                                                                                                         
Jan2910_webpage                                                                                                                                                         
Jan2920_webpage                                                                                                                                                         
Metrics                                                                                                                                                                 
Passwd                                                                                                                                                                  
member                                                                                                                                                                  
8 row(s) in 2.6450 seconds

hbase(main):002:0> scan 'Metrics'

Gora官方文档之二:Gora对Map-Reduce的支持相关推荐

  1. 【cocos2d-js官方文档】二十五、Cocos2d-JS v3.0中的单例对象

    为何将单例模式移除 在Cocos2d-JS v3.0之前.全部API差点儿都是从Cocos2d-x中移植过来的,这是Cocos2d生态圈统一性的重要一环.可惜的是,这样的统一性也在非常大程度上限制了C ...

  2. Android [Camera 源码] 相机 HAL3(Camera3) Google官方文档(二)

    Google源码网地址链接:https://source.android.com/devices/camera 该Google Camera的文档为系列文章,文章列表: overview Camera ...

  3. cocos2d js调用java_【cocos2d-js官方文档】二十四、如何在android平台上使用js直接调用Java方法...

    在cocos2d-js 3.0beta中加入了一个新特性,在android平台上我们可以通过反射直接在js中调用java的静态方法.它的使用方法很简单: var o = jsb.reflection. ...

  4. 【cocos2d-js官方文档】二十四、如何在android平台上使用js直接调用Java方法

    在cocos2d-js 3.0beta中加入了一个新特性,在Android平台上我们可以通过反射直接在js中调用Java的静态方法.它的使用方法很简单: var o = jsb.reflection. ...

  5. Orleans 2.0 官方文档 —— 6.8.4 部署 - 多集群支持 - silo的配置

    Orleans silo的配置 为了快速了解到概貌,我们将在下面的XML语法中,显示所有相关配置参数(包括可选配置参数): <?xml version="1.0" encod ...

  6. Hyperledger-indy 官方文档人工翻译

    博客内容为Hyperledger-indy官方部署文档的中文翻译,翻译内容系原创,转载注明来源 [说明] (1)[撰写目的] Hyperledger-indy的中文资料较少,希望能为初次接触Hyper ...

  7. OpenGL ES着色器语言之变量和数据类型(二)(官方文档第四章)

    OpenGL ES着色器语言之变量和数据类型(二)(官方文档第四章) 4.5精度和精度修饰符 4.5.1范围和精度 用于存储和展示浮点数.整数变量的范围和精度依赖于数值的源(varying,unifo ...

  8. 对于微信二维码相关官方文档的一些注解(微信登录和绑定微信、关注公众号)

    转载自:https://www.jianshu.com/p/d533c69be034 由于微信官方文档对此的描述虽然还可以,但是还是有一些让人疑惑的地方,所以笔者做了一些注解,希望对大家有所帮助 为什 ...

  9. Spring Framework 官方文档学习(四)之Validation、Data Binding、Type Conversion(二)

    接前一篇 Spring Framework 官方文档学习(四)之Validation.Data Binding.Type Conversion(一) 本篇主要内容:Spring Type Conver ...

最新文章

  1. 创新设计模式:原型模式
  2. 《福布斯》:微软的印度未来
  3. 提升UI设计界面高级感的小技巧
  4. [Usaco2006 Nov] Fence Repair 切割木板
  5. 被弃用的 Docker 未死:带着 1.05 亿美元融资“回归”,估值高达 21 亿
  6. 华为服务器双系统教程,双系统安装教程
  7. 基于深度学习的视频质量分析 深度学习视频质量诊断 图像质量诊断
  8. C++中2、8、10、16进制数字的表示及计算
  9. [iPhone8] 苹果承认硬件缺陷
  10. 点赋科技:本地生活,如何开启复苏之路
  11. 在线客服系统解决方案:游戏行业
  12. 02_制定自己的学习计划
  13. Oracle Golden Gate 系列七 -- 配置 GG Manager process
  14. Python编程PTA题解——查询水果价格
  15. 计算机维修工(4级)试题,计算机维修工 (初级) 上机动手排除故障试题【优质】.doc...
  16. 中国 IM 企业的新机会?揭秘融云全球通信云网络背后的技术 | 对话 WICC
  17. 【分布式版本控制系统Git】| Git 分支操作、Git 团队协作机制、GitHub 操作
  18. win10系统访问局域网服务器,Win10系统不能访问局域网共享磁盘的解决方法
  19. 英语二 - 常用词根一
  20. 伤疤好了有黑印怎么办_疤痕留下黑印怎么办 不妨试试这四种方法

热门文章

  1. 乐观锁和悲观锁的使用场景及应用——Java高并发系列学习笔记
  2. 1003 Emergency (25 分)【Dijastra与DFS解法】
  3. 22行代码AC,三种解法——例题3-6_环状序列(UVa-1584)
  4. java面试题(转载其他人,方便日常看)
  5. linux vi模式替换,linux基础命令之:vi模式下查找和替换
  6. Web群集与日志管理Haproxy搭建
  7. java吧王者_java单排上王者!(一) java内存
  8. c语言学生成绩删除功能,c语言学生成绩管理系统程序设计,有添加,查找,删除,输出,修改,排序等功能!!!...
  9. java 图片导出_java导出含图片的word
  10. 64位linux安装mysql数据库吗_CentOS7 64位安装mysql教程