1、安装
pip install websocket-client
2、运行环境要求
此代码需要运行在 Python 3.x 环境中,确保你的 Python 版本符合要求。
3、示例代码
import gzip
import zlib
import websocketOPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
url = "ws://echo.websocket.events/"
proxies = {"http_proxy_host": "168.38.24.125","http_proxy_port": 16898,"http_proxy_auth": ("username", "password"),
}
ws = websocket.create_connection(url, **proxies)def recv():try:frame = ws.recv_frame()except websocket.WebSocketException:return websocket.ABNF.OPCODE_CLOSE, Noneif not frame:raise websocket.WebSocketException("Not a valid frame {}".format(frame))elif frame.opcode in OPCODE_DATA:return frame.opcode, frame.dataelif frame.opcode == websocket.ABNF.OPCODE_CLOSE:ws.send_close()return frame.opcode, Noneelif frame.opcode == websocket.ABNF.OPCODE_PING:ws.pong(frame.data)return frame.opcode, frame.datareturn frame.opcode, frame.datadef recv_ws():opcode, data = recv()if opcode == websocket.ABNF.OPCODE_CLOSE:returnif opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes):data = str(data, "utf-8")if isinstance(data, bytes) and len(data) > 2 and data[:2] == b'\037\213': # gzip magictry:data = "[gzip] {}".format(str(gzip.decompress(data), "utf-8"))except Exception:passelif isinstance(data, bytes):try:data = "[zlib] {}".format(str(zlib.decompress(data, -zlib.MAX_WBITS), "utf-8"))except Exception:passif isinstance(data, bytes):data = repr(data)print("< {}".format(data))def main():print("Press Ctrl+C to quit")while True:message = input("> ")ws.send(message)recv_ws()if __name__ == "__main__":try:main()except KeyboardInterrupt:print('\nbye')except Exception as e:print(e)