How to implement server push in Flask framework?

70,157

Solution 1

Have a look at Server-Sent Events. Server-Sent Events is a browser API that lets you keep open a socket to your server, subscribing to a stream of updates. For more Information read Alex MacCaw (Author of Juggernaut) post on why he kills juggernaut and why the simpler Server-Sent Events are in manny cases the better tool for the job than Websockets.

The protocol is really easy. Just add the mimetype text/event-stream to your response. The browser will keep the connection open and listen for updates. An Event sent from the server is a line of text starting with data: and a following newline.

data: this is a simple message
<blank line>

If you want to exchange structured data, just dump your data as json and send the json over the wire.

An advantage is that you can use SSE in Flask without the need for an extra Server. There is a simple chat application example on github which uses redis as a pub/sub backend.

def event_stream():
    pubsub = red.pubsub()
    pubsub.subscribe('chat')
    for message in pubsub.listen():
        print message
        yield 'data: %s\n\n' % message['data']


@app.route('/post', methods=['POST'])
def post():
    message = flask.request.form['message']
    user = flask.session.get('user', 'anonymous')
    now = datetime.datetime.now().replace(microsecond=0).time()
    red.publish('chat', u'[%s] %s: %s' % (now.isoformat(), user, message))


@app.route('/stream')
def stream():
    return flask.Response(event_stream(),
                          mimetype="text/event-stream")

You do not need to use gunicron to run the example app. Just make sure to use threading when running the app, because otherwise the SSE connection will block your development server:

if __name__ == '__main__':
    app.debug = True
    app.run(threaded=True)

On the client side you just need a Javascript handler function which will be called when a new message is pushed from the server.

var source = new EventSource('/stream');
source.onmessage = function (event) {
     alert(event.data);
};

Server-Sent Events are supported by recent Firefox, Chrome and Safari browsers. Internet Explorer does not yet support Server-Sent Events, but is expected to support them in Version 10. There are two recommended Polyfills to support older browsers

Solution 2

Redis is overkill: use Server-Sent Events (SSE)

Late to the party (as usual), but IMHO using Redis may be overkill.

As long as you're working in Python+Flask, consider using generator functions as described in this excellent article by Panisuan Joe Chasinga. The gist of it is:

In your client index.html

var targetContainer = document.getElementById("target_div");
var eventSource = new EventSource("/stream")
  eventSource.onmessage = function(e) {
  targetContainer.innerHTML = e.data;
};
...
<div id="target_div">Watch this space...</div>

In your Flask server:

def get_message():
    '''this could be any function that blocks until data is ready'''
    time.sleep(1.0)
    s = time.ctime(time.time())
    return s

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/stream')
def stream():
    def eventStream():
        while True:
            # wait for source data to be available, then push it
            yield 'data: {}\n\n'.format(get_message())
    return Response(eventStream(), mimetype="text/event-stream")

Solution 3

As a follow-up to @peter-hoffmann's answer, I've written a Flask extension specifically to handle server-sent events. It's called Flask-SSE, and it's available on PyPI. To install it, run:

$ pip install flask-sse

You can use it like this:

from flask import Flask
from flask_sse import sse

app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')

@app.route('/send')
def send_message():
    sse.publish({"message": "Hello!"}, type='greeting')
    return "Message sent!"

And to connect to the event stream from Javascript, it works like this:

var source = new EventSource("{{ url_for('sse.stream') }}");
source.addEventListener('greeting', function(event) {
    var data = JSON.parse(event.data);
    // do what you want with this data
}, false);

Documentation is available on ReadTheDocs. Note that you'll need a running Redis server to handle pub/sub.

Solution 4

As a committer of https://github.com/WolfgangFahl/pyFlaskBootstrap4 i ran into the same need and created a flask blueprint for Server Sent Events that has no dependency to redis.

This solutions builds on the other answers that have been given here in the past.

https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/fb4/sse_bp.py has the source code (see also sse_bp.py below).

There are unit tests at https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/tests/test_sse.py

The idea is that you can use different modes to create your SSE stream:

  • by providing a function
  • by providing a generator
  • by using a PubSub helper class
  • by using the PubSub helper class and use pydispatch at the same time.

As of 2021-02-12 this is alpha code which i want to share nevertheless. Please comment here or as issues in the project.

There is a demo at http://fb4demo.bitplan.com/events and a description of the example use e.g. for a progress bar or time display at: http://wiki.bitplan.com/index.php/PyFlaskBootstrap4#Server_Sent_Events

example client javascript/html code

<div id="event_div">Watch this space...</div>
<script>
    function fillContainerFromSSE(id,url) {
        var targetContainer = document.getElementById(id);
        var eventSource = new EventSource(url)
        eventSource.onmessage = function(e) {
            targetContainer.innerHTML = e.data;
        };
    };
    fillContainerFromSSE("event_div","/eventfeed");
</script>

example server side code


