您好,登錄后才能下訂單哦!
首先、這節水的東西就比較少了,大部分是例子。
Avro端口監聽并接收來自外部的Avro客戶流的事件。當內置Avro 去Sinks另一個配對Flume代理,它就可以創建分層采集的拓撲結構。官網說的比較繞,當然我的翻譯也很弱,其實就是flume可以多級代理,然后代理與代理之間用Avro去連接
下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | FALSE | Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. |
ipFilter | FALSE | Set this to true to enable ipFiltering for netty |
ipFilter.rules | – | Define N netty ipFilter pattern rules with this config. |
官網的例子就不放了,這邊用實際例子顯示。
[html] view plain copy
#配置文件avro_case2.conf 其實和第二節的pull.conf 一模一樣
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.port = 55555
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console
成功與否就不說明,與第二節的pull.conf 同。。。
#然后在另一個終端進行測試
flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log
這個就是模擬第二節push代理費pull代理發數據,這里不寫配置直接命令方式測試。
發送事件成功,這里和push代理不一樣的是沒有用spool,所以日志文件名不會被改名稱。
看接受終端顯示
ok數據發送成功。
ThriftSource 與Avro Source 基本一致。只要把source的類型改成thrift即可,例如a1.sources.r1.type = thrift
比較簡單,不做贅述。
ExecSource的配置就是設定一個Unix(Linux)命令,然后通過這個命令不斷輸出數據。如果進程退出,Exec Source也一起退出,不會產生進一步的數據。
下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | FALSE | Whether the executed cmd should be restarted if it dies |
logStdErr | FALSE | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
下面是實際例子顯示。
[html] view plain copy
#配置文件exec_case3.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/logs/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這里我們用tail –F命令去一直都日志的尾部。
#敲命令
flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console
這邊會顯示讀取日志的所有數據
上圖是日志,這邊我們繼續往日志里添加數據
echo"looklook5" >> test.log ,會發現終端也在輸出數據。
官網說:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.
簡單說的,官網JMSsource 就測試了ActiveMQ,其他的還沒有。下面是官網的例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=jms
a1.sources.r1.channels=c1
a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory=GenericConnectionFactory
a1.sources.r1.providerURL=tcp://mqserver:61616
a1.sources.r1.destinationName=BUSINESS_DATA
a1.sources.r1.destinationType=QUEUE
下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory shoulld appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
介于這個源目前還不成熟,那我們等他成熟了再來研究吧,這里偷點懶。
Spooling Directory Source在第二節的時候已經講過,這里復述一下:監測配置的目錄下新增的文件,并將文件中的數據讀取出來。其中,Spool Source有2個注意地方,第一個是拷貝到spool目錄下的文件不可以再打開編輯,第二個是spool目錄下不可包含相應的子目錄。這個主要用途作為對日志的準實時監控。
下面是官網給出的source的配置,加粗的參數是必選。可選項太多,這邊就加一個fileSuffix,即文件讀取后添加的后綴名,這個是可以更改。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
下面給出例子,這個與第二節的push.conf 相似
[html] view plain copy
#配置文件:spool_case4.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =spooldir
a1.sources.r1.spoolDir =/tmp/logs
a1.sources.r1.fileHeader= true
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里我們監控日志目錄/tmp/logs
#敲命令
flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console
終端將數據都顯示出來了。我們查看監控日志目錄/tmp/logs
被讀取的文件已經被加上后綴名,表示已經完成讀取。
Netcat source 在某一端口上進行偵聽,它將每一行文字變成一個事件源,也就是數據是基于換行符分隔。它的工作就像命令nc -k -l [host] [port] 換句話說,它打開一個指定端口,偵聽數據將每一行文字變成Flume事件,并通過連接通道發送。
下面是官網給出的source的配置,加粗的參數是必選
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | TRUE | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
實際例子話,第二節的第一個例子就是Netcat source,這里不演示了。
一個簡單的序列發生器,不斷產成與事件計數器0和1的增量開始。主要用于測試(官網說),這里也不做贅述。
讀取syslog數據,并生成Flume 事件。 這個Source分成三類SyslogTCP Source、
Multiport Syslog TCP Source(多端口)與SyslogUDP Source。其中TCP Source為每一個用回車(\ n)來分隔的字符串創建一個新的事件。而UDP Source將整個消息作為一個單一的事件。
這個是最初的Syslog Sources
下面是官網給出的source的配置,加粗的參數是必選,這里可選我省略了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
官網案例
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogtcp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是實際的例子
[html] view plain copy
#配置文件:syslog_case5.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =syslogtcp
a1.sources.r1.port =50000
a1.sources.r1.host =192.168.233.128
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里我們設置的偵聽端口為192.168.233.128 50000
#敲命令
flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
echo "hellolooklook5" | nc 192.168.233.128 50000
然后看之前的終端,將會有如下顯示:
數據已經發送過來了。
這是一個更新,更快,支持多端口版本的SyslogTCP Source。他不僅僅監控一個端口,還可以監控多個端口。官網配置基本差不多,就是可選配置比較多
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
這里說明下需要注意的是這里ports設置已經取代tcp 的port,這個千萬注意。還有portHeader這個可以與后面的interceptors 與 channel selectors自定義邏輯路由使用。
下面是官網例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=multiport_syslogtcp
a1.sources.r1.channels=c1
a1.sources.r1.host=0.0.0.0
a1.sources.r1.ports=10001 10002 10003
a1.sources.r1.portHeader=port
下面是實際例子
[html] view plain copy
#配置文件:syslog_case6.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.ports = 50000 60000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這里我們偵探192.168.233.128的2個端口50000與60000
#敲命令
flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
echo "hellolooklook5" | nc 192.168.233.128 50000
echo "hello looklook6"| nc 192.168.233.128 60000
然后看之前的終端,將會有如下顯示:
2個端口的數據已經發送過來了。
關于這個官網都懶的介紹了,其實就是與TCP不同的協議而已。
官網配置與TCP一致,就不說了。下面是官網例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogudp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是實際例子
[html] view plain copy
#配置文件:syslog_case7.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
echo "hellolooklook5" | nc –u 192.168.233.128 50000
#在啟動的終端查看console輸出
Ok,數據已經發送過來了
八、HTTP Source
HTTP Source是HTTP POST和GET來發送事件數據的,官網說GET應只用于實驗。Flume 事件使用一個可插拔的“handler”程序來實現轉換,它必須實現的HTTPSourceHandler接口。此處理程序需要一個HttpServletRequest和返回一個flume 事件列表。
所有在一個POST請求發送的事件被認為是在一個事務里,一個批量插入flume 通道的行為。
下面是官網給出的source的配置,加粗的參數是必選
Property Name | Default | Description |
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
官網例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props
下面是實際用例:
[html] view plain copy
#配置文件:http_case8.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= http
a1.sources.r1.port= 50000
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
#我們用生成JSON 格式的POSTrequest發數據
curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000
#在啟動的終端查看console輸出
這里headers與body都正常輸出。
九、Twitter 1%firehose Source(實驗的)
官網警告,慎用,說不定下個版本就木有了
這個實驗source 是通過時搜索服務,從Twitter的1%樣本信息中獲取事件數據。需要Twitter開發者賬號。好吧,對于400網站,我們無可奈何用不到,就不多解釋了。
十、自定義Source
一個自定義 Source其實是對Source接口的實現。當我們開始flume代理的時候必須將自定義 Source和相依賴的jar包放到代理的classpath下面。自定義 Source的type就是我們實現Source接口對應的類全路徑。
這里后面的內容里會詳細介紹,這里不做贅述。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。