MapReduce并行处理csv文件,将船舶数据划分子轨迹
bean对象
因为在划分子轨迹中,主要使用的字段是mmsi号、位置、速度、时间,以及划分的特征点、子轨迹段,所以只需要这几个属性即可,重写toString方法,重写序列化和反序列化方法
// bean类
class SubTrajectorBean implements Writable{private String MMSI;private Double Lat_d;private Double Lon_d;private Long unixTime;private Integer label = -1;private String subTrajector = null;public String toString(){return MMSI + "," + Lat_d + "," + Lon_d + "," + unixTime + "," + label + "," + subTrajector; }
重写序列化和反序列化方法
// 序列化方法@overridepublic void write(DataOutput dataOutput) throw IOException{dataOutput.writeUTF(MMSI);dataOutput.writeDouble(Lat_d);dataOutput.writeDouble(Lon_d);dataOutput.writeLong(unixTime);dataOutput.writeInt(label);dataOutput.writeUTF(subTrajector);}// 反序列化方法@overridepublic void readFields(DataInput dataInput) throw IOException{this.MMSI = dataInput.readUTF();this.Lat_d = dataInput.readDouble();this.Lon_d = dataInput.readDouble();this.unixTime = dataInput.readLong();this.label = dataInput.readInt();this.subTrajector = dataInput.readUTF();}
Map阶段
map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤
// Mapper
public class SubTrajectorMapper extend Mapper<LongWritable, Text, Text, SubTrajectorBean>{// 输出key、valueprivate Text outK = new Text();private SubTrajectorBean outV = new SubTrajectorBean();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{// 转为字符串String[] comments = value.toString();// 判断速度是否大于3Kn,大于输出,否则过滤if(comments[5] > 3){String MMSI = comments[1];Double Lat_d = Double.parseDouble(comment[7]);Double Lon_d = Double.parseDouble(comment[8]);Double unixTime = Long.parseLong(comments[9]);// 封装bean对象outV.setMMMSI(MMSI);outV.setLat_d(Lat_d);outV.setLon_d(Lon_d);outV.setUnixTime(unixTime);// 封装Text对象outK.set(MMSI);// 写出contextcontext.write(outK, outV);}}
}
Reduce阶段
reduce阶段需要将过滤的数据按照mmsi排序,标记特征点,按照特征点划分子轨迹段
// Reducer
public class SubTrajectorReducer extends Reducer<Text, SubTrajectorBean, NullWritable, SubTrajectorBean>{// 重写reduce方法public void reduce(Text key, Iterable<SubTrajectorBean> values, Context context){// 将同一MMSI数据放到一个新列表中List trajectorList = new ArrayList<>(1000);for(SubTrajectorBean value: values){trajectorList.add(Utils.getNewBean(value));}// 进行处理// 二维数组List[] result = new List()[];// 二维数组索引和子轨迹索引和特征点Integer index = 0, sub = 1, trait = 0;for(int i = 0; i < trajectorList.length()-3; i++){// 如果等于一,则是第一个,将其label设置为1if(trait == 0) {trajectorList.get(i).setLabel(1); trajectorList.get(i).setLabel(1);}// 判断是否时间超限// 如果超限,将该点的label改为1Double = time = Math.abs(trajectorList.get(i).getUnixTime() - trajectorList.get(i+1).getUnixTime());if(Double > 360){trajectorList.get(i).setLabel(1);// 判断子轨迹的个数是否大于10if (trait > 10){index += 1;sub += 1;trait = 0;}else {result[index].clear();}}// 如果不超限,判断TF特征点else{SubTrajectorBean stb1 = trajectorList.get(i);SubTrajectorBean stb2 = trajectorList.get(i+1);SubTrajectorBean stb3 = trajectorList.get(i+2);SubTrajectorBean stb4 = trajectorList.get(i+3);Double T12 = Utils.getT(stb1, stb2, stb3);Double T23 = Utils.getT(stb2, stb3, stb4);if((T12 * T23) < 0){// 为i + 2赋值labeltrajectorList.get(i+2).setLabel(1);// 将i+1,i+2加入result// 拼接子轨迹编号String st = value.getMMSI() + sub;trajectorList.get(i+1).setSubTrajector(st);trajectorList.get(i+2).setSubTrajector(st);result[index].append(trajectorList.get(i+1));result[index].append(trajectorList.get(i+2));// 判断子轨迹的个数是否大于10个数if (trait > 10){index += 1;sub += 1;trait = 0;}else {result[index].clear();}}// 拼接子轨迹编号String st = value.getMMSI() + sub;trajectorList.get(i).setSubTrajector(st);// 添加到二维列表中result[index].append(trajectorList.get(i));// 写出数据for(List values: result){for(SubTrajectorBean value: values){context.write(NullWritable, value);}} }}
}
Utils工具类
在处理的过程中,为了解耦,所以将个别方法单独拿出来设置成了工具类
// 工具类Utils
public class Utils{// 得到一个新的bean对象public static SubTrajectorBean getNewBean(SubTrajectorBean stb){SubTrajectorBean bean = new SubTrajectorBean();bean.setMMSI(stb.getMMSI());bean.setLat_d(stb.getLat_d());bean.setLon_d(stb.getLon_d());bean.setUnixTime(stb.getUnixTime());bean.setLabel(stb.getLabel());bean.setSubTrajector(stb.getSubTrajector());}// 曲线边缘法public static Double getT(SubTrajectorBean stb1,SubTrajectorBean stb2, SubTrajectorBean stb3){// 计算TmnDouble T = (stb2.getLat_d - stb1.getLat_d)(stb3.getLon_d - stb1.getLon_d) + (stb3.getLat_d - stb1.getLat_d)(stb2.getLon_d - stb1.getLon_d);return T;}
}
Driver类
Driver类就是典型的八股文形式,关联map和redece,设置key、value,设置路径,提交作业
// Driver类
public class SubTrajectorDriver{public static void main(String[] args){// 1.获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2.关联Driver类job.setJarByClass(SubTrajectorDriver.class);// 3.关联Mapper和Reducer类job.setMapperClass(SubTrajectorMapper.class);job.setReducerClass(SubTrajectorReducer.class);// 4.设置Map的输出key/valuejob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(SubTrajectorBean.class);// 5.设置最终的输出key/valuejob.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(SubTrajectorBean.class);// 6.设置输入输出路径FileInputFormat.setInputPath(job, new Path("inputPath"));FileOutputFormat.setOutputPath(job, new Path("outputPath"));// 7.提交jobboolean result = job.waitForCompletion(true);System.exit(result? 0: 1);}
}
上述代码就是大概的子轨迹提取过程的MapReduce实现,因为疫情原因,本人封闭不让去实验室,所以机器的限制并不能真是运行该代码,代码编写也是在xp系统的文本中靠感觉编写,但是具体思路完全符合研究逻辑,代码虽不能保证完全正确统一,但是也在编写中也十分注意格式和语法,如有代码错误之处,还请指出。
MapReduce并行处理csv文件,将船舶数据划分子轨迹相关推荐
- 用Python并行处理大文件,看这篇就够了!
通过使用多处理.joblib和tqdm concurrent来减少数据处理时间. 扫码关注<Python学研大本营>,加入读者群,分享更多精彩 为了进行并行处理,我们将任务划分为多个子单元 ...
- phoenix导出csv文件
原文地址:https://www.cnblogs.com/alexgl2008/articles/12852013.html?share_token=E21CB83E-5BBF-4D90-AF9B-5 ...
- 使用spark来处理CSV文件数据
1.使用spark来处理CSV文件,写入mysql表当中 spark介绍 Spark是一个快速(基于内存),通用.可扩展的计算引擎,采用Scala语言编写.2009年诞生于UC Berkeley(加州 ...
- hbase导入csv文件_CDH5.4.5运行Phoenix导入CSV文件
标签: 1.安装phoenix 在界面上设置Phoenix的parcel包: http://52.11.56.155:7180/cmf/settings?groupKey=config.scm.par ...
- hbase调用ImportTsv导入csv文件时报错File does not exist
问题背景 在大数据存储课设中,任务要求是要把生成的原始数据存储到Hbase中.首先将csv文件传至了HDFS,而下一步将传至Hbase却出现了一个问题,耗费了数小时寻找问题解决方法,最终将数据成功导入 ...
- Spark 读取csv文件quote配置无效
在进行数据清洗时,使用spark 读取csv文件时,遭遇到数据列中存在 \n的字符 原始数据: names "小红\n小明" 解析后数据: index names 1 小红 2 小 ...
- 使用 JavaCSV api 读取和写入 csv 文件
使用JavaCSV api 导包 我现在基本上都是Maven构建项目,相信大家也是,就不提供jar包了. <dependency><groupId>net.sourceforg ...
- python读取csv文件第一行_尝试读取CSV文件的第一行返回['/']
我通过Django上传了一个CSV文件,我试着读它的第一行.文件存储在服务器上的/tmp/csv_file/test.csv 文件如下所示: ^{pr2}$ 我正在尝试获取文件的标题,例如:absol ...
- python统计csv行数_对Python 多线程统计所有csv文件的行数方法详解
如下所示: #统计某文件夹下的所有csv文件的行数(多线程) import threading import csv import os class MyThreadLine(threading.Th ...
最新文章
- oracle开发数据库试题,Oracle_开发数据库试题.doc
- 古老的SSM企业级应用
- ajax empty,jQuery empty仅在AJAX调用后的第二次单击时起作用
- QT的QCalendarWidget类的使用
- Shiro与Spring集成时,Shiro权限注解@RequiresRoles等不生效的解决方案
- java生成随机数的两种方式
- qt 程序异常结束_【心电国际指南2009专家解读】浦介麟 冉玉琴老师:QT 间期的规范化测量及其意义...
- 网络安全基础——破解系统密码
- Python实现everything文件检索
- 阿斯克码表ACSII对照表
- J2EE框架(四)核心设计模式
- uc浏览器linux系统下载文件夹,UC浏览器开发者工具Linux版
- 自娱自乐 中秋快乐 代码
- Linux内核源码——通知链(notifier chain)
- WPS如何生成指定区间随机数
- Long源码与常见问题
- 边缘化你必须知道的一件事!(FEJ知识点总结)
- ASO如何设置高价值的英文关键词,英文aso优化
- 电商api—拼多多搜索
- 哪些设备将用于部署酒店WiFi网络解决方案?
热门文章
- PHP 下载B站视频
- 任务车间调度问题的混合整数规划模型
- 销售额266亿元!这家企业解锁新能源车电驱动“新”未来
- 趋肤效应实验报告_大学计算机基础实验报告的答案
- 虽然凉了,也要坚强,一名渣硕的阿里菜鸟网络Java三面面经分享
- 无源和有源滤波器电路简图
- 前端面试知识点-DOCTYPE、盒模型、css选择符、position的值、网络协议【小咚 “面筋” 记】
- QQ游戏的PKG格式文件解压工具
- cf不能全屏win7的解决方法_win7电脑win+a快捷键不能用解决方法「系统天地」
- VMware运用Intel I350网卡异常处理