在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列有关CQRS的博客中按照实际应用的要求对akka的一些开发技术进行了介绍。CQRS模式着重操作流程控制,主要涉及交易数据的管理。那么,作为交易数据产生过程中发挥验证作用的一系列基础数据如用户信息、商品信息、支付类型信息等又应该怎样维护呢?首先基础数据也应该是在平台水平上的,但数据的采集、维护是在系统前端的,比如一些web界面。所以平台基础数据维护系统是一套前后台结合的系统。对于一个开放的平台系统来说,应该能够适应各式各样的前端系统。一般来讲,平台通过定义一套api与前端系统集成是通用的方法。这套api必须遵循行业标准,技术要普及通用,这样才能支持各种异类前端系统功能开发。在这些要求背景下,相对gRPC, GraphQL来说,REST风格的http集成模式能得到更多开发人员的接受。

在有关CQRS系列博客里,我以akka-http作为系统集成工具的一种,零星地针对实际需要对http通信进行了介绍。在restapi这个系列里我想系统化的用akka-http构建一套完整的,REST风格数据维护和数据交换api,除CRUD之外还包括网络安全,文件交换等功能。我的计划是用akka-http搭建一个平台数据维护api的REST-CRUD框架,包含所有标配功能如用户验证、异常处理等。CRUD部分要尽量做成通用的generic,框架型的,能用一套标准的方法对任何数据表进行操作。

akka-http是一套http程序开发工具。它的Routing-DSL及数据序列化marshalling等都功能强大。特别是HttpResponse处理,一句complete解决了一大堆问题,magnet-pattern结合marshalling让它的使用更加方便。

在这篇讨论里先搭一个restapi的基本框架,包括客户端身份验证和使用权限。主要是示范如何达到通用框架的目的。这个在akka-http编程里主要体现在Routing-DSL的结构上,要求Route能够简洁易懂,如下:

  val route =path("auth") {authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>post { complete(authenticator.issueJwt(userinfo))}}} ~pathPrefix("api") {authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>(path("hello") & get) {complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")} ~(path("how are you") & get) {complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")}// ~ ...
          }}

我觉着这应该是框架型正确的方向:把所有功能都放在api下,统统经过权限验证。可以直接在后面不断加功能Route。

身份验证和使用权限也应该是一套标准的东西,但身份验证方法可能有所不同,特别是用户身份验证可能是通过独立的身份验证服务器实现的,对不同的验证机制应该有针对性的定制函数。构建身份管理的对象应该很方便或者很通用,如下:

  val authenticator = new AuthBase().withAlgorithm(JwtAlgorithm.HS256).withSecretKey("OpenSesame").withUserFunc(getValidUser)

AuthBase源码如下:

package com.datatech.restapiimport akka.http.scaladsl.server.directives.Credentials
import pdi.jwt._
import org.json4s.native.Json
import org.json4s._
import org.json4s.jackson.JsonMethods._
import pdi.jwt.algorithms._
import scala.util._object AuthBase {type UserInfo = Map[String, Any]case class AuthBase(algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,secret: String = "OpenSesame",getUserInfo: Credentials => Option[UserInfo] = null) {ctx =>def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)def authenticateToken(credentials: Credentials): Option[String] =credentials match {case Credentials.Provided(token) =>algorithm match {case algo: JwtAsymmetricAlgorithm =>Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {case true => Some(token)case _ => None}case _ =>Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {case true => Some(token)case _ => None}}case _ => None}def getUserInfo(token: String): Option[UserInfo] = {algorithm match {case algo: JwtAsymmetricAlgorithm =>Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])case Failure(err) => None}case _ =>Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])case Failure(err) => None}}}def issueJwt(userinfo: UserInfo): String = {val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))Jwt.encode(claims, secret, algorithm)}}}

我已经把多个通用的函数封装在里面了。再模拟一个用户身份管理对象:

