在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义:

object MongoSource {def apply(query: Observable[Document]): Source[Document, NotUsed] =Source.fromPublisher(ObservableToPublisher(query))}

实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

  case class MGOContext(dbName: String,collName: String,action: MGOCommands = null) {...}case class DocumentStream(filter: Option[Bson] = None,andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,) extends MGOCommands

Source的具体实现:

    def mongoStream(ctx: MGOContext)(implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {val db = client.getDatabase(ctx.dbName)val coll = db.getCollection(ctx.collName)ctx.action match {case DocumentStream(None, None) =>MongoSource(coll.find())case DocumentStream(Some(filter), None) =>MongoSource(coll.find(filter))case DocumentStream(None, Some(next)) =>MongoSource(next(coll.find()))case DocumentStream(Some(filter), Some(next)) =>MongoSource(next(coll.find(filter)))}}

下面是mongoStream的使用示范:

  val clusterSettings = ClusterSettings.builder().hosts(List(new ServerAddress("localhost:27017")).asJava).build()val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()implicit val client = MongoClient(clientSettings)implicit val system = ActorSystem()implicit val mat = ActorMaterializer()implicit val ec = system.dispatchercase class PO (ponum: String,podate: MGODate,vendor: String,remarks: Option[String],podtl: Option[MGOArray])def toPO(doc: Document): PO = {PO(ponum = doc.getString("ponum"),podate = doc.getDate("podate"),vendor = doc.getString("vendor"),remarks = mgoGetStringOrNone(doc,"remarks"),podtl = mgoGetArrayOrNone(doc,"podtl"))}case class PODTL(item: String,price: Double,qty: Int,packing: Option[String],payTerm: Option[String])def toPODTL(podtl: Document): PODTL = {PODTL(item = podtl.getString("item"),price = podtl.getDouble("price"),qty = podtl.getInteger("qty"),packing = mgoGetStringOrNone(podtl,"packing"),payTerm = mgoGetStringOrNone(podtl,"payterm"))}def showPO(po: PO) = {println(s"po number: ${po.ponum}")println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")println(s"vendor: ${po.vendor}")if (po.remarks != None)println(s"remarks: ${po.remarks.get}")po.podtl match {case Some(barr) =>mgoArrayToDocumentList(barr).map { dc => toPODTL(dc)}.foreach { doc: PODTL =>print(s"==>Item: ${doc.item} ")print(s"price: ${doc.price} ")print(s"qty: ${doc.qty} ")doc.packing.foreach(pk => print(s"packing: ${pk} "))doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))println("")}case _ =>}}import org.mongodb.scala.model.Projections._import MongoActionStream._import MGOEngine._import akka.stream.scaladsl.{Sink, Source}val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))val ctx = MGOContext("testdb","po").setCommand(DocumentStream(filter = None, andThen = Some(proj)))val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))

我们看到:使用了许多代码去进行类型转换。不过也没有什么太好的办法,已经是一次性的了。我们也可以通过akka的Flow[A,B]来以stream里的A元素为变量对MongoDB数据进行更新操作:

 object MongoActionStream {import MGOContext._case class StreamingInsert[A](dbName: String,collName: String,converter: A => Document,parallelism: Int = 1) extends MGOCommandscase class StreamingDelete[A](dbName: String,collName: String,toFilter: A => Bson,parallelism: Int = 1,justOne: Boolean = false) extends MGOCommandscase class StreamingUpdate[A](dbName: String,collName: String,toFilter: A => Bson,toUpdate: A => Bson,parallelism: Int = 1,justOne: Boolean = false) extends MGOCommandscase class InsertAction[A](ctx: StreamingInsert[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =Flow[A].map(ctx.converter).mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))}case class UpdateAction[A](ctx: StreamingUpdate[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =if (ctx.justOne) {Flow[A].mapAsync(ctx.parallelism)(a =>collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))} elseFlow[A].mapAsync(ctx.parallelism)(a =>collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))}case class DeleteAction[A](ctx: StreamingDelete[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =if (ctx.justOne) {Flow[A].mapAsync(ctx.parallelism)(a =>collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))} elseFlow[A].mapAsync(ctx.parallelism)(a =>collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))}}

下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

  import jdbcengine._import JDBCEngine._import scalikejdbc._case class DataRow (rowid: Long,measureid: Long,state: String,county: String,year: Int,value: Int)val toRow: WrappedResultSet => DataRow = rs => DataRow(rowid = rs.long("ROWID"),measureid = rs.long("MEASUREID"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),year = rs.int("REPORTYEAR"),value = rs.int("VALUE"))//construct the contextval h2ctx = JDBCQueryContext[DataRow](dbName = 'h2,statement = "select * from AQMRPT",extractor = toRow)//source from h2 databaseval jdbcSource = jdbcAkkaStream(h2ctx)//document converterdef rowToDoc: DataRow => Document = row => Document ("rowid" -> row.rowid,"measureid" ->  row.measureid,"state" ->  row.state,"county" ->  row.county,"year" ->  row.year,"value" ->  row.value)def docToRow: Document => DataRow = doc => DataRow (rowid = doc.getLong("rowid"),measureid = doc.getLong("measureid"),state = doc.getString("state"),county = doc.getString("county"),year = doc.getInteger("year"),value = doc.getInteger("value"))//setup contextval mgoctx = StreamingInsert("testdb","members",rowToDoc)val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)val sink = Sink.foreach[DataRow]{ r =>println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")}//config jdbc driversConfigDBsWithEnv("dev").setup('h2)ConfigDBsWithEnv("dev").loadGlobalSettings()val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()val mgoCtxShow = MGOContext("testdb","members").setCommand(DocumentStream(filter = None))mongoStream(mgoCtxShow).map(docToRow).to(sink).run()import com.datastax.driver.core._import cassandraengine._import CQLEngine._import org.mongodb.scala.model.Filters._//data row converterval cqlToDataRow = (rs: Row) => DataRow(rowid = rs.getLong("ROWID"),measureid = rs.getLong("MEASUREID"),state = rs.getString("STATENAME"),county = rs.getString("COUNTYNAME"),year = rs.getInt("REPORTYEAR"),value = rs.getInt("VALUE"))import org.bson.conversions._import org.mongodb.scala.model.Updates._//#init-sessionimplicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).build.connect()//setup contextval cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)//construct sourceval cqlSource = cassandraStream(cqlCtx)def toFilter: DataRow => Bson = row => {and(equal("rowid",row.rowid), lt("value",10))}def toUpdate: DataRow => Bson = row => {set("value" , row.value * 10)}val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run()import org.bson.conversions._import org.mongodb.scala.model.Filters._def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)val mgoCtxSrc = MGOContext("testdb","members").setCommand(DocumentStream(filter = None))mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run()import org.mongodb.scala.model.Sorts._val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))val mgoCtxShow = MGOContext("testdb","members").setCommand(DocumentStream(filter = None, andThen = Some(sortDsc)))mongoStream(mgoCtxShow).map(docToRow).to(sink).run()

