restapi(4)- rest-mongo : MongoDB数据库前端的httpserver

本文涉及的产品
云数据库 MongoDB,通用型 2核4GB
简介: restapi(4)- rest-mongo : MongoDB数据库前端的httpserver 完成了一套标准的rest风格数据库CRUD操作httpserver后发现有许多不足。主要是为了追求“通用”两个字,想把所有服务接口做的更“范generic”些,结果反而限制了目标数据库的特点,最终产生了一套功能弱小的玩具。

restapi(4)- rest-mongo : MongoDB数据库前端的httpserver
完成了一套标准的rest风格数据库CRUD操作httpserver后发现有许多不足。主要是为了追求“通用”两个字,想把所有服务接口做的更“范generic”些,结果反而限制了目标数据库的特点,最终产生了一套功能弱小的玩具。比如说吧:标准rest风格getbyId需要所有的数据表都具备id这个字段,有点傻。然后get返回的结果集又没有什么灵活的控制方法如返回数量、字段、排序等。特别对MongoDB这样的在查询操作方面接近关系式数据库的分布式数据库:上篇提到过,它的query能力强大,条件组合灵活,如果不能在网络服务api中体现出来就太可惜了。所以,这篇博文会讨论一套专门针对MongoDB的rest-server。我想达到的目的是:后台数据库是MongoDB,通过httpserver提供对MongoDB的CRUD操作,客户端通过http调用CRUD服务。后台开发对每一个数据库表单使用统一的标准增添一套新的CRUD服务。希望如此能够提高开发效率,减少代码出错机会。

MongoDB是一种文件类型数据库,数据格式更加多样化。在这次示范里希望能把MongoDB有特点的数据类型以及它们的处理方法都介绍了,包括:日期类型,二进制类型blob(图片)等。顺便提一下:普通大型文本文件也可以用二进制blob方式存入MongoDB,因为文件在http传输过程中必须以byte方式进行,所以后台httpserver接收的文件格式是一串byte,不用任何格式转换就可以直接存入MongoDB blob字段。客户端从后台下载时就需要把bytes转换成UTF8字符就可以恢复文件内容了。

首先,我们先从Model开始,在scala里用case class来表示。Model是MongoDB Document的对应。在scala编程里我们是用case class 当作Document来操作的。我们设计的Model都会继承一个ModelBase trait:

复制代码
trait ModelBase[E] {
def to: E
}

case class Person(

               userid: String = "",
               name: String = "",
               age: Option[Int] = None,
               dob: Option[MGODate] = None,   //生日
               address: Option[String] = None
               ) extends ModelBase[Document] {
import org.mongodb.scala.bson._

override def to: Document = {
  var doc = Document(
  "userid" -> this.userid,
  "name" -> this.name)
  if (this.age != None)
    doc = doc + ("age" -> this.age.get)

  if (this.dob != None)
    doc = doc + ("dob" -> this.dob.get)

  if (this.address != None)
    doc = doc + ("address" -> this.address.getOrElse(""))

  doc
}

}
object Person {

val fromDocument: Document => Person = doc => {
  val keyset = doc.keySet
  Person(
    userid = doc.getString("userid"),
    name = doc.getString("name"),
    age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],

    dob =  {if (keyset.contains("dob"))
      Some(doc.getDate("dob"))
    else None },

    address =  mgoGetStringOrNone(doc,"address")
  )
}

}
复制代码
在上面例子里Person对应MongoDB里一个Document。除了注意对应类型属性与表字段类型外,还提供了to,fromDecument两个转换函数。其中to函数是继承ModelBase的,代表所有MongoDB Model都必须具备to这个函数。这点很重要,因为在从json构建成Model时,如果属于ModelBase则肯定可以调用一个to函数:

复制代码
class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
...

