前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 Postman、Python 测试 WebSocket(wss)

使用 Postman、Python 测试 WebSocket(wss)

原创
作者头像
Lorin 洛林
修改2024-04-27 10:40:55
514012
代码可运行
修改2024-04-27 10:40:55
举报
文章被收录于专栏:Python 技术小屋Python 技术小屋
运行总次数:12
代码可运行

前言

  • WebSocket(wss) 已成为现代Web开发中不可或缺的一部分,它实现了客户端和服务器之间的实时双向通信。测试 WebSocket 连接对确保其可靠性、安全性和性能至关重要。在本篇指南中,我们将探讨使用 Postman 和 Python 分别如何测试 WebSocket(wss)

工具

  • Python 3.x
  • Postman

Postman

  • Postman 是一款功能强大的 API 开发工具,旨在简化和加速 API 的创建、测试和调试过程。除了传统的 HTTP 请求测试外,Postman 还支持 WebSocket 协议,使得开发者可以轻松地测试和调试 WebSocket 连接。
  • Postman 8.0 版本开始,用户可以利用 Postman 测试和调试 WebSocket 连接。

新建 wss 测试

连接 topic

  • 这里我以网上的一个案例作为示范:wss://ws.dyhjw.com/?token=
  • 连接成功后部分 wss 需要发送消息建立通信,比如建立需要通信的消息源。
  • 发送消息建立通信
代码语言:txt
复制
{
    "cmd": "sub",
    "codes": [
        "AUTD",
        "XAU"
    ]
}
  • 实时通信:
  • 可以看到上面正常和 wss 服务端进行通信。