下面是本次示范的全部源代码:

build.sbt

name := "learn-mongo"version := "0.1"scalaVersion := "2.12.4"libraryDependencies := Seq("com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0","com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0","org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1","com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17","com.typesafe.akka" %% "akka-actor" % "2.5.4","com.typesafe.akka" %% "akka-stream" % "2.5.4","com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16","org.scalikejdbc" %% "scalikejdbc"       % "3.2.1","org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test","org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1","org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1","com.h2database"  %  "h2"                % "1.4.196","mysql" % "mysql-connector-java" % "6.0.6","org.postgresql" % "postgresql" % "42.2.0","commons-dbcp" % "commons-dbcp" % "1.4","org.apache.tomcat" % "tomcat-jdbc" % "9.0.2","com.zaxxer" % "HikariCP" % "2.7.4","com.jolbox" % "bonecp" % "0.8.0.RELEASE","com.typesafe.slick" %% "slick" % "3.2.1","ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf

# JDBC settings
test {db {h2 {driver = "org.h2.Driver"url = "jdbc:h2:tcp://localhost/~/slickdemo"user = ""password = ""poolInitialSize = 5poolMaxSize = 7poolConnectionTimeoutMillis = 1000poolValidationQuery = "select 1 as one"poolFactoryName = "commons-dbcp2"}}db.mysql.driver = "com.mysql.cj.jdbc.Driver"db.mysql.url = "jdbc:mysql://localhost:3306/testdb"db.mysql.user = "root"db.mysql.password = "123"db.mysql.poolInitialSize = 5db.mysql.poolMaxSize = 7db.mysql.poolConnectionTimeoutMillis = 1000db.mysql.poolValidationQuery = "select 1 as one"db.mysql.poolFactoryName = "bonecp"# scallikejdbc Global settingsscalikejdbc.global.loggingSQLAndTime.enabled = truescalikejdbc.global.loggingSQLAndTime.logLevel = infoscalikejdbc.global.loggingSQLAndTime.warningEnabled = truescalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warnscalikejdbc.global.loggingSQLAndTime.singleLineMode = falsescalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = falsescalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {db {h2 {driver = "org.h2.Driver"url = "jdbc:h2:tcp://localhost/~/slickdemo"user = ""password = ""poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = true}mysql {driver = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://localhost:3306/testdb"user = "root"password = "123"poolInitialSize = 5poolMaxSize = 7poolConnectionTimeoutMillis = 1000poolValidationQuery = "select 1 as one"poolFactoryName = "bonecp"}postgres {driver = "org.postgresql.Driver"url = "jdbc:postgresql://localhost:5432/testdb"user = "root"password = "123"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = true}}# scallikejdbc Global settingsscalikejdbc.global.loggingSQLAndTime.enabled = truescalikejdbc.global.loggingSQLAndTime.logLevel = infoscalikejdbc.global.loggingSQLAndTime.warningEnabled = truescalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warnscalikejdbc.global.loggingSQLAndTime.singleLineMode = falsescalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = falsescalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}

JDBCEngine.scala

package jdbcengine
import java.sql.PreparedStatementimport scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import scala.util._
import java.time._
import scala.concurrent.duration._
import filestreaming.FileStreaming._import scalikejdbc.TxBoundary.Try._import scala.concurrent.ExecutionContextExecutor
import java.io.InputStreamobject JDBCContext {type SQLTYPE = Intval SQL_EXEDDL= 1val SQL_UPDATE = 2val RETURN_GENERATED_KEYVALUE = trueval RETURN_UPDATED_COUNT = false}case class JDBCQueryContext[M](dbName: Symbol,statement: String,parameters: Seq[Any] = Nil,fetchSize: Int = 100,autoCommit: Boolean = false,queryTimeout: Option[Int] = None,extractor: WrappedResultSet => M)case class JDBCContext(dbName: Symbol,statements: Seq[String] = Nil,parameters: Seq[Seq[Any]] = Nil,fetchSize: Int = 100,queryTimeout: Option[Int] = None,queryTags: Seq[String] = Nil,sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,batch: Boolean = false,returnGeneratedKey: Seq[Option[Any]] = Nil,// no return: None, return by index: Some(1), by name: Some("id")preAction: Option[PreparedStatement => Unit] = None,postAction: Option[PreparedStatement => Unit] = None) {ctx =>//helper functions
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE &&!ctx.batch && ctx.statements.size == 1)ctx.copy(preAction = action)elsethrow new IllegalStateException("JDBCContex setting error: preAction not supported!")}def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE &&!ctx.batch && ctx.statements.size == 1)ctx.copy(postAction = action)elsethrow new IllegalStateException("JDBCContex setting error: preAction not supported!")}def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {ctx.copy(statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(Seq(_parameters)))} elsethrow new IllegalStateException("JDBCContex setting error: option not supported!")}def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {ctx.copy(statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(_parameters),returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)))} elsethrow new IllegalStateException("JDBCContex setting error: option not supported!")}def appendBatchParameters(_parameters: Any*): JDBCContext = {if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")var matchParams = trueif (ctx.parameters != Nil)if (ctx.parameters.head.size != _parameters.size)matchParams = falseif (matchParams) {ctx.copy(parameters = ctx.parameters ++ Seq(_parameters))} elsethrow new IllegalStateException("JDBCContex setting error: batch command parameters not match!")}def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")ctx.copy(returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil)}def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters),sqlType = JDBCContext.SQL_EXEDDL,batch = false)}def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters),returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),sqlType = JDBCContext.SQL_UPDATE,batch = false)}def setBatchCommand(_statement: String): JDBCContext = {ctx.copy (statements = Seq(_statement),sqlType = JDBCContext.SQL_UPDATE,batch = true)}type JDBCDate = LocalDatetype JDBCDateTime = LocalDateTimedef jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)def jdbcSetNow = LocalDateTime.now()type JDBCBlob = InputStreamdef fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer) = FileToInputStream(fileName,timeOut)def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(implicit mat: Materializer) =  InputStreamToFile(blob,fileName)}object JDBCEngine {import JDBCContext._private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>throw new IllegalStateException(message)}def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))ctx.queryTimeout.foreach(rawSql.queryTimeout(_))val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)sql.iterator.withDBSessionForceAdjuster(session => {session.connection.setAutoCommit(ctx.autoCommit)session.fetchSize(ctx.fetchSize)})}Source.fromPublisher[A](publisher)}def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A])(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))ctx.queryTimeout.foreach(rawSql.queryTimeout(_))rawSql.fetchSize(ctx.fetchSize)implicit val session = NamedAutoSession(ctx.dbName)val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)sql.collection.apply[C]()}def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {if (ctx.sqlType != SQL_EXEDDL) {Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))}else {NamedDB(ctx.dbName) localTx { implicit session =>Try {ctx.statements.foreach { stm =>val ddl = new SQLExecution(statement = stm, parameters = Nil)(before = WrappedResultSet => {})(after = WrappedResultSet => {})ddl.apply()}"SQL_EXEDDL executed succesfully."}}}}def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {if (ctx.statements == Nil)throw new IllegalStateException("JDBCContex setting error: statements empty!")if (ctx.sqlType != SQL_UPDATE) {Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))}else {if (ctx.batch) {if (noReturnKey(ctx)) {val usql = SQL(ctx.statements.head).tags(ctx.queryTags: _*).batch(ctx.parameters: _*)Try {NamedDB(ctx.dbName) localTx { implicit session =>ctx.queryTimeout.foreach(session.queryTimeout(_))usql.apply[Seq]()Seq.empty[Long].to[C]}}} else {val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)Try {NamedDB(ctx.dbName) localTx { implicit session =>ctx.queryTimeout.foreach(session.queryTimeout(_))usql.apply[C]()}}}} else {Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))}}}private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {val Some(key) :: xs = ctx.returnGeneratedKeyval params: Seq[Any] = ctx.parameters match {case Nil => Nilcase p@_ => p.head}val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)Try {NamedDB(ctx.dbName) localTx { implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val result = usql.apply()Seq(result).to[C]}}}private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {val params: Seq[Any] = ctx.parameters match {case Nil => Nilcase p@_ => p.head}val before = ctx.preAction match {case None => pstm: PreparedStatement => {}case Some(f) => f}val after = ctx.postAction match {case None => pstm: PreparedStatement => {}case Some(f) => f}val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)Try {NamedDB(ctx.dbName) localTx {implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val result = usql.apply()Seq(result.toLong).to[C]}}}private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {if (noReturnKey(ctx))singleTxUpdateNoReturnKey(ctx)elsesingleTxUpdateWithReturnKey(ctx)}private def noReturnKey(ctx: JDBCContext): Boolean = {if (ctx.returnGeneratedKey != Nil) {val k :: xs = ctx.returnGeneratedKeyk match {case None => truecase Some(k) => false}} else true}def noActon: PreparedStatement=>Unit = pstm => {}def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {Try {NamedDB(ctx.dbName) localTx { implicit session =>session.fetchSize(ctx.fetchSize)ctx.queryTimeout.foreach(session.queryTimeout(_))val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {case Nil => Seq.fill(ctx.statements.size)(None)case k@_ => k}val sqlcmd = ctx.statements zip ctx.parameters zip keysval results = sqlcmd.map { case ((stm, param), key) =>key match {case None =>new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLongcase Some(k) =>new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong}}results.to[C]}}}def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {if (ctx.statements == Nil)throw new IllegalStateException("JDBCContex setting error: statements empty!")if (ctx.sqlType != SQL_UPDATE) {Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))}else {if (!ctx.batch) {if (ctx.statements.size == 1)singleTxUpdate(ctx)elsemultiTxUpdates(ctx)} elseFailure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))}}case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,statement: String, prepareParams: R => Seq[Any]) {jas =>def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)private def perform(r: R) = {import scala.concurrent._val params = prepareParams(r)NamedDB(dbName) autoCommit { session =>session.execute(statement,params: _*)}Future.successful(r)}def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =if (processInOrder)Flow[R].mapAsync(parallelism)(perform)elseFlow[R].mapAsyncUnordered(parallelism)(perform)}object JDBCActionStream {def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)}}

