您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關基于akka怎樣實現RPC,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
目前的工作在基于akka實現數據服務總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構。里面的一塊功能就是RPC(遠程過程調用),這篇文章將會介紹一種實現方式。 akka rpc java 目錄[-] akka-rpc(基于akka的rpc的實現) RPC 實現原理 Server端核心代碼 Client端核心代碼 Demo akka-rpc(基于akka的rpc的實現) 代碼:http://git.oschina.net/for-1988/Simples 目前的工作在基于akka(java)實現數據服務總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構。里面的一塊功能就是RPC(遠程過程調用)。 RPC 遠程過程調用(Remote Procedure Call,RPC)是一個計算機通信協議。該協議允許運行于一臺計算機的程序調用另一臺計算機的子程序,而程序員無需額外地為這個交互作用編程。如果涉及的軟件采用面向對象編程,那么遠程過程調用亦可稱作遠程調用或遠程方法調用,例:Java RMI。 實現原理 整個RPC的調用過程完全基于akka來傳遞對象,因為需要進行網絡通信,所以我們的接口實現類、調用參數以及返回值都需要實現java序列化接口。客戶端跟服務端其實都是在一個Akka 集群關系中,Client跟Server都是集群中的一個節點。首先Client需要初始化RpcClient對象,在初始化的過程中,我們啟動了AkkaSystem,加入到整個集群中,并創建了負責與Server進行通信的Actor。然后通過RpcClient中的getBean(Class<T> clz)方法獲取Server端的接口實現類的實例對象,然后通過動態代理攔截這個對象的所有方法。最后,在執行方法的時候,在RpcBeanProxy中向Server發送CallMethod事件,執行遠程實現類的方法,獲取返回值給Client。 Server端核心代碼 public class RpcServer extends UntypedActor { private Map<String, Object> proxyBeans; public RpcServer(Map<Class<?>, Object> beans) { proxyBeans = new HashMap<String, Object>(); for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator .hasNext();) { Class<?> inface = iterator.next(); proxyBeans.put(inface.getName(), beans.get(inface)); } } @Override public void onReceive(Object message) throws Exception { if (message instanceof RpcEvent.CallBean) { //返回Server端的接口實現類的實例 CallBean event = (CallBean) message; ReturnBean bean = new ReturnBean( proxyBeans.get(event.getBeanName()), getSelf()); getSender().tell(bean, getSelf()); } else if (message instanceof RpcEvent.CallMethod) { CallMethod event = (CallMethod) message; Object bean = proxyBeans.get(event.getBeanName()); Object[] params = event.getParams(); List<Class<?>> paraTypes = new ArrayList<Class<?>>(); Class<?>[] paramerTypes = new Class<?>[] {}; if (params != null) { for (Object param : params) { paraTypes.add(param.getClass()); } } Method method = bean.getClass().getMethod(event.getMethodName(), paraTypes.toArray(paramerTypes)); Object o = method.invoke(bean, params); getSender().tell(o, getSelf()); } } } 啟動Server public static void main(String[] args) { final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2551) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcServer]")) .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("EsbSystem", config); // Server 加入發布的服務 Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>(); beans.put(ExampleInterface.class, new ExampleInterfaceImpl()); system.actorOf(Props.create(RpcServer.class, beans), "rpcServer"); } Client端核心代碼 RpcClient類型集成了Thread,為了解決一個問題:因為AkkaSystem在加入集群中的時候是異步的,所以我們在第一次new RpcClient對象的時候需要等待加入集群成功以后,才可以執行下面的方法,不然獲取的 /user/rpcServer Route中沒有Server的Actor,請求會失敗。 public class RpcClient extends Thread { private ActorSystem system; private ActorRef rpc; private ActorRef clientServer; private static RpcClient instance = null; public RpcClient() { this.start(); final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2552) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcClient]")) .withFallback(ConfigFactory.load()); system = ActorSystem.create("EsbSystem", config); int totalInstances = 100; Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer"); boolean allowLocalRoutees = false; ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup( new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections.<String> emptyList()), new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, "RpcServer")); rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall"); clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc), "client"); Cluster.get(system).registerOnMemberUp(new Runnable() { //加入集群成功后的回調事件,恢復當前線程的中斷 @Override public void run() { synchronized (instance) { System.out.println("notify"); instance.notify(); } } }); } public static RpcClient getInstance() { if (instance == null) { instance = new RpcClient(); synchronized (instance) { try { //中斷當前線程,等待加入集群成功后,恢復 System.out.println("wait"); instance.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return instance; } public <T> T getBean(Class<T> clz) { Future<Object> future = Patterns.ask(clientServer, new RpcEvent.CallBean(clz.getName(), clientServer), new Timeout(Duration.create(5, TimeUnit.SECONDS))); try { Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); if (o != null) { ReturnBean returnBean = (ReturnBean) o; return (T) new RpcBeanProxy().proxy(returnBean.getObj(), clientServer, clz); } } catch (Exception e) { e.printStackTrace(); } return null; } } RpcClientServer public class RpcClientServer extends UntypedActor { private ActorRef rpc; public RpcClientServer(ActorRef rpc) { this.rpc = rpc; } @Override public void onReceive(Object message) throws Exception { if (message instanceof RpcEvent.CallBean) { //向Server發送CallBean請求 CallBean event = (CallBean) message; Future<Object> future = Patterns.ask(rpc, event, new Timeout( Duration.create(5, TimeUnit.SECONDS))); Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); getSender().tell(o, getSelf()); } else if (message instanceof RpcEvent.CallMethod) { //向Server發送方法調用請求 Future<Object> future = Patterns.ask(rpc, message, new Timeout( Duration.create(5, TimeUnit.SECONDS))); Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); getSender().tell(o, getSelf()); } } } RpcBeanProxy,客戶端的動態代理類 public class RpcBeanProxy implements InvocationHandler { private ActorRef rpcClientServer; private Class<?> clz; public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) { this.rpcClientServer = rpcClientServer; this.clz = clz; return Proxy.newProxyInstance(target.getClass().getClassLoader(), target.getClass().getInterfaces(), this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object result = null; RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod( method.getName(), args, clz.getName()); Future<Object> future = Patterns.ask(rpcClientServer, callMethod, new Timeout(Duration.create(5, TimeUnit.SECONDS))); Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); result = o; return result; } } Demo Interface,Client和Server都需要這個類,必須實現序列化 public interface ExampleInterface extends Serializable{ public String sayHello(String name); } 實現類,只需要Server端存在這個類。 public class ExampleInterfaceImpl implements ExampleInterface { @Override public String sayHello(String name) { System.out.println("Be Called !"); return "Hello " + name; } } Client調用 public static void main(String[] args) { RpcClient client = RpcClient.getInstance(); long start = System.currentTimeMillis(); ExampleInterface example = client.getBean(ExampleInterface.class); System.out.println(example.sayHello("rpc")); long time = System.currentTimeMillis() - start; System.out.println("time :" + time); } 這里第一次調用耗時比較長需要46毫秒,akka會對消息進行優化,調用多次以后時間為 1~2毫秒。
上述就是小編為大家分享的基于akka怎樣實現RPC了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。