亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

最簡消息隊列的實現方法

發布時間:2021-06-28 17:54:27 來源:億速云 閱讀:171 作者:chen 欄目:編程語言

這篇文章主要介紹“最簡消息隊列的實現方法”,在日常操作中,相信很多人在最簡消息隊列的實現方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”最簡消息隊列的實現方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

使用

結合其他 mq 的使用經歷,基本的使用流程:

  1. 創建 producerconsumer

  2. 啟動 mq

  3. 生產消息/消費消息

對應到 queue 中,大致也是這個:

創建 queue

// 生產者創建工廠
producer := newMockedProducer()
// 消費者創建工廠
consumer := newMockedConsumer()
// 將生產者以及消費者的創建工廠函數傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我們看看 NewQueue 需要什么構建條件:

  1. producer constructor

  2. consumer constructor

將雙方的工廠函數傳遞給 queue ,由它去執行以及重試。

這兩個需要的目的是將生產者/消費者的構建和消息生產/消費都封裝在 mq 中,而且將生產者/消費者的整套邏輯交給開發者處理:

type (
	// 開發者需要實現此接口
	Producer interface {
		AddListener(listener ProduceListener)
		Produce() (string, bool)
	}
	...
	// ProducerFactory定義了生成Producer的方法
	ProducerFactory func() (Producer, error)
)
  1. 其實也就是將生產者的邏輯交個開發者自己完成,mq 只負責生產者/消費者的消息傳遞和之間的調度。

  2. 工廠方法的設計,是將生產者本身和生產消息,這兩個任務都交給 queue 自己來做調度或者重試。

生產msg

生產消息當然要回到生產者本身:

type mockedProducer struct {
	total int32
	count int32
  // 使用waitgroup來模擬任務的完成
	wait  sync.WaitGroup
}
// 實現 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
	if atomic.AddInt32(&p.count, 1) <= p.total {
		p.wait.Done()
		return "item", true
	}
	time.Sleep(time.Second)
	return "", false
}

queue 中的生產者編寫都必須實現:

  • Produce():由開發者編寫生產消息的邏輯

  • AddListener():生產者

消費msg

和生產者類似:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error {
	atomic.AddInt32(&c.count, 1)
	return nil
}

啟動 queue

啟動,然后驗證我們上述的生產者和消費者之間的數據是否傳輸成功:

func TestQueue(t *testing.T) {
	producer := newMockedProducer(rounds)
	consumer := newMockedConsumer()
	// 創建 queue
	q := NewQueue(func() (Producer, error) {
		return producer, nil
	}, func() (Consumer, error) {
		return consumer, nil
	})
	// 當生產者生產完畢,執行 Stop() 關閉生產端生產
	go func() {
		producer.wait.Wait()
    // mq生產端停止生產,不是mq本身 Stop 運行
		q.Stop()
	}()
	// 啟動
	q.Start()
	// 驗證生產消費端是否消息消費完成
	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}

以上就是 queue 最簡易的入門使用代碼。開發者可以根據自己的業務實際情況:自由定義生產者/消費者已經生產/消費邏輯。

整體設計

![image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)

整體流程如上圖:

  1. 全體的通信都由 channel 進行

  2. 通過加入監聽器 listener ,以及事件觸發 event ,相當于將觸發器邏輯分離出來

  3. 生產者有 produceone ,這個是生產消息的邏輯,但是其中的 Produce() 是由開發者編寫【上面的 interface 中正是這個函數】

  4. 同理消費者,Consume()

基本的消息流動就入上圖以及上述描寫的,具體的代碼分析我們就留到下一篇,我們????分析里面,尤其是如何控制 channel 是整個設計的核心。

到此,關于“最簡消息隊列的實現方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

杭锦旗| 钟祥市| 巨鹿县| 西宁市| 虹口区| 阜平县| 鲁甸县| 柘城县| 康乐县| 安岳县| 永吉县| 渭源县| 庆元县| 石嘴山市| 垫江县| 延寿县| 华蓥市| 陕西省| 搜索| 磐安县| 沙洋县| 华容县| 平利县| 达日县| 通渭县| 华坪县| 博爱县| 成武县| 肇州县| 勃利县| 交口县| 志丹县| 东安县| 铁力市| 金川县| 沭阳县| 苏尼特左旗| 鹿泉市| 天门市| 和田市| 德化县|