亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實現Flune Client 開發

發布時間:2022-01-10 11:47:26 來源:億速云 閱讀:129 作者:柒染 欄目:開發技術

如何實現Flune Client 開發,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

由于在實際工作中,數據的生產方式極具多樣性,Flume 雖然包含了一些內置的機制來采集數據,但是更多的時候用戶更希望能將應用程序和flume直接相通。所以這邊運行用戶開發應用程序,通過IPC或者RPC連接flume并往flume發送數據。

一、RPC client interface

Flume的RpcClient實現了Flume的RPC機制。用戶的應用程序可以很簡單的調用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發送數據,不用擔心底層信息交換的細節。用戶可以提供所需的event通過直接實現Event接口,例如可以使用簡單的方便的實現SimpleEvent類或者使用EventBuilder的writeBody()靜態輔助方法。

自Flume 1.4.0起,Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient實現了RpcClient接口。實現中我們需要知道我們將要連接的目標flume agent的host和port用于創建client實例,然后使用RpcClient發送數據到flume agent。

官網給了一個Avro RPCclients的例子,這邊直接拿來做實際測試例子。

這里我們把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000);  與我們的主機對接

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6. import java.nio.charset.Charset;  

  7.    

  8. public class MyApp {  

  9.   public static voidmain(String[] args) {  

  10.    MyRpcClientFacade client = new MyRpcClientFacade();  

  11.    // Initializeclient with the remote Flume agent's host and port  

  12. //client.init("host.example.org",41414);  

  13. client.init("192.168.233.128",50000);  

  14.    

  15.    // Send 10events to the remote Flume agent. That agent should be  

  16.    // configured tolisten with an AvroSource.  

  17.    String sampleData = "Hello Flume!";  

  18.    for (int i =0; i < 10; i++) {  

  19.      client.sendDataToFlume(sampleData);  

  20.    }  

  21.    

  22.    client.cleanUp();  

  23.   }  

  24. }  

  25.    

  26. class MyRpcClientFacade {  

  27.   private RpcClient client;  

  28.   private String hostname;  

  29.   private int port;  

  30.    

  31.   public void init(String hostname, int port) {  

  32.    // Setup the RPCconnection  

  33.    this.hostname = hostname;  

  34.    this.port = port;  

  35.    this.client = RpcClientFactory.getDefaultInstance(hostname, port);  

  36.    // Use thefollowing method to create a thrift client (instead of the above line):  

  37.     // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  38.   }  

  39.    

  40.   public void sendDataToFlume(String data) {  

  41.    // Create aFlume Event object that encapsulates the sample data  

  42.    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  43.    

  44.    // Send theevent  

  45.    try {  

  46.      client.append(event);  

  47.    } catch (EventDeliveryException e) {  

  48.      // clean up andrecreate the client  

  49.      client.close();  

  50.      client = null;  

  51.      client = RpcClientFactory.getDefaultInstance(hostname, port);  

  52.      // Use thefollowing method to create a thrift client (instead of the above line):  

  53.      // this.client =RpcClientFactory.getThriftInstance(hostname, port);  

  54.    }  

  55.   }  

  56.    

  57.   public void cleanUp() {  

  58.    // Close the RPCconnection  

  59.    client.close();  

  60.   }  

  61.    

  62. }  

這邊代碼不解釋了,主要是將HelloFlume 發送10遍給flume,同時記得將flume 安裝主目錄下的lib 文件都添加進項目,才能正常運行程序。

下面是代理配置:

[html] view plain copy

  1. #配置文件:avro_client_case20.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = logger  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  

這里要注意下,之前說了,在接收端需要AvroSource或者Thrift Source來監聽接口。所以配置代理的時候要把a1.sources.r1.type 寫成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功后

在eclipse 里運行Java程序,當然也可以打包后在服務器上運行JAVA程序。

#在啟動源發送的代理終端查看console輸出

如何實現Flune Client 開發

可以看到10條數據正常發送。

這里要說明下,開發代碼中client.append(event)不僅僅可以發送一條數據,也可以發送一個List(string) 的數據信息,也就是批量發送。這邊就不做演示了。

二、Failover Client

這個類包封裝了Avro RPCclient的類默認提供故障處理能力。hosts采用空格分開host:port所代表的flume agent,構成一個故障處理組。這Failover RPC Client目前不支持thrift。如果當前選擇的host agent有問題,這個failover client會自動負載到組中下一個host中。

下面是官網開發例子:

[java] view plain copy

  1. // Setup properties for the failover  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_failover");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h2 h3 h4");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h2", host1);  

  13. props.put("hosts.h3", host2);  

  14. props.put("hosts.h4", host3);  

  15.   

  16. // create the client with failover properties  

  17. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測試的開發例子

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6.   

  7. import java.nio.charset.Charset;  

  8. import java.util.Properties;  

  9.   

  10. public class Failover_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade2 client = new MyRpcClientFacade2();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Hello Flume!";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade2 {  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.         // Setup the RPC connection  

  34.         // Use the following method to create a thrift client (instead of the above line):  

  35.         // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  36.      // Setup properties for the failover  

  37.         Properties props = new Properties();  

  38.         props.put("client.type""default_failover");  

  39.   

  40.         // List of hosts (space-separated list of user-chosen host aliases)  

  41.         props.put("hosts""h2 h3 h4");  

  42.   

  43.         // host/port pair for each host alias  

  44.         String host1 = "192.168.233.128:50000";  

  45.         String host2 = "192.168.233.128:50001";  

  46.         String host3 = "192.168.233.128:50002";  

  47.         props.put("hosts.h2", host1);  

  48.         props.put("hosts.h3", host2);  

  49.         props.put("hosts.h4", host3);  

  50.   

  51.         // create the client with failover properties  

  52.         client = RpcClientFactory.getInstance(props);  

  53.       }  

  54.   

  55.       public void sendDataToFlume(String data) {  

  56.         // Create a Flume Event object that encapsulates the sample data  

  57.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  58.   

  59.         // Send the event  

  60.         try {  

  61.           client.append(event);  

  62.         } catch (EventDeliveryException e) {  

  63.           // clean up and recreate the client  

  64.           client.close();  

  65.           client = null;  

  66.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  67.           // Use the following method to create a thrift client (instead of the above line):  

  68.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  69.         }  

  70.       }  

  71.   

  72.       public void cleanUp() {  

  73.         // Close the RPC connection  

  74.         client.close();  

  75.       }  

  76. }  