CassandraEngine.scala

package cassandraengine
import com.datastax.driver.core._import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._
import filestreaming.FileStreaming._object CQLContext {// Consistency Levelstype CONSISTENCY_LEVEL = Intval ANY: CONSISTENCY_LEVEL          =                                        0x0000val ONE: CONSISTENCY_LEVEL          =                                        0x0001val TWO: CONSISTENCY_LEVEL          =                                        0x0002val THREE: CONSISTENCY_LEVEL        =                                        0x0003val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004val ALL: CONSISTENCY_LEVEL          =                                        0x0005val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000Aval LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000Bval SERIAL: CONSISTENCY_LEVEL       =                                      0x000Cdef apply(): CQLContext = CQLContext(statements = Nil)def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {consistency match {case ALL => ConsistencyLevel.ALLcase ONE => ConsistencyLevel.ONEcase TWO => ConsistencyLevel.TWOcase THREE => ConsistencyLevel.THREEcase ANY => ConsistencyLevel.ANYcase EACH_QUORUM => ConsistencyLevel.EACH_QUORUMcase LOCAL_ONE => ConsistencyLevel.LOCAL_ONEcase QUORUM => ConsistencyLevel.QUORUMcase SERIAL => ConsistencyLevel.SERIALcase LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL}}}
case class CQLQueryContext[M](statement: String,extractor: Row => M,parameter: Seq[Object] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,fetchSize: Int = 100) { ctx =>def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =ctx.copy(consistency = Some(_consistency))def setFetchSize(pageSize: Int): CQLQueryContext[M] =ctx.copy(fetchSize = pageSize)
}
object CQLQueryContext {def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =new CQLQueryContext[M](statement = stmt, extractor = converter)
}case class CQLContext(statements: Seq[String],parameters: Seq[Seq[Object]] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None) { ctx =>def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =ctx.copy(consistency = Some(_consistency))def setCommand(_statement: String, _parameters: Object*): CQLContext =ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))def appendCommand(_statement: String, _parameters: Object*): CQLContext =ctx.copy(statements = ctx.statements :+ _statement,parameters = ctx.parameters ++ Seq(_parameters))
}object CQLEngine {import CQLContext._import CQLHelpers._def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {val prepStmt = session.prepare(ctx.statement)var boundStmt =  prepStmt.bind()if (ctx.parameter != Nil) {val params = processParameters(ctx.parameter)boundStmt = prepStmt.bind(params:_*)}ctx.consistency.foreach {consistency =>boundStmt.setConsistencyLevel(consistencyLevel(consistency))}val resultSet = session.execute(boundStmt.setFetchSize(pageSize))(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])}def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =if (resultSet.isFullyFetched) {(resultSet, None)} else {try {val result = Await.result(resultSet.fetchMoreResults(), timeOut)(result, Some((result.asScala.view.map(extractor)).to[C]))} catch { case e: Throwable => (resultSet, None) }}def cqlExecute(ctx: CQLContext)(implicit session: Session, ec: ExecutionContext): Future[Boolean] = {if (ctx.statements.size == 1)cqlSingleUpdate(ctx)elsecqlMultiUpdate(ctx)}def cqlSingleUpdate(ctx: CQLContext)(implicit session: Session, ec: ExecutionContext): Future[Boolean] = {val prepStmt = session.prepare(ctx.statements.head)var boundStmt =  prepStmt.bind()if (ctx.parameters != Nil) {val params = processParameters(ctx.parameters.head)boundStmt = prepStmt.bind(params:_*)}ctx.consistency.foreach {consistency =>boundStmt.setConsistencyLevel(consistencyLevel(consistency))}session.executeAsync(boundStmt).map(_.wasApplied())}def cqlMultiUpdate(ctx: CQLContext)(implicit session: Session, ec: ExecutionContext): Future[Boolean] = {val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parametersvar batch = new BatchStatement()commands.foreach { case (stm, params) =>val prepStmt = session.prepare(stm)if (params == Nil)batch.add(prepStmt.bind())else {val p = processParameters(params)batch.add(prepStmt.bind(p: _*))}}ctx.consistency.foreach {consistency =>batch.setConsistencyLevel(consistencyLevel(consistency))}session.executeAsync(batch).map(_.wasApplied())}def cassandraStream[A](ctx: CQLQueryContext[A])(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {val prepStmt = session.prepare(ctx.statement)var boundStmt =  prepStmt.bind()val params = processParameters(ctx.parameter)boundStmt = prepStmt.bind(params:_*)ctx.consistency.foreach {consistency =>boundStmt.setConsistencyLevel(consistencyLevel(consistency))}CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)}case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,statement: String, prepareParams: R => Seq[Object],consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =cas.copy(consistency = Some(_consistency))private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {val prepStmt = session.prepare(statement)var boundStmt =  prepStmt.bind()val params = processParameters(prepareParams(r))boundStmt = prepStmt.bind(params:_*)consistency.foreach { cons =>boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))}session.executeAsync(boundStmt).map(_ => r)}def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =if (processInOrder)Flow[R].mapAsync(parallelism)(perform)elseFlow[R].mapAsyncUnordered(parallelism)(perform)}object CassandraActionStream {def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =new CassandraActionStream[R]( statement=_statement, prepareParams = params)}}
object CQLHelpers {import java.nio.ByteBufferimport com.datastax.driver.core.LocalDateimport java.time.Instantimport akka.stream._import scala.concurrent.duration._implicit def listenableFutureToFuture[T](listenableFuture: ListenableFuture[T]): Future[T] = {val promise = Promise[T]()Futures.addCallback(listenableFuture, new FutureCallback[T] {def onFailure(error: Throwable): Unit = {promise.failure(error)()}def onSuccess(result: T): Unit = {promise.success(result)()}})promise.future}type CQLBlob = ByteBuffercase class CQLDate(year: Int, month: Int, day: Int)case object CQLTodayDatecase class CQLDateTime(year: Int, Month: Int,day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)case object CQLDateTimeNowdef processParameters(params: Seq[Object]): Seq[Object] = {import java.time.{Clock,ZoneId}params.map { obj =>obj match {case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)case CQLTodayDate =>val today = java.time.LocalDate.now()LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")case p@_ => p}}}def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut)def cqlBlobToFile(blob: CQLBlob, fileName: String)(implicit mat: Materializer) =  ByteBufferToFile(blob,fileName)}

MongoEngine.scala

import java.text.SimpleDateFormatimport akka.NotUsed
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.mongodb.scala.MongoClient
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import org.mongodb.scala._
import org.mongodb.scala.model._
import java.util.Calendarimport scala.collection.JavaConverters._
import filestreaming.FileStreaming._
import akka.stream.Materializer
import org.mongodb.scala.bson.{BsonArray, BsonBinary}import scala.concurrent._
import scala.concurrent.duration._object MGOContext {trait MGOCommandsobject MGOCommands {case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommandscase class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands/*  org.mongodb.scala.FindObservableimport com.mongodb.async.client.FindIterableval resultDocType = FindIterable[Document]val resultOption = FindObservable(resultDocType).maxScan(...).limit(...).sort(...).project(...) */case class Find[M](filter: Option[Bson] = None,andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,converter: Option[Document => M] = None,firstOnly: Boolean = false) extends MGOCommandscase class DocumentStream(filter: Option[Bson] = None,andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,) extends MGOCommandscase class Aggregate(pipeLine: Seq[Bson]) extends MGOCommandscase class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommandscase class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommandscase class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommandscase class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommandscase class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommandscase class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands}object MGOAdmins {case class DropCollection(collName: String) extends MGOCommandscase class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommandscase class ListCollection(dbName: String) extends MGOCommandscase class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommandscase class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommandscase class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommandscase class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommandscase class DropAllIndexes(options: Option[Any] = None) extends MGOCommands}case class MGOContext(dbName: String,collName: String,action: MGOCommands = null) {ctx =>def setDbName(name: String): MGOContext = ctx.copy(dbName = name)def setCollName(name: String): MGOContext = ctx.copy(collName = name)def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)}object MGOContext {def apply(db: String, coll: String) = new MGOContext(db, coll)def apply(db: String, coll: String, command: MGOCommands) =new MGOContext(db, coll, command)}type MGODate = java.util.Datedef mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {val ca = Calendar.getInstance()ca.set(yyyy,mm,dd)ca.getTime()}def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {val ca = Calendar.getInstance()ca.set(yyyy,mm,dd,hr,min,sec)ca.getTime()}def mgoDateTimeNow: MGODate = {val ca = Calendar.getInstance()ca.getTime}def mgoDateToString(dt: MGODate, formatString: String): String = {val fmt= new SimpleDateFormat(formatString)fmt.format(dt)}type MGOBlob = BsonBinarytype MGOArray = BsonArraydef fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer) = FileToByteArray(fileName,timeOut)def mgoBlobToFile(blob: MGOBlob, fileName: String)(implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)def mgoGetStringOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getString(fieldName))else None}def mgoGetIntOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getInteger(fieldName))else None}def mgoGetLonggOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getLong(fieldName))else None}def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getDouble(fieldName))else None}def mgoGetBoolOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getBoolean(fieldName))else None}def mgoGetDateOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))Some(doc.getDate(fieldName))else None}def mgoGetBlobOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))doc.get(fieldName).asInstanceOf[Option[MGOBlob]]else None}def mgoGetArrayOrNone(doc: Document, fieldName: String) = {if (doc.keySet.contains(fieldName))doc.get(fieldName).asInstanceOf[Option[MGOArray]]else None}def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {(arr.getValues.asScala.toList).asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]}type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
}
object MGOEngine {import MGOContext._import MGOCommands._import MGOAdmins._def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {val db = client.getDatabase(ctx.dbName)val coll = db.getCollection(ctx.collName)ctx.action match {/* count */case Count(Some(filter), Some(opt)) =>coll.count(filter, opt.asInstanceOf[CountOptions]).toFuture().asInstanceOf[Future[T]]case Count(Some(filter), None) =>coll.count(filter).toFuture().asInstanceOf[Future[T]]case Count(None, None) =>coll.count().toFuture().asInstanceOf[Future[T]]/* distinct */case Distict(field, Some(filter)) =>coll.distinct(field, filter).toFuture().asInstanceOf[Future[T]]case Distict(field, None) =>coll.distinct((field)).toFuture().asInstanceOf[Future[T]]/* find */case Find(None, None, optConv, false) =>if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]case Find(None, None, optConv, true) =>if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]case Find(Some(filter), None, optConv, false) =>if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]case Find(Some(filter), None, optConv, true) =>if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]case Find(None, Some(next), optConv, _) =>if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]case Find(Some(filter), Some(next), optConv, _) =>if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]/* aggregate */case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]/* mapReduce */case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]/* insert */case Insert(docs, Some(opt)) =>if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture().asInstanceOf[Future[T]]else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture().asInstanceOf[Future[T]]case Insert(docs, None) =>if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]/* delete */case Delete(filter, None, onlyOne) =>if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]case Delete(filter, Some(opt), onlyOne) =>if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]/* replace */case Replace(filter, replacement, None) =>coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]case Replace(filter, replacement, Some(opt)) =>coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]/* update */case Update(filter, update, None, onlyOne) =>if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]case Update(filter, update, Some(opt), onlyOne) =>if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]/* bulkWrite */case BulkWrite(commands, None) =>coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]case BulkWrite(commands, Some(opt)) =>coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]/* drop collection */case DropCollection(collName) =>val coll = db.getCollection(collName)coll.drop().toFuture().asInstanceOf[Future[T]]/* create collection */case CreateCollection(collName, None) =>db.createCollection(collName).toFuture().asInstanceOf[Future[T]]case CreateCollection(collName, Some(opt)) =>db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]/* list collection */case ListCollection(dbName) =>client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]/* create view */case CreateView(viewName, viewOn, pline, None) =>db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]case CreateView(viewName, viewOn, pline, Some(opt)) =>db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]/* create index */case CreateIndex(key, None) =>coll.createIndex(key).toFuture().asInstanceOf[Future[T]]case CreateIndex(key, Some(opt)) =>coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]/* drop index */case DropIndexByName(indexName, None) =>coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]case DropIndexByName(indexName, Some(opt)) =>coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]case DropIndexByKey(key, None) =>coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]case DropIndexByKey(key, Some(opt)) =>coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]case DropAllIndexes(None) =>coll.dropIndexes().toFuture().asInstanceOf[Future[T]]case DropAllIndexes(Some(opt)) =>coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]}}def mongoStream(ctx: MGOContext)(implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {val db = client.getDatabase(ctx.dbName)val coll = db.getCollection(ctx.collName)ctx.action match {case DocumentStream(None, None) =>MongoSource(coll.find())case DocumentStream(Some(filter), None) =>MongoSource(coll.find(filter))case DocumentStream(None, Some(next)) =>MongoSource(next(coll.find()))case DocumentStream(Some(filter), Some(next)) =>MongoSource(next(coll.find(filter)))}}}object MongoActionStream {import MGOContext._case class StreamingInsert[A](dbName: String,collName: String,converter: A => Document,parallelism: Int = 1) extends MGOCommandscase class StreamingDelete[A](dbName: String,collName: String,toFilter: A => Bson,parallelism: Int = 1,justOne: Boolean = false) extends MGOCommandscase class StreamingUpdate[A](dbName: String,collName: String,toFilter: A => Bson,toUpdate: A => Bson,parallelism: Int = 1,justOne: Boolean = false) extends MGOCommandscase class InsertAction[A](ctx: StreamingInsert[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =Flow[A].map(ctx.converter).mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))}case class UpdateAction[A](ctx: StreamingUpdate[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =if (ctx.justOne) {Flow[A].mapAsync(ctx.parallelism)(a =>collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))} elseFlow[A].mapAsync(ctx.parallelism)(a =>collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))}case class DeleteAction[A](ctx: StreamingDelete[A])(implicit mongoClient: MongoClient) {val database = mongoClient.getDatabase(ctx.dbName)val collection = database.getCollection(ctx.collName)def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =if (ctx.justOne) {Flow[A].mapAsync(ctx.parallelism)(a =>collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))} elseFlow[A].mapAsync(ctx.parallelism)(a =>collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))}}

