关于 WebSocket 的定义无需多说,这里我们讲一下 WebSocket 的协议细节。

接受请求

WebSocket 是一个应用层的协议,它通过 HTTP 协议进行握手和协商,然后通过底层的 TCP / TLS 等进行传输。当客户端发起请求时,服务端收到一个特殊的 HTTP 请求,代表 WebSocket 连接请求:

GET /ws HTTP/1.1
Host: xxxxx
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: xxxx
Sec-WebSocket-Extensions: ....

服务端收到连接请求后,判断请求是否合法。如果接受连接,则返回响应:

HTTP/1.1 101 Switch Protocol
Server: xxxx
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: xxxxxx
Sec-WebSocket-Extensions: ...

我们以 Node.JS 为示例来看如何处理连接请求:

// websocket 连接验证的盐
const unique_key = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
// message 类型
const OPCODE_TEXT = 1,
      OPCODE_BINARY = 2,
      OPCODE_CLOSE = 8;
// close 类型
const CLOSE_NORMAL = 1000,
      CLOSE_GOING_AWAY = 1001,
      CLOSE_PROTOCOL_ERROR = 1002,
      CLOSE_UNSUPPORTED = 1003,
      CLOSE_ABNORMAL = 1006,
      CLOSE_INTERNAL_ERROR = 1011;
// flags
const FLAG_FIN = 8, FLAG_RSV1 = 4;
// 创建 HTTP server。HTTP server 可以处理普通的 http 请求,只需为其设置 upgrade 事件处理函数,即可处理 websocket 请求。
let server = require('http').createServer(...);
// 处理普通的 http 请求
server.on('request', function (req, res) {
    // ...
});

// 处理 upgrade 请求
server.on('upgrade', function (req, socket) {
    let upgrade = req.headers.upgrade;
    if(upgrade && upgrade.toLowerCase() === 'websocket') { // 是 websocket 请求!
        on_websocket(req, socket);
    }
});

function on_websocket(req, socket) {
    // 可以根据 req.url 等判断是否接受请求
    if(!is_valid_request(req)) {
        socket.end("HTTP/1.1 403 Forbidden\r\n"
                 + "Connection: close\r\n"
                 + "Content-Length: 3\r\n"
                 + "\r\n"
                 + "bye");
        return;
    }
    // 接受请求!
    let key = req.headers['sec-websocket-key'];
    let accept = sha1(key + unique_key).toString('base64');
    // 以固定 key 作为盐进行 sha1
    socket.write("HTTP/1.1 101 Switch Protocol\r\n"
               + "Connection: Upgrade\r\n"
               + "Upgrade: websocket\r\n"
               + `Sec-WebSocket-Accept: ${accept}\r\n`
               + "\r\n");
    // 读写 socket...
}

以 Node.JS 发起 websocket 请求方式如下:

// 连接 ws://127.0.0.1:8888/foo
let key = 'xxxxxx'; // 随机生成字符串
let options = {
    method: 'GET', // this is default
    host: '127.0.0.1',
    port: 8888,
    path: '/foo',
    headers: {
        Connection: 'Upgrade',
        Upgrade: 'websocket',
        'Sec-WebSocket-Key': key,
        'Sec-WebSocket-Version': '13',
    },
};

let req = require('http').request(options);

req.on('upgrade', function (res, socket) {
    // 收到 upgrade 响应
    let resp_accept = res.headers['sec-websocket-accept'];
    let accept = sha1(key + unique_key).toString('base64');
    if(resp_accept !== accept) {
        on_reject({ message: 'bad response' });
        return;
    }
    // 读写 socket...
});
req.on('response', function (res) {
    // 收到普通响应
    on_reject({ message: 'bad response' })
});
req.end(); // GET 没有请求 body,这里不发送任何内容

sending / receiving messages

WebSocket 的 client / server 端的读写过程基本相同,除了一个差别:client 需要对每个消息的 payload 进行异或掩码。

不同平台的 socket 读取过程不尽相同。为了方便我们以流的模式来演示读取过程。由于 Node.JS 的 socket 读取过程是异步的,实际读取时可以通过 generator / fiber / async 等方法转换为流式读取,这里不再赘述。

function read_messages(socket, is_server) {
    for(;;) {
        let header = read(socket, 2); // 读取 2 字节的头部
        // 头部包含 以下信息:
        // * 4 个 flag
        // * 4 比特的 opcode, 标识 message 类型
        // * masked 位,标识消息是否进行了掩码
        // * 7 比特的 payload_len,标识 payload 长度
        const flag = header[0] >> 4, opcode = header[0] & 0xf, masked = header[1] & 0x80;
        let payload_len = header[1] & 0x7f;
        if(!(flag & FLAG_FIN)) { // STOP
            on_close({code: CLOSE_GOING_AWAY});
            socket.destroy();
            return;
        }
        if(is_server && !masked) {
            // 客户端发来了未掩码的消息!
            send_close(CLOSE_PROTOCOL_ERROR);
            throw ...
        }
        if(!is_server && masked) {
            // 服务端发来了掩码过的消息!
            send_close(CLOSE_PROTOCOL_ERROR);
            throw ...
        }
        // 对 payload_len 进行处理
        if(payload_len === 126) {
            // 实际 payload_len 是 2 字节的 big-endian uint16
            let buf = read(socket, 2);
            payload_len = buf.readUInt16BE(0, true);
            // payload_len = buf[0] << 8 | buf[1];
        } else if(payload_len === 127) {
            // 实际 payload_len 是 8 字节的 big-endian uint64
            let buf = read(socket, 8);
            payload_len = buf.readUInt32BE(0, true) * 0x100000000 + buf.readUInt32BE(4, true);
        }
        // 处理 mask
        let mask = null;
        if(masked) {
            // 读取 4 字节的 mask
            mask = read(socket, 4);
        }
        // 读取 payload
        const payload = read(socket, payload_len);
        if(masked) { // 反掩码
            for(let i = 0; i < payload_len; i++) {
                payload[i] ^= mask[i & 3];
            }
        }
        // TODO: 对压缩的内容进行解压缩
        if(opcode === OPCODE_TEXT) {
            on_message({
                type: 'text',
                data: payload.toString(),
            });
        } else if(opcode === OPCODE_BINARY) {
            on_message({
                type: 'binary',
                data: payload
            });
        } else if(opcode === OPCODE_CLOSE) {
            socket.destroy();
            if(payload.length >= 2) {
                on_close({
                    code: payload.readUInt16BE(0, true),
                    reason: payload.toString('utf8', 2),
                });
            } else {
                on_close({
                    code: CLOSE_NORMAL
                });
            }
        }
    }
}

