我有这段代码可以使用 asyncwebsokets python 库与 websocket api 交互:

#!/usr/bin/env python3 
 
import sys, json 
import asyncio 
from websockets import connect 
 
class AsyncWebsocket(): 
    async def __aenter__(self): 
        self._conn = connect('wss://ws.myws.com/v2') 
        self.websocket = await self._conn.__aenter__()         
        return self 
 
    async def __aexit__(self, *args, **kwargs): 
        await self._conn.__aexit__(*args, **kwargs) 
 
    async def send(self, message): 
        await self.websocket.send(message) 
 
    async def receive(self): 
        return await self.websocket.recv() 
 
class mtest(): 
    def __init__(self, api_token): 
        self.aws        = AsyncWebsocket() 
        self.loop       = asyncio.get_event_loop() 
        self.api_token  = api_token 
 
        self.authorize() 
 
    def authorize(self): 
        jdata = self.__async_exec({ 
                                    'authorize': self.api_token 
                                  }) 
 
        try: 
            print (jdata['email']) 
            ret = True 
        except: 
            ret = False 
 
        return ret 
 
    def sendtest(self): 
 
        jdata = self.__async_exec({ 
                                    "hello": 1 
                                  }) 
 
        print (jdata) 
 
 
    def __async_exec(self, jdata): 
        try: 
            ret = json.loads(self.loop.run_until_complete(self.__async_send_recieve(jdata))) 
        except: 
            ret = None 
 
        return ret 
 
    async def __async_send_recieve(self, jdata): 
        async with self.aws: 
            await self.aws.send(json.dumps(jdata)) 
            return await self.aws.receive() 

所以我在 main.py 中有以下内容:

from webclass import * 
 
a = mtest('12341234') 
print (a.sendtest()) 

问题是它不保留授权 session ,所以这是输出:

root@ubupc1:/home/dinocob# python3 test.py  
asd@gmail.com 
{'error': {'message': 'Please log in.', 'code': 'AuthorizationRequired'}} 

如您所见,登录调用工作正常,但是在 sendtest 函数中调用和发送 hello 时, session 不一样。

  • session 销毁在哪里?
  • 我怎样才能保存它(不彻底 修改我的类(class)结构)?

请您参考如下方法:

我认为问题可能出在 context_manager 或 with statement.

async def __async_send_recieve(self, jdata): 
    async with self.aws: 
        await self.aws.send(json.dumps(jdata)) 
        return await self.aws.receive() 

当您调用“with”时,上下文应该如下所示(具有更好的异常处理和上下文管理器的所有好处,因此您可以将 __async_send_recieve 的流程描绘为:

       self.aws.__aenter__() 
       self.aws.send(data) 
       self.aws.receive() 
       self.aws.__aexit__() 

为了证明这个理论,在 __aenter____aexit__ 函数中添加打印语句,您应该能够更好地可视化上下文管理器流程。

修复方法是在每个请求中重新授权。但我想你想要做的是让你的测试类来管理用于与远程服务器通信的上下文。 (我的异步语法在这里可能有点错误,但在概念上与上下文管理器有关):

class Mtest(): 
    def __init__(self, ...): 
           ... 
 
    def __enter__(self,): 
        self.authorize() 
 
    def __exit__(self): 
        self.deauthorize() 
 
    async def make_async_request(self, data): 
        await self.aws.send(json.dumps(data)) 
        return await self.aws.receive() 
 
with Mtest(api_key) as m: 
    m.make_async_request({'test_data': 'dummy_test_data'}) 
    m.make_async_request({'more_data': 'more_mock_data'}) 


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

python - 在 python 中以相反的顺序柯里化(Currying)