Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。
Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。
有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。
有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.
有类型actor是使用 jdk proxies 实现的,jdk proxies提供了非常简单的api来拦截方法调用。
注意
和普通Akka actor一样,有类型actor也一次处理一个消息。
什么时候使用有类型的Actor
有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。
工具箱
返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem
判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);
返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);
返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();
返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.
返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());
具体例子及说明
<span>package</span> practise.akka.typedactors
<span>import</span> akka.dispatch.Future
<span>import</span> akka.japi.Option
<span>/**
* 这个就是对外的接口,各函数就是Typed Actor的接口方法
*/</span>
<span>public</span> <span>interface</span> Squarer {
<span>void</span> squareDontCare(<span>int</span> i); <span>//fire-forget</span>
Future<Integer> square(<span>int</span> i); <span>//non-blocking send-request-reply</span>
Option<Integer> squareNowPlease(<span>int</span> i);<span>//blocking send-request-reply</span>
<span>int</span> squareNow(<span>int</span> i); <span>//blocking send-request-reply</span>
}
<span>package</span> practise.akka.typedactors
<span>import</span> akka.dispatch.Future
<span>import</span> akka.dispatch.Futures
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.japi.Option
<span>import</span> akka.actor.ActorContext
<span>import</span> groovy.util.logging.Log4j
<span>import</span> akka.actor.ActorRef
<span>/**
* 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
*/</span>
@Log4j
<span>class</span> SquarerImpl <span>implements</span> Squarer, akka.actor.TypedActor.Receiver {
<span>private</span> String name;
<span>public</span> SquarerImpl() {
<span>this</span>.name = "<span>default</span>";
}
<span>public</span> SquarerImpl(String name) {
<span>this</span>.name = name;
}
<span>public</span> <span>void</span> squareDontCare(<span>int</span> i) {
log.debug("<span>squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----</span>" + i) <span>//可以从线程号看出是异步处理的</span>
<span>int</span> sq = i * i; <span>//Nobody cares :(</span>
<span>//返回当前的ActorContext,</span>
<span>// 此方法仅在一个TypedActor 实现的方法中有效</span>
ActorContext context = TypedActor.context();
println "<span>context ---- </span>" + context
<span>//返回当前有类型actor的外部代理,</span>
<span>// 此方法仅在一个TypedActor 实现的方法中有效</span>
Squarer mysq = TypedActor.<Squarer> self();
println "<span>--self --</span>" + mysq
}
<span>public</span> Future<Integer> square(<span>int</span> i) {
log.debug("<span>square send-request-reply Future----</span>" + i) <span>//可以从线程号看出是异步处理的</span>
<span>return</span> Futures.successful(i * i, TypedActor.dispatcher());
}
<span>public</span> Option<Integer> squareNowPlease(<span>int</span> i) {
log.debug("<span>squareNowPlease send-request-reply Option----</span>" + i) <span>//可以从线程号看出是异步处理的</span>
<span>return</span> Option.some(i * i);
}
<span>public</span> <span>int</span> squareNow(<span>int</span> i) {
log.debug("<span>squareNow send-request-reply result----</span>" + i) <span>//可以从线程号看出是异步处理的</span>
<span>return</span> i * i;
}
@Override
<span>void</span> onReceive(Object o, ActorRef actorRef) {
log.debug("<span>TypedActor收到消息----${o}---from:${actorRef}</span>")
}
}
<span>package</span> practise.akka.typedactors
<span>import</span> akka.actor.ActorSystem
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.actor.TypedProps
<span>import</span> com.typesafe.config.ConfigFactory
<span>import</span> akka.japi.Creator
<span>import</span> groovy.util.logging.Log4j
<span>import</span> akka.actor.ActorContext
<span>/**
* 这里创建Typed Actor.
*/</span>
@Log4j
<span>class</span> TypedActorsFactory {
ActorSystem system
<span>private</span> <span>final</span> String config = """akka {
loglevel = "<span>${log?.debugEnabled ? </span>"DEBUG"<span> : </span>"INFO"<span>}</span>"
actor.provider = "<span>akka.remote.RemoteActorRefProvider</span>"
remote.netty.hostname = "<span>127.0.0.1</span>"
remote.netty.port = 2552
remote.log-received-messages = on
remote.log-sent-messages = on
}"""
TypedActorsFactory(String sysName) {
<span>this</span>.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
}
Squarer getTypedActorDefault() {
Squarer mySquarer =
TypedActor.get(system).typedActorOf(<span>new</span> TypedProps<SquarerImpl>(Squarer.<span>class</span>, SquarerImpl.<span>class</span>));
<span>//这里创建的是代理类型</span>
<span>return</span> mySquarer
}
Squarer getTypedActor(String name) {
Squarer otherSquarer =
TypedActor.get(system).typedActorOf(<span>new</span> TypedProps<SquarerImpl>(Squarer.<span>class</span>,
<span>new</span> Creator<SquarerImpl>() {
<span>public</span> SquarerImpl create() { <span>return</span> <span>new</span> SquarerImpl(name); } <span>//这里创建的是具体的实现类型</span>
}),
name); <span>//这个name是actor的name:akka//sys@host:port/user/name</span>
<span>return</span> otherSquarer
}
}下面用几个测试用例实验一下
<span>package</span> practise.akka.typedactors
<span>import</span> akka.actor.ActorRef
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.actor.UntypedActorContext
<span>import</span> akka.dispatch.Future
<span>import</span> com.baoxian.akka.AkkaClientNoReply
<span>import</span> com.baoxian.akka.AkkaServerApp
<span>class</span> TestTypedActors <span>extends</span> GroovyTestCase {
def testTypeActor() {
println("<span>----</span>")
TypedActorsFactory factory = <span>new</span> TypedActorsFactory("<span>typedServer</span>")
<span>// Squarer squarer = factory?.getTypedActorDefault() //创建代理</span>
Squarer squarer = factory?.getTypedActor("<span>serv</span>") <span>//具体实现</span>
squarer?.squareDontCare(10)
Future future = squarer?.square(10)
AkkaServerApp app = <span>new</span> AkkaServerApp("<span>tmp</span>", "<span>127.0.0.1</span>", 6666, "<span>result</span>") <span>//这是我自己构建的接收器</span>
app.messageProcessor = {msg, UntypedActorContext context ->
log.info("<span>结果为</span>" + msg)
}
app.startup()
akka.pattern.Patterns.pipe(future).to(app.serverActor) <span>//Future的返回结果pipe到接收器中了,在log中能看到结果</span>
println "<span>----</span>" + squarer?.squareNowPlease(10)?.get()
println "<span>----</span>" + squarer?.squareNow(10)
<span>//返回有类型actor扩展</span>
TypedActor.get(factory.system)
<span>//返回一个外部有类型actor代理所代表的Akka actor</span>
ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer);
actor.tell("<span>消息</span>") <span>//这个消息将会在SquarerImpl的onReceive方法中接收到</span>
sleep(1000 * 60 * 10)
<span>// TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor</span>
TypedActor.get(factory.system).poisonPill(squarer);<span>//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它</span>
}
def testRemoteTypedActor() {
AkkaClientNoReply client = <span>new</span> AkkaClientNoReply("<span>akka://typedServer@127.0.0.1:2552/user/serv</span>")
client.send("<span>远程消息</span>") <span>//这将会在SquarerImpl的onReceive方法中接收到</span>
sleep(1000)
client.shutdown()
}
}
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号