使用 Azure Databricks 执行 ETL(提取、转换和加载数据)操作。 将数据从 Azure Data Lake Storage Gen2 提取到 Azure Databricks 中,在 Azure Databricks 中对数据运行转换操作,然后将转换的数据加载到 Azure Synapse Analytics 中。

本教程中的步骤使用 Azure Databricks 的 Azure Synapse 连接器将数据传输到 Azure Databricks。 而此连接器又使用 Azure Blob 存储来临时存储在 Azure Databricks 群集和 Azure Synapse 之间传输的数据。

下图演示了应用程序流:

本教程涵盖以下任务:

  • 创建 Azure Databricks 服务。

  • 在 Azure Databricks 中创建 Spark 群集。

  • 在 Data Lake Storage Gen2 帐户中创建文件系统。

  • 上传示例数据到 Azure Data Lake Storage Gen2 帐户。

  • 创建服务主体。

  • 从 Azure Data Lake Storage Gen2 帐户中提取数据。

  • 在 Azure Databricks 中转换数据。

  • 将数据加载到 Azure Synapse 中

创建DataLake测试账户

Azure Data Lake 存储 Gen2](Azure Data Lake Storage Gen2 Introduction | Microsoft Docs) (也称为 ADLS Gen2) 是一种用于大数据分析的下一代 Data Lake解决方案。 Azure Data Lake 存储 Gen2 使用低成本分层存储、高可用性和灾难恢复功能,生成 Azure Data Lake 存储 Gen1 功能文件系统语义、文件级安全性和缩放到 — Azure Blob 存储。 — Azure Blob File System (ABFS) 驱动程序提供 ADLS Gen2 存储的接口。 Databricks Runtime 中包含的 ABFS 驱动程序支持 Azure Blob 存储上的标准文件系统语义。

portal上选择存储账户,创建过程如下

 

ADLS Gen2 访问

  • 对于生产和多用户方案,有多种身份验证方法可用于从 Azure Databricks 访问 ADLS Gen2 存储。 根据要求选择方法:

    • 若要根据每个用户的权限ADLS Gen2工作区用户访问 Azure Data Lake 存储

    • 若要为多个工作区用户提供对一组常用文件夹或文件的访问权限,请参阅配合使用 OAuth 2.0 和 Azure 服务主体来访问 Azure Data Lake Storage Gen2。

    • 若要精细控制对存储资源的访问,请参阅使用 SAS 令牌提供程序直接访问 Azure Data Lake Storage Gen2,定义自定义访问策略。

从 Azure 存储帐户获取访问密钥。

创建 Azure Key Vault

可以使用Key Vault保存密钥

将 Azure 访问密钥添加到 Azure 密钥保管库

使用 Azure 服务主体运行作业

作业提供了一种非交互式的方式来运行 Azure Databricks 群集中的应用程序,例如,ETL 作业或应按计划运行的数据分析任务。 通常,这些作业以创建它们的用户身份运行,但这可能有一些限制:

  • 创建并运行作业取决于具有适当权限的用户。

  • 只有创建作业的用户才能访问作业。

  • 用户可能已从 Azure Databricks 工作区中删除。

使用服务帐户(与应用程序而不是特定用户关联的帐户)是解决这些限制的常用方法。 在 Azure 中,可使用 Azure Active Directory (Azure AD) 应用程序和服务主体来创建服务帐户。

这一点非常重要,尤其是服务主体控制对存储在 Azure Data Lake Storage Gen2 帐户中的数据的访问时。 使用这些服务主体运行作业允许作业访问存储帐户中的数据,并提供对数据访问范围的控制。

下面进行创建 Azure AD 应用程序和服务主体,并使该服务主体成为作业的所有者。向不拥有作业的其他组授予作业运行权限:

在 Azure Active Directory 中创建服务主体

在门户中搜索应用注册

完成

创建客户端密码

选择证书和密码-新建客户端密码

此密钥之后不能查看,一定要保存好,作为之后使用,

