Golang socket websocket - Go语言中文社区

Golang socket websocket


理论知识可以参考
网络信息怎么在网线中传播的 (转载自知乎)
Android 网络(一) 概念 TCP/IP Socket Http Restful
脑残式网络编程入门(一):跟着动画来学TCP三次握手和四次挥手
脑残式网络编程入门(二):我们在读写Socket时,究竟在读写什么?
TCP 粘包问题浅析及其解决方案,这个帖子里大家一顿喷粘包这个叫法
我工作五年的时候也不知道 “TCP 粘包”,继续吐槽

一、API
1.服务端通过Listen加Accept
package main

import (
    "fmt"
    "net"
    "os"
    "time"
)

func main() {
    //通过 ResolveTCPAddr 获取一个 TCPAddr
    //ResolveTCPAddr(net, addr string) (*TCPAddr, os.Error)
    
    //net参数是"tcp4"、"tcp6"、"tcp"中的任意一个,
    //分别表示 TCP(IPv4-only),TCP(IPv6-only)
    //或者 TCP(IPv4,IPv6 的任意一个)
    
    //addr 表示域名或者IP地址,
    //例如"www.google.com:80" 或者"127.0.0.1:22".
    service := ":7777"
    tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
    checkError(err)
    
    //ListenTCP(net string, laddr *TCPAddr) (l *TCPListener, err os.Error)
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)
    
    //func (l *TCPListener) Accept() (c Conn, err os.Error)
    for {
        conn, err := listener.Accept()
            if err != nil {
            continue
        }
        
        daytime := time.Now().String()
        // don't care about return value
        conn.Write([]byte(daytime)) 
        
        // we're finished with this client
        conn.Close() 
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}

上面的服务跑起来之后,它将会一直在那里等待,直到有新的客户端请求到达。当有新的客户端请求到达并同意接受 Accept 该请求的时候他会反馈当前的时间信息。值得注意的是,在代码中 for 循环里,当有错误发生时,直接 continue而不是退出,是因为在服务器端跑代码的时候,当有错误发生的情况下最好是由服务端记录错误,然后当前连接的客户端直接报错而退出,从而不会影响到当前服务端运行的整个服务。

上面的代码有个缺点,执行的时候是单任务的,不能同时接收多个请求,那么该如何改造以使它支持多并发呢?

...
for {
    conn, err := listener.Accept()
    if err != nil {
        continue
    }
    go handlerClient(conn)
}
...

func handleClient(conn net.Conn) {
    defer conn.Close()
    daytime := time.Now().String()
    // don't care about return value
    conn.Write([]byte(daytime)) 
    
    // we're finished with this client
}
...
2.客户端直接调用 Dial
package main
    import (
        "fmt"
        "io/ioutil"
        "net"
        "os"
    )
    
    func main() {
        if len(os.Args) != 2 {
            fmt.Fprintf(os.Stderr, "Usage: %s host:port ", os.Args[0])
            os.Exit(1)
        }
    
        service := os.Args[1]
        tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
        checkError(err)
        
        conn, err := net.DialTCP("tcp", nil, tcpAddr)
        checkError(err)
        
        _, err = conn.Write([]byte("HEAD / HTTP/1.0rnrn"))
        checkError(err)
        
        result, err := ioutil.ReadAll(conn)
        checkError(err)
        
        fmt.Println(string(result))
        os.Exit(0)
    }
    
    func checkError(err error) {
        if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}

首先程序将用户的输入作为参数 service 传入net.ResolveTCPAddr 获取一个 tcpAddr,然后把 tcpAddr 传入 DialTCP 后创建了一个 TCP连接 conn ,通过 conn 来发送请求信息,最后通过 ioutil.ReadAll 从 conn 中读取全部的文本,也就是服务端响应反馈的信息。

二、实现一个可以接受不同命令的服务端

参考使用 Go 进行 Socket 编程
我们实现一个服务端, 它可以接受下面这些命令:

  • ping 探活的命令, 服务端会返回 “pong”
  • echo 服务端会返回收到的字符串
  • quit 服务端收到这个命令后就会关闭连接

具体的服务端代码如下所示:

package main

import (
    "fmt"
    "net"
    "strings"
)

func connHandler(c net.Conn) {
    if c == nil {
        return
    }

    buf := make([]byte, 4096)

    for {
        cnt, err := c.Read(buf)
        if err != nil || cnt == 0 {
            c.Close()
            break
        }

        inStr := strings.TrimSpace(string(buf[0:cnt]))

        inputs := strings.Split(inStr, " ")

        switch inputs[0] {
        case "ping":
            c.Write([]byte("pongn"))
        case "echo":
            echoStr := strings.Join(inputs[1:], " ") + "n"
            c.Write([]byte(echoStr))
        case "quit":
            c.Close()
            break
        default:
            fmt.Printf("Unsupported command: %sn", inputs[0])
        }
    }

    fmt.Printf("Connection from %v closed. n", c.RemoteAddr())
}

