您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何快速搭建一個高可用的IM系統”,在日常操作中,相信很多人在如何快速搭建一個高可用的IM系統問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何快速搭建一個高可用的IM系統”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
深入理解 WebSocket 協議
Web Sockets 的目標是在一個單獨的持久連接上提供全雙工、雙向通信。在 Javascript 創建了 WebSocket 之后,會有一個 HTTP 請求發送到瀏覽器以發起連接。
在取得服務器響應后,建立的連接會將 HTTP 升級從 HTTP 協議交換為 WebSocket 協議。
由于 WebSocket 使用自定義的協議,所以 URL 模式也略有不同。未加密的連接不再是 http://,而是 ws://;加密的連接也不是 https://,而是 wss://。
在使用 WebSocket URL 時,必須帶著這個模式,因為將來還有可能支持其他的模式。
使用自定義協議而非 HTTP 協議的好處是,能夠在客戶端和服務器之間發送非常少量的數據,而不必擔心 HTTP 那樣字節級的開銷。由于傳遞的數據包很小,所以 WebSocket 非常適合移動應用。
上文中只是對 Web Sockets 進行了籠統的描述,接下來的篇幅會對 Web Sockets 的細節實現進行深入的探索。
本文接下來的幾個小節不會涉及到大量的代碼片段,但是會對相關的 API 和技術原理進行分析,相信大家讀完下文之后再來看這段描述,會有一種豁然開朗的感覺。
①WebSocket 復用了 HTTP 的握手通道
“握手通道”是 HTTP 協議中客戶端和服務端通過"TCP 三次握手"建立的通信通道。
客戶端和服務端使用 HTTP 協議進行的每次交互都需要先建立這樣一條“通道”,然后通過這條通道進行通信。
我們熟悉的 Ajax 交互就是在這樣一個通道上完成數據傳輸的,只不過 Ajax 交互是短連接,在一次 Request→Response 之后,“通道”連接就斷開了。
下面是 HTTP 協議中建立“握手通道”的過程示意圖:
上文中我們提到:在 Javascript 創建了 WebSocket 之后,會有一個 HTTP 請求發送到瀏覽器以發起連接,然后服務端響應,這就是“握手“的過程。
在這個握手的過程當中,客戶端和服務端主要做了兩件事情:
建立了一條連接“握手通道”用于通信:這點和 HTTP 協議相同,不同的是 HTTP 協議完成數據交互后就釋放了這條握手通道,這就是所謂的“短連接”,它的生命周期是一次數據交互的時間,通常是毫秒級別的。
將 HTTP 協議升級到 WebSocket 協議,并復用 HTTP 協議的握手通道,從而建立一條持久連接。
說到這里可能有人會問:HTTP 協議為什么不復用自己的“握手通道”,而非要在每次進行數據交互的時候都通過 TCP 三次握手重新建立“握手通道”呢?
答案是這樣的:雖然“長連接”在客戶端和服務端交互的過程中省去了每次都建立“握手通道”的麻煩步驟。
但是維持這樣一條“長連接”是需要消耗服務器資源的,而在大多數情況下,這種資源的消耗又是不必要的,可以說 HTTP 標準的制定經過了深思熟慮的考量。
到我們后邊說到 WebSocket 協議數據幀時,大家可能就會明白,維持一條“長連接”服務端和客戶端需要做的事情太多了。
說完了握手通道,我們再來看 HTTP 協議如何升級到 WebSocket 協議的。
②HTTP 協議升級為 WebSocket 協議
升級協議需要客戶端和服務端交流,服務端怎么知道要將 HTTP 協議升級到 WebSocket 協議呢?它一定是接收到了客戶端發送過來的某種信號。
下面是我從谷歌瀏覽器中截取的“客戶端發起協議升級請求的報文”,通過分析這段報文,我們能夠得到有關 WebSocket 中協議升級的更多細節。
首先,客戶端發起協議升級請求。采用的是標準的 HTTP 報文格式,且只支持 GET 方法。
下面是重點請求的首部的意義:
Connection:Upgrade:表示要升級的協議。
Upgrade: websocket:表示要升級到 WebSocket 協議。
Sec-WebSocket-Version: 13:表示 WebSocket 的版本。
Sec-WebSocket-Key:UdTUf90CC561cQXn4n5XRg==:與 Response Header 中的響應首部 Sec-WebSocket-Accept: GZk41FJZSYY0CmsrZPGpUGRQzkY= 是配套的,提供基本的防護,比如惡意的連接或者無意的連接。
其中 Connection 就是我們前邊提到的,客戶端發送給服務端的信號,服務端接受到信號之后,才會對 HTTP 協議進行升級。
那么服務端怎樣確認客戶端發送過來的請求是否是合法的呢?在客戶端每次發起協議升級請求的時候都會產生一個唯一碼:Sec-WebSocket-Key。
服務端拿到這個碼后,通過一個算法進行校驗,然后通過 Sec-WebSocket-Accept 響應給客戶端,客戶端再對 Sec-WebSocket-Accept 進行校驗來完成驗證。
這個算法很簡單:
將 Sec-WebSocket-Key 跟全局唯一的(GUID,[RFC4122])標識:258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接。
通過 SHA1 計算出摘要,并轉成 base64 字符串。
258EAFA5-E914-47DA-95CA-C5AB0DC85B11 這個字符串又叫“魔串",至于為什么要使用它作為 WebSocket 握手計算中使用的字符串,這點我們無需關心,只需要知道它是 RFC 標準規定就可以了。
官方的解析也只是簡單的說此值不大可能被不明白 WebSocket 協議的網絡終端使用。
我們還是用世界上最好的語言來描述一下這個算法吧:
public function dohandshake($sock, $data, $key) { if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $data, $match)) { $response = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)); $upgrade = "HTTP/1.1 101 Switching Protocol\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Accept: " . $response . "\r\n\r\n"; socket_write($sock, $upgrade, strlen($upgrade)); $this->isHand[$key] = true; } }
服務端響應客戶端的頭部信息和 HTTP 協議的格式是相同的,HTTP1.1 協議是以換行符(\r\n)分割的,我們可以通過正則匹配解析出 Sec-WebSocket-Accept 的值,這和我們使用 curl 工具模擬 get 請求是一個道理。
這樣展示結果似乎不太直觀,我們使用命令行 CLI 來根據上圖中的 Sec-WebSocket-Key 和握手算法來計算一下服務端返回的 Sec-WebSocket-Accept 是否正確:
從圖中可以看到,通過算法算出來的 base64 字符串和 Sec-WebSocket-Accept 是一樣的。
那么假如服務端在握手的過程中返回一個錯誤的 Sec-WebSocket-Accept 字符串會怎么樣呢?
當然是客戶端會報錯,連接會建立失敗,大家可以嘗試一下,例如將全局唯一標識符 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 改為 258EAFA5-E914-47DA-95CA-C5AB0DC85B12。
③WebSocket 的幀和數據分片傳輸
下圖是我做的一個測試:將小說《飄》的第一章內容復制成文本數據,通過客戶端發送到服務端,然后服務端響應相同的信息完成了一次通信。
可以看到一篇足足有將近 15000 字節的數據在客戶端和服務端完成通信只用了 150ms 的時間。
我們還可以看到瀏覽器控制臺中 Frame 欄中顯示的客戶端發送和服務端響應的文本數據,你一定驚訝 WebSocket 通信強大的數據傳輸能力。
數據是否真的像 Frame 中展示的那樣客戶端直接將一大篇文本數據發送到服務端,服務端接收到數據之后,再將一大篇文本數據返回給客戶端呢?
這當然是不可能的,我們都知道 HTTP 協議是基于 TCP 實現的,HTTP 發送數據也是分包轉發的,就是將大數據根據報文形式分割成一小塊一小塊發送到服務端,服務端接收到客戶端發送的報文后,再將小塊的數據拼接組裝。
關于 HTTP 的分包策略,大家可以查看相關資料進行研究,WebSocket 協議也是通過分片打包數據進行轉發的,不過策略上和 HTTP 的分包不一樣。
Frame(幀)是 WebSocket 發送數據的基本單位,下邊是它的報文格式:
報文內容中規定了數據標示,操作代碼、掩碼、數據、數據長度等格式。不太理解沒關系,下面我通過講解大家只要理解報文中重要標志的作用就可以了。
首先我們明白了客戶端和服務端進行 WebSocket 消息傳遞是這樣的:
客戶端:將消息切割成多個幀,并發送給服務端。
服務端:接收消息幀,并將關聯的幀重新組裝成完整的消息。
服務端在接收到客戶端發送的幀消息的時候,將這些幀進行組裝,它怎么知道何時數據組裝完成的呢?
這就是報文中左上角 FIN(占一個比特)存儲的信息,1 表示這是消息的最后一個分片(fragment)如果是 0,表示不是消息的最后一個分片。
WebSocket 通信中,客戶端發送數據分片是有序的,這一點和 HTTP 不一樣。
HTTP 將消息分包之后,是并發無序的發送給服務端的,包信息在數據中的位置則在 HTTP 報文中存儲,而 WebSocket 僅僅需要一個 FIN 比特位就能保證將數據完整的發送到服務端。
接下來的 RSV1,RSV2,RSV3 三個比特位的作用又是什么呢?這三個標志位是留給客戶端開發者和服務端開發者開發過程中協商進行拓展的,默認是 0。
拓展如何使用必須在握手的階段就協商好,其實握手本身也是客戶端和服務端的協商。
④WebSocket 連接保持和心跳檢測
WebSocket 是長連接,為了保持客戶端和服務端的實時雙向通信,需要確保客戶端和服務端之間的 TCP 通道保持連接沒有斷開。
但是對于長時間沒有數據往來的連接,如果依舊保持著,可能會浪費服務端資源。
不排除有些場景,客戶端和服務端雖然長時間沒有數據往來,仍然需要保持連接,就比如說你幾個月沒有和一個 QQ 好友聊天了,突然有一天他發 QQ 消息告訴你他要結婚了,你還是能在第一時間收到。
那是因為,客戶端和服務端一直再采用心跳來檢查連接。客戶端和服務端的心跳連接檢測就像打乒乓球一樣:
發送方→接收方:ping
接收方→發送方:pong
等什么時候沒有 ping、pong 了,那么連接一定是存在問題了。
說了這么多,接下來我使用 Go 語言來實現一個心跳檢測,WebSocket 通信實現細節是一件繁瑣的事情,直接使用開源的類庫是比較不錯的選擇,我使用的是:gorilla/websocket。
這個類庫已經將 WebSocket 的實現細節(握手,數據解碼)封裝的很好啦。下面我就直接貼代碼了:
package main import ( "net/http" "time" "github.com/gorilla/websocket" ) var ( //完成握手操作 upgrade = websocket.Upgrader{ //允許跨域(一般來講,websocket都是獨立部署的) CheckOrigin:func(r *http.Request) bool { return true }, } ) func wsHandler(w http.ResponseWriter, r *http.Request) { var ( conn *websocket.Conn err error data []byte ) //服務端對客戶端的http請求(升級為websocket協議)進行應答,應答之后,協議升級為websocket,http建立連接時的tcp三次握手將保持。 if conn, err = upgrade.Upgrade(w, r, nil); err != nil { return } //啟動一個協程,每隔1s向客戶端發送一次心跳消息 go func() { var ( err error ) for { if err = conn.WriteMessage(websocket.TextMessage, []byte("heartbeat")); err != nil { return } time.Sleep(1 * time.Second) } }() //得到websocket的長鏈接之后,就可以對客戶端傳遞的數據進行操作了 for { //通過websocket長鏈接讀到的數據可以是text文本數據,也可以是二進制Binary if _, data, err = conn.ReadMessage(); err != nil { goto ERR } if err = conn.WriteMessage(websocket.TextMessage, data); err != nil { goto ERR } } ERR: //出錯之后,關閉socket連接 conn.Close() } func main() { http.HandleFunc("/ws", wsHandler) http.ListenAndServe("0.0.0.0:7777", nil) }
借助 Go 語言很容易搭建協程的特點,我專門開啟了一個協程每秒向客戶端發送一條消息。
打開客戶端瀏覽器可以看到,Frame 中每秒的心跳數據一直在跳動,當長鏈接斷開之后,心跳就沒有了,就像人沒有了心跳一樣:
大家對 WebSocket 協議已經有了了解,接下來就讓我們一起快速搭建一個高性能、可拓展的 IM 系統吧。
快速搭建高性能、可拓展的 IM 系統
①系統架構和代碼文件目錄結構
下圖是一個比較完備的 IM 系統架構:包含了 C 端、接入層(通過協議接入)、S 端處理邏輯和分發消息、存儲層用來持久化數據。
我們本節 C 端使用的是 Webapp, 通過 Go 語言渲染 Vue 模版快速實現功能,接入層使用的是 WebSocket 協議,前邊已經進行了深入的介紹。
S 端是我們實現的重點,其中鑒權、登錄、關系管理、單聊和群聊的功能都已經實現,讀者可以在這部分功能的基礎上再拓展其他的功能,比如:視頻語音聊天、發紅包、朋友圈等業務模塊。
存儲層我們做的比較簡單,只是使用 MySQL 簡單持久化存儲了用戶關系,然后聊天中的圖片資源我們存儲到了本地文件中。
雖然我們的 IM 系統實現的比較簡化,但是讀者可以在次基礎上進行改進、完善、拓展,依然能夠作出高可用的企業級產品。
我們的系統服務使用 Go 語言構建,代碼結構比較簡潔,但是性能比較優秀(這是 Java 和其他語言所無法比擬的),單機支持幾萬人的在線聊天。
下邊是代碼文件的目錄結構:
app │ ├── args │ │ ├── contact.go │ │ └── pagearg.go │ ├── controller //控制器層,api入口 │ │ ├── chat.go │ │ ├── contract.go │ │ ├── upload.go │ │ └── user.go │ ├── main.go //程序入口 │ ├── model //數據定義與存儲 │ │ ├── community.go │ │ ├── contract.go │ │ ├── init.go │ │ └── user.go │ ├── service //邏輯實現 │ │ ├── contract.go │ │ └── user.go │ ├── util //幫助函數 │ │ ├── md5.go │ │ ├── parse.go │ │ ├── resp.go │ │ └── string.go │ └── view //模版資源 │ │ ├── ... asset //js、css文件 resource //上傳資源,上傳圖片會放到這里
從入口函數 main.go 開始,我們定義了 Controller 層,是客戶端 API 的入口。Service 用來處理主要的用戶邏輯,消息分發、用戶管理都在這里實現。
Model 層定義了一些數據表,主要是用戶注冊和用戶好友關系、群組等信息,存儲到 MySQL。
Util 包下是一些幫助函數,比如加密、請求響應等。View 下邊存儲了模版資源信息,上邊所說的這些都在 App 文件夾下存儲,外層還有 asset 用來存儲 css、js 文件和聊天中會用到的表情圖片等。
Resource 下存儲用戶聊天中的圖片或者視頻等文件。總體來講,我們的代碼目錄機構還是比較簡潔清晰的。
了解了我們要搭建的 IM 系統架構,我們再來看一下架構重點實現的功能吧。
②10 行代碼萬能模版渲染
Go 語言提供了強大的 HTML 渲染能力,非常簡單的構建 Web 應用,下邊是實現模版渲染的代碼,它太簡單了,以至于可以直接在 main.go 函數中實現:
func registerView() { tpl, err := template.ParseGlob("./app/view/**/*") if err != nil { log.Fatal(err.Error()) } for _, v := range tpl.Templates() { tplName := v.Name() http.HandleFunc(tplName, func(writer http.ResponseWriter, request *http.Request) { tpl.ExecuteTemplate(writer, tplName, nil) }) } } ... func main() { ...... http.Handle("/asset/", http.FileServer(http.Dir("."))) http.Handle("/resource/", http.FileServer(http.Dir("."))) registerView() log.Fatal(http.ListenAndServe(":8081", nil)) }
Go 實現靜態資源服務器也很簡單,只需要調用 http.FileServer 就可以了,這樣 HTML 文件就可以很輕松的訪問依賴的 js、css 和圖標文件了。
使用 http/template 包下的 ParseGlob、ExecuteTemplate 又可以很輕松的解析 Web 頁面,這些工作完全不依賴與 Nginx。
現在我們就完成了登錄、注冊、聊天 C 端界面的構建工作:
③注冊、登錄和鑒權
之前我們提到過,對于注冊、登錄和好友關系管理,我們需要有一張 user 表來存儲用戶信息。
我們使用 github.com/go-xorm/xorm 來操作 MySQL,首先看一下 MySQL 表的設計:
app/model/user.go:
package model import "time" const ( SexWomen = "W" SexMan = "M" SexUnknown = "U" ) type User struct { Id int64 `xorm:"pk autoincr bigint(64)" form:"id" json:"id"` Mobile string `xorm:"varchar(20)" form:"mobile" json:"mobile"` Passwd string `xorm:"varchar(40)" form:"passwd" json:"-"` // 用戶密碼 md5(passwd + salt) Avatar string `xorm:"varchar(150)" form:"avatar" json:"avatar"` Sex string `xorm:"varchar(2)" form:"sex" json:"sex"` Nickname string `xorm:"varchar(20)" form:"nickname" json:"nickname"` Salt string `xorm:"varchar(10)" form:"salt" json:"-"` Online int `xorm:"int(10)" form:"online" json:"online"` //是否在線 Token string `xorm:"varchar(40)" form:"token" json:"token"` //用戶鑒權 Memo string `xorm:"varchar(140)" form:"memo" json:"memo"` Createat time.Time `xorm:"datetime" form:"createat" json:"createat"` //創建時間, 統計用戶增量時使用 }
我們 user 表中存儲了用戶名、密碼、頭像、用戶性別、手機號等一些重要的信息,比較重要的是我們也存儲了 Token 標示用戶在用戶登錄之后,HTTP 協議升級為 WebSocket 協議進行鑒權,這個細節點我們前邊提到過,下邊會有代碼演示。
接下來我們看一下 model 初始化要做的一些事情吧:
app/model/init.go:
package model import ( "errors" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/go-xorm/xorm" "log" ) var DbEngine *xorm.Engine func init() { driverName := "mysql" dsnName := "root:root@(127.0.0.1:3306)/chat?charset=utf8" err := errors.New("") DbEngine, err = xorm.NewEngine(driverName, dsnName) if err != nil && err.Error() != ""{ log.Fatal(err) } DbEngine.ShowSQL(true) //設置數據庫連接數 DbEngine.SetMaxOpenConns(10) //自動創建數據庫 DbEngine.Sync(new(User), new(Community), new(Contact)) fmt.Println("init database ok!") }
我們創建一個 DbEngine 全局 MySQL 連接對象,設置了一個大小為 10 的連接池。
Model 包里的 init 函數在程序加載的時候會先執行,對 Go 語言熟悉的同學應該知道這一點。
我們還設置了一些額外的參數用于調試程序,比如:設置打印運行中的 SQL,自動的同步數據表等,這些功能在生產環境中可以關閉。
我們的 Model 初始化工作就做完了,非常簡陋,在實際的項目中,像數據庫的用戶名、密碼、連接數和其他的配置信息,建議設置到配置文件中,然后讀取,而不像本文硬編碼的程序中。
注冊是一個普通的 API 程序,對于 Go 語言來說,完成這件工作太簡單了,我們來看一下代碼:
############################ //app/controller/user.go ############################ ...... //用戶注冊 func UserRegister(writer http.ResponseWriter, request *http.Request) { var user model.User util.Bind(request, &user) user, err := UserService.UserRegister(user.Mobile, user.Passwd, user.Nickname, user.Avatar, user.Sex) if err != nil { util.RespFail(writer, err.Error()) } else { util.RespOk(writer, user, "") } } ...... ############################ //app/service/user.go ############################ ...... type UserService struct{} //用戶注冊 func (s *UserService) UserRegister(mobile, plainPwd, nickname, avatar, sex string) (user model.User, err error) { registerUser := model.User{} _, err = model.DbEngine.Where("mobile=? ", mobile).Get(®isterUser) if err != nil { return registerUser, err } //如果用戶已經注冊,返回錯誤信息 if registerUser.Id > 0 { return registerUser, errors.New("該手機號已注冊") } registerUser.Mobile = mobile registerUser.Avatar = avatar registerUser.Nickname = nickname registerUser.Sex = sex registerUser.Salt = fmt.Sprintf("%06d", rand.Int31n(10000)) registerUser.Passwd = util.MakePasswd(plainPwd, registerUser.Salt) registerUser.Createat = time.Now() //插入用戶信息 _, err = model.DbEngine.InsertOne(®isterUser) return registerUser, err } ...... ############################ //main.go ############################ ...... func main() { http.HandleFunc("/user/register", controller.UserRegister) }
首先我們使用 util.Bind(request, &user) 將用戶參數綁定到 user 對象上,使用的是 util 包中的 Bind 函數,具體實現細節讀者可以自行研究,主要模仿了 Gin 框架的參數綁定,可以拿來即用,非常方便。
然后我們根據用戶手機號搜索數據庫中是否已經存在,如果不存在就插入到數據庫中,返回注冊成功信息,邏輯非常簡單。
登錄邏輯更簡單:
############################ //app/controller/user.go ############################ ... //用戶登錄 func UserLogin(writer http.ResponseWriter, request *http.Request) { request.ParseForm() mobile := request.PostForm.Get("mobile") plainpwd := request.PostForm.Get("passwd") //校驗參數 if len(mobile) == 0 || len(plainpwd) == 0 { util.RespFail(writer, "用戶名或密碼不正確") } loginUser, err := UserService.Login(mobile, plainpwd) if err != nil { util.RespFail(writer, err.Error()) } else { util.RespOk(writer, loginUser, "") } } ... ############################ //app/service/user.go ############################ ... func (s *UserService) Login(mobile, plainpwd string) (user model.User, err error) { //數據庫操作 loginUser := model.User{} model.DbEngine.Where("mobile = ?", mobile).Get(&loginUser) if loginUser.Id == 0 { return loginUser, errors.New("用戶不存在") } //判斷密碼是否正確 if !util.ValidatePasswd(plainpwd, loginUser.Salt, loginUser.Passwd) { return loginUser, errors.New("密碼不正確") } //刷新用戶登錄的token值 token := util.GenRandomStr(32) loginUser.Token = token model.DbEngine.ID(loginUser.Id).Cols("token").Update(&loginUser) //返回新用戶信息 return loginUser, nil } ... ############################ //main.go ############################ ...... func main() { http.HandleFunc("/user/login", controller.UserLogin) }
實現了登錄邏輯,接下來我們就到了用戶首頁,這里列出了用戶列表,點擊即可進入聊天頁面。
用戶也可以點擊下邊的 Tab 欄查看自己所在的群組,可以由此進入群組聊天頁面。
具體這些工作還需要讀者自己開發用戶列表、添加好友、創建群組、添加群組等功能,這些都是一些普通的 API 開發工作,我們的代碼程序中也實現了,讀者可以拿去修改使用,這里就不再演示了。
我們再重點看一下用戶鑒權這一塊吧,用戶鑒權是指用戶點擊聊天進入聊天界面時,客戶端會發送一個 GET 請求給服務端。
請求建立一條 WebSocket 長連接,服務端收到建立連接的請求之后,會對客戶端請求進行校驗,以確實是否建立長連接,然后將這條長連接的句柄添加到 Map 當中(因為服務端不僅僅對一個客戶端服務,可能存在千千萬萬個長連接)維護起來。
我們下邊來看具體代碼實現:
############################ //app/controller/chat.go ############################ ...... //本核心在于形成userid和Node的映射關系 type Node struct { Conn *websocket.Conn //并行轉串行, DataQueue chan []byte GroupSets set.Interface } ...... //userid和Node映射關系表 var clientMap map[int64]*Node = make(map[int64]*Node, 0) //讀寫鎖 var rwlocker sync.RWMutex //實現聊天的功能 func Chat(writer http.ResponseWriter, request *http.Request) { query := request.URL.Query() id := query.Get("id") token := query.Get("token") userId, _ := strconv.ParseInt(id, 10, 64) //校驗token是否合法 islegal := checkToken(userId, token) conn, err := (&websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return islegal }, }).Upgrade(writer, request, nil) if err != nil { log.Println(err.Error()) return } //獲得websocket鏈接conn node := &Node{ Conn: conn, DataQueue: make(chan []byte, 50), GroupSets: set.New(set.ThreadSafe), } //獲取用戶全部群Id comIds := concatService.SearchComunityIds(userId) for _, v := range comIds { node.GroupSets.Add(v) } rwlocker.Lock() clientMap[userId] = node rwlocker.Unlock() //開啟協程處理發送邏輯 go sendproc(node) //開啟協程完成接收邏輯 go recvproc(node) sendMsg(userId, []byte("welcome!")) } ...... //校驗token是否合法 func checkToken(userId int64, token string) bool { user := UserService.Find(userId) return user.Token == token } ...... ############################ //main.go ############################ ...... func main() { http.HandleFunc("/chat", controller.Chat) } ......
進入聊天室,客戶端發起 /chat 的 GET 請求,服務端首先創建了一個 Node 結構體,用來存儲和客戶端建立起來的 WebSocket 長連接句柄。
每一個句柄都有一個管道 DataQueue,用來收發信息,GroupSets 是客戶端對應的群組信息,后邊我們會提到。
type Node struct { Conn *websocket.Conn //并行轉串行, DataQueue chan []byte GroupSets set.Interface }
服務端創建了一個 Map,將客戶端用戶 ID 和其 Node 關聯起來:
//userid和Node映射關系表 var clientMap map[int64]*Node = make(map[int64]*Node, 0)
接下來是主要的用戶邏輯了,服務端接收到客戶端的參數之后,首先校驗 Token 是否合法,由此確定是否要升級 HTTP 協議到 WebSocket 協議,建立長連接,這一步稱為鑒權。
//校驗token是否合法 islegal := checkToken(userId, token) conn, err := (&websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return islegal }, }).Upgrade(writer, request, nil)
鑒權成功以后,服務端初始化一個 Node,搜索該客戶端用戶所在的群組 ID,填充到群組的 GroupSets 屬性中。
然后將 Node 節點添加到 ClientMap 中維護起來,我們對 ClientMap 的操作一定要加鎖,因為 Go 語言在并發情況下,對 Map 的操作并不保證原子安全:
//獲得websocket鏈接conn node := &Node{ Conn: conn, DataQueue: make(chan []byte, 50), GroupSets: set.New(set.ThreadSafe), } //獲取用戶全部群Id comIds := concatService.SearchComunityIds(userId) for _, v := range comIds { node.GroupSets.Add(v) } rwlocker.Lock() clientMap[userId] = node rwlocker.Unlock()
服務端和客戶端建立了長鏈接之后,會開啟兩個協程專門來處理客戶端消息的收發工作,對于 Go 語言來說,維護協程的代價是很低的。
所以說我們的單機程序可以很輕松的支持成千上完的用戶聊天,這還是在沒有優化的情況下。
...... //開啟協程處理發送邏輯 go sendproc(node) //開啟協程完成接收邏輯 go recvproc(node) sendMsg(userId, []byte("welcome!")) ......
至此,我們的鑒權工作也已經完成了,客戶端和服務端的連接已經建立好了,接下來我們就來實現具體的聊天功能吧。
④實現單聊和群聊
實現聊天的過程中,消息體的設計至關重要,消息體設計的合理,功能拓展起來就非常的方便,后期維護、優化起來也比較簡單。
我們先來看一下,我們消息體的設計:
############################ //app/controller/chat.go ############################ type Message struct { Id int64 `json:"id,omitempty" form:"id"` //消息ID Userid int64 `json:"userid,omitempty" form:"userid"` //誰發的 Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊還是私聊 Dstid int64 `json:"dstid,omitempty" form:"dstid"` //對端用戶ID/群ID Media int `json:"media,omitempty" form:"media"` //消息按照什么樣式展示 Content string `json:"content,omitempty" form:"content"` //消息的內容 Pic string `json:"pic,omitempty" form:"pic"` //預覽圖片 Url string `json:"url,omitempty" form:"url"` //服務的URL Memo string `json:"memo,omitempty" form:"memo"` //簡單描述 Amount int `json:"amount,omitempty" form:"amount"` //其他和數字相關的 }
每一條消息都有一個唯一的 ID,將來我們可以對消息持久化存儲,但是我們系統中并沒有做這件工作,讀者可根據需要自行完成。
然后是 userid,發起消息的用戶,對應的是 dstid,要將消息發送給誰。還有一個參數非常重要,就是 cmd,它表示是群聊還是私聊。
群聊和私聊的代碼處理邏輯有所區別,我們為此專門定義了一些 cmd 常量:
//定義命令行格式 const ( CmdSingleMsg = 10 CmdRoomMsg = 11 CmdHeart = 0 )
Media 是媒體類型,我們都知道微信支持語音、視頻和各種其他的文件傳輸,我們設置了該參數之后,讀者也可以自行拓展這些功能。
Content 是消息文本,是聊天中最常用的一種形式。Pic 和 URL 是為圖片和其他鏈接資源所設置的。
Memo 是簡介,Amount 是和數字相關的信息,比如說發紅包業務有可能使用到該字段。
消息體的設計就是這樣,基于此消息體,我們來看一下,服務端如何收發消息,實現單聊和群聊吧。
還是從上一節說起,我們為每一個客戶端長鏈接開啟了兩個協程,用于收發消息,聊天的邏輯就在這兩個協程當中實現。
############################ //app/controller/chat.go ############################ ...... //發送邏輯 func sendproc(node *Node) { for { select { case data := <-node.DataQueue: err := node.Conn.WriteMessage(websocket.TextMessage, data) if err != nil { log.Println(err.Error()) return } } } } //接收邏輯 func recvproc(node *Node) { for { _, data, err := node.Conn.ReadMessage() if err != nil { log.Println(err.Error()) return } dispatch(data) //todo對data進一步處理 fmt.Printf("recv<=%s", data) } } ...... //后端調度邏輯處理 func dispatch(data []byte) { msg := Message{} err := json.Unmarshal(data, &msg) if err != nil { log.Println(err.Error()) return } switch msg.Cmd { case CmdSingleMsg: sendMsg(msg.Dstid, data) case CmdRoomMsg: for _, v := range clientMap { if v.GroupSets.Has(msg.Dstid) { v.DataQueue <- data } } case CmdHeart: //檢測客戶端的心跳 } } //發送消息,發送到消息的管道 func sendMsg(userId int64, msg []byte) { rwlocker.RLock() node, ok := clientMap[userId] rwlocker.RUnlock() if ok { node.DataQueue <- msg } } ......
服務端向客戶端發送消息邏輯比較簡單,就是將客戶端發送過來的消息,直接添加到目標用戶 Node 的 Channel 中去就好了。
通過 WebSocket 的 WriteMessage 就可以實現此功能:
func sendproc(node *Node) { for { select { case data := <-node.DataQueue: err := node.Conn.WriteMessage(websocket.TextMessage, data) if err != nil { log.Println(err.Error()) return } } } }
收發邏輯是這樣的,服務端通過 WebSocket 的 ReadMessage 方法接收到用戶信息,然后通過 dispatch 方法進行調度:
func recvproc(node *Node) { for { _, data, err := node.Conn.ReadMessage() if err != nil { log.Println(err.Error()) return } dispatch(data) //todo對data進一步處理 fmt.Printf("recv<=%s", data) } }
dispatch 方法所做的工作有兩件:
解析消息體到 Message 中。
根據消息類型,將消息體添加到不同用戶或者用戶組的 Channel 當中。
Go 語言中的 Channel 是協程間通信的強大工具,dispatch 只要將消息添加到 Channel 當中,發送協程就會獲取到信息發送給客戶端,這樣就實現了聊天功能。
單聊和群聊的區別只是服務端將消息發送給群組還是個人,如果發送給群組,程序會遍歷整個 clientMap,看看哪個用戶在這個群組當中,然后將消息發送。
其實更好的實踐是我們再維護一個群組和用戶關系的 Map,這樣在發送群組消息的時候,取得用戶信息就比遍歷整個 clientMap 代價要小很多了。
func dispatch(data []byte) { msg := Message{} err := json.Unmarshal(data, &msg) if err != nil { log.Println(err.Error()) return } switch msg.Cmd { case CmdSingleMsg: sendMsg(msg.Dstid, data) case CmdRoomMsg: for _, v := range clientMap { if v.GroupSets.Has(msg.Dstid) { v.DataQueue <- data } } case CmdHeart: //檢測客戶端的心跳 } } ...... func sendMsg(userId int64, msg []byte) { rwlocker.RLock() node, ok := clientMap[userId] rwlocker.RUnlock() if ok { node.DataQueue <- msg } }
可以看到,通過 Channel,我們實現用戶聊天功能還是非常方便的,代碼可讀性很強,構建的程序也很健壯。
下邊是筆者本地聊天的示意圖:
⑤發送表情和圖片
下邊我們再來看一下聊天中經常使用到的發送表情和圖片功能是如何實現的吧。
其實表情也是小圖片,只是和聊天中圖片不同的是,表情圖片比較小,可以緩存在客戶端,或者直接存放到客戶端代碼的代碼文件中(不過現在微信聊天中有的表情包都是通過網絡傳輸的)。
下邊是一個聊天中返回的圖標文本數據:
{ "dstid":1, "cmd":10, "userid":2, "media":4, "url":"/asset/plugins/doutu//emoj/2.gif" }
客戶端拿到 URL 后,就加載本地的小圖標。聊天中用戶發送圖片也是一樣的原理,不過聊天中用戶的圖片需要先上傳到服務器,然后服務端返回 URL,客戶端再進行加載,我們的 IM 系統也支持此功能。
我們看一下圖片上傳的程序:
############################ //app/controller/upload.go ############################ func init() { os.MkdirAll("./resource", os.ModePerm) } func FileUpload(writer http.ResponseWriter, request *http.Request) { UploadLocal(writer, request) } //將文件存儲在本地/im_resource目錄下 func UploadLocal(writer http.ResponseWriter, request *http.Request) { //獲得上傳源文件 srcFile, head, err := request.FormFile("file") if err != nil { util.RespFail(writer, err.Error()) } //創建一個新的文件 suffix := ".png" srcFilename := head.Filename splitMsg := strings.Split(srcFilename, ".") if len(splitMsg) > 1 { suffix = "." + splitMsg[len(splitMsg)-1] } filetype := request.FormValue("filetype") if len(filetype) > 0 { suffix = filetype } filename := fmt.Sprintf("%d%s%s", time.Now().Unix(), util.GenRandomStr(32), suffix) //創建文件 filepath := "./resource/" + filename dstfile, err := os.Create(filepath) if err != nil { util.RespFail(writer, err.Error()) return } //將源文件拷貝到新文件 _, err = io.Copy(dstfile, srcFile) if err != nil { util.RespFail(writer, err.Error()) return } util.RespOk(writer, filepath, "") } ...... ############################ //main.go ############################ func main() { http.HandleFunc("/attach/upload", controller.FileUpload) }
我們將文件存放到本地的一個磁盤文件夾下,然后發送給客戶端路徑,客戶端通過路徑加載相關的圖片信息。
關于發送圖片,我們雖然實現功能,但是做的太簡單了,我們在接下來的章節詳細的和大家探討一下系統優化相關的方案。怎樣讓我們的系統在生產環境中用的更好。
程序優化和系統架構升級方案
我們上邊實現了一個功能健全的 IM 系統,要將該系統應用在企業的生產環境中,需要對代碼和系統架構做優化,才能實現真正的高可用。
本節主要從代碼優化和架構升級上談一些個人觀點,能力有限不可能面面俱到,希望讀者也在評論區給出更多好的建議。
代碼優化
我們的代碼沒有使用框架,函數和 API 都寫的比較簡陋,雖然進行了簡單的結構化,但是很多邏輯并沒有解耦,所以建議大家業界比較成熟的框架對代碼進行重構,Gin 就是一個不錯的選擇。
系統程序中使用 clientMap 來存儲客戶端長鏈接信息,Go 語言中對于大 Map 的讀寫要加鎖,有一定的性能限制。
在用戶量特別大的情況下,讀者可以對 clientMap 做拆分,根據用戶 ID 做 Hash 或者采用其他的策略,也可以將這些長鏈接句柄存放到 Redis 中。
上邊提到圖片上傳的過程,有很多可以優化的地方,首先是圖片壓縮(微信也是這樣做的),圖片資源的壓縮不僅可以加快傳輸速度,還可以減少服務端存儲的空間。
另外對于圖片資源來說,實際上服務端只需要存儲一份數據就夠了,讀者可以在圖片上傳的時候做 Hash 校驗。
如果資源文件已經存在了,就不需要再次上傳了,而是直接將 URL 返回給客戶端(各大網盤廠商的妙傳功能就是這樣實現的)。
代碼還有很多優化的地方,比如我們可以將鑒權做的更好,使用 wss:// 代替 ws://。
在一些安全領域,可以對消息體進行加密,在高并發領域,可以對消息體進行壓縮。
對 MySQL 連接池再做優化,將消息持久化存儲到 Mongo,避免對數據庫頻繁的寫入,將單條寫入改為多條一塊寫入;為了使程序耗費更少的 CPU,降低對消息體進行 Json 編碼的次數,一次編碼,多次使用......
系統架構升級
我們的系統太過于簡單,所在在架構升級上,有太多的工作可以做,筆者在這里只提幾點比較重要的:
①應用/資源服務分離
我們所說的資源指的是圖片、視頻等文件,可以選擇成熟廠商的 Cos,或者自己搭建文件服務器也是可以的,如果資源量比較大,用戶比較廣,CDN 是不錯的選擇。
②突破系統連接數,搭建分布式環境
對于服務器的選擇,一般會選擇 Linux,Linux 下一切皆文件,長鏈接也是一樣。
單機的系統連接數是有限制的,一般來說能達到 10 萬就很不錯了,所以在用戶量增長到一定程序,需要搭建分布式。
分布式的搭建就要優化程序,因為長鏈接句柄分散到不同的機器,實現消息廣播和分發是首先要解決的問題,筆者這里不深入闡述了,一來是沒有足夠的經驗,二來是解決方案有太多的細節需要探討。
搭建分布式環境所面臨的問題還有:怎樣更好的彈性擴容、應對突發事件等。
③業務功能分離
我們上邊將用戶注冊、添加好友等功能和聊天功能放到了一起,真實的業務場景中可以將它們做分離,將用戶注冊、添加好友、創建群組放到一臺服務器上,將聊天功能放到另外的服務器上。
業務的分離不僅使功能邏輯更加清晰,還能更有效的利用服務器資源。
④減少數據庫I/O,合理利用緩存
我們的系統沒有將消息持久化,用戶信息持久化到 MySQL 中去。
在業務當中,如果要對消息做持久化儲存,就要考慮數據庫 I/O 的優化,簡單講:合并數據庫的寫次數、優化數據庫的讀操作、合理的利用緩存。
到此,關于“如何快速搭建一個高可用的IM系統”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。