Issue
I've created a basic flask application in which I want to implement Login through JWT. It generates tokens for every USERNAME and PASSWORD, whether the entered value is right or wrong.
Plus, I'm unsure how it is to 'validate the data' at the time of login and 'check the password for a hash'.
I've seen many libraries for JWT in Flask I'm confused about which one to use as I'm using SQLAlchemy core(writing hardcoded SQL queries for API request).
app.py
from dataclasses import fields
import datetime
import json
from typing_extensions import Required
from sqlalchemy import DateTime
from email.policy import default, strict
from flask import Flask, jsonify, request, make_response
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from sqlalchemy.sql import text, func
from flask_marshmallow import Marshmallow
from marshmallow import Schema, fields, validates, ValidationError, validate
from sqlalchemy.sql import select, insert, bindparam
from sqlalchemy.dialects.postgresql import UUID
import uuid
from werkzeug.security import generate_password_hash,check_password_hash
from sqlalchemy import create_engine
import logging
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
from functools import wraps
import jwt
engine = create_engine('mysql://root:[email protected]:3306/flask')
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:[email protected]:3306/flask'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'thisissecret'
#app.config['JWT_SECRET_KEY'] = 'super-secret'
db = SQLAlchemy(app)
ma= Marshmallow(app)
migrate = Migrate(app, db)
# jwt = JWTManager(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(100), nullable=False)
password = db.Column(db.String(100), nullable=False)
email_address = db.Column(db.String(80), unique=True, nullable=False)
dob = db.Column(db.DateTime, nullable=True)
address = db.Column(db.String(200))
uid = db.Column(db.Text(length=16), nullable = False)
def __init__(self, user_name, password, email_address, dob, address, uid):
self.user_name = user_name
self.password = password
self.email_address = email_address
self.dob = dob
self.address = address
self.uid = uid
# User Schema
class UserSchema(Schema):
print("Scehma")
user_name = fields.String(required=True, validate=validate.Length(min=5))
password = fields.String()
email_address = fields.String(required=True, error_messages={"required": " email_address is required. "})
dob = fields.DateTime(required= True, error_messages={"required": " dob is required. "})
address = fields.String()
uid = fields.UUID( error_messages={'null': 'Sorry, Id field cannot be null', 'invalid_uuid': 'Sorry, Id field must be a valid UUID'})
print("access")
@validates('dob')
def is_not_in_future(self,value):
"""'value' is the datetime parsed from time_created by marshmallow"""
now = datetime.datetime.now()
if value > now:
raise ValidationError("Can't create notes in the future!")
# if the function doesn't raise an error, the check is considered passed
class Meta:
# Fields to show when sending data
fields = ('id','user_name', 'password', 'email_address', 'dob', 'address')
#init Schema
user_schema = UserSchema()
users_scehma = UserSchema(many=True)
@app.route('/users', methods=['GET'])
# def users():
# if request.method == 'GET':
# all_users = User.query.all()
# return users_scehma.dump(all_users)
def get_all_users():
conn = engine.connect()
str_sql = text("SELECT * FROM flask.`user`;")
results = conn.execute(str_sql).fetchall()
return users_scehma.dump(results)
@app.route('/register', methods=['POST'])
def user_register():
print("entry")
# request_data = request.get_json()
# print("data :",request_data)
errors = user_schema.validate(request.get_json())
print("errors", errors)
data_dict= user_schema.load(request.get_json())
print("dict:",data_dict)
conn = engine.connect()
user_name = data_dict['user_name']
print(user_name)
password = generate_password_hash(data_dict['password'], method = 'sha256')
email_address = data_dict['email_address']
dob = data_dict['dob']
address = data_dict['address']
uid = uuid.uuid4().hex
print(uid)
# print (user_name,password,email_address,dob,address)
sql= text("""INSERT INTO user (user_name, password, email_address, dob, address, uid ) VALUES(:user_name , :password, :email_address, :dob, :address, :uid)""")
data = ({"user_name": user_name, "password": password, "email_address": email_address, "dob": dob, "address": address, "uid": uid})
print(sql)
conn.execute(sql, data)
return jsonify({'Message':'New user Created'})
@app.route('/login', methods=['POST'])
def login():
print("login")
# request_data= user_schema.load(request.get_json())
request_data = request.get_json()
user_name=request_data['user_name']
password=request_data['password']
conn = engine.connect()
data=conn.execute("""SELECT user_name, password FROM user WHERE user_name = user_name and password = password""",{"user_name": user_name, "password": password}).fetchone()
# userdata = json.dumps(data.data)
print(data)
if data:
access_token = create_access_token(identity=user_name)
return jsonify(message='Login Successful', access_token=access_token)
else:
return jsonify('Bad email or Password'), 401
if __name__ == '__main__':
app.run(debug=True)
Solution
To handle the authentication process correctly you should define 2 methods before declaring the app = Flask(__name__)
which are called authenticate and identity in JWT(app, authenticate, identity)
`
from flask_jwt import JWT
def authenticate(username, password):
user = username_table.get(username, None)
if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
return user
def identity(payload):
user_id = payload['identity']
return userid_table.get(user_id, None)
engine = create_engine('mysql://root:[email protected]:3306/flask')
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'super-secret'
jwt = JWT(app, authenticate, identity)
now you just have to enter the user data and make a request to your program to return the user data and secret token, additionally also specify the JWT
payload handler that recieve data from identity
method.
@jwt.payload_handler
def make_payload(identity):
return {'user_id': identity.id}
Also check authentication_handler
@jwt.authentication_handler
def authenticate(username, password):
user = User.query.filter(User.username == username).scalar()
if bcrypt.check_password_hash(user.password, password):
return user
Answered By - ellhe-blaster
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.