4.1 Actors
Actors模型为并发和分布式系统提供了一个高层次抽象,它减轻了必须处理乐观锁和线程管理的开发,使开发支持并发和并行的系统更加容易,Actors在CarlHewitt 1973年发表的论文里定义,但其普及则是由于Erlang语言的广泛应用,其中一个很好的典型案例是爱立信公司基于此模型非常成功的构建了高并发和高可用系统。
Akka actors的API和Scala actors的API非常像,都借鉴了一些Erlang的一些语法。
4.1.1 创建Actors
自从AKKa强制父级监管,每个actor会被监管并且作为(潜在)子级的监管者;熟悉Actor系统和监管和检测是明智的并且它会可能对阅读actor相关概述也是有帮助的。
定义一个Actor类
Actor实现类需要继承Actor类并且实现receive方法。Receive方法需要定义一系列case语句(具有PartialFunction[Any,Unit])来界定你的Actor可以处理哪些消息,使用标准的Scala模式匹配,以及消息如何被处理的具体实现。
一个例子:
Import akka.actor.Actor
Import akka.actor.Props
Import akka.event.Logging
Class MyActor extends Actor{
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("receivedtest")
case _ => log.info("receivedunknownmessage")
}
}
请注意AKKa Actor接受消息循环是详尽无任何遗漏的, 和Erlang及Scala的 Actor是不同的。意思是你需要为所有可以接受的消息提供模式匹配,并且如果你想要处理未知消息那你需要有一个默认case语句像上面的例子所示。否则一个akka.actor.UnhandledMessage(message,sender,recipient)将会被发布给ActorSystem的EventStream。
通过默认构造创建Actors
object Main extends App{
val system = ActorSystem("MySystem")
val myActor = system.actorOf(Props[MyActor],name="myactor")
actorof的调用返回一个ActorRef。这是一个Actor实例的句柄可以实现和Actor的交互。ActorRef是不可变的并且和它所代表的Actor有一对一关联,而且是可序列化和网络感知的。这意思是你可以把它序列化,通过连接发送它,并且通过网络在一个远程主机上使用它,并且它会仍然代表同一个原型节点上的Actor。
在上面的例子中actor是被系统创建的。它也可能是其它actor通过其actor context创建的actors。不同之处是监管者层级如何安排。当使用context时当前的actor将是创建的子actor的监管者。当使用system时它将是一个顶级actor,它将被系统监管(内部监管的actor)。
class FirstActor extends Actor {
val myActor = context.actorOf(Props[MyActor],name="myactor")
名称参数是可选的,但你必须尽可能命名你的actors,因为要在日志消息中作为acotr的标识。名称不能为空或$。如果名字已经被另外一个同父的子级actor使用会抛出一个InvalidActorNameException异常。
Actors在创建成功后会自动被异步启动。当你创建一个Actor之后它会自动调用Actor特质中的preStart回调方法。这是一个添加Actor初始化代码的极好的位置。
override def preStart() = {
... // initializationcode
}
使用非默认构造创建Actor
如果你的Actor有一个带参数的构造函数那么你不能使用actorOf(Props[TYPE])创建它。作为替代你可以使用一个不同的actorOf,带有一个通过名称调用的代码块,在里面你可以以你喜欢的任何方式创建Actor。
这里有一个例子:
// 允许向 MyActor 的构造函数中提交参数
val myActor = system.actorOf( Props( newMyActor( "..." ) ), name="myactor" )
Props
Props是一个配置参数类用于引用可以创建各种actor的可选操作,这里有一些创建一个Props实例的例子。
import akka.actor.Props
val props1 = Props()
val props2 = Props[MyActor]
val props3 = Props(newMyActor)
val props4 = Props( creator = { () => newMyActor }, dispatcher="my-dispatcher")
val props5 = props1.withCreator(new MyActor)
val props6 = props5.withDispatcher("my-dispatcher")
使用Props创建Actor
Actor在actorOf工厂方法里通过一个Props实例创建。
importakka.actor.Props
val myActor = system.actorOf( Props[MyActor].withDispatcher("my-dispatcher"),
name = "myactor2" )
使用匿名类创建Actor
当在一个actor中为特定的分支任务大量生成actor时,使用一个匿名类在适当的地方包含直接被执行的代码可能比较方便。
def receive = {
case m: DoIt =>
context.actorOf( Props( newActor {
def receive = {
case DoIt(msg) =>
val replyMsg = doSomeDangerousWork(msg)
sender ! replyMsg
context.stop(self)
}
def doSomeDangerousWork(msg: ImmutableMessage): String={"done"}
})) forward m
}
警告:在这个例子中你需要小心避免掩盖所包含的actor的引用,也就是不要调用在匿名Actor类中的封闭actor中的方法。这会打破actor的封装并且引入同步bug和冲突,因为另外一些actor的代码将会被封闭的actor同时调度。不幸的是还没有一种方法可以在编译期检测出这些非法访问。另请参考:Actor和共享的易变的状态。
4.1.2 Actor API
Actor 特质只定义了一个虚拟方法,上面提及的receive,它实现了actor的行为。
如果当前的actor行为与接收到的消息不匹配,unhandled将被调用,它默认发布一个akka.actor.UnhandledMessage(message,sender,recipient)在actor系统的时间流中。
另外,它提供:
actor的ActorRef的self引用
sender引用最后接收到的消息的发送者Actor,
4.2 强类型Actors
Akka强类型Actors是Actors对象模型的实现。
4.3日志
4.4事件总线
4.5调度器
4.6 Futures
4.6.1 介绍
在 Akka中,Futures是用于检索某些并发操作结果的一种数据结构。这些操作经常被Actors或Dispatcher直接执行。这些结果可以被同步(阻塞)或异步(非阻塞)访问。
4.6.2 ExecutionContext
为了执行回调和操作,Futures需要一些称作ExecutionContext,类似java.util.concurrent.Executor的对象。如果你有一个ActorSystem在作用域,它会使用它默认的分配器作为ExecutionContext,或者你可以使用工厂方法由ExecutionContext伴生对象封装Executor和ExecutorServices提供,或者甚至创建你自己的。
import akka.dispatch.{ExecutionContext,Promise}
implicit val ec=ExecutionContext
.fromExecutorService(yourExecutorServiceGoesHere)
//用你崭新光亮的 ExecutionContext 填充
val f=Promise.successful("foo")
//在某时关闭 ExecutionContext
//释放占用资源
ec.shutdown()
4.6.3 Actors应用
通常有两种途径得到一个Actor的响应:第一种是发送一个消息(actor ! msg),这种方式仅在原始发送者是一个Actor时可以工作;第二种方式是通过Future来实现。
使用一个Actor的?方法发送消息会返回一个Future。等待并且检索实际结果简单的方法:
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
implicit val timeout=Timeout(5seconds)
val future=actor ? msg //通过引入“ask”起作用
val result=Await.result(future,timeout.duration).asInstanceOf[String]
这样会促使当前线程阻塞并且等待Actor“完成”Future回复,阻塞是让人担心的,应为它会造成性能问题,阻塞操作产生于Await.result和Await.ready使指出阻塞重现点变得简单,阻塞的替代方案会在当前的文档中深入讨论,同时注意通过Actor返回的Future是一个Future[any]因为一个Acotor是动态的。这就是为什么asInstanceOf会在上面的例子中使用,当使用非阻塞时比使用mapTo方法更适合于试图安全转换Future为预期的类型。
Import akka.dispatch.Future
Import akka.pattern.ask
val future:Future[String]=ask(actor,msg).mapTo[String]
如果转换成功mapTo会返回一个新的包含结果的Future,否则抛出类型转换异常,异常处理会在此文档中深入讨论。
4.6.4 立即使用
一个在AKKa中普通的使用案例是有一些并行计算不需要一个Actor的扩展。如果你发现自己创建了一个Actors对象池仅仅为了执行一个并行计算,这里有一个简单快速的方法:
Import akka.dispatch.Await
Import akka.dispatch.Future
Import akka.util.duration._
val future=Future{
"Hello"+"World"
}
val result=Await.result(future,1second)
在上面代码中阻塞传递给Future将被默认的分配器执行,阻塞的返回值用于完成这个Future(在这个例子中,结果会是:“HelloWord”)。不像一个Actor返回的Futrue,这个Future是合适的类型,并且我们避免了管理一个Actor的资源消耗。
你也可以创建已经完成Futures使用Promise同伴,可以成功如:
val future=Promise.successful("Yay!")
或失败:
val otherFuture=Promise.failed[String](new IllegalArgumentException("Bang!"))
4.6.5 FunctionalFutures
AKKa的Future有若干一元方法和Scala的collections和类似。这些允许你创建返回结果会经过的管道(pipelines)或数据流(streams)。
Future是一个方法
第一个对Future有效的函数化方法是map。这个方法提供一个Function可以在Future的结果上执行一些操作,并且返回一个新的结果。Map的返回结果值是另外一个包含新结果的Future:
Val f1=Future{
"Hello"+"World"
}
val f2=f1 map { x=>
x.length
}
val result=Await.result(f2,1second)
result must be(10)
f1.value must be(Some(Right("HelloWorld")))
在这个例子中我们在一个Future中将两个字符串连接在一起。代替等待完成,我们套用我们的过程使用map方法计算字符串长度。现在我们有第二个Future最终包含一个Int。当我们的原始的Future完成时,他同样会套用我们的过程并且通过其返回结果完成第二个Future。当我们最后获取结果,它会包含数字10。我们的原始Future仍然包含“HelloWorld”字符串并且是不受map影响的。
如果我们正在修改一个单个的Future那map方法是很好的,但是如果2个或者更多Futures涉及map将不允许你把他们结合在一起。
val f1 = Future{
"Hello"+"World"
}
val f2 = Promise.successful(3)
val f3 = f1 map { x =>
f2 map { y =>
x.length * y
}
}
f3是一个Future[ Future [ Int ] ] 代替期望的Future [ Int ]。另外,flatMap方法应该被使用:
val f1 = Future {
"Hello"+"World"
}
val f2 = Promise.successful(3)
val f3 = f1 flatMap{ x => f2 map { y => x.length * y } }
val result = Await.result( f3, 1second)
result must be(30)
使用嵌套组合Composing Futures有时非常复杂难懂,在这些例子中使用Scala的“for 循环”通常的代码更易读。阅读下个章节的例子。
如果你需要做条件传递,你可以使用filter:
val future1 = Promise.successful(4)
val future2 = future1.filter(_%2==0)
val result = Await.result(future2, 1second)
result must be(4)
val failedFilter = future1.filter(_ % 2 == 1 ).recover{
casem:MatchError => 0 //When filter fails,it will have a MatchError
}
Val result2 = Await.result(failedFilter, 1second)
result2 must be(0) //Can only be 0 when there was a MatchError
For 循环
因为Future有一个map,filter和flatmap方法可以很容易在for循环中使用:
Val f = for {
a <- Future(10/2) //10/2=5
b <- Future(a+1) //5+1=6
c <- Future(a-1) //5-1=4
if c >3 //Future.filter
}yieldb*c//6*4=24
// Note that the execution of futures a, b, and c
// are not done in parallel.
val result=Await.result(f, 1second)
result must be(24)
牢记当做这些事时即使看起来是上面例子的一部分可以并行运行,for循环的灭一个步骤都是循环执行的。在分配的线程给每个步骤时有用,但是对于所有计算使用一个Future时没有任何益处。真正有益的是首先创建所有Future,然后把他们结合在一起。
Composing Futures
上面的for循环的例子是一个Composing Futures的例子。相关的一个普通的使用案例是组合若干Actors的应答到一个单独的计算无需依靠调用Await.result或者Await.ready阻挡每一个返回结果。第一个使用Await.result的例子:
val f1 = ask(actor1,msg1)
val f2 = ask(actor2,msg2)
val a = Await.result(f1,1second).asInstanceOf[Int]
val b = Await.result(f2,1second).asInstanceOf[Int]
val f3 = ask(actor3,(a+b))
val result = Await.result(f3,1second).asInstanceOf[Int]
我们在这里等待头2个Actors的返回结果在发送该记过给第三个Actor之前。我们调用Await.result3次,它造成我们的小程序阻塞3次在得到我们的最终结果之前。现在和这个例子比较下:
val f1 = ask(actor1,msg1)
val f2 = ask(actor2,msg2)
val f3 = for{
a => f1.mapTo[Int]
b => f2.mapTo[Int]
c => ask(actor3,(a+b)).mapTo[Int]
} yield c
val result=Await.result(f3,1second).asInstanceOf[Int]
4.7数据流并发
4.8容错
4.9分配器
4.10路由
4.11 远程访问
Akka引入远程访问能力相关参考“位置透明”(2.5Location Transparency)。
4.11.1为远程访问准备好你的ActorSystem
Akka远程访问是一个jar文件。确定在你的工程里有下列依赖项:
“com.typesafe.akka” % “akka-remote“ % ”2.0“
要使你的Akka工程具备远程访问能力你应该在application.conf文件中最少做如下变更:
akka {
actor {
provider = “akka.remote.RemoteActorRefProvider”
}
remote {
transport = “akka.remote.netty.NettyRemoteTransport”
netty {
hostname = “127.0.0.1”
port = 2552
}
}
}
正如你能所看到的上面的例子中为了开始创建有4样东西你需要添加:
●将akka.actor.LocalActorRefProvider提供者变更为akk.remote.RemoteActorRefProvider
●添加主机名称-你想要运行actor系统的计算机;这个主机名称正是为了在远程系统中识别这个系统并且因此用于如果需要连接回到这个系统时,之所以将其设置为可获取的IP地址或者可识别的名称是为了万一你需要进行网络通讯时提供支持。
●添加端口号-actor系统需要监听的端口,设置为0使其自动选择
上面的例子仅仅为了举例说明你允许启动远程最少必须添加的参数。在Akka中还有很多和远程访问有关的参数。我们参考下面的参考文件可以了解更多的信息:
#####################################
# Akka 远程访问参考配置文件 #
#####################################
# 这是包含所有默认设置的配置文件参考
# 在你的application.conf中进行你需要的修改和重写
# akka.actor相关设置的注释不考虑,因为已经存在于akka-actor.jar中,否则会重复配置。
akka {
actor {
serializers {
proto="akka.serialization.ProtobufSerializer"
}
serialization-bindings{
# 虽然com.google.protobuf.Message不是继承自Serializable但是
# GeneratedMessage是,需要指定使用更具体的一个避免歧义
"com.google.protobuf.GeneratedMessage" = proto
}
deployment {
default {
# 如果这是对一个有效的远程地址的设置,已命名的actor将会被部署在那个节
# 点例如:"akka://sys@host:port"remote=""
target {
# 一个主机名和端口列表举例说明一个间接路由子元素
# 格式必须符合 "akka://sys@host:port
# - sys 是远程访问actor系统名称
# -hostname 可以是远程actor需要连接的主机名或IP地址,
# -port 应是其它节点上的远程服务器端口
# 被创建的actor实例的数量仍然取自nr-of-instances关于本地路由设置;
# 这些实例将被分布给出的节点中循环
nodes = []
}
}
}
}
remote {
# 默认使用的akka.remote.RemoteTransport是一个TCP协议的网状的远程传
# 输实现
transport = "akka.remote.netty.NettyRemoteTransport"
# 为完全服务端管理的actor的安全启用不信任模式,允许不信任客户端连接
untrusted-mode = off
# 集群环境应答超时设置,例如actor校验等
remote-daemon-ack-timeout = 30s
# 如果是“on”,Akka将记录所有DEBUG级别的入站消息,否则不记录
log-received-messages = off
# 如果是“on”,Akka将记录所有DEBUG级别的出站消息,否则不记录
log-sent-messages = off
# 每一个属性使用(I)或(O)或(I&O)注释,I是“入站”、O是“出站“连接。网络远程传输
# 经常启动服务端规则来允许入站连接,并且每当发送给一个未连接的目标时它启动客户
# 端连接;如果配置它重用入站连接来应答,称之为被动客户端连接(也就是从服务端到
# 客户端)
netty{
# (O)万一潜在的增长/溢出我们需要等待多长时间(阻塞发送者)直到我们认为发送
# 被终止?0表示“从不返回“;任何正数将代表时间
block at most.backoff-timeout = 0ms
# (I&O) 你自己生成
# 依据’$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh’
# 或者使用’akka.util.Crypt.generateSecureCookie’
secure-cookie = ""
# (I) 远程服务器应该需要公平分享相同的
# secure-cookie (在 ’remote’章节定义)?
require-cookie = off
# (I) 重用入站连接给出站消息
use-passive-connections = on
# (I) 主机名称或IP用于绑定远程访问,
# 如果空则使用InetAddress.getLocalHost.getHostAddress
hostname = ""
# (I) 默认远程服务端口客户端需要连接。
# 默认是2552 (AKKA),使用0来随机生成有效端口
port=2552
# (O) The address of a local network interface (IP Address) to bind to when
# creating outbound connections. Set to "" or "auto" for automatic selection of
# local address.
# (O) 一个本地网络接口(IP地址)地址用于绑定当创建出站连接时。
# 设置为“”或“auto”自动选择本地地址
outbound-local-address = "auto"
# (I&O) 如果你想要能够使用较大的载荷发送消息增加这个设置值
message-frame-size = 1 MiB
# (O) 超时持续时间
connection-timeout = 120s
# (I) 设置连接储备大小
backlog=4096
# (I) akka.timet中的时间长度,空转时主线程将被保持多长时间单元
execution-pool-keepalive = 60s
# (I) 远程执行单元的主池大小
execution-pool-size = 4
# (I) 最大通道大小, 0 是 off
max-channel-memory-size = 0b
# (I) 所有通道总大小, 0 是 off
max-total-memory-size = 0b
# (O) 活动客户端重新连接时间间隔
reconnect-delay=5s
# (O) 在活动客户端连接关闭后读休眠状态周期(最小单位是秒)
# 万一有新的通讯请求将会重建连接
read-timeout=0s
# (O) 在一个心跳通过连线被发送后写休眠周期(最小单位是秒)
# 值为0将关闭此特性
write-timeout=10s
# (O)在活动客户端连接关闭后读和写两者的休眠周期(最小单位是秒)
# 万一有新的通讯请求将会重建连接
# 值为0会关闭此特性
all-timeout=0s
# (O) 最大时间一个客户端应尝试重新连接window
reconnection-time-window=600s
}
# 用于系统actor "network-event-sender" 的分配器
network-event-sender-dispatcher {
executor=thread-pool-executor
type=PinnedDispatcher
}
}
}
4.11.2远程交互类型
Akka有两种应用远程的方式:
●查找:通常在一个远程节点上通过actorFor(path)查找一个actor
●创建:通常在一个远程节点上通过actorFor(Props(…), actorName)创建一个actor
在下一个章节两个替代选择会被详细描述。
4.11.3查找远程Actor
actorFor(path)会得到一个远程节点上的Actor的ActorRef,例如:
val actor =
context.actorFor(" akka://actorSystemName@10.0.0.1:2552/user/actorName ")
如你在上面例子中所见下面的模式用来在一个远程节点上找到一个ActorRef;
akka:// 一旦你获得一个actor的引用你可以与其交互如同一个本地actor一样,例如: actor ! "Prettyawesomefeature" 更多如何构造和使用actor地址和路径相关细节,请参考Actor Reference,Paths and Addresses。 4.11.4远程创建Actor 如果你想要在Akka远程中使用创建功能你必须使用如下所示的方式(只展示了部署章节)深入修改application.conf文件: akka { actor { deployment { /sampleActor { remote = "akka://sampleActorSystem@127.0.0.1:2553" } } } 上述配置指示Akka响应当一个actor通过路径/sampleActor被创建,也就是使用system.actorOf(Props(...), sampleActor)。这个特殊的actor将不会被直接实例化,但是远程系统的远程守护进程会被请求创建这个actor作为替代,这个actor在这个例子中相当于sampleActorSystem@127.0.0.1:2553。 一旦你配置了上面所示的参数就需要写下面的代码: class SampleActor extends Actor { def receive = { case_ => println( "Got something" ) } } valactor = context.actorOf(Props[SampleActor], "sampleActor") actor ! "Prettyslick" SampleActor必须可以被运行时使用,也就是actor系统的类加载器必须有一个JAR包含该类。 4.11.5序列化 4.11.6远程路由 4.11.7远程样例说明 4.12序列化 4.13FSM(有限状态机) 4.14软件事物记忆(STM) 4.15代理 4.16 Transactors 4.17 IO 4.18测试Actor系统 4.19 Akka扩展 4.20 ZeroMQ