Azure Databricks 中创建个人访问令牌 (PAT)

  1. 在 Azure Databricks 中创建个人访问令牌 (PAT) ,可使用 PAT 对 Databricks REST API 进行身份验证。

    你将使用 Azure Databricks 个人访问令牌 (PAT) 对 Databricks REST API 进行身份验证。 创建可用于发出 API 请求的 PAT:

    1. 转到 Azure Databricks 工作区。

    2. 单击屏幕右上角的用户图标,然后单击“用户设置”。

    3. 单击 "访问令牌" "生成新令牌"

    4. 复制并保存令牌值。

  2. 使用 Databricks SCIM API 将服务主体作为非管理用户添加到 Azure Databricks。

    curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/preview/scim/v2/ServicePrincipals" \--header "Content-Type: application/scim+json" \--header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" \--data-raw "{"schemas":["urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"],"applicationId":"eb603b71-6def-44bb-b660-d4ea67fe33c4","displayName": "test-sp","entitlements":[{"value":"allow-cluster-create"}]}"

    以上代码在windows 下执行会报错,需要把单引号改为双引号,把"" 改为 “^”,data-raw 参属下也不行,可以使用@data 把参数写成json读取。如下图

  3. 在 Azure Databricks 中创建 Azure Key Vault 支持的机密范围。

    访问:类似这样地址::https://<databrick url>/?o=7535626322818590#secrets/createScope

  4. 授予服务主体对机密范围的读取权限。

    curl -X POST 'https://<per-workspace-url/api/2.0/secrets/acls/put' \--header 'Authorization: Bearer <personal-access-token>' \--header 'Content-Type: application/json' \--data-raw '{"scope": "<scope-name>","principal": "<application-id>","permission": "READ"}'curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/secrets/acls/put"  --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" --header "Content-Type: application/json"  --data @create1.json

    测试作业

    在 Azure Databricks 中创建一个作业,并将该作业群集配置为从机密范围读取机密。

    1. 转到 Azure Databricks 登录页并选择“创建空白笔记本”。 为笔记本命名,然后选择“SQL”作为默认语言。

    2. 在笔记本的第一个单元格中输入 SELECT 1。 这是一个简单的命令,如果成功,只显示 1。 如果你已授予服务主体对 Azure Data Lake Storage Gen 2 中特定文件或路径的访问权限,则可以改为从这些路径读取。

    3. 转到“作业”并单击“+ 创建作业”按钮 。 为作业命名,单击“选择笔记本”,然后选择刚刚创建的笔记本。

    4. 单击群集信息旁边的“编辑”。

    5. 在“配置群集”页面上,单击“高级选项” 。

    6. 在“Spark”选项卡上,输入以下 Spark 配置:

    1. fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth
      fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
      fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net <application-id>
      fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/<secret-scope-name>/<secret-name>}}
      fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/<directory-id>/oauth2/token
      ​
      -----
      ​
      ​
      fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth
      fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
      fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net  eb603b71-6def-44bb-b660-d4ea67fe33c4
      fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/maxscope/acme-sp-secret}}
      fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/75d1575e-746b-46a1-9faf-e571b4af6eda/oauth2/token
      ​
      ​
      将 <secret-scope-name> 替换为包含客户端机密的 Azure Databricks 机密范围的名称。 maxscope
      将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。  eb603b71-6def-44bb-b660-d4ea67fe33c4
      将 <secret-name> 替换为与机密范围中的客户端机密值关联的名称。acme-sp-secret
      将 <directory-id> 替换为 Azure AD 应用程序注册的 Directory (tenant) ID。 75d1575e-746b-46a1-9faf-e571b4af6eda
      ​

  5. 将作业的所有权转移给服务主体。

    curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/jobs/<job-id>' \--header 'Authorization: Bearer <personal-access-token>' \--header 'Content-Type: application/json' \--data-raw '{"access_control_list": [{"service_principal_name": "<application-id>","permission_level": "IS_OWNER"},{"group_name": "admins","permission_level": "CAN_MANAGE"}]}'替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。
    使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。
    将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。
    ​
    ​
    ​
    该作业还需要对笔记本的读取权限。 运行以下命令以授予所需的权限:
    ​
    ​
    curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/notebooks/<notebook-id>' \--header 'Authorization: Bearer <personal-access-token>' \--header 'Content-Type: application/json' \--data-raw '{"access_control_list": [{"service_principal_name": "<application-id>","permission_level": "CAN_READ"}]}'替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。
    将 <notebook-id> 替换为与作业关联的笔记本的 ID。 若要查找 ID,请转到 Azure Databricks 工作区中的笔记本,并在笔记本的 URL 中查找 notebook/ 后面的数字 ID。
    使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。
    将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。
    ​
    curl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/jobs/731458451210876" ^--header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" ^--header "Content-Type: application/json" ^--data @create2.jsoncurl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/notebooks/861292323730779" ^--header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c"  ^--header "Content-Type: application/json" ^--data @create3.json
    ​
  6. 通过将作业作为服务主体运行来测试该作业。

