Background
After go-zero
was open sourced, many users asked if and when websocket
would be supported, finally in v1.1.6
we got websocket
supported in the framework level, below we will use chat
as an example to explain how to use go-zero
to implement a chat
service with websocket
.
Overall design
Let’s take the chat
chat room in zero-example
as an example to explain the implementation of websocket
step by step, divided into the following parts.
- multi-client access
- message broadcasting
- timely online and offline clients
- full-duplex communication [the client itself is the sender and the receiver].
First of all, let’s put a diagram of the general data transmission.
There is a select loop
in the middle which is the engine
of the whole chat
. First of all, to support communication between two parties.
- There has to be a pipe to exchange data. Client only reads/transfers data from pipe.
- The client is online. Can’t say you’re offline and still transfer data to you.
Data flow
Data flow is the main function of engine
, let's not rush through the code first, let's think how client
can access and be sensed by engine
.
- first send a
websocket
request from the front-end. - establish a connection; prepare the receive/send channel.
- register to the
engine
.
Code looks like below:
// HTML manipulation {js}
if (window["WebSocket"]) {
conn = new WebSocket("ws://" + document.location.host + "/ws");
conn.onclose = function (evt) {
var item = document.createElement("div");
item.innerHTML = "<b>Connection closed.</b>";
appendLog(item);
};
...
}
// Routing
engine.AddRoute(rest.Route{
Method: http.MethodGet,
Path: "/ws",
Handler: func(w http.ResponseWriter, r *http.Request) {
internal.ServeWs(hub, w, r)
},
})
// Access logic
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// escalate http requests to websockets
conn, err := upgrader.Upgrade(w, r, nil)
...
// build client: hub{engine}, con{websocker conn}, send{channel buff}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, bufSize),
}
client.hub.register <- client
// Start client-side duplex communication, receiving and writing data
go client.writePump()
go client.readPump()
}
This way, the newly accessed client
is added to the registration channel.
hub engine
What does the engine
do when it issues a register action?
type Hub struct {
clients map[*Client]bool // uplink clients
broadcast chan []byte // message sent by client -> broadcast to other clients
register chan *Client // register chan, receive registration msg
unregister chan *Client // go offline chan
}
func (h *Hub) Run() {
for {
select {
// register chan: stored in the registry, and the data flow happens in these clients
case client := <-h.register:
h.clients[client] = true
// Offline channels: remove from the registry
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
// Broadcast message: sent to client in the registry, received by send and displayed to client
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
- receive registration messages -> add to the global registry
- If
engine.broadcast
receives it, it will passmsg
toclient.sendChan
in the registry.
This way the whole data flow from HTML -> client -> hub -> other client is clear.
Broadcast data
It says engine.broadcast
receives the data, but how is the data sent here, starting from the page?
func (c *Client) readPump() {
...
for {
// 1
_, message, err := c.conn.ReadMessage()
if err ! = nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
// 2.
c.hub.broadcast <- message
}
}
- keep reading
msg
fromconn
[passed after page click] - pass
msg
intoengine.broadcast
to broadcast to otherclients
. 3. - when there is a send exception or a timeout, an exception exit will be triggered offline
client
At the same time, you should know that there is more than one client
sending the message, there may be many. Then send it to other clients, who will read it from their own send channel
as follows
func (c *Client) writePump() {
// write timeout control
ticker := time.NewTicker(pingPeriod)
...
for {
select {
case message, ok := <-c.send:
// Extend the write timeout when receiving a message to write.
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
...
w, err := c.conn.NextWriter(websocket.TextMessage)
...
w.Write(message)
// Read the messages in send in sequence and write
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
...
case <-ticker:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
...
}
}
}
As mentioned above, send
has the msg sent from the respective client: so when send
is detected, it keeps receiving messages and writing to the current client; at the same time, when the write timeout is reached, it checks if the websocket long connection is still alive, and if it is, it continues to read send chan
, and if it is disconnected, it returns directly.
Complete sample code
https://github.com/zeromicro/zero-examples/tree/main/chat
Summary
This article describes how to start your websocket
project with go-zero
in use, so you can adapt it to your needs.
For more design and implementation articles on go-zero
, keep following us.
https://github.com/zeromicro/go-zero
Feel free to use go-zero and give a star to support us!