func main() {
    server, err := net.Listen("tcp", ":1208")
    if err != nil {
        fmt.Printf("Fail to start server, %sn", err)
    }

    fmt.Println("Server Started ...")

    for {
        conn, err := server.Accept()
        if err != nil {
            fmt.Printf("Fail to connect, %sn", err)
            break
        }

        go connHandler(conn)
    }
}

客户端的实现

package main

import (
    "bufio"
    "fmt"
    "net"
    "os"
    "strings"
)

func connHandler(c net.Conn) {
    defer c.Close()

    reader := bufio.NewReader(os.Stdin)
    buf := make([]byte, 1024)

    for {
        input, _ := reader.ReadString('n')
        input = strings.TrimSpace(input)

        if input == "quit" {
            return
        }

        c.Write([]byte(input))

        cnt, err := c.Read(buf)
        if err != nil {
            fmt.Printf("Fail to read data, %sn", err)
            continue
        }

        fmt.Print(string(buf[0:cnt]))
    }
}

func main() {
    conn, err := net.Dial("tcp", "localhost:1208")
    if err != nil {
        fmt.Printf("Fail to connect, %sn", err)
        return
    }

    connHandler(conn)
}
三、解决golang开发socket服务时粘包半包bug

基础知识可以参考tcp是流的一些思考--拆包和粘包
tcp中有一个negal算法,用途是这样的:通信两端有很多小的数据包要发送,虽然传送的数据很少,但是流程一点没少,也需要tcp的各种确认,校验。这样小的数据包如果很多,会造成网络资源很大的浪费,negal算法做了这样一件事,当来了一个很小的数据包,我不急于发送这个包,而是等来了更多的包,将这些小包组合成大包之后一并发送,不就提高了网络传输的效率的嘛。这个想法收到了很好的效果,但是我们想一下,如果是分属于两个不同页面的包,被合并在了一起,那客户那边如何区分它们呢?
这就是粘包问题。从粘包问题我们更可以看出为什么tcp被称为流协议,因为它就跟水流一样,是没有边界的,没有消息的边界保护机制,所以tcp只有流的概念,没有包的概念。

解决tcp粘包的方法:
客户端会定义一个标示,比如数据的前4位是数据的长度,后面才是数据。那么客户端只需发送 ( 数据长度+数据 ) 的格式数据就可以了,接收方根据包头信息里的数据长度读取buffer.
客户端:

//客户端发送封包
package main

import (
    "fmt"
    "math/rand"
    "net"
    "os"
    "strconv"
    "strings"
    "time"
)

func main() {

    server := "127.0.0.1:5000"
    tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }

    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }

    defer conn.Close()

    for i := 0; i < 50; i++ {
        //msg := strconv.Itoa(i)
        msg := RandString(i)
        msgLen := fmt.Sprintf("%03s", strconv.Itoa(len(msg)))
        //fmt.Println(msg, msgLen)
        words := "aaaa" + msgLen + msg
        //words := append([]byte("aaaa"), []byte(msgLen), []byte(msg))
        fmt.Println(len(words), words)
        conn.Write([]byte(words))
    }
}

/**
*生成随机字符
**/
func RandString(length int) string {
    rand.Seed(time.Now().UnixNano())
    rs := make([]string, length)
    for start := 0; start < length; start++ {
        t := rand.Intn(3)
        if t == 0 {
            rs = append(rs, strconv.Itoa(rand.Intn(10)))
        } else if t == 1 {
            rs = append(rs, string(rand.Intn(26)+65))
        } else {
            rs = append(rs, string(rand.Intn(26)+97))
        }
    }
    return strings.Join(rs, "")
}

服务端实例代码:

package main

import (
    "fmt"
    "io"
    "net"
    "os"
    "strconv"
)

func main() {
    netListen, err := net.Listen("tcp", ":5000")
    CheckError(err)

    defer netListen.Close()

    for {
        conn, err := netListen.Accept()
        if err != nil {
            continue
        }

        go handleConnection(conn)
    }
}

