您好,登錄后才能下訂單哦!
模擬編寫了一個Flume 1.7中TAILDIR的功能實現,通過手動控制文件的讀取位置來達到對文件的讀寫,防止flume掛了之后重復消費的情況。
以下是代碼實現,僅做參考,生產上直接用TAILDIR讀取文件內容即可,若要讀取一個目錄下的子目錄,可使用github上以實現的這個項目包:https://github.com/qwurey/flume-source-taildir-recursive
package com.fwmagic.flume.source;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Description:自定義Source 1、讀取指定目錄下的文件,如nginx的access.log
* 2、讀取文件前先判斷offset文件是否存在,不存在則創建它
* 3、每次讀取完都寫一個offset文件記錄讀取到文件的什么位置,防止重啟flume時發生重復消費的情況
* 4、如何自定義?參考ExecSource
* <p>
* (1):獲取自定義配置文件屬性
* (2):創建線程池,用channelProcessor發送數據給channel
* (3):線程池提交(啟動任務)
* 任務內容:
* (1):讀取偏移量文件,沒有則創建,有則獲取偏移量,將讀取的指針重置到指定偏移量
* (2):讀取指定的日志文件,將讀取的一行內容打包成Event,用Channel發送Event
* (3):獲取讀取內容后的偏移量,重置偏移量
* (4):stop方法調用,關閉線程池,調用super.stop方法。
* @Date:Create in 2018/8/19
*/
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {
/*監聽的文件*/
private String filePath;
/*記錄讀取偏移量的文件*/
private String posiFile;
/*若讀取文件暫無內容,則等待數秒*/
private Long interval;
/*讀寫文件的字符集*/
private String charset;
/*讀取文件內容的線程*/
private FileRunner fileRunner;
/*線程池*/
private ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);
/**
* 初始化配置文件內容
*
* @param context
*/
@Override
public void configure(Context context) {
filePath = context.getString("filePath");
posiFile = context.getString("posiFile");
interval = context.getLong("interval", 2000L);
charset = context.getString("charset", "UTF-8");
}
@Override
public synchronized void start() {
//啟動一個線程,用于監聽對應的日志文件
//創建一個線程池
executor = Executors.newSingleThreadExecutor();
//用channelProcessor發送數據給channel
ChannelProcessor channelProcessor = super.getChannelProcessor();
fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor);
executor.submit(fileRunner);
super.start();
}
@Override
public synchronized void stop() {
fileRunner.setFlag(Boolean.FALSE);
while (!executor.isTerminated()) {
logger.debug("waiting for exec executor service to stop");
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
logger.debug("Interrupted while waiting for executor service to stop,Just exiting.");
Thread.currentThread().interrupt();
}
}
super.stop();
}
public static class FileRunner implements Runnable {
private Long interval;
private String charset;
private Long offset = 0L;
private File pFile;
private RandomAccessFile raf;
private ChannelProcessor channelProcessor;
private Boolean flag = Boolean.TRUE;
public void setFlag(Boolean flag) {
this.flag = flag;
}
public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {
this.interval = interval;
this.charset = charset;
this.channelProcessor = channelProcessor;
//1、判斷是否有偏移量文件,有則讀取偏移量,沒有則創建
pFile = new File(posiFile);
if (!pFile.exists()) {
try {
pFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
logger.error("create position file error!", e);
}
}
//2、判斷偏移量中的文件內容是否大于0
try {
String offsetStr = FileUtils.readFileToString(pFile, this.charset);
// 3、如果偏移量文件中有記錄,則將內容轉換為Long
if (StringUtils.isNotBlank(offsetStr)) {
offset = Long.parseLong(offsetStr);
}
// 4、如果有偏移量,則直接跳到文件的偏移量位置
raf = new RandomAccessFile(filePath, "r");
// 跳到指定的位置
raf.seek(offset);
} catch (IOException e) {
e.printStackTrace();
logger.error("read position file error!", e);
}
}
@Override
public void run() {
//監聽文件
while (flag) {
// 讀取文件中的內容
String line = null;
try {
line = raf.readLine();
if (StringUtils.isNotBlank(line)) {
// 把數據打包成Event,發送到Channel
line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
Event event = EventBuilder.withBody(line.getBytes());
channelProcessor.processEvent(event);
//更新偏移量文件,把偏移量寫入文件
offset = raf.getFilePointer();
FileUtils.writeStringToFile(pFile, offset.toString());
} else {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("thread sleep error", e);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。