post {

    entity(as[String]) { json =>
      val extractedEntity: M = fromJson[M](json)
      val doc: Document = extractedEntity.to
      val futmsg = repository.insert(doc).value.value.runToFuture.map {
        eoc =>
          eoc match {
            case Right(oc) => oc match {
              case Some(c) => c.toString()
              case None => "insert may not complete!"
            }
            case Left(err) => err.getMessage
          }
      }

复制代码
注意这个extractedEntity:我们现在还不能确定它的具体类型,是Person,Animal,Machine? 但我们确定它是M类型,而M<:ModalBase[Document],所以M是MongoDB Model。可以调用extractedEntity.to获取一个Document。

仔细看,Person里并不包括blob类型字段。因为到现在我还没有想到办法在一个httprequest里把多个字段和图片一次性发出来,必须分两个request才能完成一个Document的上传。httpserver收到两个requests后还要进行requests的匹配对应管理,十分的复杂。所以含blob类型的Document只能把blob分拆到另一个Document里,然后用这个Document唯一一个id字段来链接:

复制代码
case class Photo (

                 id: String,
                 photo: Option[MGOBlob]
               ) extends ModelBase[Document] {
override def to: Document = {
  var doc = Document("id" -> this.id)
  if (photo != None)
    doc = doc + ("photo" -> this.photo)
  doc
}

}

object Photo {

def fromDocument: Document => Photo = doc => {
  val keyset = doc.keySet
  Photo(
    id = doc.getString("id"),
    photo = mgoGetBlobOrNone(doc, "photo")
  )
}

}
复制代码
从另一个角度来讲,把blob和正常字段分开来存储也有一定的优势,最多也就是需要两次query罢了。

第二部分是repository:数据库操作函数:

复制代码
class MongoRepoR(implicit client: MongoClient) {

def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
  var res = Seq[ResultOptions]()
  next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
  sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
  fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
  top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}

  val ctxFind = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
    .setCommand(Find(andThen = res))
  mgoQuery[Seq[R]](ctxFind,converter)
}

 def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
   var res = Seq[ResultOptions]()
   next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
   sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
   fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
   top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
   val ctxFind = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
     .setCommand(Find(filter = Some(filtr),andThen = res))
   mgoQuery[Seq[R]](ctxFind,converter)
}
def getOneDocument(filtr: Bson): DBOResult[Document] = {
      val ctxFind = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
     .setCommand(Find(filter = Some(filtr),firstOnly = true))
   mgoQuery[Document](ctxFind,converter)
}

def insert(doc: Document): DBOResult[Completed] = {
  val ctxInsert = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Insert(Seq(doc)))
  mgoUpdate[Completed](ctxInsert)
}

def delete(filter: Bson): DBOResult[DeleteResult] = {
  val ctxDelete = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Delete(filter))
  mgoUpdate[DeleteResult](ctxDelete)
}

def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
  val ctxUpdate = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Update(filter,update,None,!many))
  mgoUpdate[UpdateResult](ctxUpdate)
}

def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
   val ctxUpdate = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
     .setCommand(Replace(filter,row))
   mgoUpdate[UpdateResult](ctxUpdate)
}

}
复制代码
这部分上篇博文讨论过。最后是akka-http的核心部分:Route。MongoDB CRUD服务对外的api:

