我有这段代码可以使用 async
和 websokets
python 库与 websocket api 交互:
#!/usr/bin/env python3
import sys, json
import asyncio
from websockets import connect
class AsyncWebsocket():
async def __aenter__(self):
self._conn = connect('wss://ws.myws.com/v2')
self.websocket = await self._conn.__aenter__()
return self
async def __aexit__(self, *args, **kwargs):
await self._conn.__aexit__(*args, **kwargs)
async def send(self, message):
await self.websocket.send(message)
async def receive(self):
return await self.websocket.recv()
class mtest():
def __init__(self, api_token):
self.aws = AsyncWebsocket()
self.loop = asyncio.get_event_loop()
self.api_token = api_token
self.authorize()
def authorize(self):
jdata = self.__async_exec({
'authorize': self.api_token
})
try:
print (jdata['email'])
ret = True
except:
ret = False
return ret
def sendtest(self):
jdata = self.__async_exec({
"hello": 1
})
print (jdata)
def __async_exec(self, jdata):
try:
ret = json.loads(self.loop.run_until_complete(self.__async_send_recieve(jdata)))
except:
ret = None
return ret
async def __async_send_recieve(self, jdata):
async with self.aws:
await self.aws.send(json.dumps(jdata))
return await self.aws.receive()
所以我在 main.py 中有以下内容:
from webclass import *
a = mtest('12341234')
print (a.sendtest())
问题是它不保留授权 session ,所以这是输出:
root@ubupc1:/home/dinocob# python3 test.py
asd@gmail.com
{'error': {'message': 'Please log in.', 'code': 'AuthorizationRequired'}}
如您所见,登录调用工作正常,但是在 sendtest
函数中调用和发送 hello
时, session 不一样。
- session 销毁在哪里?
- 我怎样才能保存它(不彻底 修改我的类(class)结构)?
请您参考如下方法:
我认为问题可能出在 context_manager 或 with
statement. 中
async def __async_send_recieve(self, jdata):
async with self.aws:
await self.aws.send(json.dumps(jdata))
return await self.aws.receive()
当您调用“with”时,上下文应该如下所示(具有更好的异常处理和上下文管理器的所有好处,因此您可以将 __async_send_recieve
的流程描绘为:
self.aws.__aenter__()
self.aws.send(data)
self.aws.receive()
self.aws.__aexit__()
为了证明这个理论,在 __aenter__
和 __aexit__
函数中添加打印语句,您应该能够更好地可视化上下文管理器流程。
修复方法是在每个请求中重新授权。但我想你想要做的是让你的测试类来管理用于与远程服务器通信的上下文。 (我的异步语法在这里可能有点错误,但在概念上与上下文管理器有关):
class Mtest():
def __init__(self, ...):
...
def __enter__(self,):
self.authorize()
def __exit__(self):
self.deauthorize()
async def make_async_request(self, data):
await self.aws.send(json.dumps(data))
return await self.aws.receive()
with Mtest(api_key) as m:
m.make_async_request({'test_data': 'dummy_test_data'})
m.make_async_request({'more_data': 'more_mock_data'})