bean对象

因为在划分子轨迹中,主要使用的字段是mmsi号、位置、速度、时间,以及划分的特征点、子轨迹段,所以只需要这几个属性即可,重写toString方法,重写序列化和反序列化方法

// bean类
class SubTrajectorBean implements Writable{private String MMSI;private Double Lat_d;private Double Lon_d;private Long unixTime;private Integer label = -1;private String subTrajector = null;public String toString(){return MMSI + "," + Lat_d + "," + Lon_d + "," + unixTime + "," + label + "," + subTrajector;    }

重写序列化和反序列化方法

// 序列化方法@overridepublic void write(DataOutput dataOutput) throw IOException{dataOutput.writeUTF(MMSI);dataOutput.writeDouble(Lat_d);dataOutput.writeDouble(Lon_d);dataOutput.writeLong(unixTime);dataOutput.writeInt(label);dataOutput.writeUTF(subTrajector);}// 反序列化方法@overridepublic void readFields(DataInput dataInput) throw IOException{this.MMSI = dataInput.readUTF();this.Lat_d = dataInput.readDouble();this.Lon_d = dataInput.readDouble();this.unixTime = dataInput.readLong();this.label = dataInput.readInt();this.subTrajector = dataInput.readUTF();}

Map阶段

map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤

// Mapper
public class SubTrajectorMapper extend Mapper<LongWritable, Text, Text, SubTrajectorBean>{// 输出key、valueprivate Text outK = new Text();private SubTrajectorBean outV = new SubTrajectorBean();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{// 转为字符串String[] comments = value.toString();// 判断速度是否大于3Kn,大于输出,否则过滤if(comments[5] > 3){String MMSI = comments[1];Double Lat_d = Double.parseDouble(comment[7]);Double Lon_d = Double.parseDouble(comment[8]);Double unixTime = Long.parseLong(comments[9]);// 封装bean对象outV.setMMMSI(MMSI);outV.setLat_d(Lat_d);outV.setLon_d(Lon_d);outV.setUnixTime(unixTime);// 封装Text对象outK.set(MMSI);// 写出contextcontext.write(outK, outV);}}
}

Reduce阶段

reduce阶段需要将过滤的数据按照mmsi排序,标记特征点,按照特征点划分子轨迹段

// Reducer
public class SubTrajectorReducer extends Reducer<Text, SubTrajectorBean, NullWritable, SubTrajectorBean>{// 重写reduce方法public void reduce(Text key, Iterable<SubTrajectorBean> values, Context context){// 将同一MMSI数据放到一个新列表中List trajectorList = new ArrayList<>(1000);for(SubTrajectorBean value: values){trajectorList.add(Utils.getNewBean(value));}// 进行处理// 二维数组List[] result = new List()[];// 二维数组索引和子轨迹索引和特征点Integer index = 0, sub = 1, trait = 0;for(int i = 0; i < trajectorList.length()-3; i++){// 如果等于一,则是第一个,将其label设置为1if(trait == 0) {trajectorList.get(i).setLabel(1);                trajectorList.get(i).setLabel(1);}// 判断是否时间超限// 如果超限,将该点的label改为1Double = time = Math.abs(trajectorList.get(i).getUnixTime() - trajectorList.get(i+1).getUnixTime());if(Double > 360){trajectorList.get(i).setLabel(1);// 判断子轨迹的个数是否大于10if (trait > 10){index += 1;sub += 1;trait = 0;}else {result[index].clear();}}// 如果不超限,判断TF特征点else{SubTrajectorBean stb1 = trajectorList.get(i);SubTrajectorBean stb2 = trajectorList.get(i+1);SubTrajectorBean stb3 = trajectorList.get(i+2);SubTrajectorBean stb4 = trajectorList.get(i+3);Double T12 = Utils.getT(stb1, stb2, stb3);Double T23 = Utils.getT(stb2, stb3, stb4);if((T12 * T23) < 0){// 为i + 2赋值labeltrajectorList.get(i+2).setLabel(1);// 将i+1,i+2加入result// 拼接子轨迹编号String st = value.getMMSI() + sub;trajectorList.get(i+1).setSubTrajector(st);trajectorList.get(i+2).setSubTrajector(st);result[index].append(trajectorList.get(i+1));result[index].append(trajectorList.get(i+2));// 判断子轨迹的个数是否大于10个数if (trait > 10){index += 1;sub += 1;trait = 0;}else {result[index].clear();}}// 拼接子轨迹编号String st = value.getMMSI() + sub;trajectorList.get(i).setSubTrajector(st);// 添加到二维列表中result[index].append(trajectorList.get(i));// 写出数据for(List values: result){for(SubTrajectorBean value: values){context.write(NullWritable, value);}}  }}
}

Utils工具类

在处理的过程中,为了解耦,所以将个别方法单独拿出来设置成了工具类

// 工具类Utils
public class Utils{// 得到一个新的bean对象public static SubTrajectorBean getNewBean(SubTrajectorBean stb){SubTrajectorBean bean = new SubTrajectorBean();bean.setMMSI(stb.getMMSI());bean.setLat_d(stb.getLat_d());bean.setLon_d(stb.getLon_d());bean.setUnixTime(stb.getUnixTime());bean.setLabel(stb.getLabel());bean.setSubTrajector(stb.getSubTrajector());}// 曲线边缘法public static Double getT(SubTrajectorBean stb1,SubTrajectorBean stb2, SubTrajectorBean stb3){// 计算TmnDouble T = (stb2.getLat_d - stb1.getLat_d)(stb3.getLon_d - stb1.getLon_d) + (stb3.getLat_d - stb1.getLat_d)(stb2.getLon_d - stb1.getLon_d);return T;}
}

Driver类

Driver类就是典型的八股文形式,关联map和redece,设置key、value,设置路径,提交作业

// Driver类
public class SubTrajectorDriver{public static void main(String[] args){// 1.获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2.关联Driver类job.setJarByClass(SubTrajectorDriver.class);// 3.关联Mapper和Reducer类job.setMapperClass(SubTrajectorMapper.class);job.setReducerClass(SubTrajectorReducer.class);// 4.设置Map的输出key/valuejob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(SubTrajectorBean.class);// 5.设置最终的输出key/valuejob.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(SubTrajectorBean.class);// 6.设置输入输出路径FileInputFormat.setInputPath(job, new Path("inputPath"));FileOutputFormat.setOutputPath(job, new Path("outputPath"));// 7.提交jobboolean result = job.waitForCompletion(true);System.exit(result? 0: 1);}
}

上述代码就是大概的子轨迹提取过程的MapReduce实现,因为疫情原因,本人封闭不让去实验室,所以机器的限制并不能真是运行该代码,代码编写也是在xp系统的文本中靠感觉编写,但是具体思路完全符合研究逻辑,代码虽不能保证完全正确统一,但是也在编写中也十分注意格式和语法,如有代码错误之处,还请指出。

MapReduce并行处理csv文件,将船舶数据划分子轨迹相关推荐