wss 鉴权(Unexpected server response: 200

  • 如果你的 wss 服务端需要鉴权操作,那么你需要根据具体的鉴权方式进行鉴权,比如在 header 中添加 cookie 之类,否则那么有可能返回 200 状态码

使用 Python 连接

代码版本一

代码语言:python
代码运行次数:1
复制
import asyncio
import time

import websockets


class WebSocketClient:
    def __init__(self, uri, auth_cookie):
        self.uri = uri
        self.auth_cookie = auth_cookie
        self.websocket = None

    async def connect(self):
        self.websocket = await websockets.connect(self.uri, extra_headers={"Cookie": self.auth_cookie})

    async def subscribe(self, topic):
        if self.websocket:
            await self.websocket.send(
                build_message("SUBSCRIBE", {"id": "sub-0", "destination": topic}))

    async def send_message(self, message):
        if self.websocket:
            await self.websocket.send(message)

    async def receive_message(self):
        if self.websocket:
            response = await self.websocket.recv()
            return response

    async def send_heartbeat(self):
        while True:
            await asyncio.sleep(5)  # 每5秒发送一次心跳检测消息
            if self.websocket:
                await self.websocket.send(build_message("SEND", ''))
                print(f"心跳发送成功-------")

# 按照消息格式定义消息内容
def build_message(command, headers, msg=None):
    BYTE = {
        'LF': '\x0A',
        'NULL': '\x00',
        'HIDDEN': '\u0001'
    }
    data_arr = [command + BYTE['LF']]

    # add headers
    for key in headers:
        data_arr.append(key + ":" + headers[key] + BYTE['LF'])

    data_arr.append(BYTE['LF'])

    # add message, if any
    if msg is not None:
        data_arr.append(msg)

    # terminate with null octet
    data_arr.append(BYTE['NULL'])

    frame = ''.join(data_arr)

    # transmit over ws
    print("构建后数据:" + frame)

    return frame


async def main():
    uri = "wss://xxxxx"
    # 添加权限认证
    auth_cookie = ("xxxxx")
    topics = ["/topic/xxxxx"]

    client = WebSocketClient(uri, auth_cookie)
    await client.connect()
    # connect message
    await client.send_message(
        build_message("CONNECT", {"passcode": "", "accept-version": "1.0,1.1,1.2", "heart-beat": "5000,0"}))
    # 启动心跳检测任务,否则服务端会自动断开连接
    asyncio.create_task(client.send_heartbeat())
    time.sleep(2)

    for topic in topics:
        await client.subscribe(topic)
        print(f"{topic} 订阅成功")

    while True:
        print("等待接受消息------")
        response = await client.receive_message()
        print(f"收到消息:{response}")


asyncio.run(main())

代码版本二

  • 推荐代码版本二的写法,整体代码的耦合性更低,封装更好。
代码语言:python
代码运行次数:1
复制
import threading
import time

import websocket

# socket访问地址:
socket_add = 'wss://xxxx'


def on_message(ws, message):
    print(f"接收到消息:{message}")


def on_error(ws, error):
    # 程序报错时,就会触发on_error事件
    print(error)


def on_close(ws, param1, param2):
    print("Connection closed------")


def on_open(ws):
    ws.send(build_message("CONNECT", {"passcode": "", "accept-version": "1.0,1.1,1.2", "heart-beat": "5000,0"}))
    time.sleep(2)
    topic = "xxxxx"
    ws.send(build_message("SUBSCRIBE", {"id": "sub-0", "destination": topic}))

    # 启动心跳检测任务
    thread = threading.Thread(target=check_heartbeat, args=[ws])
    thread.start()


def check_heartbeat(ws):
    while True:
        time.sleep(5)
        ws.send(build_message("SEND", ''))
        print(f"心跳发送成功-------")


# 按照消息格式定义消息内容
def build_message(command, headers, msg=None):
    BYTE = {
        'LF': '\x0A',
        'NULL': '\x00',
        'HIDDEN': '\u0001'
    }
    data_arr = [command + BYTE['LF']]

    # add headers
    for key in headers:
        data_arr.append(key + ":" + headers[key] + BYTE['LF'])

    data_arr.append(BYTE['LF'])

    # add message, if any
    if msg is not None:
        data_arr.append(msg)

    # terminate with null octet
    data_arr.append(BYTE['NULL'])

    frame = ''.join(data_arr)

    # transmit over ws
    print("构建后数据:" + frame)

    return frame


def main(address=socket_add):
    websocket.enableTrace(False)
    ws = websocket.WebSocketApp(address,
                                cookie="xxxxx",
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close,
                                on_open=on_open)
    ws.run_forever(ping_interval=5, ping_timeout=3)


if __name__ == "__main__":
    main()

问题

  • 以下是连接过程中的一些常见问题,大家可以作为参考:

连接返回 Unexpected server response: 200

  • 参考上文,有可能是 wss 服务端需要鉴权操作。

wss:// 和 ws:// 的区别

  • wss:// 和 ws:// 的区别在于安全性和传输加密,ws:// 使用普通的 WebSocket 协议进行通信, wss:// 使用加密的 WebSocket 协议进行通信,基于 TLS/SSL 进行加密。

连接成功后一段时间自动断开连接

  • wss 服务端可能需要接收心跳报文检测客户端是否存活,超过一定时间如果没有收到心跳报文则会断开连接。(也可能服务端主动检测客户端)

其它注意点

  • 确认连接的地址和 topic 是否正确,以及是否需要认证,报文格式,通信流程等,同时调试时可以结合后端打印的日志排查问题,方便快速定位问题。

个人简介

? 你好,我是 Lorin 洛林,一位 Java 后端技术开发者!座右铭:Technology has the power to make the world a better place.

? 我对技术的热情是我不断学习和分享的动力。我的博客是一个关于Java生态系统、后端开发和最新技术趋势的地方。

? 作为一个 Java 后端技术爱好者,我不仅热衷于探索语言的新特性和技术的深度,还热衷于分享我的见解和最佳实践。我相信知识的分享和社区合作可以帮助我们共同成长。

? 在我的博客上,你将找到关于Java核心概念、JVM 底层技术、常用框架如Spring和Mybatis 、MySQL等数据库管理、RabbitMQ、Rocketmq等消息中间件、性能优化等内容的深入文章。我也将分享一些编程技巧和解决问题的方法,以帮助你更好地掌握Java编程。

? 我鼓励互动和建立社区,因此请留下你的问题、建议或主题请求,让我知道你感兴趣的内容。此外,我将分享最新的互联网和技术资讯,以确保你与技术世界的最新发展保持联系。我期待与你一起在技术之路上前进,一起探讨技术世界的无限可能性。

? 保持关注我的博客,让我们共同追求技术卓越。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 工具
  • Postman
    • 新建 wss 测试
      • 连接 topic
        • wss 鉴权(Unexpected server response: 200)
        • 使用 Python 连接
          • 代码版本一
            • 代码版本二
            • 问题
              • 连接返回 Unexpected server response: 200
                • wss:// 和 ws:// 的区别
                  • 连接成功后一段时间自动断开连接
                    • 其它注意点
                    • 个人简介
                    相关产品与服务
                    云数据库 MySQL
                    腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                    http://www.vxiaotou.com