這邊代碼設三個host用于故障轉移,這里偷懶,用同一個主機的3個端口模擬。代碼還是將Hello Flume 發送10遍給第一個flume代理,當第一個代理故障的時候,則發送給第二個代理,以順序進行故障轉移。

下面是代理配置沿用之前的那個,并對配置文件進行拷貝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分別修改avro_client_case21.conf與avro_client_case22.conf中的

a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功后

在eclipse 里運行JAVA程序Failover_Client.java,當然也可以打包后在服務器上運行JAVA程序。

#在啟動源發送的3個代理終端查看console輸出

我們可以看到第一個代理終端收到了,數據而其他2個終端沒有數據。

如何實現Flune Client 開發

然后我們把第一個終端的進程關掉,再運行一遍client程序,然后會發現這個時候是發生到第二個終端中。當第二個終端也關閉的時候,再發送數據,則是發送到最后一個終端。這里我們可以看到,故障轉移的代理主機轉移是采用順序序列的。

三、LoadBalancing RPC client

Flume Client SDK也支持在多個host之間使用負載均衡的Rpc Client。這種類型的client帶有一個通過空格分隔的host:port主機列表并構成了一個負載均衡組。這個client可以指定一個負載均衡的策略,既可以隨機的選擇一個配置的host,也可以循環選擇一個host。當然你也可以自己編寫一個類實現LoadBalancingRpcClient$HostSelector接口以至于用戶可以使用自己編寫的選擇順序。在這種情況下,用戶自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當前不支持thrift。

如果開啟了backoff,那么client失敗將被放入黑名單中,只有過了被指定的超時之間之后這個被選擇的失敗的主機才會從黑名單中被排除。當超時到了,如果主機還是沒有反應,那么這被認為是一個連續的失敗并且超時時間會成倍的增長,以避免可能陷入對反應遲鈍主機的長時間等待中。

這backoff的最大超時時間可以通過maxBackoff屬性來配置,單位是毫秒。在默認情況下maxBackoff的值是30秒(在orderSelector類里面指定)。

下面是官網例子