import org.mongodb.scala._
import scala.concurrent._
import scala.concurrent.duration._object MGOHelpers {implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {override val converter: (Document) => String = (doc) => doc.toJson}implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {override val converter: (C) => String = (doc) => doc.toString}trait ImplicitObservable[C] {val observable: Observable[C]val converter: (C) => Stringdef results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)def headResult() = Await.result(observable.head(), 10 seconds)def printResults(initial: String = ""): Unit = {if (initial.length > 0) print(initial)results().foreach(res => println(converter(res)))}def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")}def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {Await.result(fut,timeOut)}def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {Await.result(fut,timeOut)}}

FileStreaming.scala

package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Pathsimport akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters}import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._object FileStreaming {def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer):ByteBuffer = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}(Await.result(fut, timeOut)).toByteBuffer}def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer): Array[Byte] = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}(Await.result(fut, timeOut)).toArray}def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer): InputStream = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}val buf = (Await.result(fut, timeOut)).toArraynew ByteArrayInputStream(buf)}def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(implicit mat: Materializer) = {val ba = new Array[Byte](byteBuf.remaining())byteBuf.get(ba,0,ba.length)val baInput = new ByteArrayInputStream(ba)val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))source.runWith(FileIO.toPath(Paths.get(fileName)))}def ByteArrayToFile(bytes: Array[Byte], fileName: String)(implicit mat: Materializer) = {val bb = ByteBuffer.wrap(bytes)val baInput = new ByteArrayInputStream(bytes)val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))source.runWith(FileIO.toPath(Paths.get(fileName)))}def InputStreamToFile(is: InputStream, fileName: String)(implicit mat: Materializer) = {val source = StreamConverters.fromInputStream(() => is)source.runWith(FileIO.toPath(Paths.get(fileName)))}}

