Issue
I have the following function:
def send_command(self, cmd):
self.loop.call_soon_threadsafe(
functools.partial(
self._transport.write, str(cmd).encode() + b"\n"
)
)
The system under test (sut) is a class inheriting from asyncio.Protocol
which sends some commands to a piece of hardware on a socket. I have to use threads since this is part of a GUI under wxPython. Finally, if I call self._transport.write
the code works fine on Linux but crashes on Windows™.
When running the test:
@pytest.mark.asyncio
async def test_send_command(self):
self.sut._transport = Mock()
self.sut.send_command("ook eek")
assert self.sut._transport.write.called is True
I get an assert error. The self.sut._transport.write
is never called. If I call self._transport.write
directly in the function, the code crashes on Windows™ but the test passes just fine.
What am I missing here?
Anyone?…
Surely, this is not such an edge case…
Solution
A work around…
After reading and experimenting with even loops, using this:
import asyncio
import selectors
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)
by passes the problem. Of course, it means using a sub-efficient loop on Windows. ☹ PHA!
Anyone with a better solution is welcome to some fake internet points.
Answered By - Sardathrion - against SE abuse
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.