复制代码

  (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
    (filter,fields,sort,top,next) => {
    dbor = {
      filter match {
        case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
        case None => repository.getAll(next,sort,fields,top)
      }
    }
    val futRows = dbor.value.value.runToFuture.map {
      eolr =>
        eolr match {
          case Right(olr) => olr match {
            case Some(lr) => lr
            case None => Seq[M]()
          }
          case Left(_) => Seq[M]()
        }
    }
    complete(futureToJson(futRows))
   }
  } ~ post {
    entity(as[String]) { json =>
      val extractedEntity: M = fromJson[M](json)
      val doc: Document = extractedEntity.to
      val futmsg = repository.insert(doc).value.value.runToFuture.map {
        eoc =>
          eoc match {
            case Right(oc) => oc match {
              case Some(c) => c.toString()
              case None => "insert may not complete!"
            }
            case Left(err) => err.getMessage
          }
      }

      complete(futmsg)
    }
  } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
    val bson = Document(filter)
    if (set == None) {
      entity(as[String]) { json =>
        val extractedEntity: M = fromJson[M](json)
        val doc: Document = extractedEntity.to
        val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
          eoc =>
            eoc match {
              case Right(oc) => oc match {
                case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                case None => "update may not complete!"
              }
              case Left(err) => err.getMessage
            }
        }
        complete(futureToJson(futmsg))
      }
    } else {
      set match {
        case Some(u) =>
          val ubson = Document(u)
          dbou = repository.update(bson, ubson, many.getOrElse(true))
        case None =>
          dbou = Left(new IllegalArgumentException("missing set statement for update!"))
      }
      val futmsg = dbou.value.value.runToFuture.map {
        eoc =>
          eoc match {
            case Right(oc) => oc match {
              case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
              case None => "update may not complete!"
            }
            case Left(err) => err.getMessage
          }
      }
      complete(futureToJson(futmsg))
    }
  } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>
    val bson = Document(filter)
    val futmsg = repository.delete(bson).value.value.runToFuture.map {
      eoc =>
        eoc match {
          case Right(oc) => oc match {
            case Some(d) => s"${d.getDeletedCount} rows deleted."
            case None => "delete may not complete!"
          }
          case Left(err) => err.getMessage
        }
    }
    complete(futureToJson(futmsg))
  }
}

复制代码
与上篇最大的区别就是这次的Route支持MongoDB特性的query string,bson类型的参数。如:

http://192.168.0.189:50081/private/crud/person
http://192.168.0.189:50081/private/crud/person?filter={"userid":"c001"}
http://192.168.0.189:50081/private/crud/person?sort={"userid":-1}
http://192.168.0.189:50081/private/crud/person?filter={"userid":{$gt:"c000"}}&sort={"userid":-1}&top=3
可惜的是bson表达式中有些字符是url禁止的,所以必须预先处理一下。可以用公网的UrlEncoder在线转换:

https://www.url-encoder.com {"userid":"c001"} -> %7B%22userid%22%3A%22c001%22%7D

在程序里可以用软件工具:"com.github.tasubo" % "jurl-tools" % "0.6" URLEncode.encode(xyz)

复制代码
val sort =

  """
    |{userid:-1}
  """.stripMargin

val getAllRequest = HttpRequest(
  HttpMethods.GET,
  uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
).addHeader(authentication)

复制代码
blob服务的api Route:

复制代码

  pathPrefix("blob") {
    (get & path(Remaining)) { id =>
      val filtr = equal("id", id)
      val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
        eodoc =>
          eodoc match {
            case Right(odoc) => odoc match {
              case Some(doc) =>
                if (doc == null) None
                else mgoGetBlobOrNone(doc, "photo")
              case None => None
            }
            case Left(_) => None
          }
      }
      onComplete(futOptPic) {
        case Success(optBlob) => optBlob match {
          case Some(blob) =>
            withoutSizeLimit {
              encodeResponseWith(Gzip) {
                complete(
                  HttpEntity(
                    ContentTypes.`application/octet-stream`,
                    ByteArrayToSource(blob.getData))
                )
              }
            }
          case None => complete(StatusCodes.NotFound)
        }
        case Failure(err) => complete(err)
      }
    } ~
    (post &  parameter('id)) { id =>
      withoutSizeLimit {
        decodeRequest {
          extractDataBytes { bytes =>
            val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
              hd ++ bs
            }
            onComplete(fut) {
              case Success(b) =>
                val doc = Document("id" -> id, "photo" -> b.toArray)
                val futmsg = repository.insert(doc).value.value.runToFuture.map {
                  eoc =>
                    eoc match {
                      case Right(oc) => oc match {
                        case Some(c) => c.toString()
                        case None => "insert may not complete!"
                      }
                      case Left(err) => err.getMessage
                    }
                }
                complete(futmsg)
              case Failure(err) => complete(err)
            }
          }
        }
      }
    }
  } 

复制代码
注意:MongoRoute[M]是个范类型。我希望对任何Model的Route只需要指定M即可,如:

复制代码
implicit val personDao = new MongoRepoPerson)
implicit val picDao = new MongoRepoPhoto

...

pathPrefix("public") {
    (pathPrefix("crud")) {
      new MongoRoute[Person]("person")(personDao)
        .route ~
        new MongoRoute[Photo]("photo")(picDao)
          .route
    }
  }

复制代码
是否省力多了?但是,回到原来问题:blob类型在整个移动过程中都不需要进行格式转换。所以id字段名称是指定的,这点在设计表结构时要注意。

如何测试一个httpserver还是比较头痛的。用浏览器只能测试GET,其它POST,PUT,DELETE应该怎么测试?其实可以用curl:

curl -i -X GET http://rest-api.io/items
curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584
curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584
curl -i -X POST -H 'Content-Type: application/json' -d '{"name": "New item", "year": "2009"}' http://rest-api.io/items
curl -i -X PUT -H 'Content-Type: application/json' -d '{"name": "Updated item", "year": "2010"}' http://rest-api.io/items/5069b47aa892630aae059584

下面写两个客户端分别测试crud和blob:

TestCrudClient.scala

复制代码
import akka.actor._
import akka.http.scaladsl.model.headers._

import scala.concurrent._
import scala.concurrent.duration._
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import com.github.tasubo.jurl.URLEncode
import com.datatech.rest.mongo.MongoModels.Person
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
import com.datatech.sdp.mongo.engine.MGOClasses._

trait JsonCodec extends Json4sSupport {
import org.json4s.DefaultFormats
import org.json4s.ext.JodaTimeSerializers
implicit val serilizer = jackson.Serialization
implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object TestCrudClient {

type UserInfo = Map[String,Any]
def main(args: Array[String]): Unit = {

import JsConverters._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
val authRequest = HttpRequest(
  HttpMethods.POST,
  uri = "http://192.168.0.189:50081/auth",
  headers = List(authorization)
)

val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

val respToken = for {
  resp <- futToken
  jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
} yield jstr

val jstr =  Await.result[String](respToken,2 seconds)
println(jstr)
scala.io.StdIn.readLine()

val authentication = headers.Authorization(OAuth2BearerToken(jstr))

val sort =
  """
    |{userid:-1}
  """.stripMargin

val getAllRequest = HttpRequest(
  HttpMethods.GET,
  uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
).addHeader(authentication)
val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest)
println(Await.result(futGetAll,2 seconds))
scala.io.StdIn.readLine()

var bf =
  """
    |{"userid":"c888"}
  """.stripMargin

println(URLEncode.encode(bf))

val delRequest = HttpRequest(
  HttpMethods.DELETE,
  uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
).addHeader(authentication)
val futDel: Future[HttpResponse] = Http().singleRequest(delRequest)
println(Await.result(futDel,2 seconds))
scala.io.StdIn.readLine()

 bf =
  """
    |{"userid":"c001"}
  """.stripMargin

val getRequest = HttpRequest(
  HttpMethods.GET,
  uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf),
).addHeader(authentication)
val futGet: Future[HttpResponse] = Http().singleRequest(getRequest)
println(Await.result(futGet,2 seconds))
scala.io.StdIn.readLine()

val tiger = Person("c001","tiger chan",Some(56))
val john = Person("c002", "johnny dep", Some(60))
val peter = Person("c003", "pete brad", Some(58))
val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )
val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )

val saveRequest = HttpRequest(
  HttpMethods.POST,
  uri = "http://192.168.0.189:50081/public/crud/person"
).addHeader(authentication)
val futPost: Future[HttpResponse] =
  for {
    reqEntity <- Marshal(peter).to[RequestEntity]
    response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))
  } yield response

println(Await.result(futPost,2 seconds))
scala.io.StdIn.readLine()

var set =
  """
    | {$set:
    |   {
    |    name:"tiger the king",
    |    age:18
    |   }
    | }
  """.stripMargin