package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {case class User(username: String, password: String, userInfo: UserInfo)val validUsers = Seq(User("johnny", "p4ssw0rd",Map("shopid" -> "1101", "userid" -> "101")),User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102")))def getValidUser(credentials: Credentials): Option[UserInfo] =credentials match {case p @ Credentials.Provided(_) =>validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {case Some(user) => Some(user.userInfo)case _ => None}case _ => None}}

好了,服务端示范代码中可以直接构建或者调用这些标准的类型了:

package com.datatech.restapiimport akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._object RestApiServer extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherval authenticator = new AuthBase().withAlgorithm(JwtAlgorithm.HS256).withSecretKey("OpenSesame").withUserFunc(getValidUser)val route =path("auth") {authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>post { complete(authenticator.issueJwt(userinfo))}}} ~pathPrefix("api") {authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>(path("hello") & get) {complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")} ~(path("how are you") & get) {complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")}// ~ ...
          }}val (port, host) = (50081,"192.168.11.189")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

就是说后面的http功能可以直接插进这个框架,精力可以完全聚焦于具体每项功能的开发上了。

然后用下面的客户端测试代码:

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import akka.http.scaladsl.model._
import pdi.jwt._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util._
import scala.concurrent.duration._object RestApiClient  {type UserInfo = Map[String,Any]def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val materializer = ActorMaterializer()// needed for the future flatMap/onComplete in the endimplicit val executionContext = system.dispatcherval helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))val authRequest = HttpRequest(HttpMethods.POST,uri = "http://192.168.11.189:50081/auth",headers = List(authorization))val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)val respToken = for {resp <- futTokenjstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}} yield jstrval jstr =  Await.result[String](respToken,2 seconds)println(jstr)scala.io.StdIn.readLine()val parts = Jwt.decodeRawAll(jstr, "OpenSesame", Seq(JwtAlgorithm.HS256)) match {case Failure(exception) => println(s"Error: ${exception.getMessage}")case Success(value) =>println(((parse(value._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])}scala.io.StdIn.readLine()val authentication = headers.Authorization(OAuth2BearerToken(jstr))val apiRequest = HttpRequest(HttpMethods.GET,uri = "http://192.168.11.189:50081/api/hello",).addHeader(authentication)val futAuth: Future[HttpResponse] = Http().singleRequest(apiRequest)println(Await.result(futAuth,2 seconds))scala.io.StdIn.readLine()system.terminate()}}

build.sbt

name := "restapi"version := "0.1"scalaVersion := "2.12.8"libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-http"   % "10.1.8","com.typesafe.akka" %% "akka-stream" % "2.5.23","com.pauldijou" %% "jwt-core" % "3.0.1","de.heikoseeberger" %% "akka-http-json4s" % "1.22.0","org.json4s" %% "json4s-native" % "3.6.1","com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8","com.typesafe.scala-logging" %% "scala-logging" % "3.9.0","org.slf4j" % "slf4j-simple" % "1.7.25","org.json4s" %% "json4s-jackson" % "3.6.7","org.json4s" %% "json4s-ext" % "3.6.7"
)

转载于:https://www.cnblogs.com/tiger-xc/p/11169197.html