使用服务主体运行作业的方式与使用用户身份运行作业的方式相同,无论是通过 UI、API 还是 CLI。 使用 Azure Databricks UI 测试作业:

  1. 转到 Azure Databricks UI 中的“作业”并选择作业。

  2. 单击 “立即运行”

如果一切正常运行,你将看到作业的状态为“成功”。 可在 UI 中选择作业以验证输出:

ETL过程测试

1、打开DataBricks工作区,创建notebook,输入下面的代码进行设置

val appID = "<appID>"
val secret = "<secret>"
val tenantID = "<tenant-id>"
​
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
---
​
val storageAccountName = "<storage-account-name>"
val appID = "<app-id>"
val secret = "<secret>"
val fileSystemName = "<file-system-name>"
val tenantID = "<tenant-id>"
​
spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
--
%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

下载数据

如图运行后:

将示例数据引入 Azure Data Lake Storage Gen2 帐户

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json
​
​
​
​
dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

从 Azure Data Lake Storage Gen2 帐户中提取数据

---获取数据
val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
df.show()
​
---获取数据列
val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
specificColumnsDf.show()
​
---更改列名
val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
renamedColumnsDF.show()
​
​

加载数据到Azure Synapse

由于需要临时存储,输入下面配置。

val blobStorage = "maxtestdatalake.blob.core.windows.net"
val blobContainer = "temp"
val blobAccessKey =  "I3db2DV2qMGj2IDgLwbp/d7a9esxAjTF3GxM0T2Tbv/ssirP9NntHTdzFCohSlmIxhEaiRcq5BXJC1lO5/aQiQ=="
​
​
---设置
val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
​
val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
​
---数据库配置
//Azure Synapse related settings
val dwDatabase = "maxsqlpools"
val dwServer = "maxsqlpools.database.windows.net"
val dwUser = "max"
val dwPass = ""
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
​
----
数据存放
spark.conf.set("spark.sql.parquet.writeLegacyFormat","true")
​
renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
​
​
​

连接数据库

下面是数据库数据信息,数据进入了数据库。

整个过程完成。

