The implementation of flash restful API and the basic permission control based on flash httpauth

This series of tutorials is divided into four stages

1.flask restful web service

2.flask restful api

3.flask Httpauth to realize authority control

4.uwsgi Manage flask applications


In the first two articles, we have studied the development and implementation of basic web services and restful api. After the implementation of interface development, we generally need to encrypt and authenticate the interface to avoid data loss caused by interface leakage.

In the original text, the code is divided into several parts, which is not intuitive, and affects the continuous learning, so here the author studies directly in the form of the whole code, and explains the key points in the code.

Here is the code to realize httpauth and token authentication:

import time

from flask import Flask, jsonify, url_for, request, g
from flask_restful import Api, abort
# Flash Sqlalchemy module to implement ORM framework operation
from flask_sqlalchemy import SQLAlchemy
# Flash comes with httpauth module, BasicAuth and tokenAuth submodules
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
# Password hash encryption module
from passlib.apps import custom_app_context as pwd_context
# jwt-- token generation module
import jwt
# werkzeug encryption module, you can learn about it if you are interested
# from werkzeug.security import generate_password_hash, check_password_hash

app = Flask(__name__)
api = Api(app)

# Some basic configuration of the ORM framework operation implemented by flash Sqlalchemy
app.config['SECRET_KEY'] = 'dqwecf29vbneuirjnf2i3n0f2i302n'
# Database connection information configuration
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite'
# Whether to automatically submit sql execution
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
# Show modification receipt or not
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# SQLAlchemy instantiation
db = SQLAlchemy(app)
# Introduce basic authentication and token authentication module
auth = HTTPBasicAuth()
token_m = HTTPTokenAuth()

class User(db.Model):
    #Table name
    __tablename__ = 'users'
    #Table field name corresponding assignment property
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(32), index=True)
    password_hash = db.Column(db.String(64))
    # print(id, username, password_hash)

    # def __init__(self, username, password_hash):
    #     self.username = username
    #     self.password_hash = password_hash

    #Generate password hash method
    def hash_password(self, password):
        self.password_hash = pwd_context.encrypt(password)

    #Password verification method
    def verify_password(self, password):
        return pwd_context.verify(password, self.password_hash)

    #Generate token method
    def generate_auth_token(self):
        # print(time.time())
        print("self id ", self.id)
        print(self.username)
        print(self.password_hash)
        return jwt.encode(
            {'id': self.id, 'exp': time.time() + 600},
            app.config['SECRET_KEY'], algorithm='HS256')

    #Check token method
    @staticmethod
    def verify_auth_token(token):
        try:
            data = jwt.decode(token, app.config['SECRET_KEY'],
                              algorithms=['HS256'])
            print(data)
        except Exception as e:
            print(e)
            return False
        user = User.query.get(data['id'])
        print("user", user)
        return user

# Table creation of data on initial request
# In the actual development, this is usually completed in the database. The R & D personnel only need to pay attention to the table structure and perform the corresponding field operations
@app.before_first_request
def create_db():
    #delete
    db.drop_all()
    #establish
    db.create_all()

# Verify the password information carried by the request according to the password login authentication decorator
@auth.verify_password
def verify_password(username_or_token, password):
    user = User.query.filter_by(username=username_or_token).first()
    if not user or not user.verify_password(password):
        return False
    g.user = user
    return True

# Checking the token carried by the request according to the token login authentication decorator
@token_m.verify_token
def verify_token(token):
    user = User.verify_auth_token(token)
    g.user = user
    if not user:
        return False
    return True

# Obtain user access information and add new users according to judgment information
@app.route('/api/users', methods=['POST'])
def new_user():
    username = request.json.get('username')
    password = request.json.get('password')
    if username is None or password is None:
        abort(400)  # missing arguments
    if User.query.filter_by(username=username).first() is not None:
        print('existing user')
        abort(400)  # existing user
    user = User(username=username)
    user.hash_password(password)
    db.session.add(user)
    db.session.commit()
    return (jsonify({'username': user.username}), 201,
            {'Location': url_for('get_user', id=user.id, _external=True)})

# Query specified user method
@app.route('/api/users/<int:id>')
def get_user(id):
    user = User.query.get(id)
    if not user:
        abort(400)
    return jsonify({'username': user.username})

# Query all user methods
@app.route('/api/users', methods=['GET'])
def get_all_user():
    users = User.query.all()
    if not users:
        abort(400)
    return jsonify(["{0}, {1}".format(user.username, user.password_hash) for user in users])

# token generation method based on password
@app.route('/api/token')
# Password login authentication decorator
@auth.login_required
def get_auth_token():
    token = g.user.generate_auth_token()
    return jsonify({'token': token.decode('ascii'), 'duration': 600})

# token authentication method
@app.route('/api/resource')
# token login authentication decorator
@token_m.login_required
def get_resource():
    return jsonify({'data': 'Hello, %s!' % g.user.username})

@app.errorhandler(400)
def error(e):
    print(e)
    return 'Bad request', 400

if __name__ == '__main__':
    app.run('0.0.0.0', 8080)


In the above code block, readers can directly copy it to IDEA to run and view the effect.
In the ORM framework, method calls are used instead of native sql execution, for example:

query
User.query.filter_by(username=username).first()
increase
user = User(username=username)
user.hash_password(password)
db.session.add(user)
db.session.commit()

For the specific implementation of ORM framework based on SQLAlchemy, please inquire the information to learn.
The corresponding module has been described in the code block. In the actual call, the code name is self explanatory. If you don't know about it, you can search the relevant information again.

In this paper, we only demonstrate the auth implementation of flask, no longer too much elaboration, adults' learning should not be what to feed, but do it by themselves.

Thanks for watching.

Tags: Python Session Database SQLite SQL

Posted on Thu, 14 May 2020 02:22:01 -0700 by Javizy