您好,登錄后才能下訂單哦!
如何實現Flune Client 開發,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
由于在實際工作中,數據的生產方式極具多樣性,Flume 雖然包含了一些內置的機制來采集數據,但是更多的時候用戶更希望能將應用程序和flume直接相通。所以這邊運行用戶開發應用程序,通過IPC或者RPC連接flume并往flume發送數據。
Flume的RpcClient實現了Flume的RPC機制。用戶的應用程序可以很簡單的調用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發送數據,不用擔心底層信息交換的細節。用戶可以提供所需的event通過直接實現Event接口,例如可以使用簡單的方便的實現SimpleEvent類或者使用EventBuilder的writeBody()靜態輔助方法。
自Flume 1.4.0起,Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient實現了RpcClient接口。實現中我們需要知道我們將要連接的目標flume agent的host和port用于創建client實例,然后使用RpcClient發送數據到flume agent。
官網給了一個Avro RPCclients的例子,這邊直接拿來做實際測試例子。
這里我們把client.init("host.example.org",41414);
改成 client.init("192.168.233.128",50000); 與我們的主機對接
[java] view plain copy
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static voidmain(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initializeclient with the remote Flume agent's host and port
//client.init("host.example.org",41414);
client.init("192.168.233.128",50000);
// Send 10events to the remote Flume agent. That agent should be
// configured tolisten with an AvroSource.
String sampleData = "Hello Flume!";
for (int i =0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPCconnection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create aFlume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send theevent
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up andrecreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPCconnection
client.close();
}
}
這邊代碼不解釋了,主要是將HelloFlume 發送10遍給flume,同時記得將flume 安裝主目錄下的lib 文件都添加進項目,才能正常運行程序。
下面是代理配置:
[html] view plain copy
#配置文件:avro_client_case20.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里要注意下,之前說了,在接收端需要AvroSource或者Thrift Source來監聽接口。所以配置代理的時候要把a1.sources.r1.type 寫成avro或者thrift
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
啟動成功后
在eclipse 里運行Java程序,當然也可以打包后在服務器上運行JAVA程序。
#在啟動源發送的代理終端查看console輸出
可以看到10條數據正常發送。
這里要說明下,開發代碼中client.append(event)不僅僅可以發送一條數據,也可以發送一個List(string) 的數據信息,也就是批量發送。這邊就不做演示了。
這個類包封裝了Avro RPCclient的類默認提供故障處理能力。hosts采用空格分開host:port所代表的flume agent,構成一個故障處理組。這Failover RPC Client目前不支持thrift。如果當前選擇的host agent有問題,這個failover client會自動負載到組中下一個host中。
下面是官網開發例子:
[java] view plain copy
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測試的開發例子
[java] view plain copy
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
import java.util.Properties;
public class Failover_Client {
public static void main(String[] args) {
MyRpcClientFacade2 client = new MyRpcClientFacade2();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade2 {
private RpcClient client;
private String hostname;
private int port;
public void init() {
// Setup the RPC connection
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
// create the client with failover properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這邊代碼設三個host用于故障轉移,這里偷懶,用同一個主機的3個端口模擬。代碼還是將Hello Flume 發送10遍給第一個flume代理,當第一個代理故障的時候,則發送給第二個代理,以順序進行故障轉移。
下面是代理配置沿用之前的那個,并對配置文件進行拷貝,
cp avro_client_case20.conf avro_client_case21.conf
cp avro_client_case20.conf avro_client_case22.conf
分別修改avro_client_case21.conf與avro_client_case22.conf中的
a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啟動成功后
在eclipse 里運行JAVA程序Failover_Client.java,當然也可以打包后在服務器上運行JAVA程序。
#在啟動源發送的3個代理終端查看console輸出
我們可以看到第一個代理終端收到了,數據而其他2個終端沒有數據。
然后我們把第一個終端的進程關掉,再運行一遍client程序,然后會發現這個時候是發生到第二個終端中。當第二個終端也關閉的時候,再發送數據,則是發送到最后一個終端。這里我們可以看到,故障轉移的代理主機轉移是采用順序序列的。
Flume Client SDK也支持在多個host之間使用負載均衡的Rpc Client。這種類型的client帶有一個通過空格分隔的host:port主機列表并構成了一個負載均衡組。這個client可以指定一個負載均衡的策略,既可以隨機的選擇一個配置的host,也可以循環選擇一個host。當然你也可以自己編寫一個類實現LoadBalancingRpcClient$HostSelector接口以至于用戶可以使用自己編寫的選擇順序。在這種情況下,用戶自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當前不支持thrift。
如果開啟了backoff,那么client失敗將被放入黑名單中,只有過了被指定的超時之間之后這個被選擇的失敗的主機才會從黑名單中被排除。當超時到了,如果主機還是沒有反應,那么這被認為是一個連續的失敗并且超時時間會成倍的增長,以避免可能陷入對反應遲鈍主機的長時間等待中。
這backoff的最大超時時間可以通過maxBackoff屬性來配置,單位是毫秒。在默認情況下maxBackoff的值是30秒(在orderSelector類里面指定)。
下面是官網例子
[java] view plain copy
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測試的開發例子
[java] view plain copy
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.util.Properties;
public class Load_Client {
public static void main(String[] args) {
MyRpcClientFacade3 client = new MyRpcClientFacade3();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Flume Load_Client";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade3{
private RpcClient client;
private String hostname;
private int port;
public void init() {
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這里采用隨機的負載均衡props.put("host-selector","random") 。測試的時候沿用之前的3個接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并將他們起起來。
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啟動成功后
在eclipse 里運行JAVA程序Failover_Client.java,當然也可以打包后在服務器上運行JAVA程序。
#在啟動源發送的3個代理終端查看console輸出
下面是Host1,收到了2條數據
下面是Host2,收到了2條數據
下面是Host3,收到了6條數據。
可以看到我們開發例子中,host-selector選擇的是隨機,因此程序也是隨機發送數據。下面我們測試輪詢round_robin選項。
程序里我們修改這句
//props.put("host-selector","random"); // For random host selection
props.put("host-selector", "round_robin");// Forround-robin host
再運行Java 程序
下面是Host1,收到了4條數據
下面是Host2,收到了3條數據
同樣Host3,收到了3條數據,這邊就不放圖了。輪詢就是按照順序放圖。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。