Databricks文档04----使用 Azure Databricks 提取、转换和加载数据相关推荐

  1. Databricks文档05----使用 Azure Databricks 连接SQL Server查询数据

    创建新的notebook 输入相关参数 jdbcHostname = "-----" jdbcDatabase = "iotdb" userName = 'sa ...

  2. Databricks文档01----Azure Databricks初探

    Azure Databricks 是一个已针对 Microsoft Azure 云服务平台进行优化的数据分析平台. Azure Databricks 提供了三种用于开发数据密集型应用程序的环境: Da ...

  3. docx文档怎么排列图片_PDF文档中的图片怎么提取出来?不得不说这两个方法太好用了...

    原标题:PDF文档中的图片怎么提取出来?不得不说这两个方法太好用了 目前PDF文档被大家广泛应用,主要是因为PDF文档在传输和转换的过程中比较稳定,所以PDF格式几乎是办公文件格式的首选.大家都知道P ...

  4. ChatGPT | Word文档如何更好地提取表格内容给ChatGPT

    本文来自http://blog.csdn.net/hellogv/ ,引用必须注明出处! Word文档如何更好地提取表格内容给ChatGPT做知识库,这属于文本预处理工作. 本文只讲思路.测试结果,技 ...

  5. 手机版PDF编辑器支持PDF转Word、文档内容编辑合并与提取

    PDF文档作为使用十分普遍的办公文档,其内容展示精美.设备兼容性好的特性十分受到用户们的喜爱.不过也有令人头疼的部分,修改PDF内容不易,需要专用的PDF编辑器,当然你也可以选择将PDF转成Word后 ...

  6. 结合GSM协议文档与Tems工具捕捉到的GSM手机数据,分析L3消息。以0418-8r0为例.

    结合GSM协议文档与Tems工具捕捉到的GSM手机数据,分析L3消息.以0418-8r0为例. Tems显示的一个L3的Paging request type 2 message detail: 在消 ...

  7. ​Excel如何转换成Word文档?教你如何实现转换

    怎么把excel表格转换成word文档呢?对文件的格式转换相信大家都经历过很多次了,很多时候都是我们在工作中的办公需求.最近有看到有很多小伙伴想了解怎么把excel表格转换成word文档,对于这个比较 ...

  8. 以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)

    导读:每个数据科学专业人员都必须从不同的数据源中提取.转换和加载(Extract-Transform-Load,ETL)数据. 本文将讨论如何使用Python为选定的流行数据库实现数据的ETL.对于关 ...

  9. 数据提取、转换和加载 - ETL工具

    ETL,Extraction-Transformation-Loading的缩写,中文名称为数据提取.转换和加载. ETL工具有:OWB(Oracle Warehouse Builder).ODI(O ...

最新文章

  1. findwindowex子窗口类型有哪几种_光学玻璃有哪几种类别?一文告诉你
  2. 机器学习7—AdaBoost学习笔记
  3. Python进阶: Decorator 装饰器你太美
  4. RabbitMQ原理讲解
  5. JPA –我应该成为懒惰的极端主义者吗?
  6. w ndows10电脑配置看哪里,Windows10怎么自动登录?Windows10自动登录的设置方法
  7. 华为2018春招笔试题目 字节流解析与长整数相乘
  8. 美女,真的有标准吗?
  9. 计算机组装与维护报告论文,计算机组装与维护实习报告范文
  10. [转][.NET 基于角色安全性验证] 之一:基础知识
  11. SecureCRT连接阿里云ECS服务器,经常掉线的解决方案
  12. 浮点数学运算是否被破坏?
  13. Unity内嵌浏览器插件(Android、iOS、Windows)
  14. 开启并定制 Apache 显示目录索引样式
  15. 明天全国哀悼日,小程序只需三行代码秒变黑白
  16. 上门洗车APP --- Android客户端开发 之 项目结构介绍
  17. SuperMap iDesktop 8C 空间分析
  18. XMind2020安装教程
  19. linux gnuplot 教程,图形绘制利器:Gnuplot
  20. 什么是 SRS 呢?在我们大部分的音频播放器里都内欠有这种音效。

热门文章

  1. android 类对象的存储,android - 以共享首选项存储和检索类对象
  2. Day03-JavaScript01
  3. 【BUG解决】 RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor)
  4. objective-c常见类型%z
  5. 魔兽世界怀旧服官网-衣米魔兽网站展示
  6. 数据结构课程设计---实现一元稀疏多项式计算器
  7. idea evaluation license has expired 解决办法
  8. 读东野圭吾《白夜行》有感
  9. saber与matlab联合仿真
  10. 非真,亦非假——20世纪数学悖论入侵机器学习