
本文旨在帮助开发者解决在使用 Jupyter Notebook API 通过 WebSocket 连接执行代码时遇到的 "socket is already closed" 错误。我们将分析错误原因,并提供通过重新连接 WebSocket 并确保消息格式正确来解决此问题的方案,确保代码能够顺利执行并接收到服务器的响应。
在使用 Jupyter Notebook API 通过 WebSocket 连接执行代码时,遇到 "socket is already closed" 错误通常表明 WebSocket 连接在接收到服务器响应之前意外关闭。这可能是由于多种原因造成的,例如网络问题、服务器端错误或客户端代码中的逻辑错误。以下提供一种解决方案,通过重新建立连接以及确保消息格式正确来解决此问题。
以下步骤描述了如何重新连接 WebSocket,并确保发送到 Jupyter Notebook 服务器的消息格式正确,从而解决 "socket is already closed" 错误。
错误分析:
从原始问题描述可以看出,错误发生在 ws.recv() 尝试接收服务器响应时。这暗示着 WebSocket 连接可能在发送执行请求后,但在收到响应前关闭了。Jupyter Notebook 服务器的日志也显示了一些警告信息,例如 "No session ID specified" 和 "No channel specified",这表明客户端发送的请求可能缺少必要的参数。
重新连接 WebSocket:
在循环接收消息之前,如果检测到连接关闭,应该重新建立 WebSocket 连接。这可以通过将 create_connection 放在一个 try...except 块中,并在捕获到 WebSocketConnectionClosedException 异常时重新调用它来实现。
import json
from websocket import create_connection, WebSocketConnectionClosedException
import time
def execute_code(kernel_id, session_id, code, headers):
    ws_url = f"ws://127.0.0.1:8888/api/kernels/{kernel_id}/channels?session_id={session_id}"
    ws = create_connection(ws_url, header=headers)
    ws.send(json.dumps(send_execute_request(code)))
    try:
        while True:
            rsp = json.loads(ws.recv())
            msg_type = rsp["msg_type"]
            # 处理不同类型的消息,例如 'execute_result', 'stream', 'error' 等
            if msg_type == 'execute_result':
                # 处理执行结果
                print("Execute Result:", rsp["content"]["data"])
                break  # 结束循环,因为我们已经得到了执行结果
            elif msg_type == 'stream':
                # 处理输出流(stdout/stderr)
                print("Stream Output:", rsp["content"]["text"])
            elif msg_type == 'error':
                # 处理错误信息
                print("Error:", rsp["content"]["ename"], rsp["content"]["evalue"])
                break  # 结束循环,因为发生了错误
    except WebSocketConnectionClosedException as e:
        print(f"WebSocket connection closed: {e}")
        # 在这里可以选择重新连接,或者抛出异常,取决于你的应用逻辑
        # 例如:
        # ws = create_connection(ws_url, header=headers) # 尝试重新连接
        raise # 抛出异常,向上层处理
    finally:
        ws.close()修正消息格式:
Jupyter Notebook 服务器的日志表明,请求可能缺少 session ID 和 channel 信息。确保在创建 WebSocket 连接时,URL 中包含了正确的 session_id 参数。此外,检查发送到 WebSocket 的消息格式是否符合 Jupyter Notebook API 的要求。 特别注意时间戳格式,需要包含时区信息。
import datetime
import uuid
def send_execute_request(code):
    msg_id = str(uuid.uuid1())
    session_id = str(uuid.uuid1()) # You can generate a new session ID for each request
    now = datetime.datetime.now(datetime.timezone.utc).isoformat() # Include timezone information
    msg = {
        "header": {
            "msg_id": msg_id,
            "username": "test",
            "session": session_id,
            "data": now,
            "msg_type": "execute_request",
            "version": "5.0"
        },
        "parent_header": {
            "msg_id": msg_id,
            "username": "test",
            "session": session_id,
            "data": now,
            "msg_type": "execute_request",
            "version": "5.0"
        },
        "metadata": {},
        "content": {
            "code": code,
            "silent": False,
            "store_history": True,
            "user_expressions": {},
            "allow_stdin": False
        },
        "buffers": [],
        "channel": "shell" # Explicitly specify the channel
    }
    return msg处理服务器响应:
修改后的代码示例中,execute_code 函数现在会处理不同类型的服务器响应(execute_result,stream,error)。请根据你的应用需求修改这些处理逻辑。
完整示例
    import requests
    import json
    from websocket import create_connection, WebSocketConnectionClosedException
    import datetime
    import uuid
    base = "http://127.0.0.1:8888"  # 替换为你的 Jupyter Notebook 地址
    headers = {"Authorization": "Token your_token"}  # 替换为你的 token
    def create_session(file_name):
        url = base + '/api/sessions'
        params = '{"path":"%s","type":"notebook","name":"","kernel":{"id":null,"name":"env37"}}' % file_name
        response = requests.post(url, headers=headers, data=params)
        session = json.loads(response.text)
        return session
    def get_notebook_content(notebook_path):
        url = base + '/api/contents' + notebook_path
        response = requests.get(url, headers=headers)
        file = json.loads(response.text)
        code = [c['source'] for c in file['content']['cells'] if len(c['source']) > 0]
        return code
    def send_execute_request(code):
        msg_id = str(uuid.uuid1())
        session_id = str(uuid.uuid1()) # You can generate a new session ID for each request
        now = datetime.datetime.now(datetime.timezone.utc).isoformat() # Include timezone information
        msg = {
            "header": {
                "msg_id": msg_id,
                "username": "test",
                "session": session_id,
                "data": now,
                "msg_type": "execute_request",
                "version": "5.0"
            },
            "parent_header": {
                "msg_id": msg_id,
                "username": "test",
                "session": session_id,
                "data": now,
                "msg_type": "execute_request",
                "version": "5.0"
            },
            "metadata": {},
            "content": {
                "code": code,
                "silent": False,
                "store_history": True,
                "user_expressions": {},
                "allow_stdin": False
            },
            "buffers": [],
            "channel": "shell" # Explicitly specify the channel
        }
        return msg
    def execute_code(kernel_id, session_id, code, headers):
        ws_url = f"ws://127.0.0.1:8888/api/kernels/{kernel_id}/channels?session_id={session_id}"
        ws = create_connection(ws_url, header=headers)
        ws.send(json.dumps(send_execute_request(code)))
        try:
            while True:
                rsp = json.loads(ws.recv())
                msg_type = rsp["msg_type"]
                # 处理不同类型的消息,例如 'execute_result', 'stream', 'error' 等
                if msg_type == 'execute_result':
                    # 处理执行结果
                    print("Execute Result:", rsp["content"]["data"])
                    break  # 结束循环,因为我们已经得到了执行结果
                elif msg_type == 'stream':
                    # 处理输出流(stdout/stderr)
                    print("Stream Output:", rsp["content"]["text"])
                elif msg_type == 'error':
                    # 处理错误信息
                    print("Error:", rsp["content"]["ename"], rsp["content"]["evalue"])
                    break  # 结束循环,因为发生了错误
        except WebSocketConnectionClosedException as e:
            print(f"WebSocket connection closed: {e}")
            # 在这里可以选择重新连接,或者抛出异常,取决于你的应用逻辑
            # 例如:
            # ws = create_connection(ws_url, header=headers) # 尝试重新连接
            raise # 抛出异常,向上层处理
        finally:
            ws.close()
    # Example usage:
    file_name = "example2.ipynb"  # 替换为你的 notebook 文件名
    notebook_path = "/" + file_name
    session = create_session(file_name)
    kernel = session["kernel"]
    kernel_id = kernel["id"]
    session_id = session["id"]
    code = get_notebook_content(notebook_path)
    for c in code:
        try:
            execute_code(kernel_id, session_id, c, headers)
        except WebSocketConnectionClosedException:
            print(f"Failed to execute code: {c}")
            # Handle reconnection or error as needed通过重新连接 WebSocket 并确保消息格式正确,可以有效地解决 Jupyter Notebook API 中的 "socket is already closed" 错误。 此外,需要完善错误处理机制,以便在出现问题时能够及时发现并解决。
以上就是解决 Jupyter Notebook WebSocket 连接关闭错误的详细内容,更多请关注php中文网其它相关文章!
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号