场景

有时候需要处理并存储大量流式数据,以Gps数据为例,面对一天千万级别的数据,时常要分库分表,数据偶尔会存在丢失的情况,很难做到既要保证大量数据处理的同时又要保证数据查询的实时性。这时借助云平台就是一个很好的做法。

数据流向

大致数据流向数据源->缓冲区->数据库处理中心->云数据库

所用Azure云平台介绍

1.Azure EventHub (云Kakfa)
Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。
2.Azure Databricks (云Spark)
是一个已针对 Microsoft Azure 云服务平台进行优化的数据分析平台。 Azure Databricks开发数据密集型应用程序的三个环境:Databricks SQL、Databricks Data Science Engineering 和 & Databricks 机器学习。Databricks SQL 为想要针对数据湖运行 SQL 查询、创建多种可视化类型以从不同角度探索查询结果,以及生成和共享仪表板的分析员提供了一个易于使用的平台。
3.Azure Cosmos DB Cassandra(云数据库)
Azure Cosmos DB Cassandra API 可以充当为 Apache Cassandra 编写的应用的数据存储。 这意味着通过使用现有的符合 CQLv4 的 Apache 驱动程序,现有 Cassandra 应用程序现在可以与 Azure Cosmos DB Cassandra API 通信。 在许多情况下,只需更改连接字符串,就可以从使用 Apache Cassandra 切换为使用 Azure Cosmos DB Cassandra API。通过 Cassandra API 可以使用 Cassandra 查询语言 (CQL)、基于 Cassandra 的工具(如 cqlsh)和熟悉的 Cassandra 客户端驱动程序与 Azure Cosmos DB 中存储的数据进行交互。

技术方案优势

1.高效实时,高吞吐(理论上只要预算够无论多大的数据都能处理)
2.只要部署好,根本不用担心自己搭的服务器会爆炸
3.减少各种环境搭建过程,减少人力机器维护成本
4.后续查询扩展快速方便,可以根据业务需求做出各种定制

具体讲解

  1. 数据发送部分
    数据发送部分主要将数据源发送至EventHub,那么这里以.net为例附上一部分发送代码,因为是在老项目上改的,用的是.net fk,所以没有太多的操作空间(不然不兼容),EventHub使用环境在.net fk4.6以上,大家记得更新框架,推荐使用.net core或者.net 5的BackGroundService。
    大概说一下思路,一个并发队列,4线程发送,而且做了分区,可以根据自身语言环境或者需要更改,最好自己写
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.AppCenter.Crashes;namespace EHiGPSSocket.FormsApp
{public static class EventHub{private const string ConnectionString ="Your ConnectionString ";private const string EventHubName = "EventHubName";private const int Capacity = 500;private const int MacCapacity = 200000;private static EventHubProducerClient _producerClient = new EventHubProducerClient(ConnectionString, EventHubName);private static ConcurrentDictionary<string, ConcurrentQueue<string>> DicQueues = new ConcurrentDictionary<string, ConcurrentQueue<string>>();static EventHub(){for (int i = 0; i < 4; i++){Task.Factory.StartNew(SendAsync, CancellationToken.None, TaskCreationOptions.LongRunning);}}public static void Enqueue(string message, string topic){if (!DicQueues.Keys.Contains(topic)){var queue = new ConcurrentQueue<string>();queue.Enqueue(message);DicQueues.TryAdd(topic, queue);}else{DicQueues.TryGetValue(topic, out var queue);if (queue?.Count < MacCapacity){queue?.Enqueue(message);}}}static async Task SendAsync(object state){while (true){try{foreach (var dicQueue in DicQueues){var queue = dicQueue.Value;if (queue.Count > Capacity){var count = 0;var list = new List<EventData>();while (count < Capacity && count < dicQueue.Value.Count){queue.TryDequeue(out var data);if (!string.IsNullOrEmpty(data)){list.Add(new EventData(data));count++;}}if (_producerClient.IsClosed){_producerClient = new EventHubProducerClient(ConnectionString, EventHubName);}await _producerClient.SendAsync(list, new SendEventOptions() { PartitionKey = dicQueue.Key }).ConfigureAwait(false);}}}catch (Exception e){Crashes.TrackError(e);}}}}
}

2.EventHub
数据发送至EventHub后,可以看到大致的数据量

3.Azure Databricks
Azure Databricks从EventHub读取数据
大致的读取代码如下Scala

def EventHandle(): Unit = {val namespaceName = "namespaceName "val eventHubName = "eventHubName "val sasKeyName = "sasKeyName "val sasKey = "sasKey "val domainName="domainName.chinacloudapi.cn"val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder().setEndpoint(namespaceName,domainName).setEventHubName(eventHubName).setSasKeyName(sasKeyName).setSasKey(sasKey)val customEventhubParameters =EventHubsConf(connStr.toString()).setMaxEventsPerTrigger(100)customEventhubParameters.setConsumerGroup("$Default")println(customEventhubParameters.consumerGroup)println(customEventhubParameters.connectionString)val carDF = spark.sql("select * from xxxx")val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()val messages =incomingStream.withColumn("Body", $"body".cast(StringType)).withColumn("PartitionKey", $"PartitionKey".cast(StringType)).select("Body","PartitionKey")val query= messages.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long)=>batchDF.filter("PartitionKey='DBQ'")//这里处理DF.write.cassandraFormat("gpsdata", "gpsprofile").mode("append").save()}query.start().awaitTermination()
}

4.Azure Cosmos DB Cassandra
最后丢入Azure Cosmos DB Cassandra

总结

EventHub Azure Databricks Azure Cosmos DB Cassandra
这三个云平台中需要有很多可以配置的地方,大家参阅官方文档应该都可以解决,这里只 、提供一些思路和解决方案,具体操作与选型还是要看个人业务需求与技术选型