func handleConnection(conn net.Conn) {
    allbuf := make([]byte, 0)
    buffer := make([]byte, 1024)
    for {
        readLen, err := conn.Read(buffer)
        //fmt.Println("readLen: ", readLen, len(allbuf))
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println("read error")
            return
        }

        if len(allbuf) != 0 {
            allbuf = append(allbuf, buffer...)
        } else {
            allbuf = buffer[:]
        }
        var readP int = 0
        for {
            //fmt.Println("allbuf content:", string(allbuf))

            //buffer长度小于7
            if readLen-readP < 7 {
                allbuf = buffer[readP:]
                break
            }

            msgLen, _ := strconv.Atoi(string(allbuf[readP+4 : readP+7]))
            logLen := 7 + msgLen
            //fmt.Println(readP, readP+logLen)
            //buffer剩余长度>将处理的数据长度
            if len(allbuf[readP:]) >= logLen {
                //fmt.Println(string(allbuf[4:7]))
                fmt.Println(string(allbuf[readP : readP+logLen]))
                readP += logLen
                //fmt.Println(readP, readLen)
                if readP == readLen {
                    allbuf = nil
                    break
                }
            } else {
                allbuf = buffer[readP:]
                break
            }
        }
    }
}

func CheckError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}
四、io包的ReadFull

对于第三部分的解决golang开发socket服务时粘包半包bug,有作者认为太复杂了,参见golang tcp拆包的正确姿势,他提出可以用ReadFull来简化。

关于io包基础知识,参考Golang io reader writer
关于ReadFull,可以参考达达的博客系列:
Go语言小贴士1 - io包
Go语言小贴士2 - 协议解析
Go语言小贴士3 - bufio包

原文不再转述,现在引用一下重点:

io.Reader的定义如下:

type Reader interface {
        Read(p []byte) (n int, err error)
}

其中文档的说明非常重要,文档中详细描述了Read方法的各种返回可能性。

文档描述中有一个要点,就是n可能小于等于len(p),也就是说Go在读IO的时候,是不会保证一次读取预期的所有数据的。如果我们要确保一次读取我们所需的所有数据,就需要在一个循环里调用Read,累加每次返回的n并小心设置下次Readp的偏移量,直到n的累加值达到我们的预期。

因为上述需求实在太常见了,所以Go在io包中提供了一个ReadFull函数来做到一次读取要求的所有数据,通过阅读ReadFull函数的代码,也可以反过来帮助大家理解io.Reader是怎么运作的。

//io.go源码
func ReadFull(r Reader, buf []byte) (n int, err error) {
    return ReadAtLeast(r, buf, len(buf))
}

func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
    if len(buf) < min {
        return 0, ErrShortBuffer
    }
    for n < min && err == nil {
        var nn int
        nn, err = r.Read(buf[n:])
        n += nn
    }
    if n >= min {
        err = nil
    } else if n > 0 && err == EOF {
        err = ErrUnexpectedEOF
    }
    return
}

在很多应用场景中,消息包的长度是不固定的,就像上面的字符串字段一样。我们一样可以用开头固定的几个字节来存放消息长度,在解析通讯协议的时候就可以从字节流中截出一个个的消息包了,这样的操作通常叫做协议分包或者粘包处理。
贴个从Socket读取消息包的伪代码(没编译):

func ReadPacket(conn net.Conn) ([]byte, error) {
        var head [2]byte

        if _, err := io.ReadFull(conn, head[:]); err != nil {
                return err
        }

        size := binary.BigEndian.Uint16(head)
        packet := make([]byte, size)

        if _, err := io.ReadFull(conn, packet); err != nil {
                return err
        }

        return packet
}

上面的代码就用到了前一个小贴士中说到的io.ReadFull来确保一次读取完整数据。

要注意,这段代码不是线程安全的,如果有两个线程同时对一个net.Conn进行ReadPacket操作,很可能会发生严重错误,具体逻辑请自行分析。

从上面结构体序列化和反序列化的代码中,大家不难看出,实现一个二进制协议是挺繁琐和容易出BUG的,只要稍微有一个数值计算错就解析出错了。所以在工程实践中,不推荐大家手写二进制协议的解析代码,项目中通常会用自动化的工具来辅助生成代码。

Leaf 游戏服务器框架简介的tcp_msg.go中,Read方法也使用了ReadFull这种方式来处理。

五、WebSocket

参考封装golang websocket
websocket是个二进制协议,需要先通过Http协议进行握手,从而协商完成从Http协议向websocket协议的转换。一旦握手结束,当前的TCP连接后续将采用二进制websocket协议进行双向双工交互,自此与Http协议无关。

可以通过这篇知乎了解一下websocket协议的基本原理:《WebSocket 是什么原理?为什么可以实现持久连接?》

1.粘包

我们开发过TCP服务的都知道,需要通过协议decode从TCP字节流中解析出一个一个请求,那么websocket又怎么样呢?

websocket以message为单位进行通讯,本身就是一个在TCP层上的一个分包协议,其实并不需要我们再进行粘包处理。但是因为单个message可能很大很大(比如一个视频文件),那么websocket显然不适合把一个视频作为一个message传输(中途断了前功尽弃),所以websocket协议其实是支持1个message分多个frame帧传输的。