val updateRequest = HttpRequest(
  HttpMethods.PUT,
  uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(
       bf)+"&set="+URLEncode.encode(set)+"&many=true"
).addHeader(authentication)

val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest)
println(Await.result(futUpdate,2 seconds))
scala.io.StdIn.readLine()

val repRequest = HttpRequest(
  HttpMethods.PUT,
  uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
).addHeader(authentication)
val futReplace: Future[HttpResponse] =
  for {
    reqEntity <- Marshal(susan).to[RequestEntity]
    response <- Http().singleRequest(updateRequest.copy(entity=reqEntity))
  } yield response

println(Await.result(futReplace,2 seconds))
scala.io.StdIn.readLine()

system.terminate()

}

}
复制代码
TestFileClient.scala

复制代码
import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.rest.mongo.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._

case class FileUtil(implicit sys: ActorSystem) {
import sys.dispatcher
implicit val mat = ActorMaterializer()
def createEntity(file: File): RequestEntity = {

require(file.exists())
val formData =
  Multipart.FormData(
    Source.single(
      Multipart.FormData.BodyPart(
        "test",
        HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
        Map("filename" -> file.getName))))
Await.result(Marshal(formData).to[RequestEntity], 3 seconds)

}

def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {

implicit val mat = ActorMaterializer()
import sys.dispatcher
val futResp = Http(sys).singleRequest(
  //   Gzip.encodeMessage(
  request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
  //   )
)
futResp
  .andThen {
    case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
      entity.dataBytes.map(_.utf8String).runForeach(println)
    case Success(r@HttpResponse(code, _, _, _)) =>
      println(s"Upload request failed, response code: $code")
      r.discardEntityBytes()
    case Success(_) => println("Unable to Upload file!")
    case Failure(err) => println(s"Upload failed: ${err.getMessage}")
  }

}

def downloadFileTo(request: HttpRequest, destPath: String) = {

//  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
futResp
  .andThen {
    case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
      entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
        .onComplete { case _ => println(s"Download file saved to: $destPath") }
    case Success(r@HttpResponse(code, _, _, _)) =>
      println(s"Download request failed, response code: $code")
      r.discardEntityBytes()
    case Success(_) => println("Unable to download file!")
    case Failure(err) => println(s"Download failed: ${err.getMessage}")
  }

}

}

object TestFileClient {
type UserInfo = Map[String,Any]
def main(args: Array[String]): Unit = {

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/")

val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
val authRequest = HttpRequest(
  HttpMethods.POST,
  uri = "http://192.168.0.189:50081/auth",
  headers = List(authorization)
)

val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

val respToken = for {
  resp <- futToken
  jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
} yield jstr

val jstr =  Await.result[String](respToken,2 seconds)
println(jstr)

scala.io.StdIn.readLine()

val authentication = headers.Authorization(OAuth2BearerToken(jstr))

val entity = HttpEntity(
  ContentTypes.`application/octet-stream`,
  fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
)
//
val chunked = HttpEntity.Chunked.fromData(
  ContentTypes.`application/octet-stream`,
  fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
)

val uploadRequest = HttpRequest(
  HttpMethods.POST,

// uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg",

  uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg",
).addHeader(authentication)

//upload file
Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
//Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
val dlRequest = HttpRequest(
  HttpMethods.GET,

// uri = "http://192.168.0.189:50081/api/file/mypic.jpg",

  uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg",
).addHeader(authentication)

FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

scala.io.StdIn.readLine()
system.terminate()

}

}
复制代码
下面是本次示范中的源代码:

build.sbt

复制代码
name := "rest-mongo"

version := "0.1"

scalaVersion := "2.12.8"

scalacOptions += "-Ypartial-unification"
val akkaVersion = "2.5.23"
val akkaHttpVersion = "10.1.8"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-stream" % "2.5.23",
"com.pauldijou" %% "jwt-core" % "3.0.1",
"de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
"org.json4s" %% "json4s-native" % "3.6.1",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.slf4j" % "slf4j-simple" % "1.7.25",
"org.json4s" %% "json4s-jackson" % "3.6.7",
"org.json4s" %% "json4s-ext" % "3.6.7",