FileStreaming.scala

package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Pathsimport akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters}import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._object FileStreaming {def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer):ByteBuffer = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}(Await.result(fut, timeOut)).toByteBuffer}def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer): Array[Byte] = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}(Await.result(fut, timeOut)).toArray}def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(implicit mat: Materializer): InputStream = {val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>hd ++ bs}val buf = (Await.result(fut, timeOut)).toArraynew ByteArrayInputStream(buf)}def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(implicit mat: Materializer) = {val ba = new Array[Byte](byteBuf.remaining())byteBuf.get(ba,0,ba.length)val baInput = new ByteArrayInputStream(ba)val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))source.runWith(FileIO.toPath(Paths.get(fileName)))}def ByteArrayToFile(bytes: Array[Byte], fileName: String)(implicit mat: Materializer) = {val bb = ByteBuffer.wrap(bytes)val baInput = new ByteArrayInputStream(bytes)val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))source.runWith(FileIO.toPath(Paths.get(fileName)))}def InputStreamToFile(is: InputStream, fileName: String)(implicit mat: Materializer) = {val source = StreamConverters.fromInputStream(() => is)source.runWith(FileIO.toPath(Paths.get(fileName)))}}

HikariCPool.scala

package jdbcengine
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {import scala.collection.JavaConverters._def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else defaultdef getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else defaultdef getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else defaultdef getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else defaultdef getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else defaultdef getDurationOr(path: String, default: => Duration = Duration.Zero) =if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else defaultdef getPropertiesOr(path: String, default: => Properties = null): Properties =if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else defaultdef toProperties: Properties = {def toProps(m: mutable.Map[String, ConfigValue]): Properties = {val props = new Properties(null)m.foreach { case (k, cv) =>val v =if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)else if(cv.unwrapped eq null) nullelse cv.unwrapped.toStringif(v ne null) props.put(k, v)}props}toProps(c.root.asScala)}def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else Nonedef getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else Nonedef getStringOpt(path: String) = Option(getStringOr(path))def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}object ConfigExtensionMethods {@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}trait HikariConfigReader extends TypesafeConfigReader {self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>
import ConfigExtensionMethods.configExtensionMethodsdef getFactoryName(dbName: Symbol): String = {val c: Config = config.getConfig(envPrefix + "db." + dbName.name)c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)}def hikariCPConfig(dbName: Symbol): HikariConfig = {val hconf = new HikariConfig()val c: Config = config.getConfig(envPrefix + "db." + dbName.name)// Connection settingsif (c.hasPath("dataSourceClass")) {hconf.setDataSourceClassName(c.getString("dataSourceClass"))} else {Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)}hconf.setJdbcUrl(c.getStringOr("url", null))c.getStringOpt("user").foreach(hconf.setUsername)c.getStringOpt("password").foreach(hconf.setPassword)c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)// Pool configurationhconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)val numThreads = c.getIntOr("numThreads", 20)hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))hconf.setPoolName(c.getStringOr("poolName", dbName.name))hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))// Equivalent of ConnectionPreparerhconf.setReadOnly(c.getBooleanOr("readOnly", false))c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)hconf.setCatalog(c.getStringOr("catalog", null))hconf}
}import scalikejdbc._
trait ConfigDBs {self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {getFactoryName(dbName) match {case "hikaricp" => {val hconf = hikariCPConfig(dbName)val hikariCPSource = new HikariDataSource(hconf)if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {Class.forName(hconf.getDriverClassName)}ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))}case _ => {val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)val cpSettings = readConnectionPoolSettings(dbName)if (driver != null && driver.trim.nonEmpty) {Class.forName(driver)}ConnectionPool.add(dbName, url, user, password, cpSettings)}}}def setupAll(): Unit = {loadGlobalSettings()dbNames.foreach { dbName => setup(Symbol(dbName)) }}def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {ConnectionPool.close(dbName)}def closeAll(): Unit = {ConnectionPool.closeAll}}object ConfigDBs extends ConfigDBswith TypesafeConfigReaderwith StandardTypesafeConfigwith HikariConfigReadercase class ConfigDBsWithEnv(envValue: String) extends ConfigDBswith TypesafeConfigReaderwith StandardTypesafeConfigwith HikariConfigReaderwith EnvPrefix {override val env = Option(envValue)

MongoStreamDemo.scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._
import org.mongodb.scala.connection._
import scala.collection.JavaConverters._object MongoStream extends App {import MGOContext._import MGOEngine._import MGOCommands._import MGOHelpers._val clusterSettings = ClusterSettings.builder().hosts(List(new ServerAddress("localhost:27017")).asJava).build()val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()implicit val client = MongoClient(clientSettings)implicit val system = ActorSystem()implicit val mat = ActorMaterializer()implicit val ec = system.dispatchercase class PO (ponum: String,podate: MGODate,vendor: String,remarks: Option[String],podtl: Option[MGOArray])def toPO(doc: Document): PO = {PO(ponum = doc.getString("ponum"),podate = doc.getDate("podate"),vendor = doc.getString("vendor"),remarks = mgoGetStringOrNone(doc,"remarks"),podtl = mgoGetArrayOrNone(doc,"podtl"))}case class PODTL(item: String,price: Double,qty: Int,packing: Option[String],payTerm: Option[String])def toPODTL(podtl: Document): PODTL = {PODTL(item = podtl.getString("item"),price = podtl.getDouble("price"),qty = podtl.getInteger("qty"),packing = mgoGetStringOrNone(podtl,"packing"),payTerm = mgoGetStringOrNone(podtl,"payterm"))}def showPO(po: PO) = {println(s"po number: ${po.ponum}")println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")println(s"vendor: ${po.vendor}")if (po.remarks != None)println(s"remarks: ${po.remarks.get}")po.podtl match {case Some(barr) =>mgoArrayToDocumentList(barr).map { dc => toPODTL(dc)}.foreach { doc: PODTL =>print(s"==>Item: ${doc.item} ")print(s"price: ${doc.price} ")print(s"qty: ${doc.qty} ")doc.packing.foreach(pk => print(s"packing: ${pk} "))doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))println("")}case _ =>}}import org.mongodb.scala.model.Projections._import MongoActionStream._import MGOEngine._import akka.stream.scaladsl.{Sink, Source}val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))val ctx = MGOContext("testdb","po").setCommand(DocumentStream(filter = None, andThen = Some(proj)))val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))import jdbcengine._import JDBCEngine._import scalikejdbc._case class DataRow (rowid: Long,measureid: Long,state: String,county: String,year: Int,value: Int)val toRow: WrappedResultSet => DataRow = rs => DataRow(rowid = rs.long("ROWID"),measureid = rs.long("MEASUREID"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),year = rs.int("REPORTYEAR"),value = rs.int("VALUE"))//construct the contextval h2ctx = JDBCQueryContext[DataRow](dbName = 'h2,statement = "select * from AQMRPT",extractor = toRow)//source from h2 databaseval jdbcSource = jdbcAkkaStream(h2ctx)//document converterdef rowToDoc: DataRow => Document = row => Document ("rowid" -> row.rowid,"measureid" ->  row.measureid,"state" ->  row.state,"county" ->  row.county,"year" ->  row.year,"value" ->  row.value)def docToRow: Document => DataRow = doc => DataRow (rowid = doc.getLong("rowid"),measureid = doc.getLong("measureid"),state = doc.getString("state"),county = doc.getString("county"),year = doc.getInteger("year"),value = doc.getInteger("value"))//setup contextval mgoctx = StreamingInsert("testdb","members",rowToDoc)val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)val sink = Sink.foreach[DataRow]{ r =>println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")}//config jdbc driversConfigDBsWithEnv("dev").setup('h2)ConfigDBsWithEnv("dev").loadGlobalSettings()val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()val mgoCtxPrint = MGOContext("testdb","members").setCommand(DocumentStream(filter = None))mongoStream(mgoCtxPrint).map(docToRow).to(sink).run()import com.datastax.driver.core._import cassandraengine._import CQLEngine._import org.mongodb.scala.model.Filters._//data row converterval cqlToDataRow = (rs: Row) => DataRow(rowid = rs.getLong("ROWID"),measureid = rs.getLong("MEASUREID"),state = rs.getString("STATENAME"),county = rs.getString("COUNTYNAME"),year = rs.getInt("REPORTYEAR"),value = rs.getInt("VALUE"))import org.bson.conversions._import org.mongodb.scala.model.Updates._//#init-sessionimplicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).build.connect()//setup contextval cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)//construct sourceval cqlSource = cassandraStream(cqlCtx)def toFilter: DataRow => Bson = row => {and(equal("rowid",row.rowid), lt("value",10))}def toUpdate: DataRow => Bson = row => {set("value" , row.value * 10)}val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run()import org.bson.conversions._import org.mongodb.scala.model.Filters._def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)val mgoCtxSrc = MGOContext("testdb","members").setCommand(DocumentStream(filter = None))mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run()import org.mongodb.scala.model.Sorts._val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))val mgoCtxShow = MGOContext("testdb","members").setCommand(DocumentStream(filter = None, andThen = Some(sortDsc)))mongoStream(mgoCtxShow).map(docToRow).to(sink).run()scala.io.StdIn.readLine()system.terminate()}

转载于:https://www.cnblogs.com/tiger-xc/p/8581280.html

SDP(12): MongoDB-Engine - Streaming相关推荐