发送消息的过程和其相对应。如果要发送文本,需要先将其编码为 buffer,然后指定 opcode 为 OPCODE_TEXT 即可。

function send(opcode, flags, payload) {
    // 写入头部
    let head = Buffer.alloc(2);
    head[0] = flags << 4 | opcode;
    const masked = is_server ? 0 : 0x80;
    let payload_len = payload.length;
    if(payload_len > 65535) {
        head[1] = masked | 127;
    } else if(payload_len > 125) {
        head[1] = masked | 126;
    } else {
        head[1] = masked | payload_len;
    }
    write(head);

    if(payload_len > 65535) {
        let buf = Buffer.alloc(8);
        buf.writeUInt32BE(payload_len / 0x100000000, 0 ,true);
        buf.writeUInt32BE(payload_len, 4 ,true);
        write(buf);
    } else if(payload_len > 125) {
        let buf = Buffer.alloc(2);
        buf.writeUInt16BE(payload_len, 0 ,true);
        write(buf);
    }
    if(masked) {
        let mask = Buffer.alloc(4);
        // 随机填充 mask
        mask.writeUInt32BE(Math.random() * 0x100000000, 0, true);
        write(mask);
        for(let i = 0; i < paylaod_len; i++) {
            payload[i] ^= mask[i & 3];
        }
    }
    write(payload);
}

如: 发送一段文本:

function send_text(text) {
    let payload = Buffer.from(text);
    send(OPCODE_TEXT, FLAG_FIN, payload);
}

要关闭连接,可以给对方发送一个 close 消息

function send_close(code, reason) {
    if(!reason) reason = '';
    let payload = Buffer.alloc(2 + Buffer.byteLength(reason));
    payload.writeUInt16BE(code, 0, true);
    payload.write(reason, 2);
    send(OPCODE_CLOSE, 0, payload);
}

压缩

为了更高效地发送较长文本,可以对消息进行 deflate 压缩。deflate 压缩发生在 payload 长度计算之前,因此消息中的 payload 长度是压缩后的长度。deflate 压缩的协商过程如下:

首先,客户端发起请求时,在请求头中指定支持 deflate:

GET ...
Sec-WebSocket-Extension: permessage-deflate

如果服务端支持 deflate,将在响应头中也指定 Sec-WebSocket-Extension: permessage-deflate

某些场景下,服务端的内存是有限的。客户端可以在请求头中附加选项:

Sec-WebSocket-Extension: permessage-deflate; client_max_window_bits

client_max_window_bits 选项表示客户端支持服务端指定其压缩的 deflate 滑动窗口大小,一般取值 8-15。

此时,服务端在响应时可以为客户端指定 deflate 滑动窗口大小:

Sec-WebSocket-Extension: permessage-deflate; client_max_window_bits=15

这里指定了滑动窗口大小为 15 位,即最大 32767 字节。服务端在解压缩时只需不超过 32767 字节的滑动窗口即可完成解压缩。

在发送消息时,需要对消息进行压缩。需要注意到压缩操作的上下文是整个连接的,因此必须在整个连接开始即初始化压缩、解压上下文:

let inflater = zlib.createInflateRaw();
let deflater = zlib.createDeflateRaw({ flush: zlib.Z_SYNC_FLUSH });

但是在压缩时,每次消息内容必须独立发送,因此 websocket 会在每个消息结尾进行一次强制 flush。在接收时,也需要在每个消息结尾进行 flush。

function send_deflated(opcode, flags, payload) {
    // 对 payload 进行 deflate 压缩
    payload = deflate(payload);
    send(opcode, flags | FLAG_RSV1, payload); // FLAG_RSV1 是 deflate 标识
}

deflate 在 Node.JS 中是一个异步操作,我们以 callback 方式表述实现如下:

function deflate(payload, cb) {
    deflater.write(payload);
    deflater.flush(zlib.Z_SYNC_FLUSH, function () {
        // 读取流
        let received = [], recv_len = 0;
        let buf;
        while(buf = deflater.read()) {
            received.push(buf);
            recv_len += buf.length;
        }
        const deflated = Buffer.concat(received, recv_len);
        // deflated 末尾会有 4 字节的 flush tail: 00 00 FF FF,我们需要将其去除
        cb(null, deflated.slice(0, -4));
    });
}

在读取操作中,得到 payload 后需要对齐进行解压缩:

let payload = ...;
if(masked) {...}
payload = inflate(payload);

inflate 也是一个异步操作,其过程如下:

function inflate(payload, cb) {
    inflater.write(payload);
    inflater.write(FLUSH_TAIL); // 00 00 FF FF
    inflater.flush(function () {
        // 读取流
        const inflated = Buffer.concat...
        cb(null, inflated);
    });
}