Issue
I'm trying to display an interactive mesh visualizer based on Three.js inside a Jupyter cell. The workflow is the following:
- The user launches a Jupyter notebook, and open the viewer in a cell
- Using Python commands, the user can manually add meshes and animate them interactively
In practice, the main thread is sending requests to a server via ZMQ sockets (every request needs a single reply), then the server sends back the desired data to the main thread using other socket pairs (many "request", very few replies expected), which finally uses communication through ipython kernel to send the data to the Javascript frontend. So far so good, and it works properly because the messages are all flowing in the same direction:
Main thread (Python command) [ZMQ REQ] -> [ZMQ REP] Server (Data) [ZMQ XREQ] -> [ZMQ XREQ] Main thread (Data) [IPykernel Comm] -> [Ipykernel Comm] Javascript (Display)
However, the pattern is different when I'm want to fetch the status of the frontend to wait for the meshes to finish loading:
Main thread (Status request) --> Server (Status request) --> Main thread (Waiting for reply)
| |
<--------------------------------Javascript (Processing) <--
This time, the server sends a request to the frontend, which in return does not send the reply directly back to the server, but to the main thread, that will forward the reply to the server, and finally to the main thread.
There is a clear issue: the main thread is supposed to jointly forward the reply of the frontend and receive the reply from the server, which is impossible. The ideal solution would be to enable the server to communicate directly with the frontend but I don't know how to do that, since I cannot use get_ipython().kernel.comm_manager.register_target
on the server side. I tried to instantiate an ipython kernel client on the server side using jupyter_client.BlockingKernelClient
, but I didn't manged to use it to communicate nor to register targets.
Solution
OK so I found a solution for now but it is not great. Indeed of just waiting for a reply and keep busy the main loop, I added a timeout and interleave it with do_one_iteration
of the kernel to force to handle to messages:
while True:
try:
rep = zmq_socket.recv(flags=zmq.NOBLOCK).decode("utf-8")
except zmq.error.ZMQError:
kernel.do_one_iteration()
It works but unfortunately it is not really portable and it messes up with the Jupyter evaluation stack (all queued evaluations will be processed here instead of in order)...
Alternatively, there is another way that is more appealing:
import zmq
import asyncio
import nest_asyncio
nest_asyncio.apply()
zmq_socket.send(b"ready")
async def enforce_receive():
await kernel.process_one(True)
return zmq_socket.recv().decode("utf-8")
loop = asyncio.get_event_loop()
rep = loop.run_until_complete(enforce_receive())
but in this case you need to know in advance that you expect the kernel to receive exactly one message, and relying on nest_asyncio
is not ideal either.
Here is a link to an issue on this topic of Github, along with an example notebook.
UPDATE
I finally manage to solve completely my issue, without shortcomings. The trick is to analyze every incoming messages. The irrelevant messages are put back in the queue in order, while the comm-related ones are processed on-the-spot:
class CommProcessor:
"""
@brief Re-implementation of ipykernel.kernelbase.do_one_iteration
to only handle comm messages on the spot, and put back in
the stack the other ones.
@details Calling 'do_one_iteration' messes up with kernel
'msg_queue'. Some messages will be processed too soon,
which is likely to corrupt the kernel state. This method
only processes comm messages to avoid such side effects.
"""
def __init__(self):
self.__kernel = get_ipython().kernel
self.qsize_old = 0
def __call__(self, unsafe=False):
"""
@brief Check once if there is pending comm related event in
the shell stream message priority queue.
@param[in] unsafe Whether or not to assume check if the number
of pending message has changed is enough. It
makes the evaluation much faster but flawed.
"""
# Flush every IN messages on shell_stream only
# Note that it is a faster implementation of ZMQStream.flush
# to only handle incoming messages. It reduces the computation
# time from about 10us to 20ns.
# https://github.com/zeromq/pyzmq/blob/e424f83ceb0856204c96b1abac93a1cfe205df4a/zmq/eventloop/zmqstream.py#L313
shell_stream = self.__kernel.shell_streams[0]
shell_stream.poller.register(shell_stream.socket, zmq.POLLIN)
events = shell_stream.poller.poll(0)
while events:
_, event = events[0]
if event:
shell_stream._handle_recv()
shell_stream.poller.register(
shell_stream.socket, zmq.POLLIN)
events = shell_stream.poller.poll(0)
qsize = self.__kernel.msg_queue.qsize()
if unsafe and qsize == self.qsize_old:
# The number of queued messages in the queue has not changed
# since it last time it has been checked. Assuming those
# messages are the same has before and returning earlier.
return
# One must go through all the messages to keep them in order
for _ in range(qsize):
priority, t, dispatch, args = \
self.__kernel.msg_queue.get_nowait()
if priority <= SHELL_PRIORITY:
_, msg = self.__kernel.session.feed_identities(
args[1], copy=False)
msg = self.__kernel.session.deserialize(
msg, content=False, copy=False)
else:
# Do not spend time analyzing already rejected message
msg = None
if msg is None or not 'comm_' in msg['header']['msg_type']:
# The message is not related to comm, so putting it back in
# the queue after lowering its priority so that it is send
# at the "end of the queue", ie just at the right place:
# after the next unchecked messages, after the other
# messages already put back in the queue, but before the
# next one to go the same way. Note that every shell
# messages have SHELL_PRIORITY by default.
self.__kernel.msg_queue.put_nowait(
(SHELL_PRIORITY + 1, t, dispatch, args))
else:
# Comm message. Processing it right now.
tornado.gen.maybe_future(dispatch(*args))
self.qsize_old = self.__kernel.msg_queue.qsize()
process_kernel_comm = CommProcessor()
Answered By - milembar
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.