亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink的rpc組件有哪些

發布時間:2021-12-31 14:30:27 來源:億速云 閱讀:165 作者:iii 欄目:大數據

本篇內容介紹了“Flink的rpc組件有哪些”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

Flink采用akka來實現rpc服務。其中有這幾個重要組件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。

Flink的rpc組件有哪些

這幾個組件作用如下:

(1)RpcEndpoint

提供具體rpc服務。主要實現有 ResourceManager 和 TaskExecutor,

①YarnResourceManager為AM容器中啟動的服務,持有ResourceManager和NodeManager的客戶端

②TaskExecutor為NM容器中啟動taskmanager的類

(2)AkkaRpcService

提供rpc的服務類。該類內部持有ActorSystem實例和Supervisor實例。Supervisor中含有SupervisorActor實例,SupervisorActor用于創建其他Actor,可以理解為根Actor。RpcEndpoint在構造時,通過AkkaRpcService的startServer()方法,獲取RpcServer實例。

	public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
		checkNotNull(rpcEndpoint, "rpc endpoint");

		final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
		final ActorRef actorRef = actorRegistration.getActorRef();
		final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();

		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

		final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
		final String hostname;
		Option<String> host = actorRef.path().address().host();
		if (host.isEmpty()) {
			hostname = "localhost";
		} else {
			hostname = host.get();
		}

		Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

		implementedRpcGateways.add(RpcServer.class);
		implementedRpcGateways.add(AkkaBasedEndpoint.class);

		final InvocationHandler akkaInvocationHandler;

		if (rpcEndpoint instanceof FencedRpcEndpoint) {
			// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
			akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
				captureAskCallstacks);

			implementedRpcGateways.add(FencedMainThreadExecutable.class);
		} else {
			akkaInvocationHandler = new AkkaInvocationHandler(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				captureAskCallstacks);
		}

		// Rather than using the System ClassLoader directly, we derive the ClassLoader
		// from this class . That works better in cases where Flink runs embedded and all Flink
		// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
		ClassLoader classLoader = getClass().getClassLoader();

		@SuppressWarnings("unchecked")
		RpcServer server = (RpcServer) Proxy.newProxyInstance(
			classLoader,
			implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
			akkaInvocationHandler);

		return server;
	}

先創建RpcEndpoint對應的ActorRef,然后創建RpcServer的代理類AkkaInvocationHandler或FencedAkkaInvocationHandler,并將ActorRef實例賦給其成員屬性 rpcEndpoint:ActorRef。這里的ActorRef即為AkkaRpcActor或FencedAkkaRpcActor實例

(3)RpcServer

用來啟動rpc服務,通常不直接調用,而是調用其動態代理類AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法

(4)AkkaInvocationHandler或FencedAkkaInvocationHandler

RpcServer的動態代理類。start()方法用來啟動服務:

	public void start() {
		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
	}

這里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor實例發送一條ControlMessages.START消息

(5)AkkaRpcActor

響應rpc消息的actor。其createReceive():

	public Receive createReceive() {
		return ReceiveBuilder.create()
			.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
			.match(ControlMessages.class, this::handleControlMessage)
			.matchAny(this::handleMessage)
			.build();
	}

當消息為ControlMessages.START,調用StoppedState 的start()方法

		public State start(AkkaRpcActor<?> akkaRpcActor) {
			akkaRpcActor.mainThreadValidator.enterMainThread();

			try {
				akkaRpcActor.rpcEndpoint.internalCallOnStart();
			} catch (Throwable throwable) {
				akkaRpcActor.stop(
					RpcEndpointTerminationResult.failure(
						new AkkaRpcException(
							String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
							throwable)));
			} finally {
				akkaRpcActor.mainThreadValidator.exitMainThread();
			}

			return StartedState.STARTED;
		}

在start()方法中調用具體提供服務的RpcEndpoint實現類internalCallOnStart()方法來啟動服務。internalCallOnStart()方法中會調用onStart()方法。

“Flink的rpc組件有哪些”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

吉木萨尔县| 台安县| 吉木乃县| 临颍县| 文山县| 五原县| 恩平市| 弥勒县| 南丹县| 祁连县| 北碚区| 二连浩特市| 淮安市| 巴林左旗| 唐山市| 上蔡县| 汉沽区| 高安市| 吉木萨尔县| 廉江市| 丽江市| 云和县| 延津县| 富蕴县| 四平市| 雷州市| 三门峡市| 平塘县| 洛隆县| 河池市| 浦北县| 丰顺县| 岳西县| 石台县| 阿鲁科尔沁旗| 万宁市| 五河县| 离岛区| 志丹县| 宜黄县| 银川市|