Implement a chat service with Go + WebSocket

Kevin Wan
CodeX
Published in
4 min readOct 1, 2021

--

Background

After go-zero was open sourced, many users asked if and when websocket would be supported, finally in v1.1.6 we got websocket supported in the framework level, below we will use chat as an example to explain how to use go-zero to implement a chat service with websocket.

Overall design

Let’s take the chat chat room in zero-example as an example to explain the implementation of websocket step by step, divided into the following parts.

  1. multi-client access
  2. message broadcasting
  3. timely online and offline clients
  4. full-duplex communication [the client itself is the sender and the receiver].

First of all, let’s put a diagram of the general data transmission.

There is a select loop in the middle which is the engine of the whole chat. First of all, to support communication between two parties.

  • There has to be a pipe to exchange data. Client only reads/transfers data from pipe.
  • The client is online. Can’t say you’re offline and still transfer data to you.

Data flow

Data flow is the main function of engine, let's not rush through the code first, let's think how client can access and be sensed by engine.

  1. first send a websocket request from the front-end.
  2. establish a connection; prepare the receive/send channel.
  3. register to the engine.

Code looks like below:

 // HTML manipulation {js}
if (window["WebSocket"]) {
conn = new WebSocket("ws://" + document.location.host + "/ws");
conn.onclose = function (evt) {
var item = document.createElement("div");
item.innerHTML = "<b>Connection closed.</b>";
appendLog(item);
};
...
}

// Routing
engine.AddRoute(rest.Route{
Method: http.MethodGet,
Path: "/ws",
Handler: func(w http.ResponseWriter, r *http.Request) {
internal.ServeWs(hub, w, r)
},
})

// Access logic
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// escalate http requests to websockets
conn, err := upgrader.Upgrade(w, r, nil)
...
// build client: hub{engine}, con{websocker conn}, send{channel buff}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, bufSize),
}
client.hub.register <- client
// Start client-side duplex communication, receiving and writing data
go client.writePump()
go client.readPump()
}

This way, the newly accessed client is added to the registration channel.

hub engine

What does the engine do when it issues a register action?

 type Hub struct {
clients map[*Client]bool // uplink clients
broadcast chan []byte // message sent by client -> broadcast to other clients
register chan *Client // register chan, receive registration msg
unregister chan *Client // go offline chan
}

func (h *Hub) Run() {
for {
select {
// register chan: stored in the registry, and the data flow happens in these clients
case client := <-h.register:
h.clients[client] = true
// Offline channels: remove from the registry
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
// Broadcast message: sent to client in the registry, received by send and displayed to client
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
  1. receive registration messages -> add to the global registry
  2. If engine.broadcast receives it, it will pass msg to client.sendChan in the registry.

This way the whole data flow from HTML -> client -> hub -> other client is clear.

Broadcast data

It says engine.broadcast receives the data, but how is the data sent here, starting from the page?

 func (c *Client) readPump() {
...
for {
// 1
_, message, err := c.conn.ReadMessage()
if err ! = nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
// 2.
c.hub.broadcast <- message
}
}
  1. keep reading msg from conn [passed after page click]
  2. pass msg into engine.broadcast to broadcast to other clients. 3.
  3. when there is a send exception or a timeout, an exception exit will be triggered offline client

At the same time, you should know that there is more than one client sending the message, there may be many. Then send it to other clients, who will read it from their own send channel as follows

 func (c *Client) writePump() {
// write timeout control
ticker := time.NewTicker(pingPeriod)
...
for {
select {
case message, ok := <-c.send:
// Extend the write timeout when receiving a message to write.
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
...
w, err := c.conn.NextWriter(websocket.TextMessage)
...
w.Write(message)

// Read the messages in send in sequence and write
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
...
case <-ticker:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
...
}
}
}

As mentioned above, send has the msg sent from the respective client: so when send is detected, it keeps receiving messages and writing to the current client; at the same time, when the write timeout is reached, it checks if the websocket long connection is still alive, and if it is, it continues to read send chan, and if it is disconnected, it returns directly.

Complete sample code

https://github.com/zeromicro/zero-examples/tree/main/chat

Summary

This article describes how to start your websocket project with go-zero in use, so you can adapt it to your needs.

For more design and implementation articles on go-zero, keep following us.

https://github.com/zeromicro/go-zero

Feel free to use go-zero and give a star to support us!

--

--