大数据挑战

在公司需要处理不断增长的数据量的各个领域中,对大数据的概念有不同的理解。 在大多数这些情况下,需要以某种方式设计所考虑的系统,以便能够处理该数据,而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲,这导致需要构建高度可伸缩的系统,以便可以根据在给定时间点需要处理的数据量来分配更多资源。

构建这样的系统是一项耗时且复杂的活动,因此,可以使用第三方框架和库来提供现成的可伸缩性要求。 在Java应用程序中已经有很多不错的选择,本文将简要讨论一些最受欢迎的选择:


行动框架

我们将通过实现一个简单的管道来处理每个设备的数据,以测量给定区域的空气质量指数,从而演示每个框架。 为简单起见,我们假定来自设备的数字数据是分批接收或以流方式接收的。 在整个示例中,我们将使用THRESHOLD常量表示该值,在该值之上,我们认为一个区域被污染。

阿帕奇火花

在Spark中,我们需要先将数据转换为正确的格式。 我们将使用数据集,但我们也可以选择数据帧或RDD(弹性分布式数据集)作为数据表示的替代方法。 然后,我们可以应用许多Spark转换和操作,以便以分布式方式处理数据。

 public long countPollutedRegions(String[] numbers) { // runs a Spark master that takes up 4 cores SparkSession session = SparkSession.builder(). appName( "AirQuality" ). master( "local[4]" ). getOrCreate(); // converts the array of numbers to a Spark dataset Dataset numbersSet = session.createDataset(Arrays.asList(numbers), Encoders.STRING());         // runs the data pipeline on the local spark long pollutedRegions = numbersSet.map(number -> Integer.valueOf(number), Encoders. INT ()) .filter(number -> number > THRESHOLD).count();                 return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Spark集群而不是本地Spark实例上运行,我们将具有以下执行流程:


Spark驱动程序可以是单独的实例,也可以是Spark群集的一部分。

Apache Flink

与Spark相似,我们需要在Flink DataSet中表示数据,然后对其应用必要的转换和操作:

 public long countPollutedRegions(String[] numbers) throws Exception { // creates a Flink execution environment with proper configuration StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment(); // converts the array of numbers to a Flink dataset and creates // the data pipiline DataStream stream = env.fromCollection(Arrays.asList(numbers)). map(number -> Integer.valueOf(number)) .filter(number -> number > THRESHOLD).returns(Integer. class ); long pollutedRegions = 0; Iterator numbersIterator = DataStreamUtils.collect(stream); while (numbersIterator.hasNext()) { pollutedRegions++; numbersIterator.next(); } return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Flink群集上运行,我们将具有以下执行流程:


将应用程序提交到Flink群集的Flink客户端是Flink CLI实用程序或JobManager的UI。

阿帕奇风暴

在Storm中,数据管道被创建为Spouts(数据源)和Bolts(数据处理单元)的拓扑。 由于Storm通常会处理无限制的数据流,因此我们会将空气质量指数编号数组的处理模拟为有限制的流:

 public void countPollutedRegions(String[] numbers) throws Exception { // builds the topology as a combination of spouts and bolts TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "numbers-spout" StormAirQualitySpout(numbers)); "numbers-spout" , new StormAirQualitySpout(numbers)); builder.setBolt( "number-bolt" , new StormAirQualityBolt()). shuffleGrouping( "numbers-spout" shuffleGrouping( "numbers-spout" );         // prepares Storm conf and along with the topology submits it for // execution to a local Storm cluster Config conf = new Config(); conf.setDebug( true ); LocalCluster localCluster = null; try { localCluster = new LocalCluster(); localCluster.submitTopology( "airquality-topology" , conf, builder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } catch (InterruptedException ex) { localCluster.shutdown(); } } 

我们有一个喷嘴可以为空气质量指数编号的数组提供数据源,还有一个仅过滤指示污染区域的螺栓:

 public class StormAirQualitySpout extends BaseRichSpout { private boolean emitted = false ; private SpoutOutputCollector collector; private String[] numbers; public StormAirQualitySpout(String[] numbers) { this .numbers = numbers; }     @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void open(Map paramas, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; } @Override public void nextTuple() { // we make sure that the numbers array is processed just once by // the spout if (!emitted) { for (String number : numbers) { collector.emit( new Values(number)); } emitted = true ; } }  } 
 public class StormAirQualityBolt extends BaseRichBolt { private static final int THRESHOLD = 10; private int pollutedRegions = 0; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void prepare(Map params,  TopologyContext context,  OutputCollector collector) { } @Override public void execute(Tuple tuple) { String number = tuple.getStringByField( "number" ); Integer numberInt = Integer.valueOf(number); if (numberInt > THRESHOLD) { pollutedRegions++; } }  } 

我们正在使用LocalCluster实例提交到本地Storm集群,这对于开发很方便,但是我们想将Storm拓扑提交到生产集群。 在这种情况下,我们将具有以下执行流程:


阿帕奇点燃

在Ignite中,我们需要先将数据放入分布式缓存中,然后再运行数据处理管道,该管道是在Ignite群集上以分布式方式执行的SQL查询的前者:

 public long countPollutedRegions(String[] numbers) { IgniteConfiguration igniteConfig = new IgniteConfiguration(); CacheConfiguration cacheConfig = new CacheConfiguration(); // cache key is number index in the array and value is the number cacheConfig.setIndexedTypes(Integer. class , String. class ); cacheConfig.setName(NUMBERS_CACHE); igniteConfig.setCacheConfiguration(cacheConfig);         try (Ignite ignite = Ignition.start(igniteConfig)) { IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE); // adds the numbers to the Ignite cache try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { int key = 0; for (String number : numbers) { streamer.addData(key++, number); } } // performs an SQL query over the cached numbers SqlFieldsQuery query = new SqlFieldsQuery( "select * from String where _val > " + THRESHOLD);             FieldsQueryCursor<List> cursor = cache.query(query); int pollutedRegions = cursor.getAll().size(); return pollutedRegions; }  } 

如果我们要在Ignite群集中运行应用程序,它将具有以下执行流程:


榛树喷射机

Hazelcast Jet在Hazelcast IMDG之上运行,并且与Ignite相似,如果我们要处理数据,我们需要首先将其放入Hazelcast IMDG群集中:

 public long countPollutedRegions(String[] numbers) { // prepares the Jet data processing pipeline Pipeline p = Pipeline.create(); p.drawFrom(Sources.list( "numbers" )). map(number -> Integer.valueOf((String) number)) .filter(number -> number > THRESHOLD).drainTo(Sinks.list( "filteredNumbers" )); JetInstance jet = Jet.newJetInstance(); IList numbersList = jet.getList( "numbers" ); numbersList.addAll(Arrays.asList(numbers)); try { // submits the pipeline in the Jet cluster jet.newJob(p).join(); // gets the filtered data from Hazelcast IMDG List filteredRecordsList = jet.getList( "filteredNumbers" ); int pollutedRegions = filteredRecordsList.size(); return pollutedRegions; } finally { Jet.shutdownAll(); } } 

但是请注意,Jet还提供集成而无需外部数据源,并且不需要将数据存储在IMDG群集中。 您也可以在不首先将数据存储到列表中的情况下进行聚合(查看Github中包含改进版本的完整示例)。 感谢Hazelcast工程团队的Jaromir和Can的宝贵意见。

如果我们要在Hazelcast Jet集群中运行该应用程序,它将具有以下执行流程:


卡夫卡流

Kafka Streams是一个客户端库,使用Kafka主题作为数据处理管道的源和接收器。 为了在我们的方案中使用Kafka Streams库,我们将把空气质量指数数字放入数字 Kafka主题中:

 public long countPollutedRegions() { List result = new LinkedList(); // key/value pairs contain string items final Serde stringSerde = Serdes.String(); // prepares and runs the data processing pipeline final StreamsBuilder builder = new StreamsBuilder(); builder.stream( "numbers" , Consumed.with(stringSerde, stringSerde)) .map((key, value) -> new KeyValue(key, Integer.valueOf(value))). filter((key, value) -> value > THRESHOLD) .foreach((key, value) -> { result.add(value.toString()); });     final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, createKafkaStreamsConfiguration()); streams.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } int pollutedRegions = result.size(); System.out.println( "Number of severely polluted regions: " + pollutedRegions); streams.close(); return pollutedRegions; } private Properties createKafkaStreamsConfiguration() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-search-config" ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } 

我们的Kafka Stream应用程序实例将具有以下执行流程:


脉冲星函数

Apache Pulsar函数是轻量级的计算过程,可与Apache Pulsar集群一起以无服务器的方式工作。 假设我们在Pulsar集群中传输空气质量指数,我们可以编写一个函数来计算超出给定阈值的指数数量,并将结果写回到Pulsar,如下所示:

 public class PulsarFunctionsAirQualityApplication implements Function { private static final int HIGH_THRESHOLD = 10; @Override public Void process(String input, Context context) throws Exception {         int number = Integer.valueOf(input);         if (number > HIGH_THRESHOLD) { context.incrCounter( "pollutedRegions" , 1); } return null; }  } 

该函数以及Pulsar集群的执行流程如下:


Pulsar函数可以在Pulsar群集中运行,也可以作为单独的应用程序运行。

摘要

在本文中,我们简要回顾了一些可用于在Java中实现大数据处理系统的最受欢迎的框架。 所提供的每个框架都相当大,值得单独发表一篇文章。 尽管非常简单,但我们的空气质量指数数据管道却展示了这些框架的运行方式,您可以以此为基础来扩展您可能会进一步感兴趣的每个框架中的知识。 您可以在此处查看完整的代码示例。

翻译自: https://www.javacodegeeks.com/2019/12/popular-frameworks-for-big-data-processing-in-java.html

Java大数据处理的流行框架相关推荐

  1. Py之PyODPS:PyODPS(MaxCompute平台上的大数据处理和分析框架)的简介、安装、使用方法之详细攻略

    Py之PyODPS:PyODPS(MaxCompute平台上的大数据处理和分析框架)的简介.安装.使用方法之详细攻略 目录 PyODPS的简介 1.PyODPS的特点 2.MaxCompute下SQL ...

  2. 23个java大数据处理框架

    本文转自:https://www.cnblogs.com/stm32stm32/p/6413557.html 目前,编程人员面对的最大挑战就是复杂性,硬件越来越复杂,OS越来越复杂,编程语言和API越 ...

  3. 想学大数据?大数据处理的开源框架推荐

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文讨论大数据处理生态系统和相关的架构栈,包括对适应于不同任务的多种框架特性的调研.除此之外,文章还从多个层次对框架进行深入研究,如存储,资 ...

  4. java 大数据处理一

    2019独角兽企业重金招聘Python工程师标准>>> 众所周知,java在处理数据量比较大的时候,加载到内存必然会导致内存溢出,而在一些数据处理中我们不得不去处理海量数据,在做数据 ...

  5. java 大数据处理之内存溢出解决办法(一)

    http://www.cnblogs.com/a757956132/p/4644910.html http://my.oschina.net/songhongxu/blog/209951 一.内存溢出 ...

  6. java 大数据处理之内存溢出解决办法

    因项目中需要查询大量数据,所以导致jvm内存溢出,发现这篇博文不错,转发来共同学习 原文地址:https://www.cnblogs.com/a757956132/p/4644910.html 一.内 ...

  7. java 大数据处理类 BigDecimal 解析

    这两天,由于我的必修课概率论里经常要用到排列组合的计算,感觉很麻烦,加上现代智能手机的计算器是没有这方面功能的. 所以,就自己动手写了个安卓的 排列组合 计算器,用了一天,发现有很大的问题,阶乘达百亿 ...

  8. 大数据处理——Java

    Java在大数据处理方面具有以下优点: 可扩展性:Java提供了良好的可扩展性,可以处理大规模数据的存储.处理和分析.Java支持多线程编程,可以充分利用多核处理器的计算能力,提高数据处理速度. 大数 ...

  9. 最适合Java开发者的大数据工具和框架

    http://www.xsh-gz.com/javatrain/20170104/2222.html 转自 最适合Java开发者的大数据工具和框架 发布时间: 2017-01-04 17:12:58 ...

最新文章

  1. tf.contrib.layers.fully_connected详解
  2. java nio doug_Java NIO简介
  3. 想成为别人眼里的Python大牛,就必不可少的书单
  4. BZOJ 1968 [Ahoi2005]COMMON 约数研究
  5. 将一个简单远程调用的方式例子改为异步调用 -- 2
  6. EPPlus导出Excel感觉很不错~~~
  7. java城市级联一次查询_我的城市没有任何设计活动,所以我自己组织了一次。...
  8. tsd3dmapper软件使用方法_TOYO模组选型软件使用方法
  9. H5 js 处理localstorage方法封装
  10. oracle是delete可以加并行吗,提高Oracle DELETE性能的策略
  11. SpringCloud面试题及答案
  12. 这才是牛逼程序员的标配!
  13. Java里进制转换(二进制、八进制、十进制、十六进制)
  14. python 菜鸟-python菜鸟教程
  15. c#类的多态和文件流复习
  16. 知识图谱嵌入的应用场景
  17. 华为USG防火墙配置命令
  18. el-scrollbar 优化滚动条样式
  19. XP系统无法访问\\192.168.1.104无法访问。你可能没有权限使用网络资源。与这台服务器的管理员联系以查明你是否有访问权限
  20. 计算机开机按f1,图文讲解开机按f1的解决方法,简单明了!

热门文章

  1. 【2018.4.14】模拟赛之二-ssl2392 蚂蚁【图论】
  2. 2021“MINIEYE杯”中国大学生算法设计超级联赛(5)Random Walk 2(推式子+矩阵逆+矩阵乘)
  3. 初一级练习题(2019.3.8)
  4. LVS三种模式的区别及负载均衡算法
  5. DevOps通用及版本控制面试题
  6. art-template入门(九)之API
  7. 我们在进行着一场拔河比赛……
  8. LocalDateTime与LocalDate之间的相互转换
  9. JAVA常用的环境变量配置
  10. android解决工具类中没有context中的问题