我们的浏览器提供的编程API都是message粒度的,把frame拆帧的细节对开发者隐蔽了,而服务端websocket框架一般也做了同样的隐藏,会自动帮我们收集所有的frame后拼成messasge再回调,所以结论就是:

websocket以message为单位通讯,不需要开发者自己处理粘包问题。

更多参考Websocket需要像TCP Socket那样进行逻辑数据包的分包与合包吗?

2.golang实现

golang官方标准库里有一个websocket的包,但是它提供的就是frame粒度的API,压根不能用。

不过官方其实已经认可了一个准标准库实现,它实现了message粒度的API,让开发者不需要关心websocket协议细节,开发起来非常方便,其文档地址:https://godoc.org/github.com/gorilla/websocket

开发websocket服务时,首先要基于http库对外暴露接口,然后由websocket库接管TCP连接进行协议升级,然后进行websocket协议的数据交换,所以开发时总是要用到http库和websocket库。

上述websocket文档中对开发websocket服务有明确的注意事项要求,主要是指:

  • 读和写API不是并发安全的,需要启动单个goroutine串行处理。
  • 关闭API是线程安全的,一旦调用则阻塞的读和写API会出错返回,从而终止处理。
六、心跳实现

Golang 心跳的实现
在多客户端同时访问服务器的工作模式下,首先要保证服务器的运行正常。因此,Server和Client建立通讯后,确保连接的及时断开就非常重要。否则,多个客户端长时间占用着连接不关闭,是非常可怕的服务器资源浪费。会使得服务器可服务的客户端数量大幅度减少。因此,针对短链接和长连接,根据业务的需求,配套不同的处理机制。

  • 短连接:一般建立完连接,就立刻传输数据。传输完数据,连接就关闭。服务端根据需要,设定连接的时长。超过时间长度,就算客户端超时。立刻关闭连接。
  • 长连接:建立连接后,传输数据,然后要保持连接,然后再次传输数据。直到连接关闭。

socket读写可以通过 SetDeadline、SetReadDeadline、SetWriteDeadline设置阻塞的时间。

func (*IPConn) SetDeadline  
func (c *IPConn) SetDeadline(t time.Time) error  

func (*IPConn) SetReadDeadline  
func (c *IPConn) SetReadDeadline(t time.Time) error  

func (*IPConn) SetWriteDeadline 
func (c *IPConn) SetWriteDeadline(t time.Time) error

如果做短连接,直接在Server端的连接上设置SetReadDeadline。当你设置的时限到达,无论客户端是否还在继续传递消息,服务端都不会再接收。并且已经关闭连接。

func main() {
    server := ":7373"
    netListen, err := net.Listen("tcp", server)
    if err != nil{
        Log("connect error: ", err)
        os.Exit(1)
    }
    Log("Waiting for Client ...")
    for{
        conn, err := netListen.Accept()
        if err != nil{
            Log(conn.RemoteAddr().String(), "Fatal error: ", err)
            continue
        }

        //设置短连接(10秒)
        conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))

        Log(conn.RemoteAddr().String(), "connect success!")
        ...
    }
}

这就可以了。在这段代码中,每当10秒中的时限一道,连接就终止了。

根据业务需要,客户端可能需要长时间保持连接。但是服务端不能无限制的保持。这就需要一个机制,如果超过某个时间长度,服务端没有获得客户端的数据,就判定客户端已经不需要连接了(比如客户端挂掉了)。做到这个,需要一个心跳机制。在限定的时间内,客户端给服务端发送一个指定的消息,以便服务端知道客户端还活着。

func sender(conn *net.TCPConn) {
    for i := 0; i < 10; i++{
        words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
        msg, err := conn.Write([]byte(words))
        if err != nil {
            Log(conn.RemoteAddr().String(), "Fatal error: ", err)
            os.Exit(1)
        }
        Log("服务端接收了", msg)
        time.Sleep(2 * time.Second)
    }
    for i := 0; i < 2 ; i++ {
        time.Sleep(12 * time.Second)
    }
    for i := 0; i < 10; i++{
        words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
        msg, err := conn.Write([]byte(words))
        if err != nil {
            Log(conn.RemoteAddr().String(), "Fatal error: ", err)
            os.Exit(1)
        }
        Log("服务端接收了", msg)
        time.Sleep(2 * time.Second)
    }

}

这段客户端代码,实现了两个相同的信息发送频率给服务端。两个频率中间,我们让运行休息了12秒。然后,我们在服务端的对应机制是这样的。

func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
    select {
    case fk := <- bytes:
        Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
        conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
        break

        case <- time.After(5 * time.Second):
            Log("conn dead now")
            conn.Close()
    }
}

每次接收到心跳数据就 SetDeadline 延长一个时间段 timeout。如果没有接到心跳数据,5秒后连接关闭。

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/c0db1567bfd9
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-09 22:13:53
  • 阅读 ( 758 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