// for scalikejdbc
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.199",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.3.2",
//for cassandra 3.6.0
"com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
//for mongodb 4.0
"org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"io.monix" %% "monix" % "3.0.0-RC3",
"org.typelevel" %% "cats-core" % "2.0.0-M4",
"com.github.tasubo" % "jurl-tools" % "0.6"
)
复制代码
MongoHttpServer.scala

复制代码
package com.datatech.rest.mongo

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._
import org.mongodb.scala._

import scala.collection.JavaConverters._
import MongoModels._
import MongoRepo._
import MongoRoute._

object MongoHttpServer extends App {

implicit val httpSys = ActorSystem("httpSystem")
implicit val httpMat = ActorMaterializer()
implicit val httpEC = httpSys.dispatcher

val settings: MongoClientSettings = MongoClientSettings.builder()

.applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
.build()

implicit val client: MongoClient = MongoClient(settings)
implicit val personDao = new MongoRepoPerson)
implicit val picDao = new MongoRepoPhoto

implicit val authenticator = new AuthBase()

.withAlgorithm(JwtAlgorithm.HS256)
.withSecretKey("OpenSesame")
.withUserFunc(getValidUser)

val route =

path("auth") {
  authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
    post { complete(authenticator.issueJwt(userinfo))}
  }
} ~
  pathPrefix("private") {
    authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>
      FileRoute(validToken)
        .route
      // ~ ...
    }
  } ~
  pathPrefix("public") {
    (pathPrefix("crud")) {
      new MongoRoute[Person]("person")(personDao)
        .route ~
        new MongoRoute[Photo]("photo")(picDao)
          .route
    }
  }

val (port, host) = (50081,"192.168.0.189")

val bindingFuture = Http().bindAndHandle(route,host,port)

println(s"Server running at $host $port. Press any key to exit ...")

scala.io.StdIn.readLine()

bindingFuture.flatMap(_.unbind())

.onComplete(_ => httpSys.terminate())

}
复制代码
ModalBase.scala

package com.datatech.rest.mongo

trait ModelBase[E] {
def to: E
}
MongoModel.scala

复制代码
package com.datatech.rest.mongo
import org.mongodb.scala._
import com.datatech.sdp.mongo.engine._
import MGOClasses._

object MongoModels {

case class Person(

               userid: String = "",
               name: String = "",
               age: Option[Int] = None,
               dob: Option[MGODate] = None,
               address: Option[String] = None
               ) extends ModelBase[Document] {
import org.mongodb.scala.bson._

override def to: Document = {
  var doc = Document(
  "userid" -> this.userid,
  "name" -> this.name)
  if (this.age != None)
    doc = doc + ("age" -> this.age.get)

  if (this.dob != None)
    doc = doc + ("dob" -> this.dob.get)

  if (this.address != None)
    doc = doc + ("address" -> this.address.getOrElse(""))

  doc
}

}
object Person {

val fromDocument: Document => Person = doc => {
  val keyset = doc.keySet
  Person(
    userid = doc.getString("userid"),
    name = doc.getString("name"),
    age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],

    dob =  {if (keyset.contains("dob"))
      Some(doc.getDate("dob"))
    else None },

    address =  mgoGetStringOrNone(doc,"address")
  )
}

}

case class Photo (

                 id: String,
                 photo: Option[MGOBlob]
               ) extends ModelBase[Document] {
override def to: Document = {
  var doc = Document("id" -> this.id)
  if (photo != None)
    doc = doc + ("photo" -> this.photo)
  doc
}

}

object Photo {

def fromDocument: Document => Photo = doc => {
  val keyset = doc.keySet
  Photo(
    id = doc.getString("id"),
    photo = mgoGetBlobOrNone(doc, "photo")
  )
}

}

}
复制代码
MongoRepo.scala

