亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MQ怎么將多消息合并為一條消息發送

發布時間:2021-12-22 11:24:33 來源:億速云 閱讀:158 作者:iii 欄目:大數據

這篇文章主要介紹“MQ怎么將多消息合并為一條消息發送”,在日常操作中,相信很多人在MQ怎么將多消息合并為一條消息發送問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MQ怎么將多消息合并為一條消息發送”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

為什么要將多消息合并為一個消息發送?

前面也說了,為了節約成本。以每分鐘50w的廣告點擊數來算,一個月將產生50*60*24*31w的點擊消息,再乘以3就是每個月的sqs請求數,3代表的是發送消息、拉取消息、刪除消息,按每100w請求0.4美刀的價格計算大概一個月要26784美刀。

由于sqs限制單條消息的大小最大為256k,根據業務場景估算每點擊消息也不可能達到1k,,所以我將256個請求合并為一個消息發送,或者1s內未達到256個消息也合并為一個消息發送,這樣每月的費用可以直接除以256,這不是一個小數目。

什么樣的業務場景下才適合這么干?

將大量消息合并為一個消息后會導致消息消費失去原子性。你無法保證原本是256個消息的合并為一個消息后,這256個消息能全部消費成功或者全部消費失敗,因此要求業務必須允許消息消費失敗直接丟棄的情況。無論多少個成功多少個失敗,都需要將整條消息從mq中刪除。筆者考慮過這個問題才決定是否要這樣做的,也考慮過失敗重試的問題,但我覺得沒必要為這種概率買單,因為一個點擊在非異步的情況下,失敗就是失敗了。

如何將大量消息合并為一條消息發送而不影響服務的高并發性能呢?

其實不影響是不存在的,只是讓影響變得微弱。經過長時間的觀察,我了解該高并發服務對內存的消耗并不高,最大qps下也就消耗1.5g左右的堆內存,而netty使用的直接內存大概在2g這樣,對于2核8g的機器,有足夠多的內存來實現隊列緩存消息。

我借簽Dubbo的客戶端與服務端配置多個連接時使用輪詢方式使用連接,同時也借簽了netty的EventLoop的設計,實現消息合并發送。我定義一個MesaageLoopGroup,一個MesaageLoopGroup可以配置有多少個MesaageLooper,而每個MesaageLooper就是一個線程,且維護一個阻塞隊列,默認隊列大小是102400,這個數字是我配置單個進程所能打開的最大文件句柄數。

當往MesaageLoopGroup push一個點擊消息時,先用原子類自增1與MesaageLooper數組的長度取余,選出一個MesaageLooper。然后再將消息push到這個MesaageLooper的阻塞隊列。

MQ怎么將多消息合并為一條消息發送

每個MesaageLooper的run方法實現的就是一個死循環,從阻塞隊列中拿消息,當消息等于256時,或者阻塞超過1s就將拿到的消息合并成一個消息發送到mq。如果阻塞隊列滿,那么push會直接將消息發送到mq。因此,服務重啟時如果使用kill 9強行結束進程,至多只會有1s的數據丟失。設置1s還有一個原因就是控制消息的實時性。

灰度上線測試一天后也證明此方案對服務的影響并不大,無論是gc還是內存占用,都看不出加了這么一層邏輯。1s的平均請求按50w計算,四臺機器分擔,每個服務的每秒請求數平均是2000。

為何用golang實現消費者?

然而消息的消費并不順利。一個是因為消息消費我用了golang實現,我也是剛入門,寫起代碼來還感覺別扭,二是一個消息是由原本256個消息組合而成的問題。

使用golang其實是有原因的。原本計劃是讓消費者占用較小的內存,以實現將消費者寄生在其它服務所在的機器上,充分利用其它耗內存而cpu利用率低的服務所在的機器。同時利用docker實現快速部署,讓docker 的鏡像更小,不需要安裝jdk什么的。還有就是利用go的協程并發處理能力吧,讓消費者消費消息的速度能趕上消息的產生速度。

為入門golang買單

為了便于理解,我還是以java的線程池來說明。假設我配置的線程池線程數量是512。寄生在其它服務的機器上需要給主人點面子,不能把人家的cpu全部吃完,導致主服務不可用,所以線程的數量結合消息的消費情況綜合考慮,不能超過一半的cpu使用率,而選擇512這個數量。

Sqs支持一次拉取多條消息,并且有一個可見性超時的特性,當消息被消費者拉取到之后,在多長時間內未刪除,下次可能還會被拉取到,或者其它消費者還能拉取到。最初我設置的可見性超時是60s。

MQ怎么將多消息合并為一條消息發送

一開始我開啟5個線程拉取消息,每次最多拉取10條消息。那么很可能同一時間內會拉取到50條消息。由于一條消息是由原本256條消息合并而成的,所以512個線程同一時間段至多只能消費2條消息,而一條消息(合并后的)的消費平均耗時是10s,也就是說一分鐘內最多消費12條消息,其它38條消息在一分鐘后會被其它消費者拉取到,所以就會出現大量消息重復消費的情況,久而久之,消息越積累越多。

我用golang的channel實現生產者與消費者,channel的大小可設置,當channel滿時,拉取到的消息是放不進channel的,因此會將拉取線程阻塞住,只有消費者從 channel取數據才能繼續放入。但阻塞的那段時間要小于消息的可見性超時,因為消息只有在開始消費時我才會將其從mq中刪除。

后面的改進就是根據消費能力去調整消息的拉取線程數,以及每次拉取的消息數。還有一點要注意,為保證時刻有消息準備就緒開始消費,最好不要讓消息消費完再從mq中拉取。但這也會導致另一個問題,一些消息拉取到本地后,由于channel已滿,放不進,而其它空閑消費節點又拉不到,導致消息被消費到的時間延長。這就需要作出取舍。

到此,關于“MQ怎么將多消息合并為一條消息發送”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

mq
AI

威信县| 盱眙县| 尉氏县| 汤阴县| 白朗县| 罗源县| 通州区| 柯坪县| 辉县市| 健康| 武宣县| 克东县| 平塘县| 金华市| 丹江口市| 达日县| 五原县| 屯留县| 连平县| 钟祥市| 咸宁市| 都匀市| 静宁县| 榕江县| 天全县| 宝丰县| 鄂托克前旗| 芮城县| 玉树县| 会东县| 汾阳市| 六枝特区| 孝感市| 伊通| 老河口市| 临西县| 江津市| 乐亭县| 安庆市| 同仁县| 延边|