def getTimeEvent(self):
        '''
        get the next time stamp
        '''
        time.sleep(1.0)
        s=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        return s   

def eventFeed(self):
        '''
        create a Server Sent Event Feed
        '''
        sse=self.sseBluePrint
        # stream from the given function
        return sse.streamFunc(self.getTimeEvent)

sse_bp.py

'''
Created on 2021-02-06
@author: wf
'''
from flask import Blueprint, Response, request, abort,stream_with_context
from queue import Queue
from pydispatch import dispatcher
import logging

class SSE_BluePrint(object):
    '''
    a blueprint for server side events 
    '''
    def __init__(self,app,name:str,template_folder:str=None,debug=False,withContext=False):
        '''
        Constructor
        '''
        self.name=name
        self.debug=debug
        self.withContext=False
        if template_folder is not None:
            self.template_folder=template_folder
        else:
            self.template_folder='templates'    
        self.blueprint=Blueprint(name,__name__,template_folder=self.template_folder)
        self.app=app
        app.register_blueprint(self.blueprint)
        
        @self.app.route('/sse/<channel>')
        def subscribe(channel):
            def events():
                PubSub.subscribe(channel)
            self.stream(events)
                
    def streamSSE(self,ssegenerator): 
        '''
        stream the Server Sent Events for the given SSE generator
        '''  
        response=None
        if self.withContext:
            if request.headers.get('accept') == 'text/event-stream':
                response=Response(stream_with_context(ssegenerator), content_type='text/event-stream')
            else:
                response=abort(404)    
        else:
            response= Response(ssegenerator, content_type='text/event-stream')
        return response
        
    def streamGen(self,gen):
        '''
        stream the results of the given generator
        '''
        ssegen=self.generateSSE(gen)
        return self.streamSSE(ssegen)   
            
    def streamFunc(self,func,limit=-1):
        '''
        stream a generator based on the given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            an SSE Response stream
        '''
        gen=self.generate(func,limit)
        return self.streamGen(gen)
                
    def generate(self,func,limit=-1):
        '''
        create a SSE generator from a given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            a generator for the function
        '''   
        count=0
        while limit==-1 or count<limit:
            # wait for source data to be available, then push it
            count+=1
            result=func()
            yield result
        
    def generateSSE(self,gen):
        for result in gen:
            yield 'data: {}\n\n'.format(result)
            
    def enableDebug(self,debug:bool):
        '''
        set my debugging
        
        Args:
            debug(bool): True if debugging should be switched on
        '''
        self.debug=debug
        if self.debug:
            logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
            
    def publish(self, message:str, channel:str='sse', debug=False):
        """
        Publish data as a server-sent event.
        
        Args:
            message(str): the message to send
            channel(str): If you want to direct different events to different
                clients, you may specify a channel for this event to go to.
                Only clients listening to the same channel will receive this event.
                Defaults to "sse".
            debug(bool): if True  enable debugging
        """
        return PubSub.publish(channel=channel, message=message,debug=debug)

    def subscribe(self,channel,limit=-1,debug=False):
        def stream():
            for message in PubSub.subscribe(channel,limit,debug=debug):
                yield str(message)
                
        return self.streamGen(stream)
    
class PubSub:
    '''
    redis pubsub duck replacement
    '''
    pubSubByChannel={}
    
    def __init__(self,channel:str='sse',maxsize:int=15, debug=False,dispatch=False):
        '''
        Args:
            channel(string): the channel name
            maxsize(int): the maximum size of the queue
            debug(bool): whether debugging should be switched on
            dispatch(bool): if true use the pydispatch library - otherwise only a queue
        '''
        self.channel=channel
        self.queue=Queue(maxsize=maxsize)
        self.debug=debug
        self.receiveCount=0
        self.dispatch=False
        if dispatch:
            dispatcher.connect(self.receive,signal=channel,sender=dispatcher.Any)
        
    @staticmethod
    def reinit():
        '''
        reinitialize the pubSubByChannel dict
        '''
        PubSub.pubSubByChannel={}
        
    @staticmethod
    def forChannel(channel):    
        '''
        return a PubSub for the given channel
        
        Args:
            channel(str): the id of the channel
        Returns:
            PubSub: the PubSub for the given channel
        '''
        if channel in PubSub.pubSubByChannel:
            pubsub=PubSub.pubSubByChannel[channel]
        else:
            pubsub=PubSub(channel)
            PubSub.pubSubByChannel[channel]=pubsub
        return pubsub
    
    @staticmethod    
    def publish(channel:str,message:str,debug=False):
        '''
        publish a message via the given channel
        
        Args:
            channel(str): the id of the channel to use
            message(str): the message to publish/send
        Returns:
            PubSub: the pub sub for the channel
            
        '''
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        pubsub.send(message)
        return pubsub
        
    @staticmethod    
    def subscribe(channel,limit=-1,debug=False): 
        '''
        subscribe to the given channel
        
        Args:
            channel(str): the id of the channel to use
            limit(int): limit the maximum amount of messages to be received        
            debug(bool): if True debugging info is printed
        '''  
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        return pubsub.listen(limit)
    
    def send(self,message):
        '''
        send the given message
        '''
        sender=object();
        if self.dispatch:
            dispatcher.send(signal=self.channel,sender=sender,msg=message)
        else:
            self.receive(sender,message)
        
    def receive(self,sender,message):
        '''
        receive a message
        '''
        if sender is not None:
            self.receiveCount+=1;
            if self.debug:
                logging.debug("received %d:%s" % (self.receiveCount,message))
            self.queue.put(message)
        
    def listen(self,limit=-1):
        '''
        listen to my channel
        
        this is a generator for the queue content of received messages
        
        Args:
            limit(int): limit the maximum amount of messages to be received
        
        Return:
            generator: received messages to be yielded
        '''
        if limit>0 and self.receiveCount>limit:
            return
        yield self.queue.get()
    
    def unsubscribe(self):
        '''
        unsubscribe me
        '''
        if self.dispatch:
            dispatcher.disconnect(self.receive, signal=self.channel)
        pass
