Issue
I am trying to have a Flask Server that allows me to launch a tweepy stream and on every message received in the stream listener, it sends that message to a socketio client. The Flask server is at the same time supposed to allow Twilio to post to it, and route that message to the client—so that the client is receiving messages from both Twilio and twitter.
I have been trying to get the server to send messages over to the client for the data incoming from twitter, the code for Twilio works just fine. It sends data over to the client on message receipt. The main loop in tweepy is also not locking up the program—I can test print statements and see tweets and the incoming sms's being printed in the handle_message(msg)
function asynchronously. I feel like there must be something really simple that I am missing here since the SMS's are emitted to the client, but the incoming tweets are not, even though they are propagating through to the handle_message(msg)
function. What gives?
server.py
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/path/to/credentials/'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, cors_allowed_origins="*")
auth = tweepy.OAuthHandler(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'])
auth.set_access_token(credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
api = tweepy.API(auth)
class MyListener(tweepy.StreamListener):
def on_status(self, status):
print('status')
def on_data(self, data):
handle_message(data)
def on_error(self, status):
print('error')
print(status)
stream_listener = MyListener()
# twilio sms route
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
# flask-socketio stuff
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#mysearchterm"
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=[search_term], is_async=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client Diconnected')
@sio.event
def handle_message(message):
print("This is the message received: ", message)
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)
client.py
import socketio
client = socketio.Client()
@client.on('client_connected')
def on_connect(message):
print(message)
@client.on('handle_message')
def message(data):
print(data)
client.connect('http://localhost:5000/')
Solution
I solved my problem! As I noted in this comment, the issue was with multithreading and passing information between the threads. With tweepy, the parameter is_async=True
, which in 4.1.0 is threading=True
, opens up a new thread once the stream is run.
Instead of trying to deal with passing information around, I exploited the extant flask-socketio functionality by using a local redis server as a message queue (start from the section "Using Multiple Workers" if you are setting this up for the first time, also be sure to install redis).
Here is the updated server.py
code. The client.py
code remained essentially unchanged:
import eventlet
eventlet.monkey_patch()
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/PATH/TO/CREDENTIALS'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, message_queue='redis://', cors_allowed_origins="*")
class MyStream(tweepy.Stream):
def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
super(MyStream, self).__init__(consumer_key, consumer_secret, access_token, access_secret)
self.stream_sio = SocketIO(message_queue='redis://')
def on_status(self, status):
print('status')
def on_data(self, data):
json_data = json.loads(data)
self.stream_sio.emit('handle_message', json_data['text'])
# TODO: Send along all necessary information
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#testingtesting123"
stream = MyStream(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'],
credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
stream.filter(track=[search_term], threaded=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client disconnected')
def handle_message(message):
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)
Answered By - ec.lemmon
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.