您好,登錄后才能下訂單哦!
版權聲明:本文為博主原創文章,未經博主允許不得轉載。
目錄(?)[+]
Transaction接口是基于flume的穩定性考慮的。所有主要的組件(sources、sinks、channels)都必須使用Flume Transaction。我們也可以理解Transaction接口就是flume的事務,sources和sinks的發送數據與接受數據都是在一個Transaction里完成的。
從上圖中可以看出,一個Transaction在Channel實現內實現。每一個連接到channel的source和sink都要獲取一個Transaction對象。這Sources實際上使用了一個ChannelSelector接口來封裝Transaction。存放事件到channel和從channel中提取事件的操作是在一個活躍的Transaction內執行的。
下面是官網例子
[java] view plain copy
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
上面的代碼是一個很簡單的Transaction示例,在自定義Source與自定義Sink中都要使用。
Sink提取event數據從channel中,然后直接將數據發送到下一個flume agent中或者存儲到外部庫中。
Sink和channel的關聯關系可以在配置文件中配置。有一個SinkRunner實例與每一個已配置的Sink關聯,當Flume框架調用SinkRunner.start()方法時候,將創建一個新的線程來驅動這Sink。
這個線程將管理這個Sink的生命周期。Sink需要實現LifecycleAware接口的start()和stop()方法。start()方法用于初始化數據;stop()用于釋放資源;process()是從channel中提取event數據和轉發數據的核心方法。
這Sink需要實現Configurable接口以便操作配置文件。
下面是官網例子:
[java] view plain copy
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
下面是測試例子:
[java] view plain copy
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class Custom_Sink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String out = new String(event.getBody());
// Send the Event to the external repository.
// storeSomeData(e);
System.out.println(out);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
上面的測試例子只輸出事件的BODY信息,這里說明下直接用代碼event.getBody().tostring() 輸出是亂碼。因為所有sink都是在Transaction里完成的,因此自定義開發sink是需要加上Transaction相關設置。
然后是測試配置,這里是自定義的jar 包是flumedev.Custom_Sink。注意,打包之后請放在目錄$FLUME_HOME/lib下
[html] view plain copy
#配置文件:custom_sink_case23.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = flumedev.Custom_Sink
#a1.sinks.k1.type =logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#敲命令
flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
echo "testcustom_sink" | nc 192.168.233.128 50000
#在啟動的終端查看console輸出
可以看到數據正常輸出。
Source從外面接收數據并把數據存入Channel中。很少有人用。
下面是官網的例子
[java] view plain copy
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e)
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
測試的話,主要針對Event e 這里進行傳輸數據,這里就不測試了。
官網說待定。
下面是美團網的自定義Channel 開發,下面是鏈接
http://tech.meituan.com/mt-log-system-optimization.html
……
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。我們希望利用兩者的優勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用MemoryChannel,當Sink處理速度跟不上,又需要Channel能夠緩存下應用端發送過來的日志時,就使用FileChannel,由此我們開發了DualChannel,能夠智能的在兩個Channel之間切換。
其具體的邏輯如下:
[java] view plain copy
/***
* putToMemChannel indicate put event to memChannel or fileChannel
* takeFromMemChannel indicate take event from memChannel or fileChannel
* */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
void doPut(Event event) {
if (switchon && putToMemChannel.get()) {
//往memChannel中寫數據
memTransaction.put(event);
if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中寫數據
fileTransaction.put(event);
}
}
Event doTake() {
Event event = null;
if ( takeFromMemChannel.get() ) {
//從memChannel中取數據
event = memTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//從fileChannel中取數據
event = fileTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
return event;
}
這里要說明下,官網是建議使用file channel,雖然它的效率比較低,但是它能保證數據完整性,而memory channel效率高,但是只能對數據丟失和重復不太敏感的業務使用
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。