大数据教程分享Actor学习笔记

简介:

  好程序员大数据培训分享Actor学习笔记,在scala中她能实现很强大的功能,他是基于并发机制的一个事件模型
  我们现在学的scala2.10.x版本就是之前的Actor
  同步:在主程序上排队执行的任务,只有前一个任务执行完毕后,才能执行下一个任务
  异步:指不进入主程序,而进入"任务对列"的任务,只有等主程序任务执行完毕,"任务对列"开始请求主程序,请求任务执行,该任务会进入主程序
java
共享变量 -- 加锁
会出现锁死问题
scala
Actor不共享数据
没有锁的概念
Actor通信之间需要message(通信)
Aactor执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后act()方法会被执行
3.Actor之间进行发送消息
Actor发送消息的三种方式
! -> 发送异步消息,没有返回值
!? -> 发送同步消息,有返回值,会有线程等待
!! -> 发送异步消息,有返回值,返回值类型FutureAny
Actor并行执行
//注意,这两个actor会并行执行,当其中一个for循环结束后,actor结束
object ActorDemo01 {
def main(args: Array[String]): Unit = {
MyActor1.start()
MyActor2.start()
}
}
object MyActor1 extends Actor{
override def act(): Unit = {
for (i <- 1 to 10){
println(s"actor => $i")
Thread.sleep(2000)
}
}
object MyActor2 extends Actor{
override def act(): Unit = {
for (i <- 1 to 5){
println(s"actor2 => $i")
Thread.sleep(2000)
}
}
}
}
用Actor不断接受消息
执行第一种方式,异步
object ActorDemo02 {
def main(args: Array[String]): Unit = {
val actor: MyActor = new MyActor
actor.start()
//并行执行
actor ! "start" // !->异步
actor ! "stop"
println("发送完成")
}
}
class MyActor extends Actor{
override def act(): Unit = {
while (true){ //死循环
receive { //接收
case "start" => {
println("starting")
Thread.sleep(1000)
println("started")
}
case "stop" => {
println("stopping")
Thread.sleep(1000)
println("stopped")
}
}
}
}
}
第二种方式:利用react来代替receive,也就是说react线程可复用,比receive更高效
object ActorDemo03 {
def main(args: Array[String]): Unit = {
val actor: MyActor3 = new MyActor3
actor.start()
actor ! "start"
actor ! "stop"
println("成功了")
}
}
class MyActor3 extends Actor{
override def act(): Unit = {
loop {
react{
case "start" =>{
println("starting")
Thread.sleep(1000)
println("sarted")
}
case "stop" =>{
println("stoppting")
Thread.sleep(1000)
println("stopped")
}
}
}
}
}
结合样例类练习Actor发送消息
//创建样例类
case class AsyncMsg(id: Int, msg: String)
case class SyncMsg(id: Int, msg: String)
case class ReplyMsg(id: Int, msg: String)
object ActorDemo01 extends Actor {
override def act(): Unit = {
while (true) {
receive {
case "start" => println("starting...")
case AsyncMsg(id, msg) =>
{
println(s"id:$id,msg:$msg")
sender ! ReplyMsg(1,"sucess") //接收到消息后返回响应消息
}
case SyncMsg(id,msg) => {
println(s"id:$id,msg:$msg")
sender ! ReplyMsg(2,"sucess")
}
}
}
}
}
object ActorTest{
def main(args: Array[String]): Unit = {
val actor: Actor = ActorDemo01.start()
// //异步发送消息,没有返回值
// actor ! AsyncMsg(3,"heihei")
// println("异步消息发送完成,没有返回值")
// //同步发送消息,有返回值
// val text: Any = actor !? SyncMsg(4,"OK")
// println(text)
// println("同步消息发送成功")
//异步发送消息,有返回值,返回类型为Future[Any]
val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的")
Thread.sleep(2000)
if (reply.isSet){
val applyMsg: Any = reply.apply()
println(applyMsg)
}else{
println("Nothing")
}
}
}
Actor并行化的wordcount
class Task extends Actor {
override def act(): Unit = {
loop {
react {
case SubmitTask(fileName) => {
val contents = Source.fromFile(new File(fileName)).mkString
val arr = contents.split("rn")
val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
//val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
sender ! ResultTask(result)
}
case StopTask => {
exit()
}
}
}
}
}
object WorkCount {
def main(args: Array[String]) {
val files = Array("c://words.txt", "c://words.log")
val replaySet = new mutable.HashSet[Future[Any]]
val resultList = new mutable.ListBuffer[ResultTask]
for(f <- files) {
val t = new Task
val replay = t.start() !! SubmitTask(f)
replaySet += replay
}
while(replaySet.size > 0){
val toCumpute = replaySet.filter(_.isSet)
for(r <- toCumpute){
val result = r.apply()
resultList += result.asInstanceOf[ResultTask]
replaySet.remove(r)
}
Thread.sleep(100)
}
val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
println(finalResult)
}
}
case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
4月前
|
SQL 分布式计算 大数据
Python+大数据学习笔记(一)
Python+大数据学习笔记(一)
43 0
|
SQL 分布式计算 运维
开源大数据 OLAP 引擎最佳实践 | 学习笔记(二)
快速学习开源大数据 OLAP 引擎最佳实践
1410 0
开源大数据 OLAP 引擎最佳实践 | 学习笔记(二)
|
4月前
|
关系型数据库 MySQL 大数据
大数据开发工程师基本功修炼之Linux学习笔记(四)
大数据开发工程师基本功修炼之Linux学习笔记(四)
110 1
|
5月前
|
大数据
数据治理专业认证CDMP学习笔记(思维导图与知识点)- 第14章大数据与数据科学篇
数据治理专业认证CDMP学习笔记(思维导图与知识点)- 第14章大数据与数据科学篇
|
8月前
|
机器学习/深度学习 人工智能 算法
实用!50个大厂、987页大数据、算法项目落地经验教程合集
大数据、算法项目在任何大厂无论是面试还是工作运用都是非常广泛的,我们精选了50个百度、腾讯、阿里等大厂的大数据、算法落地经验甩给大家,千万不要做收藏党哦,空闲时间记得随时看看! 如果你没有大厂项目经验,对大厂算法、大数据的项目运用不了解建议你看看!
|
10月前
|
分布式计算 Hadoop 大数据
大数据 | Hadoop HA高可用搭建保姆级教程(大二学长的万字笔记)(下)
大数据 | Hadoop HA高可用搭建保姆级教程(大二学长的万字笔记)(下)
123 0
|
10月前
|
分布式计算 运维 Hadoop
大数据 | Hadoop HA高可用搭建保姆级教程(大二学长的万字笔记)(上)
大数据 | Hadoop HA高可用搭建保姆级教程(大二学长的万字笔记)(上)
268 0
|
存储 分布式计算 监控
大数据 SRE 体系能力建设(二)| 学习笔记
快速学习大数据 SRE 体系能力建设。
248 0
大数据 SRE 体系能力建设(二)| 学习笔记
|
传感器 大数据
大数据的学习笔记第一次学习
大数据的学习笔记 (一)
|
SQL canal 弹性计算
实践教程之如何将PolarDB-X与大数据等系统互通
PolarDB-X 为了方便用户体验,提供了免费的实验环境,您可以在实验环境里体验 PolarDB-X 的安装部署和各种内核特性。除了免费的实验,PolarDB-X 也提供免费的视频课程,手把手教你玩转 PolarDB-X 分布式数据库。
实践教程之如何将PolarDB-X与大数据等系统互通

热门文章

最新文章