restapi(0)- 平台数据维护,写在前面相关推荐

  1. 大数据开发平台-数据同步服务

    什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据.根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输 ...

  2. Android 4.0 平台特性

    Android 4.0 平台特性 API等级:14  Android4.0 是一次重要的平台发布版,为用户和应用程序开发者增加了大量的新特性.在下面我们将讨论的所有新特性和API中,因为它将 Andr ...

  3. Android 学习 之 Android 4.0 平台

    [size=large]转转转,感谢饿哦额Android的各位大侠,在这里谢过,我这里做个备份,嘿嘿!! http://www.eoeandroid.com/thread-103300-1-1.htm ...

  4. 为什么学习大数据,大数据专家写给大数据分析学习者的10个理由

    因为大数据爆发,因此出现了大数据开发.大数据分析这两大主流的工作方向,目前这两个方向是很热门,不少人已经在开始转型往这两个方向发展,相较而言,转向大数据分析的人才更多一点,而同时也有不少人在观望中,这 ...

  5. 从0开始,手写MySQL事务

    说在前面:从0开始,手写MySQL的学习价值 尼恩曾经指导过的一个7年经验小伙,凭借精通Mysql, 搞定月薪40K. 从0开始,手写一个MySQL的学习价值在于: 可以深入地理解MySQL的内部机制 ...

  6. flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

    可能会有一些截图中会有错误提示,是因为本地的包一直包下载有问题,截完图已经下载好了. 创建包结构 创建一个基础信息类 所有输出到mysql数据库中的自定义MR任务的自定义key均需要实现该抽象类 代码 ...

  7. json 插入数据_Power BI数据回写SQL Server(2)——存储过程一步到位

    在上一讲: Power BI数据回写SQL Server(1)没有中间商赚差价 中, 我们讲过,利用循环的方式将PQ中得到的table表逐行导入SQL Server中,有的朋友怀疑这种方式会不会造成数 ...

  8. mysql c测试程序_Linux平台下从零开始写一个C语言访问MySQL的测试程序

    Linux 平台下从零开始写一个 C 语言访问 MySQL 的测试程序 2010-8-20 Hu Dennis Chengdu 前置条件: (1) Linux 已经安装好 mysql 数据库: (2) ...

  9. 阿里云物联网平台数据解析(python)

    阿里云物联网平台数据解析(python) DTU上传设备返回报文消息,通过数据解析后显示各功能数值,这里以智能电表DLT645规约为例进行解析 因为是做光伏的,所以对电表的需求比较多,之前查找了好多文 ...

最新文章

  1. opencv 安装_如何在 CentOS 8 上安装 OpenCV
  2. git命令行完全解读
  3. java+调用jacoco_java操作jacoco
  4. 前端学习(1367):什么是中间件
  5. 没有人能阻止程序员将电脑上的一切搬到网页上
  6. oracle dbms_crypto,Oracle的dbms_obfuscation_toolkit加密解密数据
  7. 卷积神经网络中的全连接层
  8. python的基本语法while true_Python正课15 —— 流程控制之while循环
  9. 【转】Javascript 的词法作用域、调用对象和闭包
  10. SPSS Modeler18.0数据挖掘软件教程(六):聚类分析-K-means
  11. 【Android驱动】屏和TP谁先休眠的问题
  12. SDH,OTN,IP,MPLS,ATM网络介绍
  13. java邮件中添加excel_基于javaMail的邮件发送--excel作为附件
  14. 福建农林大学计算机分数线,福建农林大学录取分数线2021是多少分(附历年录取分数线)...
  15. 大唐:我家阁楼通公主府(三)
  16. 机器学习模型 知乎_算法有没有价值观?知乎内容推荐算法解析
  17. 用c语言编写小狗图形,小狗的图片简笔画
  18. 安卓获取屏幕最大(绝对)分辨率
  19. 为什么说深度学习和机器学习截然不同?
  20. Java项目中利用Freemarker模板引擎导出--生成Word文档

热门文章

  1. 入驻商户卖隐形眼镜护理液 饿了么遭监管警告
  2. 功能测试人员技能提升路线图,试从第一个脚步到年薪50W...
  3. python日志模块----logging
  4. oracle恢复关系,SCN与Oracle数据库恢复的关系–补充
  5. 我的YUV播放器MFC小笔记:解析文件名称
  6. mysql群集配置_MySQL主主集群配置
  7. integer比较_Java中的整型包装类值的比较为什么不能用==比较?原因是因为缓存
  8. 【Flink】介绍Flink中状态一致性的保证
  9. 【Elasticsearch】es 7.8.0 java 实现 BulkRequest 批量写入数据
  10. 【SpringCloud】 failed to req API:/nacos/v1/ns/instance after all servers code:500 msg Read timed out