亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于akka怎樣實現RPC

發布時間:2021-11-15 23:45:21 來源:億速云 閱讀:230 作者:柒染 欄目:云計算

這期內容當中小編將會給大家帶來有關基于akka怎樣實現RPC,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

目前的工作在基于akka實現數據服務總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構。里面的一塊功能就是RPC(遠程過程調用),這篇文章將會介紹一種實現方式。
akka rpc java
目錄[-]
akka-rpc(基于akka的rpc的實現)
RPC
實現原理
Server端核心代碼
Client端核心代碼 
Demo
akka-rpc(基于akka的rpc的實現)

代碼:http://git.oschina.net/for-1988/Simples

目前的工作在基于akka(java)實現數據服務總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡單的寫出一個大型的分布式集群的架構。里面的一塊功能就是RPC(遠程過程調用)。

RPC

遠程過程調用(Remote Procedure Call,RPC)是一個計算機通信協議。該協議允許運行于一臺計算機的程序調用另一臺計算機的子程序,而程序員無需額外地為這個交互作用編程。如果涉及的軟件采用面向對象編程,那么遠程過程調用亦可稱作遠程調用或遠程方法調用,例:Java RMI。

實現原理

整個RPC的調用過程完全基于akka來傳遞對象,因為需要進行網絡通信,所以我們的接口實現類、調用參數以及返回值都需要實現java序列化接口。客戶端跟服務端其實都是在一個Akka 集群關系中,Client跟Server都是集群中的一個節點。首先Client需要初始化RpcClient對象,在初始化的過程中,我們啟動了AkkaSystem,加入到整個集群中,并創建了負責與Server進行通信的Actor。然后通過RpcClient中的getBean(Class<T> clz)方法獲取Server端的接口實現類的實例對象,然后通過動態代理攔截這個對象的所有方法。最后,在執行方法的時候,在RpcBeanProxy中向Server發送CallMethod事件,執行遠程實現類的方法,獲取返回值給Client。

Server端核心代碼

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

    public RpcServer(Map<Class<?>, Object> beans) {
        proxyBeans = new HashMap<String, Object>();
        for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
                .hasNext();) {
            Class<?> inface = iterator.next();
            proxyBeans.put(inface.getName(), beans.get(inface));
        }
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口實現類的實例
            CallBean event = (CallBean) message;
            ReturnBean bean = new ReturnBean(
                    proxyBeans.get(event.getBeanName()), getSelf());
            getSender().tell(bean, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {
            CallMethod event = (CallMethod) message;
            Object bean = proxyBeans.get(event.getBeanName());
            Object[] params = event.getParams();
            List<Class<?>> paraTypes = new ArrayList<Class<?>>();
            Class<?>[] paramerTypes = new Class<?>[] {};
            if (params != null) {
                for (Object param : params) {
                    paraTypes.add(param.getClass());
                }
            }
            Method method = bean.getClass().getMethod(event.getMethodName(),
                    paraTypes.toArray(paramerTypes));
            Object o = method.invoke(bean, params);
            getSender().tell(o, getSelf());
        }
    }

}
啟動Server

public static void main(String[] args) {
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2551)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcServer]"))
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("EsbSystem", config);
        
        // Server 加入發布的服務
        Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
        beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
        system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
    }
Client端核心代碼 

RpcClient類型集成了Thread,為了解決一個問題:因為AkkaSystem在加入集群中的時候是異步的,所以我們在第一次new RpcClient對象的時候需要等待加入集群成功以后,才可以執行下面的方法,不然獲取的 /user/rpcServer Route中沒有Server的Actor,請求會失敗。

public class RpcClient extends Thread {

    private ActorSystem system;

    private ActorRef rpc;

    private ActorRef clientServer;

    private static RpcClient instance = null;

    public RpcClient() {
        this.start();
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2552)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcClient]"))
                .withFallback(ConfigFactory.load());
        system = ActorSystem.create("EsbSystem", config);

        int totalInstances = 100;
        Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
        boolean allowLocalRoutees = false;
        ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
                new AdaptiveLoadBalancingGroup(
                        HeapMetricsSelector.getInstance(),
                        Collections.<String> emptyList()),
                new ClusterRouterGroupSettings(totalInstances, routeesPaths,
                        allowLocalRoutees, "RpcServer"));
        rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
        clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
                "client");
        Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回調事件,恢復當前線程的中斷
            @Override
            public void run() {
                synchronized (instance) {
                    System.out.println("notify");
                    instance.notify();
                }
            }
        });

    }

    public static RpcClient getInstance() {
        if (instance == null) {
            instance = new RpcClient();
            synchronized (instance) {
                try {   //中斷當前線程,等待加入集群成功后,恢復
                    System.out.println("wait");
                    instance.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return instance;
    }

    public <T> T getBean(Class<T> clz) {
        Future<Object> future = Patterns.ask(clientServer,
                new RpcEvent.CallBean(clz.getName(), clientServer),
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        try {
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            if (o != null) {
                ReturnBean returnBean = (ReturnBean) o;
                return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
                        clientServer, clz);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
RpcClientServer

public class RpcClientServer extends UntypedActor {

    private ActorRef rpc;

    public RpcClientServer(ActorRef rpc) {
        this.rpc = rpc;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {  //向Server發送CallBean請求
            CallBean event = (CallBean) message;
            Future<Object> future = Patterns.ask(rpc, event, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {  //向Server發送方法調用請求
            Future<Object> future = Patterns.ask(rpc, message, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        }
    }
}
RpcBeanProxy,客戶端的動態代理類

public class RpcBeanProxy implements InvocationHandler {

    private ActorRef rpcClientServer;

    private Class<?> clz;

    public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
        this.rpcClientServer = rpcClientServer;
        this.clz = clz;
        return Proxy.newProxyInstance(target.getClass().getClassLoader(),
                target.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        Object result = null;
        RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
                method.getName(), args, clz.getName());
        Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
        result = o;
        return result;
    }

}
Demo

Interface,Client和Server都需要這個類,必須實現序列化

public interface ExampleInterface extends Serializable{
    public String sayHello(String name);
}
實現類,只需要Server端存在這個類。

public class ExampleInterfaceImpl implements ExampleInterface {
    @Override
    public String sayHello(String name) {
        System.out.println("Be Called !");
        return "Hello " + name;
    }
}
Client調用

public static void main(String[] args) {
        RpcClient client = RpcClient.getInstance();
        long start = System.currentTimeMillis();
        
        ExampleInterface example = client.getBean(ExampleInterface.class);
        System.out.println(example.sayHello("rpc"));
        
        long time = System.currentTimeMillis() - start;
        System.out.println("time :" + time);
    }
 


這里第一次調用耗時比較長需要46毫秒,akka會對消息進行優化,調用多次以后時間為 1~2毫秒。

上述就是小編為大家分享的基于akka怎樣實現RPC了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武宣县| 拉萨市| 耒阳市| 武宁县| 商水县| 江山市| 东安县| 万州区| 金寨县| 南涧| 潼南县| 宝清县| 南宫市| 谢通门县| 蓬溪县| 确山县| 蒲城县| 连山| 赣榆县| 九龙坡区| 汝州市| 广河县| 朔州市| 闻喜县| 五原县| 江源县| 清水河县| 上高县| 稷山县| 宜兰县| 天镇县| 乳源| 泾阳县| 惠来县| 灵川县| 怀远县| 顺平县| 深圳市| 开封县| 南投县| 武义县|