  1. 计算机系统篇之链接(12):Chapter 7 Linking 习题与解答

    计算机系统篇之链接(12):Chapter 7 Linking 习题与解答 Author:stormQ Monday, 15. July 2019 11:18AM 目录 习题 解答 习题 7.6 7. ...

  2. 深度学习PyTorch笔记(12):线性神经网络——softmax回归

    深度学习PyTorch笔记(12):线性神经网络--softmax回归 6 线性神经网络--softmax回归 6.1 softmax回归 6.1.1 概念 6.1.2 softmax运算 6.2 图 ...

  3. 重拾后端之Spring Boot(二):MongoDB的无缝集成

    重拾后端之Spring Boot(一):REST API的搭建可以这样简单 重拾后端之Spring Boot(二):MongoDb的无缝集成 重拾后端之Spring Boot(三):找回熟悉的Cont ...

  4. 嵌入式知识-ARM裸机-学习笔记(12):ADC模数转换器

    嵌入式知识-ARM裸机-学习笔记(12):ADC模数转换器 一.ADC 1. 什么是ADC ADC(analog digital converter):模数转换(也就是模拟量转换为数字量).由于CPU ...

  5. 蓝的成长记——追逐DBA(12):七天七收获的SQL

    原创作品,出自 "深蓝的blog" 博客,欢迎转载,转载时请务必注明出处,否则追究版权法律责任. 深蓝的blog:http://blog.csdn.net/huangyanlong ...

