亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flume的介紹和簡單操作

發布時間:2020-07-26 23:26:05 來源:網絡 閱讀:210 作者:KeepInUp 欄目:大數據

Flume是什么

Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。

Flume的功能

  • 支持在日志系統中定制各類數據發送方,用于收集數據
  • 提供對數據簡單處理,并寫到各類數據接收方(可定制)的能力

    Flume的組成

  • Agent:核心組件
    • source 負責數據的產生或搜集
    • channel 是一種短暫的存儲容器,負責數據的存儲持久化
    • sink 負責數據的轉發

      Flume的工作流示意圖

  • 數據流模型
    Flume的介紹和簡單操作
  • 多Agent模型
    Flume的介紹和簡單操作
  • 合并模型
    Flume的介紹和簡單操作
  • 混合模型
    Flume的介紹和簡單操作

    Flume的安裝

    下載安裝包并解壓

    wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    tar -zxvf apache-flume-1.8.0-bin.tar.gz

    配置環境變量

vim ~/.bashrc

export FLUME_HOME=/usr/local/src/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin

source ~/.bashrc

Flume簡單操作

  • netcat模式
    進入conf目錄下編寫netcat.conf文件,內容如下:
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = netcat
agent.sources.netcatSource.bind = localhost
agent.sources.netcatSource.port = 11111
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

啟動一個實例

(py27) [root@master conf]# pwd
/usr/local/src/apache-flume-1.8.0-bin/conf
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console

啟動成功

18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels
18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat
18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]
18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel
18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink
18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource
18/10/24 11:26:36 INFO source.NetcatSource: Source starting
18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]

然后新開一個終端,發送數據

(py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
OK

查看接收數據

18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D                                           1. }

注:如果沒有telnet工具,請先安裝:yum install telnet

  • Exec模式
    編寫配置文件exec.conf
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = exec 
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

啟動實例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console

啟動成功后,創建配置文件中的exec.log文件

(py27) [root@master test_data]# ls
exec.log
(py27) [root@master test_data]# pwd
/home/master/FlumeTest/test_data
(py27) [root@master test_data]#

然后通過echo命令模擬日志的產生

(py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log

查看接收的日志

18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21       Hello World!!! }

如何將日志保存到HDFS上
修改配置文件

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = exec 
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = hdfs 
agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/
agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_
agent.sinks.loggerSink.hdfs.round = true
agent.sinks.loggerSink.hdfs.roundValue = 1
agent.sinks.loggerSink.hdfs.roundUnit = minute
agent.sinks.loggerSink.hdfs.rollInterval = 3
agent.sinks.loggerSink.hdfs.rollSize = 20
agent.sinks.loggerSink.hdfs.rollCount = 5
agent.sinks.loggerSink.hdfs.useLocalTimeStamp = true
agent.sinks.loggerSink.hdfs.fileType = DataStream
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10

然后啟動實例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console

然后可以看到它把exec.log文件里的日志給寫到了HDFS上

18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.1540475666623
18/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.

我們進入HDFS查看,可以看到log里的內容

(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954
Found 1 items
-rw-r--r--   3 root supergroup         15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623
Hello World!!!

然后我們再次寫入寫的log,然后再查看

//寫入新的log
(py27) [root@master test_data]# echo 'test001' >> exec.log               
(py27) [root@master test_data]# echo 'test002' >> exec.log
//進入HDFS目錄查看
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25
Found 2 items
drwxr-xr-x   - root supergroup          0 2018-10-25 09:54 /flume/18-10-25/0954
drwxr-xr-x   - root supergroup          0 2018-10-25 09:56 /flume/18-10-25/0956
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956
Found 1 items
-rw-r--r--   3 root supergroup         16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338
test001
test002
  • 故障轉移實例
    首先需要三臺機器,master、slave1、slave2,然后分別配置實例并啟動,master上的agent實例發送日志,slave1和slave2接收日志
    master配置
agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink1 loggerSink2

agent.sinkgroups = group

agent.sources.netcatSource.type = exec
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink1.type = avro
agent.sinks.loggerSink1.hostname = slave1
agent.sinks.loggerSink1.port = 52020
agent.sinks.loggerSink1.channel = memoryChannel

agent.sinks.loggerSink2.type = avro
agent.sinks.loggerSink2.hostname = slave2
agent.sinks.loggerSink2.port = 52020
agent.sinks.loggerSink2.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

agent.sinkgroups.group.sinks = loggerSink1 loggerSink2

agent.sinkgroups.group.processor.type = failover
agent.sinkgroups.group.processor.loggerSink1 = 10
agent.sinkgroups.group.processor.loggerSink2 = 1
agent.sinkgroups.group.processor.maxpenalty = 10000

slave1配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave1
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

slave2配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave2
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

分別啟動master、slave1、slave2的agent,然后在mater上寫入日志,然后觀察誰收到了

//master
(py27) [root@master test_data]# echo 'hello' >> exec.log  
//slave1
18/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726                             

發現是slave1收到數據,然后我們把slave1的agent關掉,再次發送日志

//master
(py27) [root@master test_data]# echo '11111' >> exec.log      
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }

然后再次啟動slave1的agent

//master
(py27) [root@master test_data]# echo '22222' >> exec.log      
//slave1
18/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32                                  22222 }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

中阳县| 佛学| 浮山县| 砚山县| 尤溪县| 广州市| 阳谷县| 洛南县| 新建县| 曲靖市| 安仁县| 同心县| 新邵县| 四川省| 定州市| 延边| 辽源市| 北宁市| 唐河县| 周口市| 南阳市| 罗城| 老河口市| 若羌县| 大同县| 远安县| 五指山市| 安义县| 连江县| 西畴县| 体育| 黎平县| 醴陵市| 叶城县| 云阳县| 怀仁县| 海南省| 元谋县| 札达县| 鄂温| 阜平县|