Issue
I am trying to cache Django REST Framework HTTP streaming responses.
My thinking is a Response sub-class can write the chunks into a temporary file as it streams, and on closing after streaming the final chunk, run a callable that copies the file into cache.
from django.http import StreamingHttpResponse
class CachedStreamingHttpResponse(StreamingHttpResponse):
def __init__(self, streaming_content=(), *args, **kwargs):
self._post_render_callbacks = []
self._buffer = None
self.buffered = False
super().__init__(streaming_content, *args, **kwargs)
def _set_streaming_content(self, value):
self._buffer = TemporaryFile()
super()._set_streaming_content(value)
def post_render(self):
self._buffer.seek(0)
self.buffered = self._buffer
retval = self
for post_callback in self._post_render_callbacks:
newretval = post_callback(retval)
if newretval is not None:
retval = newretval
def buffer(self, b):
self._buffer.write(b)
return b
@staticmethod
def closing_iterator_wrapper(iterable, close):
try:
yield from iterable
finally:
close()
@property
def streaming_content(self):
buffered = map(self.buffer, super().streaming_content)
return self.closing_iterator_wrapper(buffered, self.post_render)
@streaming_content.setter
def streaming_content(self, value):
self._set_streaming_content(value)
def add_post_render_callback(self, callback):
"""A list of callables to be run after the final chunk is returned. Used to copy the response to cache."""
if self.buffered:
callback(self)
else:
self._post_render_callbacks.append(callback)
I plan to have my cache framework pass a callable into the response, which then calls it from a content_stream finally
block to copy the temporary file into S3.
However with the above code I see two streams - one compressed, one not, and the response cannot be returned from cache.
I have modified this question to save the reader reading about syntax errors, but one was interesting. Because I overrode the streaming_content
getter, I had to re-declare the setter (identically to how it was declared in the super-class).
Footnote: Caching streams is almost always wrong. But these responses are generated by complex queries and DRF serializers and viewsets, and we stream so our many users on very poor connections see data arriving more quickly. Given a stream locks resources on server and client for the duration, this might use more resources than not streaming; it might push some memory consumption to the database rather than webserver as records are cursored. The responses are up to a few megabytes, usually less, and these will be cached on our S3 cache tier. Redis would be too expensive.
Solution
It is worth repeating Willem's warning above. In almost all cases, you should not cache streams. nginx even by default consumes any stream and returns non-streamed. Streams lock up resources on client and server for the duration. You typically stream because there is a lot of data, and so you don't want to wait until it is all generated in memory before sending it to the client - to save memory, and/or so the client starts receiving data earlier. A cache often loads data into memory as part of storing it, removing both advantages and flooding your RAM and cache storage. And large responses are often coming from storage, such as video, so there's no benefit from caching. However, I was working on an aged, unwisely built project with far bigger issues, unwilling to overturn a decade-old decision to support streaming. I decided to keep my gunpowder dry to fight bigger fires..
The reason for getting two streams turned out to be GZipMiddleware
.
StreamingHttpResponse
applies make_bytes
in the streaming_content
getter, by returning map(self.make_bytes, self._iterator)
. I used the same approach to apply buffer
and the save_to_cache callback.
GZipMiddleware
however, just replaces streaming_content
:
response.streaming_content = compress_sequence(response.streaming_content)
The streaming_content
setter calls _set_streaming_content
, which replaces self._iterator
. So CachedStreamingHttpResponse._iterator
now contains the original content, but with the calls to self.make_bytes
, self.buffer
self.closing_iterator_wrapper
, and now GZip's compress_sequence
, baked in.
When streaming, the StreamingHttpResponse.streaming_content
getter still calls make_bytes
on each GZipped chunk from self._iterator
, again. make_bytes
is harmless, but CachedHttpStreamingResponse
also applies buffer
and close
a second time, hence seeing two streams.
The below code resolves this by replacing _iterator
, like GZipMiddleware, instead of applying the generators at fetch time.
Should GZipMiddleware be changed to better align with StreamingHttpResponse
? Perhaps StreamingHttpResponse
should maintain a list of callables to be called on each chunk? imo that would be a clearer API which might encourage more middleware to support streaming responses. But smooth deprecation would be tricky.
class CachedStreamingHttpResponse(StreamingHttpResponse):
def __init__(self, streaming_content=(), *args, **kwargs):
self._post_render_callbacks = []
self.buffered = False
self._buffer = TemporaryFile()
super().__init__(streaming_content, *args, **kwargs)
# Replace self._iterator here, rather than wrap super().streaming_content, so methods are applied once only:
buffered_stream = map(self.buffer, super().streaming_content)
self.streaming_content = self.closing_iterator_wrapper(buffered_stream, self.post_render)
def post_render(self):
self._buffer.seek(0)
self.buffered = self._buffer
retval = self
for post_callback in self._post_render_callbacks:
newretval = post_callback(retval)
if newretval is not None:
retval = newretval
def buffer(self, b):
self._buffer.write(b)
return b
@staticmethod
def closing_iterator_wrapper(iterable, close):
try:
yield from iterable
finally:
close()
def add_post_render_callback(self, callback):
"""A list of callables to be run after the final chunk is returned. Used to copy the response to cache."""
if self.buffered:
callback(self)
else:
self._post_render_callbacks.append(callback)
Couple more steps - our caching framework caches whole responses, rather than just the content, so we need to construct a response that won't be streamed when it comes from cache.
class CachedStreamingHttpResponse(StreamingHttpResponse):
"""
A streaming HTTP response class that also spools the stream to disk for caching.
"""
def __init__(self, streaming_content=(), *args, **kwargs):
super().__init__(streaming_content, *args, **kwargs)
# Initialize working variables
# Callables to run after completion - used to run cache_set callback
self._post_render_callbacks = []
# True once the full stream has been streamed
self._complete = False
# Stream is streamed to this file as it is streamed to client, so that large responses do not consume RAM.
# It is critical these temporary files are tidied up promptly. Under Unix, the directory entry for the file
# is either not created at all or is removed immediately after the file is created.
# Additionally, file.close() is called:
# - when self._resource_closers are run in HttpResponseBaseResponse.close(), which is called by the WSGI server
# - when the TemporaryFile object is garbage collected, or
# - by CachedStreamingResponse.get_cacheable_response, when constructing a response to pickle and cache
self._buffer = TemporaryFile()
# Write each chunk to the temporary file as it is streamed:
buffered_stream = map(self.buffer, super().streaming_content)
# Call post_render after the stream finishes, to call the cache_set callback:
self.streaming_content = self.closing_iterator_wrapper(buffered_stream, self.post_render)
# Note: We set streaming_content, rather than have streaming_content wrap on return like StreamingHttpResponse,
# because that's how GZipMiddleware works. If we don't, GZipMiddleware applies the generators twice.
# Details: https://stackoverflow.com/questions/76181779/
def buffer(self, b: bytes):
self._buffer.write(b)
return b
@property
def buffered(self):
if self._complete:
return self._buffer
def post_render(self):
self._buffer.seek(0)
self._complete = True
retval = self
for post_callback in self._post_render_callbacks:
newretval = post_callback(retval)
if newretval is not None:
retval = newretval
def add_post_render_callback(self, callback: callable):
"""A list of callables to be run after the final chunk is returned. Used to copy the response to cache."""
if self.buffered:
callback(self)
else:
self._post_render_callbacks.append(callback)
@staticmethod
def closing_iterator_wrapper(iterable, close: callable):
try:
yield from iterable
finally:
close()
def construct_cacheable_response(self):
if self.buffered is False:
raise ValueError("Cannot construct a cacheable response until the stream has completed.")
response = HttpResponse(content=self.buffered.read())
self.buffered.close()
# content is cached before encoding (eg, gzip) and session cookie are applied by middleware
response.headers = {k: v for k, v in self.headers.items() if k != "Content-Encoding"}
return response
We then need to connect it to our cache decorator:
if isinstance(result, CachedStreamingHttpResponse):
def cache_stream(streaming_response: CachedStreamingHttpResponse) -> None:
# Next time this URL is called, return HttpResponse from the S3 cache tier without streaming
cache.set(cache_key, streaming_response.construct_cacheable_response(), _timeout)
result.add_post_render_callback(cache_stream)
Important to note that this does load the full response into RAM very briefly when writing to cache and reading from it (because the Django cache API unfortunately doesn't allow for streaming), so this is only suitable when the response is a few megabytes max.
Answered By - Chris
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.