[java] view plain copy

  1. // Setup properties for the load balancing  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_loadbalance");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h2 h3 h4");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h2", host1);  

  13. props.put("hosts.h3", host2);  

  14. props.put("hosts.h4", host3);  

  15.   

  16. props.put("host-selector""random"); // For random host selection  

  17. // props.put("host-selector", "round_robin"); // For round-robin host  

  18. //                                            // selection  

  19. props.put("backoff""true"); // Disabled by default.  

  20.   

  21. props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  22.                                   // becomes 30000 ms  

  23.   

  24. // Create the client with load balancing properties  

  25. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測試的開發例子

[java] view plain copy

  1. import java.nio.charset.Charset;  

  2.   

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.api.RpcClient;  

  6. import org.apache.flume.api.RpcClientFactory;  

  7. import org.apache.flume.event.EventBuilder;  

  8. import java.util.Properties;  

  9.   

  10. public class Load_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade3 client = new MyRpcClientFacade3();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Flume Load_Client";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade3{  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.           Properties props = new Properties();  

  34.           props.put("client.type""default_loadbalance");  

  35.   

  36.           // List of hosts (space-separated list of user-chosen host aliases)  

  37.           props.put("hosts""h2 h3 h4");  

  38.   

  39.           // host/port pair for each host alias  

  40.           String host1 = "192.168.233.128:50000";  

  41.           String host2 = "192.168.233.128:50001";  

  42.           String host3 = "192.168.233.128:50002";  

  43.           props.put("hosts.h2", host1);  

  44.           props.put("hosts.h3", host2);  

  45.           props.put("hosts.h4", host3);  

  46.   

  47.           props.put("host-selector""random"); // For random host selection  

  48.           // props.put("host-selector", "round_robin"); // For round-robin host  

  49. //                                                    // selection  

  50.           props.put("backoff""true"); // Disabled by default.  

  51.   

  52.           props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  53.                                             // becomes 30000 ms  

  54.   

  55.           // Create the client with load balancing properties  

  56.           client = RpcClientFactory.getInstance(props);  

  57.       }  

  58.   

  59.       public void sendDataToFlume(String data) {  

  60.         // Create a Flume Event object that encapsulates the sample data  

  61.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  62.   

  63.         // Send the event  

  64.         try {  

  65.           client.append(event);  

  66.         } catch (EventDeliveryException e) {  

  67.           // clean up and recreate the client  

  68.           client.close();  

  69.           client = null;  

  70.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  71.           // Use the following method to create a thrift client (instead of the above line):  

  72.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  73.         }  

  74.       }  

  75.   

  76.       public void cleanUp() {  

  77.         // Close the RPC connection  

  78.         client.close();  

  79.       }  

  80. }  

這里采用隨機的負載均衡props.put("host-selector","random") 。測試的時候沿用之前的3個接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并將他們起起來。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功后

在eclipse 里運行JAVA程序Failover_Client.java,當然也可以打包后在服務器上運行JAVA程序。

#在啟動源發送的3個代理終端查看console輸出

下面是Host1,收到了2條數據

如何實現Flune Client 開發

下面是Host2,收到了2條數據

如何實現Flune Client 開發

下面是Host3,收到了6條數據。

如何實現Flune Client 開發

可以看到我們開發例子中,host-selector選擇的是隨機,因此程序也是隨機發送數據。下面我們測試輪詢round_robin選項。

程序里我們修改這句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再運行Java 程序

下面是Host1,收到了4條數據

如何實現Flune Client 開發

下面是Host2,收到了3條數據

如何實現Flune Client 開發

同樣Host3,收到了3條數據,這邊就不放圖了。輪詢就是按照順序放圖。

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

故城县| 和硕县| 龙游县| 开远市| 泸州市| 西充县| 大安市| 利川市| 图木舒克市| 扎赉特旗| 游戏| 信丰县| 普安县| 西宁市| 黎川县| 宝兴县| 宁蒗| 武强县| 城口县| 砚山县| 循化| 莱州市| 云安县| 商洛市| 敦煌市| 霍林郭勒市| 盱眙县| 兴隆县| 定兴县| 巫溪县| 长沙市| 张北县| 福贡县| 繁昌县| 晋城| 南投县| 广汉市| 文成县| 富川| 潮州市| 嘉义市|