  6. 跟李宁老师学Python视频课程(12):Python常用模块-李宁-专题视频课程

    跟李宁老师学Python视频课程(12):Python常用模块-143人已学习 课程介绍         本系列课程一共20套,每一套视频课程会深入讲解Python的一类知识点.Python是当今炙手 ...

  7. 搭建高可用的MongoDB集群(一):MongoDB的配置与副本集

    传统的关系数据库具有不错的性能及稳定性,同时,久经历史考验,许多优秀的数据库沉淀了下来,比如MySQL.然而随着数据体积的爆发性增长,数据类型的增多,许多传统关系数据库扩展难的特点也爆发了出来,NoS ...

  8. 一步一步学Silverlight 2系列(12):数据与通信之WebClient

    版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://terrylee.blog.51cto.com/342737/67249 ...

  9. 机器人学回炉重造(1-2):各种典型机械臂的正运动学建模(标准D-H法)

    文章目录 写在前面 三连杆平面机械臂 平行四边形操作臂 闭链结构 例:平行四边形操作臂 球形臂 拟人臂 球腕 斯坦福机械臂 带球形手腕的拟人化机械臂 DLR机械臂 参考文献 写在前面 本文所有机械臂均 ...

最新文章

  1. 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️
  2. phpstorm failed to create jvm:error code -6 解决办法 解决方法
  3. 一次搞定OpenCV源码及扩展模块的编译与环境配置
  4. Eclipse创建struts.xml
  5. linux中mbr最大多少分区,Linux与磁盘分区介绍(MBR,GPT)
  6. solidworks无法获得下列许可standard_无法获取下列许可solidworks standard解决方法SW实战营...
  7. javafx 自定义控件_JavaFX自定义控件– Nest Thermostat第2部分
  8. 通过类或ID记录上一个被选中的标签或者数字
  9. 虚幻4 捏脸和换装系统实现
  10. 谢希仁编著《计算机网络》1-6章汇总
  11. 驱动人生:微软补丁如何修复CPU占用过高问题,Windows系统如何更新到win10
  12. 计算机网络-自顶向下方法笔记-滑动窗口协议
  13. ps人物换脸移花接木
  14. C++中如何获取一个文件夹下带有指定文件后缀名的文件绝对路径
  15. python源文件改写_Python源文件改写.编写一个程序,读取一个Python源程序,将文件中所有除保留字外的小写字母换成大写字母...
  16. 苹果犯了“围师必阙”大忌
  17. ML:模型训练/模型评估中常用的两种方法代码实现(留一法一次性切分训练和K折交叉验证训练)
  18. 每日新闻:未来,电脑像手机一样?
  19. 女王大学计算机科学,女王大学计算机科学专业本科课件.pdf
  20. 一些 Linux 诞生大事件

热门文章

  1. mongodb数据库扩展名_MongoDB权威指南
  2. 复联4里用到的方法论
  3. 域名变一定要购买云服务器才能有备案申请号吗?
  4. 《程序员代码面试指南》第五章 字符串问题 找到字符串的最长无重复字符子串...
  5. 用node.js给图片加水印
  6. 集群负载均衡之lvs和keepalived
  7. jenkins 持续集成, 使用sbt多项目同时package
  8. python(六)切片,生成式,生成器,迭代
  9. 我来做百科(第一天)
  10. 在linux中的文件中查找_如何在Linux中查找文件