扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
10年积累的成都网站制作、成都做网站、外贸营销网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有灵寿免费网站建设让你可以放心的选择与我们合作。
目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。 akka rpc java 目录[-] akka-rpc(基于akka的rpc的实现) RPC 实现原理 Server端核心代码 Client端核心代码 Demo akka-rpc(基于akka的rpc的实现) 代码:http://git.oschina.net/for-1988/Simples 目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。 RPC 远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。 实现原理 整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Classclz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。 Server端核心代码 public class RpcServer extends UntypedActor { private Map proxyBeans; public RpcServer(Map , Object> beans) { proxyBeans = new HashMap (); for (Iterator > iterator = beans.keySet().iterator(); iterator .hasNext();) { Class> inface = iterator.next(); proxyBeans.put(inface.getName(), beans.get(inface)); } } @Override public void onReceive(Object message) throws Exception { if (message instanceof RpcEvent.CallBean) { //返回Server端的接口实现类的实例 CallBean event = (CallBean) message; ReturnBean bean = new ReturnBean( proxyBeans.get(event.getBeanName()), getSelf()); getSender().tell(bean, getSelf()); } else if (message instanceof RpcEvent.CallMethod) { CallMethod event = (CallMethod) message; Object bean = proxyBeans.get(event.getBeanName()); Object[] params = event.getParams(); List > paraTypes = new ArrayList >(); Class>[] paramerTypes = new Class>[] {}; if (params != null) { for (Object param : params) { paraTypes.add(param.getClass()); } } Method method = bean.getClass().getMethod(event.getMethodName(), paraTypes.toArray(paramerTypes)); Object o = method.invoke(bean, params); getSender().tell(o, getSelf()); } } } 启动Server public static void main(String[] args) { final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2551) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcServer]")) .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("EsbSystem", config); // Server 加入发布的服务 Map , Object> beans = new HashMap , Object>(); beans.put(ExampleInterface.class, new ExampleInterfaceImpl()); system.actorOf(Props.create(RpcServer.class, beans), "rpcServer"); } Client端核心代码 RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。 public class RpcClient extends Thread { private ActorSystem system; private ActorRef rpc; private ActorRef clientServer; private static RpcClient instance = null; public RpcClient() { this.start(); final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2552) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcClient]")) .withFallback(ConfigFactory.load()); system = ActorSystem.create("EsbSystem", config); int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/rpcServer"); boolean allowLocalRoutees = false; ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup( new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections. emptyList()), new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, "RpcServer")); rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall"); clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc), "client"); Cluster.get(system).registerOnMemberUp(new Runnable() { //加入集群成功后的回调事件,恢复当前线程的中断 @Override public void run() { synchronized (instance) { System.out.println("notify"); instance.notify(); } } }); } public static RpcClient getInstance() { if (instance == null) { instance = new RpcClient(); synchronized (instance) { try { //中断当前线程,等待加入集群成功后,恢复 System.out.println("wait"); instance.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return instance; } public T getBean(Class clz) { Future
上述就是小编为大家分享的基于akka怎样实现RPC了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流