复制代码
package com.datatech.rest.mongo
import org.mongodb.scala._
import org.bson.conversions.Bson
import org.mongodb.scala.result._
import com.datatech.sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import MGOCommands._
import com.datatech.sdp.result.DBOResult.DBOResult
import MongoModels._

object MongoRepo {

class MongoRepoR(implicit client: MongoClient) {

def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
  var res = Seq[ResultOptions]()
  next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
  sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
  fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
  top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}

  val ctxFind = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
    .setCommand(Find(andThen = res))
  mgoQuery[Seq[R]](ctxFind,converter)
}

 def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
   var res = Seq[ResultOptions]()
   next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
   sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
   fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
   top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
   val ctxFind = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
     .setCommand(Find(filter = Some(filtr),andThen = res))
   mgoQuery[Seq[R]](ctxFind,converter)
}
def getOneDocument(filtr: Bson): DBOResult[Document] = {
      val ctxFind = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
     .setCommand(Find(filter = Some(filtr),firstOnly = true))
   mgoQuery[Document](ctxFind,converter)
}

def insert(doc: Document): DBOResult[Completed] = {
  val ctxInsert = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Insert(Seq(doc)))
  mgoUpdate[Completed](ctxInsert)
}

def delete(filter: Bson): DBOResult[DeleteResult] = {
  val ctxDelete = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Delete(filter))
  mgoUpdate[DeleteResult](ctxDelete)
}

def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
  val ctxUpdate = MGOContext(dbName = db,collName=coll)
    .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
    .setCommand(Update(filter,update,None,!many))
  mgoUpdate[UpdateResult](ctxUpdate)
}

def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
   val ctxUpdate = MGOContext(dbName = db,collName=coll)
     .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
     .setCommand(Replace(filter,row))
   mgoUpdate[UpdateResult](ctxUpdate)
}

}

}
复制代码
MongoRoute.scala

复制代码
package com.datatech.rest.mongo
import akka.http.scaladsl.server.Directives