Azure云平台 GPS大数据解决方案 EventHub+Azure Databricks+Azure Cosmos DB Cassandra相关推荐

  1. 油烟在线监控系统云平台 油烟环保监测云平台 油烟大数据平台

    摘要:餐饮业油烟是大气中挥发性有机物(VOCS)和 PM10 的主要来源之一.近年来随着环保治理的加强,中央.省市区不断强化餐饮经营商全覆盖安装油烟净化器工作,但在监管上仍存在一些问题和漏洞. (1) ...

  2. 中望软件华东区技术部经理尚飞:中望设计云与制造大数据解决方案

    为进一步推广上海市国企应用自主可控信息技术,9月26日,由上海市国有资产管理委员会.上海市经济和信息化委员会.上海市版权局指导,上海市国有资产信息中心主办,畅享网提供媒体支持的"自主可控技术 ...

  3. Greenplum Hadoop分布式平台大数据解决方案实战教程

    基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析 [上集]百度网盘下载:链接:http://pan.baidu.com/s/1eQJFXZ0 密码:kdx9 [下集]百 ...

  4. 智领云荣登“中国大数据企业50强” | 2020大数据产业生态大会盛大召开 智领云斩获多项殊荣

    近年来,我国大数据生态环境不断向好,产业发展维持高增长态势,大数据技术在与政府.企业核心业务的融合中,释放出了更多创新活力和应用潜能. 8月27日,2020(第五届)大数据产业生态大会在京隆重召开,中 ...

  5. 【Microsoft Azure 的1024种玩法】五十九.基于Azure云平台快速搭建GitLab应用实现代码托管

    [简介] GitLab是由GitLab Inc.开发,一款基于Git的完全整合的软体开发平台,以 Git 作为代码管理工具并实现自托管的 Git 项目仓库,本篇文章主要介绍如何在Azure Virtu ...

  6. 【Microsoft Azure 的1024种玩法】六十八.基于Azure云平台使用Azure Virtual machines快速搭建Docker容器

    [简介] Docker 是一个开放源代码软件,主要应用于开发应用.交付应用.运行应用,Docker 可以将应用程序及其依赖项打包到可以在任何 Linux.Windows 或 macOS 计算机上运行的 ...

  7. 昨日黄花Hadoop 方兴未艾云原生——传统大数据平台的云原生化改造

    本文6539字,阅读时间约20分钟 以Hadoop为中心的大数据生态系统从2006年开源以来,一直是大部分公司构建大数据平台的选择,但这种传统选择随着人们深入地使用,出现越来越多的问题,比如:数据开发 ...

  8. 华为云打造农业农村大数据解决方案

    数据.算力强支撑,佳格天地联合华为云打造农业农村大数据解决方案 日前,华为云"828 B2B企业节"盛大开启.农业农村大数据应用企业佳格天地联合华为云推出"测亩宝&quo ...

  9. 《一张图看懂华为云BigData Pro鲲鹏大数据解决方案》

    8月27日,华为云重磅发布了业界首个鲲鹏大数据解决方案--BigData Pro.该方案采用基于公有云的存储与计算分离架构,以可无限弹性扩容的鲲鹏算力作为计算资源,以支持原生多协议的OBS对象存储服务 ...

  10. 《企业大数据系统构建实战:技术、架构、实施与应用》——第3章 企业大数据解决方案 3.1 企业大数据解决方案实现方式...

    本节书摘来自华章计算机<企业大数据系统构建实战:技术.架构.实施与应用>一书中的第3章,第3.1节,作者 吕兆星 郑传峰 宋天龙 杨晓鹏,更多章节内容可以访问云栖社区"华章计算机 ...

最新文章

  1. swagger error: Conflicting schemaIds: Duplicate schemaIds detected for types A and B
  2. MySQL模拟oracle的connect by
  3. 增强包_情暖冬至 饺子飘香——临沭县兴华学校冬至“趣味包饺子”比赛圆满结束...
  4. 世界首份博客报纸问世
  5. 淘宝技术架构从1.0到4.0的演变
  6. Python文件练习
  7. 【华为云技术分享】如何整合hive和hbase
  8. 聊聊 归一化和标准化
  9. 【笔记】双线性池化(Bilinear Pooling)详解、改进及应用
  10. EventBus 加强学习深入了解
  11. W3CSchool.chm帮助文档百度网盘分享
  12. 鸿蒙手机型号对照表,华为首款鸿蒙手机入网,机型具体型号让人意外
  13. 五种压缩软件(WinRAR、7Z、好压、快压和360压缩)之比拼
  14. Endnote与知网研学(E-study)题录相互导入
  15. mysql 正则表达式_MySQL的正则表达式
  16. iQOO刷鸿蒙系统,iQOO招募Android12 Beta版即将到来,网友:我想要鸿蒙系统!
  17. ECharts-中国省市地图
  18. Deep Retinex Decomposition for Low-Light Enhancement 论文阅读笔记
  19. getInputStream() has already been called for this request
  20. 工具类commons-io的Tailer用法,用来监控文件内容的变化情况

热门文章

  1. 使用ico图标†制作ico图标(浏览器图标
  2. ubuntu firefox flash 插件安装
  3. 【错误解决】Ubuntu20.04安装输入法遇到的问题
  4. ADC噪声全面分析 -01- ADC噪声的类型以及ADC特性
  5. 95后CEO讲述创业“邮件经”
  6. mysql数据库维护(mysql学习笔记)
  7. 045-Java-036
  8. JAVA——判断多选题的对错
  9. 实训-利用HTML和CSS制作一个网页界面
  10. jquery.seat-chartsMark在线选座插件使用