亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

簡潔高效的Python流處理庫Faust怎么用

發布時間:2021-10-09 17:47:39 來源:億速云 閱讀:167 作者:柒染 欄目:編程語言

簡潔高效的Python流處理庫Faust怎么用,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

在分布式系統和實時數據處理中,流處理是十分重要的技術。在數據密集型應用中,數據快速到達,轉瞬即逝,需要及時進行處理,流式處理強調數據和事件的處理速度,對性能和可靠性有較高的要求。

流處理框架包括:Storm,Spark Streaming 和 Flink 等,而 Kafka 也不甘示弱,推出了分布式流處理平臺 Kafka  Streams。 Faust 把 Kafka Streams 帶到了 Python,并實現了抽象和優化,為數據和事件的流處理提供了一個高效便利的框架。

簡介

Faust,是 robinhood 在 Github 上開源的 Python 流處理庫,目前版本為 1.10.4。

Faust 把 Kafka Streams 的概念帶到了 Python,提供了包括流處理和事件處理的模式。Faust 使用純 Python  實現,使得開發者可以使用包括 NumPy, PyTorch, Pandas 等的庫進行數據處理。

Faust 實現簡潔優雅,使用簡單,性能優秀,且具有高可用、分布式、靈活性高的特點。目前 Faust  已被用于構建高性能分布式系統和實時數據管道中。

使用

Faust 需求 Python 3.6 或以上,且需要可用的 Kafka >= 0.10 服務。使用 pip 安裝:

$ pip install -U faust

此外,一些額外的特性需要額外的依賴,如 rocksdb,可以用來作為 Faust 在生產環境中的存儲,以及 Redis,可以在開啟緩存時使用。

安裝完成以后,就可以在項目中使用了。我們來看一個簡單的例子:

import faust  app = faust.App(     'hello-world',     broker='kafka://localhost:9092',     value_serializer='raw', )  greetings_topic = app.topic('greetings')  @app.agent(greetings_topic) async def greet(greetings):     async for greeting in greetings:         print(greeting)

首先,我們使用 faust.App 創建一個 Faust 應用,并配置應用的名字、Kafka broker 和序列化方式。

然后,我們創建一個主題,這跟 Kafka 中的主題是對應的。

Faust 利用 Python 3.6+ 的異步語法 async,定義異步函數 greet,并注冊為 Faust 應用的一個  agent。函數接收實時的數據集合 greetings,并異步地對每項數據進行輸出。

把上述代碼保存為 hello_world.py,并在命令行啟動工作者:

$ faust -A hello_world worker -l info

該 Faust 工作者就會從 Kafka 中實時讀取數據并處理。

我們可以發送一些數據來觀察效果:

$ faust -A hello_world send @greet "Hello Faust"

上述命令發送了一條消息,執行后,我們就能在工作者的命令行中看到這條消息。

Faust 還充分利用了 Python 的類型提示,能夠方便地定義數據模型:

import faust  class Greeting(faust.Record):     from_name: str     to_name: str  app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting)  @app.agent(topic) async def hello(greetings):     async for greeting in greetings:         print(f'Hello from {greeting.from_name} to {greeting.to_name}')  @app.timer(interval=1.0) async def example_sender(app):     await hello.send(         value=Greeting(from_name='Faust', to_name='you'),     )  if __name__ == '__main__':     app.main()

Faust 把 Kafka Streams 帶到了 Python  中,實現了簡潔高效的數據流處理。其使用簡單的裝飾器和基于類型提示機的據模型,就能定義實現數據的處理邏輯;充分利用了 Python 的 async  異步機制,和其他高性能的異步庫,實現了高效性能;其使用 Python 實現,使用開發者可以無縫對接其他數據處理和大數據相關功能。

關于 簡潔高效的Python流處理庫Faust怎么用問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

靖宇县| 泸定县| 建瓯市| 女性| 班玛县| 莎车县| 昭觉县| 调兵山市| 昭通市| 铜川市| 清徐县| 阳西县| 舟曲县| 额尔古纳市| 定日县| 饶阳县| 马鞍山市| 客服| 秀山| 彩票| 浪卡子县| 衡阳县| 南澳县| 巢湖市| 怀宁县| 安阳市| 巩义市| 惠安县| 稷山县| 桐梓县| 从化市| 孟村| 浮梁县| 肥乡县| 吉首市| 佳木斯市| 闻喜县| 蓬莱市| 长宁县| 山西省| 吉林省|