  1. 用Python并行处理大文件,看这篇就够了!

    通过使用多处理.joblib和tqdm concurrent来减少数据处理时间. 扫码关注<Python学研大本营>,加入读者群,分享更多精彩 为了进行并行处理,我们将任务划分为多个子单元 ...

  2. phoenix导出csv文件

    原文地址:https://www.cnblogs.com/alexgl2008/articles/12852013.html?share_token=E21CB83E-5BBF-4D90-AF9B-5 ...

  3. 使用spark来处理CSV文件数据

    1.使用spark来处理CSV文件,写入mysql表当中 spark介绍 Spark是一个快速(基于内存),通用.可扩展的计算引擎,采用Scala语言编写.2009年诞生于UC Berkeley(加州 ...

  4. hbase导入csv文件_CDH5.4.5运行Phoenix导入CSV文件

    标签: 1.安装phoenix 在界面上设置Phoenix的parcel包: http://52.11.56.155:7180/cmf/settings?groupKey=config.scm.par ...

  5. hbase调用ImportTsv导入csv文件时报错File does not exist

    问题背景 在大数据存储课设中,任务要求是要把生成的原始数据存储到Hbase中.首先将csv文件传至了HDFS,而下一步将传至Hbase却出现了一个问题,耗费了数小时寻找问题解决方法,最终将数据成功导入 ...

  6. Spark 读取csv文件quote配置无效

    在进行数据清洗时,使用spark 读取csv文件时,遭遇到数据列中存在 \n的字符 原始数据: names "小红\n小明" 解析后数据: index names 1 小红 2 小 ...

  7. 使用 JavaCSV api 读取和写入 csv 文件

    使用JavaCSV api 导包 我现在基本上都是Maven构建项目,相信大家也是,就不提供jar包了. <dependency><groupId>net.sourceforg ...

  8. python读取csv文件第一行_尝试读取CSV文件的第一行返回['/']

    我通过Django上传了一个CSV文件,我试着读它的第一行.文件存储在服务器上的/tmp/csv_file/test.csv 文件如下所示: ^{pr2}$ 我正在尝试获取文件的标题,例如:absol ...

  9. python统计csv行数_对Python 多线程统计所有csv文件的行数方法详解

    如下所示: #统计某文件夹下的所有csv文件的行数(多线程) import threading import csv import os class MyThreadLine(threading.Th ...

最新文章

  1. oracle开发数据库试题,Oracle_开发数据库试题.doc
  2. 古老的SSM企业级应用
  3. ajax empty,jQuery empty仅在AJAX调用后的第二次单击时起作用
  4. QT的QCalendarWidget类的使用
  5. Shiro与Spring集成时,Shiro权限注解@RequiresRoles等不生效的解决方案
  6. java生成随机数的两种方式
  7. qt 程序异常结束_【心电国际指南2009专家解读】浦介麟 冉玉琴老师:QT 间期的规范化测量及其意义...
  8. 网络安全基础——破解系统密码
  9. Python实现everything文件检索
  10. 阿斯克码表ACSII对照表
  11. J2EE框架(四)核心设计模式
  12. uc浏览器linux系统下载文件夹,UC浏览器开发者工具Linux版
  13. 自娱自乐 中秋快乐 代码
  14. Linux内核源码——通知链(notifier chain)
  15. WPS如何生成指定区间随机数
  16. Long源码与常见问题
  17. 边缘化你必须知道的一件事!(FEJ知识点总结)
  18. ASO如何设置高价值的英文关键词,英文aso优化
  19. 电商api—拼多多搜索
  20. 哪些设备将用于部署酒店WiFi网络解决方案?

热门文章

  1. PHP 下载B站视频
  2. 任务车间调度问题的混合整数规划模型
  3. 销售额266亿元!这家企业解锁新能源车电驱动“新”未来
  4. 趋肤效应实验报告_大学计算机基础实验报告的答案
  5. 虽然凉了,也要坚强,一名渣硕的阿里菜鸟网络Java三面面经分享
  6. 无源和有源滤波器电路简图
  7. 前端面试知识点-DOCTYPE、盒模型、css选择符、position的值、网络协议【小咚 “面筋” 记】
  8. QQ游戏的PKG格式文件解压工具
  9. cf不能全屏win7的解决方法_win7电脑win+a快捷键不能用解决方法「系统天地」
  10. VMware运用Intel I350网卡异常处理