您好,登錄后才能下訂單哦!
本節介紹 net 包,它提供構建客戶端和服務器程序的組件,這些程序通過 TCP、UDP 或者 UNIX 套接字進行通信。網絡服務 net/http 包是在 net 包的基礎上構建的。
這個示例是一個時鐘服務器,它以每秒一次的頻率向客戶端發送當前時間:
package main
import (
"io"
"log"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
handleConn(conn) // 一次處理一個連接
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("2006/01/02 15:04:05\r\n"))
if err != nil {
return // 例如,連接斷開
}
time.Sleep(1 * time.Second)
}
}
Listen 函數創建一個 net.Listener 對象,它在一個網絡端口上監聽進來的連接,這里是 TCP 端口 localhost:8000。監聽器的 Accept 方法被阻塞,知道有連接請求進來,然后返回 net.Conn 對象來代表一個連接。
handleConn 函數處理一個完整的客戶端連接。在循環里,它將 time.Now() 獲取的當前時間發送給客戶端。因為 net.Conn 滿足 io.Writer 接口,所以可以直接向它進行寫入。當寫入失敗時循環結束,很多時候是客戶端斷開連接,這是 handleConn 函數使用延遲(defer)的 Close 調用關閉自己這邊的連接,然后繼續等待下一個連接請求。
為了連接到服務器,還需要一個 socket 客戶端,這里可以先使用系統的 telnet 來進行驗證:
$ telnet localhost 8000
這里可以開兩個 telnet 嘗試進行連接,只有第一個可以連接上,而其他的連接會阻塞。當把第一個客戶端的連接斷開后,服務端會重新返回到 main 函數的 for 循環中等待新的連接。此時之前阻塞的一個連接就能連接進來,繼續顯示時間。服務端程序暫時先這樣,先來實現一個 socket 客戶端程序。
下面的客戶端使用 net.Dial 實現了 Go 版本的 netcat 程序,用來連接 TCP服務器:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
這個程序從網絡連接中讀取,然后寫到標準輸出,直到到達 EOF 或者出錯。
如果打開多個客戶端,同時只有一個客戶端能正常工作。第二個客戶端必須等到第一個結束才能正常工作,這是因為服務器是順序的,一次只能處理一個客戶請求。讓服務器支持并發只需要一個很小的改變:在調用 handleConn 的地方添加一個 go 關鍵字,使它在自己的 goroutine 內執行:
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發處理連接
}
現在的版本,多個客戶端可以同時接入并正常工作了。
上面的時鐘服務器每個連接使用一個 goroutine。下面要實現的這個回聲服務器,每個連接使用多個 goroutine 來處理。大多數的回聲服務器僅僅將讀到的內容寫回去,所以可以使用下面簡單的 handleConn 版本:
func handleConn(c net.Conn) {
io.Copy(c, c)
c.Close()
}
下面的這個版本可以重復3次,第一個全大寫,第二次正常,第三次全消息:
// reverb1
package main
import (
"bufio"
"fmt"
"io"
"log"
"net"
"strings"
"time"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
c.Close()
}
func handleConn0(c net.Conn) {
io.Copy(c, c)
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發處理連接
}
}
在上一個示例中,已經知道需要使用 go 關鍵字調用 handleConn 函數。不過在這個例子中,重點不是處理多個客戶端的連接,所以這里不是重點。
現在來升級一下客戶端,使它可以在終端上向服務器輸入,還可以將服務器的回復復制到輸出,這里提供了另一個使用并發的機會:
package main
import (
"io"
"log"
"net"
"os"
)
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
使用上面的服務端版本,如果有多個連續的輸入,新輸入的內容不會馬上返回,而是要等待之前輸入的內容全部返回后才會處理之后的內容。要想做的更好,需要更多的 goroutine。再一次,在調用 echo 時需要加入 go 關鍵字:
// reverb2
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
c.Close()
}
這個改進的版本,回聲也是并發的,在時間上互相重合。
這就是使服務器變成并發所要做的,不僅處理來自多個客戶端的鏈接,還包括在一個連接處理中,使用多個 go 關鍵字。在這個例子里,單個客戶端連接也可以同時發起多個請求。在最初的版本里,沒有使用 go 調用 echo,所以處理單個客戶端的請求不是并發的,只有前一個處理完才會繼續處理下一個。之后改進的版本,使用 go 調用 echo,這里對每一個請求的處理都是并發的了。
然而,在添加這些 go 關鍵字的同時,必須要仔細考慮方法 net.Conn 的并發調用是不是安全的,對大多數類型來講,這都是不安全的。
之前的客戶端在主 goroutine 中將輸入復制到服務器中,這樣的客戶端在輸入接收后立即退出,即使后臺的 goroutine 還在繼續。為了讓程序等待后臺的 goroutine 在完成后再退出,使用一個通道來同步兩個 goroutine:
func main() {
conn, err := net.Dial("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done <- struct{}{} // 通知主 goroutine 的信號
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // 等待后臺 goroutine 完成
}
當用戶關閉標準輸入流(Windows系統使用Ctrl+Z)時,mustCopy 返回,主 goroutine 調用 conn.Close() 來關閉兩端網絡連接。關閉寫半邊的連接會導致服務器看到 EOF。關閉讀半邊的連接導致后臺 goroutine 調用 io.Copy 返回 “read from closed connection” 錯誤,所以這個版本里去掉了打印錯誤日志。
上面這個版本使用起來的效果和之前的版本并沒有太大的差別,幾乎看不到差別。雖然多了等待連接關閉,但是依然不會等待接收完畢所有服務器的返回。不過這步解決了等待 goroutine 運行完畢后,主 goroutine 才會結束。使用下面的 TCP 鏈接,就可以實現接收完畢所有信息后,goroutine 才會結束。在 net 包中,conn 接口有一個具體的類型 *net.TCPConn,它代表一個 TCP 連接:
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
TCP 鏈接由兩半邊組成,可以通過 CloseRead 和 CloseWrite 方法分別關閉。修改主 goroutine,僅僅關閉連接的寫半邊,這樣程序可以繼續執行輸出來自 reverb1 服務器的回聲,即使標準輸入已經關閉:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done <- struct{}{} // 通知主 goroutine 的信號
}()
mustCopy(conn, os.Stdin)
conn.CloseWrite()
<-done // 等待后臺 goroutine 完成
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
現在只對第一個回聲服務器版本 reverb1 有效,對于之后改進的可以并發處理同一個客戶端多個請求的 reverb2 服務器,服務端還需要做一些修改。
在 reverb2 服務器的版本中,因為對于每一個連接,每一次回聲的請求都會生成一個新的 goroutine 進行處理。為了知道什么時候最后一個 goroutine 結束(有時候不一定是最后啟動的那個),需要在每一個 goroutine 啟動千遞增計數,在每一個 goroutine 結束時遞減計數。這需要一個特殊設計的計數器,它可以被多個 goroutine 安全地操作,然后又一個方法一直等到他變為 0。這個計數器類型是 sync.WaitGroup。下面是完整的服務器代碼:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // 工作 goroutine 的個數
func echo(c net.Conn, shout string, delay time.Duration) {
defer wg.Done()
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
wg.Add(1)
go echo(c, input.Text(), 2*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
wg.Wait()
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發處理連接
}
}
注意 Add 和 Done 方法的不對稱性。Add 遞增計數器,它必須工作在 goroutine 開始之前執行,而不是在中間。另外,Add 有一個參數,但 Done 沒有,它等價于 Add(-1)。使用 defer 來確保計數器在任何情況下都可以遞減。在不知道迭代次數的情況下,上面的代碼結構是通用的,符合習慣的并行循環模式。
下面的版本增加了超時斷開的功能。這樣服務端和客戶端就各有兩個斷開連接的情況了,原本只有一種。
這里需要用到 select 多路復用。
服務端
原本只要被動等待客戶端斷開就可以了,這個邏輯原本原本放在主 goroutine 中。現在服務端超時需要主動斷開,客戶端斷開了,需要被動斷開,這2個邏輯都需要一個單獨的 goroutine,而主 goroutine 則阻塞接收這兩個情況的通道,任意一個通道有數據,就斷開并退出:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // 工作 goroutine 的個數
func echo(c net.Conn, shout string, delay time.Duration) {
defer wg.Done()
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
stop1 := make(chan struct{})
stop2 := make(chan struct{})
inputSignal := make(chan struct{}) // 有任何輸入,就發送一個信號
go func() { // 接收客戶端發送回聲的goroutine
input := bufio.NewScanner(c)
for input.Scan() { // 注意:忽略 input.Err() 中可能的錯誤
inputSignal <- struct{}{}
wg.Add(1)
go echo(c, input.Text(), 1*time.Second)
}
// 退出上面的for循環,表示客戶端斷開
stop1 <- struct{}{}
}()
delay := 5 * time.Second
timer := time.NewTimer(delay)
go func() { // 計算超時的goroutine
for {
select {
case <-inputSignal:
timer.Reset(delay)
case <-timer.C:
// 超時,斷開連接
stop2 <- struct{}{}
return
}
}
}()
select {
case <-stop1:
case <-stop2:
}
wg.Wait()
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發處理連接
}
}
客戶端
原本只需要響應接收標準輸入的 Ctrl+Z 然后斷開寫半邊的連接,這個邏輯也需要從主 goroutine 放到一個新的 goroutine 中。另外一種斷開的連接是被動響應服務端的斷開連接然后客戶端也退出。這里還要稍微在復雜一點,如果是服務端的超時斷開,則直接斷開。如果是客戶端的主動斷開,則還需要繼續等待服務端的斷開,然后再退出:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
done1 := make(chan struct{})
go func() { // 打印回聲的goroutine
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done1 <- struct{}{} // 通知主 goroutine 的信號
}()
done2 := make(chan struct{})
go func() { // 發送請求的goroutine
mustCopy(conn, os.Stdin)
conn.CloseWrite()
done2 <- struct{}{}
}()
select { // 等待后臺 goroutine 完成
case <-done1:
case <-done2: // 客戶端主動斷開后,只關閉寫半邊連接
<-done1 // 繼續等待服務端斷開,就是等待全是打印完畢
}
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
實現一個聊天服務器,它可以在幾個用戶之間相互廣播文本消息。
這個程序中有四種 goroutine:
主函數的工作是監聽端口,接受連接請求。對每一個連接,它創建一個新的 handleConn。就像之前的并發回聲服務器中那樣:
func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
廣播器,它的變量 clients 會記錄當前連接的客戶集合。其記錄的內容是每一個客戶端對外發送消息的通道:
// 廣播器
type client chan<- string // 對外發送消息的通道
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // 所有接受的客戶消息
)
func broadcaster() {
clients := make(map[client]bool) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發送消息通道
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}
廣播器監聽兩個全局的通道 entering 和 leaving。通過它們通知有客戶進入和離開,如果從一個通道中接收到事件,它將更新 clients 集合。如果是客戶離開,還會關閉對應客戶對外發送消息的通道。
廣播器還監聽 messages 通道,所有的客戶都會將要廣播的消息發送到這個通道。當收到一個消息后,就會把消息廣播給所有客戶。
handleConn 函數創建一個對外發送消息的新通道,然后通過 entering 通道通知廣播器新客戶進入。接著,要讀取客戶發來的每一條消息,通過 messages 通道將每一條消息發送給廣播器,發送時再每條消息前面加上發送者的ID作為前綴。一旦客戶端將消息讀取完畢,handleConn 通過 leaving 通道通知客戶離開,然后關閉連接:
// 客戶端處理函數: handleConn
func handleConn(conn net.Conn) {
ch := make(chan string) // 對外發送客戶消息的通道
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who // 這條單發給自己
messages <- who + " has arrived" // 這條進行進行廣播,但是自己還沒加到廣播列表中
entering <- ch // 然后把自己加到廣播列表中
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
leaving <- ch
messages <- who + " has left"
conn.Close()
}
另外,handleConn 函數還為每一個客戶創建了寫入(clientWriter)goroutine,每個客戶都從自己的通道中接收消息發送給客戶端的網絡連接。在廣播器收到 leaving 通知并關閉這個接收消息的通道后,clientWriter 會結束通道的遍歷后運行結束:
// 客戶端處理函數: clientWriter
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
// 在消息結尾使用 \r\n ,提升平臺兼容
fmt.Fprintf(conn, "%s\r\n", msg) // 注意,忽略網絡層面的錯誤
}
}
給客戶端發送的消息字符串需要用"\n"結尾。如果換成"\r\n"結尾,平臺的兼容性應該會更好。至少windows上的telnet客戶端可以直接使用了。
完整的源碼就是上面的四段代碼,拼在一起就能運行了。
和之前使用回聲服務器一樣,可以用 telnet 或者也可以用之前寫的 netcat 作為客戶端來聊天。
當有 n 個客戶 session 在連接的時候,程序并發運行著 2n+2 個相互通信的 goroutine,它不需要隱式的加鎖操作也能做到并發安全。clients map 被限制在廣播器這一個 goroutine 中,所以不會被并發的訪問。唯一被多個 goroutine 共享的變量是通道以及 net.Conn 的實例,它們也都是并發安全的。
上面的聊天服務器提供了一個很好的架構,現在再在其之上擴展功能就很方便了。
在新用戶到來之后,告知該新用戶當前在聊天室的所有的用戶列表。每個用戶加入后,系統都會自動生成一個用戶名(基于用戶的網絡連接,之后會添加設置用戶名的功能),就是要把這些存在的用戶名打印出來。
所有的用戶列表只在廣播器的 clients map 中,但是這個 map 又不包括用戶名。所以先要修改數據類型,把每個連接的數據結構加上一個新的用戶名字段:
type client chan<- string // 對外發送消息的通道
type clientInfo struct {
name string
ch client
}
原本使用 client 作為元素的通道和 map,現在全部也都要換成 clientInfo 作為元素。像新用戶發送當前用戶列表的任務也在廣播器中完成:
// 廣播器
type client chan<- string // 對外發送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
func broadcaster() {
clients := make(map[clientInfo]bool) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發送消息通道
for cli := range clients {
cli.ch <- msg
}
case cli := <-entering:
// 在每一個新用戶到來的時候,通知當前存在的用戶
var users []string
for cli := range clients {
users = append(users, cli.name)
}
if len(users) > 0 {
cli.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cli.ch <- "You are the only user in this room."
}
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli.ch)
}
}
}
客戶端處理函數還需要做少量的修改,主要是因為數據結構變了。原本給 entering 和 leaving 通道發送的是 ch。現在要發送封裝好 who 的結構體。客戶端處理函數的代碼略,之后的擴展中會貼出來:
cli := clientInfo{who, ch}
entering <- cli
如果在一段時間里,客戶端沒有任何輸入,服務器就將客戶端斷開。之前的邏輯是,客戶端處理函數會一直在阻塞在 input.Scan() 這里等待客戶端輸入。只要在另外一個 goroutine 中調用 conn.Close(),就可以讓當前阻塞的讀操作變成非阻塞,就像 input.Scan() 輸入完成的讀操作一樣。不過這么做的話會有一點小問題,原本在主 goroutine 的結尾有一個conn.Close()
操作,現在在定時的 goroutine 中還需要有一個關閉的操作。如果因為定時而結束的,就會有兩次關閉操作。
這里關閉的是 socket 連接,本質上就是文件句柄。嘗試多次關閉貌似不會有什么問題,不過要解決這個問題也不難。一種是把響應用戶輸入的操作也放到 goroutine 中。現有有兩個 goroutine 在運行,主 goroutine 則只要一直阻塞,通過一個通道等待其中任何一個 goroutine 完成后發送的信號即可。這樣關閉的操作只在主 goroutine 中操作。下面的是客戶端處理函數,包括上一個功能里修改的部分:
// 客戶端處理函數: handleConn
func handleConn(conn net.Conn) {
ch := make(chan string) // 對外發送客戶消息的通道
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
cli := clientInfo{who, ch} // 打包好用戶名和通道
ch <- "You are " + who // 這條單發給自己
messages <- who + " has arrived" // 這條進行進行廣播,但是自己還沒加到廣播列表中
entering <- cli // 然后把自己加到廣播列表中
done := make(chan struct{}, 2) // 等待下面兩個 goroutine 其中一個執行完成。使用緩沖通道防止 goroutine 泄漏
// 計算超時的goroutine
inputSignal := make(chan struct{}) // 有任何輸入,就發送一個信號
timeout := 15 * time.Second // 客戶端空閑的超時時間
go func() {
timer := time.NewTimer(timeout)
for {
select {
case <-inputSignal:
timer.Reset(timeout)
case <-timer.C:
// 超時,斷開連接
done <- struct{}{}
return
}
}
}()
go func() {
input := bufio.NewScanner(conn)
for input.Scan() {
inputSignal <- struct{}{}
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發送純空白字符
continue
}
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
done <- struct{}{}
}()
<-done
leaving <- cli
messages <- who + " has left"
conn.Close()
}
這里還簡單加了一個限制客戶端發送空消息的功能,在 input.Scan() 循環中。空消息不會發送廣播,但是可以重置定時器的時間。
在客戶端連接后,不立刻進入聊天室,而是先輸入一個名字。考慮到名字不能和已有的名字重復,而現有的名字都保存在廣播器里的 clients 這個 map 中。所以客戶端輸入的名字需要在 clients 中查找一下是否已經有人用了。現在有了按名字進行查找的需求,clients 類型更適合使用一個以名字為 key 的 map 而不是原本的集合。這個 map 的 value 就是向該客戶發送消息的通道,也就是最初這個集合的 key 的值:
clients := make(map[string]client) // 所有連接的客戶端集合
客戶端處理函數
在客戶端處理函數的開頭,需要增加注冊用戶名的過程。用戶名注冊的處理過程比較復雜,所以單獨封裝到了一個函數 clientRegiste 中:
// 客戶端處理函數
func handleConn(conn net.Conn) {
who := clientRegiste(conn) // 新增這一行,注冊獲取用戶名
ch := make(chan string) // 對外發送客戶消息的通道
go clientWriter(conn, ch)
// who := conn.RemoteAddr().String() // 去掉這一行
// 之后的代碼不變
}
這里使用一個交互的方式來獲取用戶名,代替原本通過連接的信息自動生成。這個函數是串行的,只有在返回用戶名后,才會繼續執行下去。之后的代碼和之前是一樣的。
在 clientRegiste 函數中,不停的和終端進行交互,處理收到的消息,如果用戶名可用,繼續執行之后的流程。如果用戶名不可用,則提示用戶繼續處理:
// 客戶端處理函數 clientRegiste
// 注冊用戶名
func clientRegiste(conn net.Conn) (who string) {
ch := make(chan bool)
fmt.Fprint(conn, "input nickname: ") // 注意,忽略網絡層面的錯誤
input := bufio.NewScanner(conn)
for input.Scan() {
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發送純空白字符
continue
}
who = input.Text()
register <- registeInfo{who, ch}
if <-ch {
break
}
fmt.Fprintf(conn, "name %q is existed\r\ntry other name: ", who)
}
// 注意,忽略input.Err()中可能的錯誤
return who
}
這里只有最簡單的功能,還可以增加輸入超時,以及嘗試次數的限制。所以把這個函數獨立出來完成功能,更方便之后對注冊函數進行擴展。
函數的主要邏輯就是 input.Scan() 的循環,這和 handleConn 中的循環十分相似。如果之后再加上輸入超時,這兩段的處理邏輯只有極小部分的差別,所以這部分代碼也可以單獨寫一個函數。這里避免過早的優化,暫時就先這樣,看著也比較清晰。之后要添加超時功能的時候,再把這部分重復的代碼獨立出來。這部分優化最后完整的代碼里會有。
廣播器
在廣播器的 select 里要加一個分支,用來處理用戶名的請求。收到請求后,判斷是否已經存在,把結果返回給 clientRegiste。因為 clients 是只有廣播器可見的,所以這里要使用通道傳遞過來,判斷后再用通道把結果傳回去。這樣可以保證 clients 變量只在這一個 goroutine 里被使用(包括修改)。另外,每個客戶端的注冊都使用一個通道將注冊信息發送給廣播器,但是廣播器返回的內容,需要對每個客戶端使用不同的通道。所以這里,廣播器新創建了專門用于注冊交互的數據結構:
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
客戶注冊的函數創建一個布爾型的通道,加上用戶的名字封裝到 registeInfo 結構體中。然后廣播器判斷后,把結果通道 registeInfo 里的 ch 字段這個通道,把結果返回給對應的客戶注冊函數。
下面是廣播器 broadcaster 的代碼,主要是 select 新增了一個分支,處理注冊用戶名:
// 廣播器
type client chan<- string // 對外發送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
func broadcaster() {
clients := make(map[string]client) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發送消息通道
for _, cli := range clients {
cli <- msg
}
case user := <-register:
// 先判斷新用戶名是否有重復
_, ok := clients[user.name]
user.ch <- !ok
case cliSt := <-entering:
// 在每一個新用戶到來的時候,通知當前存在的用戶
var users []string
for user := range clients {
users = append(users, user)
}
if len(users) > 0 {
cliSt.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cliSt.ch <- "You are the only user in this room."
}
clients[cliSt.name] = cliSt.ch
case cliSt := <-leaving:
delete(clients, cliSt.name)
close(cliSt.ch)
}
}
}
最后還有一個問題,就是客戶端可能會卡或者延遲,但是客戶端的問題不能影響到服務器的正常運行。不過我沒法實現一個這樣的有延遲的客戶端,默認操作系統應該就已經非常友好的幫我們處理掉了,把從網絡上接收到的數據暫存在緩沖區里(對于TCP連接還有亂序重組和超時重傳,這些我們都不需要關心了),等待程序去讀取。代碼里接收的操作應該是直接從緩沖區讀取,這時服務的已經發送完畢了。所以現在只能照著下面的思路寫了:
任何客戶程序讀取數據的時間很長最終會造成所有的客戶卡住。修改廣播器,使它滿足如果一個向客戶寫入的通道沒有準備好接受它,那么跳過這條消息。還可以給每一個向客戶發送消息的通道增加緩沖,這樣大多數的消息不會丟棄;廣播器在這個通道上應該使用非阻塞的發送方式。
客戶端處理函數中創建的發送消息的通道改用有緩沖區的通道:
// 客戶端處理函數
func handleConn(conn net.Conn) {
defer conn.Close() // 退出時關閉客戶端連接,現在有分支了,并且可能會提前退出
who, ok := clientRegiste(conn) // 注冊獲取用戶名
if !ok { // 用戶名未注冊成功
fmt.Fprintln(conn, "\r\nName registe failed...")
return
}
ch := make(chan string, 10) // 有緩沖區,對外發送客戶消息的通道
go clientWriter(conn, ch)
// 省略后面的代碼
}
然后廣播器的 select 對應的 messages 通道的分支,改成非阻塞的方式:
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發送消息通道
for name, cli := range clients {
select {
case cli <- msg:
default:
fmt.Fprintf(os.Stderr, "send message failed: %s: %s\n", name, msg)
}
}
// 其他分支略過
}
下面是聊天服務器最后完整的代碼。這里的改變還包括了上一節最后提到的注冊用戶名時的輸入的超時。已經兩次用到了輸入超時,分別在 handleConn 和 clientRegiste 中,這里也就把這部分代碼單獨寫了一個函數 inputWithTimeout。完整代碼如下:
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
"strings"
"time"
)
func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
// 廣播器
type client chan<- string // 對外發送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
func broadcaster() {
clients := make(map[string]client) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發送消息通道
for name, cli := range clients {
select {
case cli <- msg:
default:
fmt.Fprintf(os.Stderr, "send message failed: %s: %s\n", name, msg)
}
}
case user := <-register:
// 先判斷新用戶名是否有重復
_, ok := clients[user.name]
user.ch <- !ok
case cliSt := <-entering:
// 在每一個新用戶到來的時候,通知當前存在的用戶
var users []string
for user := range clients {
users = append(users, user)
}
if len(users) > 0 {
cliSt.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cliSt.ch <- "You are the only user in this room."
}
clients[cliSt.name] = cliSt.ch
case cliSt := <-leaving:
delete(clients, cliSt.name)
close(cliSt.ch)
}
}
}
// 客戶端處理函數
func handleConn(conn net.Conn) {
defer conn.Close() // 退出時關閉客戶端連接,現在有分支了,并且可能會提前退出
who, ok := clientRegiste(conn) // 注冊獲取用戶名
if !ok { // 用戶名未注冊成功
fmt.Fprintln(conn, "\r\nName registe failed...")
return
}
ch := make(chan string, 10) // 有緩沖區,對外發送客戶消息的通道
go clientWriter(conn, ch)
cli := clientInfo{who, ch} // 打包好用戶名和通道
ch <- "You are " + who // 這條單發給自己
messages <- who + " has arrived" // 現在這條廣播自己也能收到
entering <- cli
inputFunc := func(sig chan<- struct{}) {
input := bufio.NewScanner(conn)
for input.Scan() {
sig <- struct{}{} // 向 sig 發送信號,會重新開始計時
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發送純空白字符
continue
}
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
}
inputWithTimeout(conn, 300*time.Second, inputFunc)
leaving <- cli
messages <- who + " has left"
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
// windows 需要 \r 了正常顯示
fmt.Fprintln(conn, msg+"\r") // 注意,忽略網絡層面的錯誤
}
}
// 注冊用戶名
func clientRegiste(conn net.Conn) (who string, ok bool) {
inputFunc := func(sig chan<- struct{}) {
input := bufio.NewScanner(conn)
ch := make(chan bool)
fmt.Fprint(conn, "input nickname: ") // 注意,忽略網絡層面的錯誤
for input.Scan() {
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發送純空白字符
continue
}
who = input.Text()
register <- registeInfo{who, ch}
if <-ch {
ok = true
break
}
fmt.Fprintf(conn, "name %q is existed\r\ntry other name: ", who)
}
// 注意,忽略input.Err()中可能的錯誤
}
inputWithTimeout(conn, 15*time.Second, inputFunc)
return who, ok
}
// 為 input.Scan 封裝超時退出的功能
func inputWithTimeout(conn net.Conn, timeout time.Duration, input func(sig chan<- struct{})) {
done := make(chan struct{}, 2)
inputSignal := make(chan struct{})
go func() {
timer := time.NewTimer(timeout)
for {
select {
case <-inputSignal:
timer.Reset(timeout)
case <-timer.C:
// 超時,斷開連接
done <- struct{}{}
return
}
}
}()
go func() {
input(inputSignal)
done <- struct{}{}
}()
<-done
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。