import scala.util._
import org.mongodb.scala._
import com.datatech.sdp.file.Streaming._
import org.mongodb.scala.result._
import MongoRepo._
import akka.stream.ActorMaterializer
import com.datatech.sdp.result.DBOResult._
import org.mongodb.scala.model.Filters._
import com.datatech.sdp.mongo.engine.MGOClasses._
import monix.execution.CancelableFuture
import akka.util._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
object MongoRoute {
class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(

implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {
import monix.execution.Scheduler.Implicits.global
var dbor: DBOResult[Seq[M]] = _
var dbou: DBOResult[UpdateResult] = _
val route = pathPrefix(pathName) {
  pathPrefix("blob") {
    (get & path(Remaining)) { id =>
      val filtr = equal("id", id)
      val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
        eodoc =>
          eodoc match {
            case Right(odoc) => odoc match {
              case Some(doc) =>
                if (doc == null) None
                else mgoGetBlobOrNone(doc, "photo")
              case None => None
            }
            case Left(_) => None
          }
      }
      onComplete(futOptPic) {
        case Success(optBlob) => optBlob match {
          case Some(blob) =>
            withoutSizeLimit {
              encodeResponseWith(Gzip) {
                complete(
                  HttpEntity(
                    ContentTypes.`application/octet-stream`,
                    ByteArrayToSource(blob.getData))
                )
              }
            }
          case None => complete(StatusCodes.NotFound)
        }
        case Failure(err) => complete(err)
      }
    } ~
    (post &  parameter('id)) { id =>
      withoutSizeLimit {
        decodeRequest {
          extractDataBytes { bytes =>
            val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
              hd ++ bs
            }
            onComplete(fut) {
              case Success(b) =>
                val doc = Document("id" -> id, "photo" -> b.toArray)
                val futmsg = repository.insert(doc).value.value.runToFuture.map {
                  eoc =>
                    eoc match {
                      case Right(oc) => oc match {
                        case Some(c) => c.toString()
                        case None => "insert may not complete!"
                      }
                      case Left(err) => err.getMessage
                    }
                }
                complete(futmsg)
              case Failure(err) => complete(err)
            }
          }
        }
      }
    }
  } ~
  (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
    (filter,fields,sort,top,next) => {
    dbor = {
      filter match {
        case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
        case None => repository.getAll(next,sort,fields,top)
      }
    }
    val futRows = dbor.value.value.runToFuture.map {
      eolr =>
        eolr match {
          case Right(olr) => olr match {
            case Some(lr) => lr
            case None => Seq[M]()
          }
          case Left(_) => Seq[M]()
        }
    }
    complete(futureToJson(futRows))
   }
  } ~ post {
    entity(as[String]) { json =>
      val extractedEntity: M = fromJson[M](json)
      val doc: Document = extractedEntity.to
      val futmsg = repository.insert(doc).value.value.runToFuture.map {
        eoc =>
          eoc match {
            case Right(oc) => oc match {
              case Some(c) => c.toString()
              case None => "insert may not complete!"
            }
            case Left(err) => err.getMessage
          }
      }

      complete(futmsg)
    }
  } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
    val bson = Document(filter)
    if (set == None) {
      entity(as[String]) { json =>
        val extractedEntity: M = fromJson[M](json)
        val doc: Document = extractedEntity.to
        val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
          eoc =>
            eoc match {
              case Right(oc) => oc match {
                case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                case None => "update may not complete!"
              }
              case Left(err) => err.getMessage
            }
        }
        complete(futureToJson(futmsg))
      }
    } else {
      set match {
        case Some(u) =>
          val ubson = Document(u)
          dbou = repository.update(bson, ubson, many.getOrElse(true))
        case None =>
          dbou = Left(new IllegalArgumentException("missing set statement for update!"))
      }
      val futmsg = dbou.value.value.runToFuture.map {
        eoc =>
          eoc match {
            case Right(oc) => oc match {
              case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
              case None => "update may not complete!"
            }
            case Left(err) => err.getMessage
          }
      }
      complete(futureToJson(futmsg))
    }
  } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>
    val bson = Document(filter)
    val futmsg = repository.delete(bson).value.value.runToFuture.map {
      eoc =>
        eoc match {
          case Right(oc) => oc match {
            case Some(d) => s"${d.getDeletedCount} rows deleted."
            case None => "delete may not complete!"
          }
          case Left(err) => err.getMessage
        }
    }
    complete(futureToJson(futmsg))
  }
}

}

}
复制代码

原文地址https://www.cnblogs.com/tiger-xc/p/11325996.html

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
NoSQL 网络协议 MongoDB
Windows公网远程连接MongoDB数据库【无公网IP】
Windows公网远程连接MongoDB数据库【无公网IP】
|
1月前
|
存储 NoSQL 关系型数据库
一篇文章带你搞懂非关系型数据库MongoDB
一篇文章带你搞懂非关系型数据库MongoDB
55 0
|
1月前
|
人工智能 NoSQL MongoDB
|
2月前
|
SQL NoSQL Java
文档型数据库MongoDB
文档型数据库MongoDB
|
2月前
|
JSON NoSQL MongoDB
MongoDB详解(五)——MongoDB数据库简单使用
MongoDB详解(五)——MongoDB数据库简单使用
105 1
|
2月前
|
存储 NoSQL Linux
MongoDB详解(四)——MongoDB数据库安装
MongoDB详解(四)——MongoDB数据库安装
67 2
|
2月前
|
NoSQL 关系型数据库 MySQL
Windows、Linux、Mac安装数据库(mysql、MongoDB、Redis)#0
不同系统下进行MySQL安装、MongoDB安装、Redis安装【2月更文挑战第5天】
441 5
Windows、Linux、Mac安装数据库(mysql、MongoDB、Redis)#0
|
16天前
|
SQL 数据可视化 关系型数据库
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
|
16天前
|
存储 关系型数据库 MySQL
轻松入门MySQL:数据库设计之范式规范,优化企业管理系统效率(21)
轻松入门MySQL:数据库设计之范式规范,优化企业管理系统效率(21)
|
16天前
|
关系型数据库 MySQL 数据库
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)