Share:
70,157
little-eyes
Author by

little-eyes

Updated on July 05, 2022

Comments

  • little-eyes
    little-eyes almost 2 years

    I am trying to build a small site with the server push functionality on Flask micro-web framework, but I did not know if there is a framework to work with directly.

    I used Juggernaut, but it seems to be not working with redis-py in current version, and Juggernaut has been deprecated recently.

    Does anyone has a suggestion with my case?

  • little-eyes
    little-eyes over 11 years
    Hi @PeterSmith, I tried this approach, however, the alert(event.data) never appears. I run my Flask app in the port 8000 and the push in port 8001. So I put "var source = new EventSource('localhost:8001/push');" and the Flask app has a page that a user can post something. The post is broadcasted and received by all other users. Do you have any ideas?
  • Peter Hoffmann
    Peter Hoffmann over 11 years
    Why do you run the push on a different port? One reason for SSE is that it runs within your app over normal http. how did you run your flask apps? through the development server? Did you add the threaded=True? What browser are you using?
  • little-eyes
    little-eyes over 11 years
    Hi @PeterHoffmann, I use tornado with Flask. Is it possible that I put app instance and pusher instance together in tornado? Say different handler? Should I still set multithread?
  • chmike
    chmike over 11 years
    Is there one thread per connected client to handle its event stream ? This looks like a long polling. If yes, this won't scale. The question of @little-eyes make sense to me.
  • Ilmo Euro
    Ilmo Euro about 11 years
    This will scale when using gevent+monkeypatch
  • flexd
    flexd almost 11 years
    How can you actually be sure Flask closes the connection? If I reload the page a lot I get a bunch of stale connections, and when I ctrl-C the flask app it still serves requests because there are connections open :-/
  • Adam
    Adam over 6 years
    Attention, I had ProfilerMiddleware enabled and it did not work, so be sure to not use it.
  • Deepak Banka
    Deepak Banka almost 5 years
    When I try to use sleep in the get_message function the stream doesn't reach the browser. Without sleep it works fine but I don't want so many messages in 1 second. So, do you have any idea on why it doesn't work when there is a sleep in the function?
  • fearless_fool
    fearless_fool almost 5 years
    @DeepakBanka: The code worked for me. Can you put a print() in get_message() to make sure it's getting called as you expect? And use your browser dev tools to monitor traffic between server and browser? That might give some insights...
  • singingwolfboy
    singingwolfboy over 4 years
    @ManuelGodoy: /stream
  • JZL003
    JZL003 over 4 years
    @fearleaa_fool is there a worry that the synchronous limitations of python will prevent this from scaling. I know in django similar sse/websockets integration has required separate async severs.
  • fearless_fool
    fearless_fool over 4 years
    @JZL003: I've only used Flask for microservices with limited # of clients. I'll yield the floor to someone who has worked with Flask servers in highly concurrent apps...
  • vincent
    vincent almost 4 years
    @ManuelGodoy I abandoned the library last year because I couldn't figure that out. Thanks for asking.
  • Shuzheng
    Shuzheng over 3 years
    @PeterHoffmann - do you know how to handle "client disconnected" as mentioned in the code you link to? It seems that it's handled automatically, but I'm not sure.
  • Vineeth Sai
    Vineeth Sai over 2 years
    I am facing problems with CORS. I want my react server to access this SSE.
  • odinchess
    odinchess over 2 years
    This blocks HTTP requests! Is there any way to run the sse connection from a background worker?
  • odinchess
    odinchess over 2 years
    Can you run this from a background worker, separate from your gunicorn web app?
  • Wolfgang Fahl
    Wolfgang Fahl over 2 years
    sse is independent of the way your server is implemented.