# 实现第一个 RESTful API

用 Flask 先写一个小 demo 来了解 RESTful API。

  1. URL 以以下规范显示
http://localhost:5000/students/1

其中, students 为表名,表面 API 是要对 students 进行请求。

1 表示表示,要对 id 号为 1 的 students 进行相关请求。

  1. 发送 GET 请求

结合 URL,说明要请求 id 号为 1 的学生信息。

  1. Response

送回 json 格式的数据,例如:

{
    "id": 1,
    "name": "Jack",
    "gender": "male"
}

首先我们新建一个 student_api.py ,在里面填写如下代码:

from app import app
from flask import jsonify
@app.route('/students/<student_id>')
def get_student(student_id):
    student = {'id': student_id, 'name': 'Jack', 'gender': 'male'}
    return jsonify(student)

上面的代码中我们创建了一个路由,这个就是进入的 URL 地址,符合我们的第一个要求。默认是 GET 请求。

因为信息应该是从数据库里取,但为了方便我们就先自己定义一个 json 格式的数据,然后返回。

在 postman 中我们发送一个请求,能够成功接收到刚才定义的 json 数据。

发送GET请求

当然,GET 还可以发送请求,这个时候在 postman 的 body 里选 raw 格式,发送我们的 json 数据进行调试。

带数据请求

我们在之前的 python 文件中更新代码,并在第八行加入断点调试,可以接收到我们发送的请求。

from app import app
from flask import jsonify, request
@app.route('/students/<student_id>')
def get_student(student_id):
    if request.is_json:
        args = request.get_json()
    student = {'id': student_id, 'name': 'Jack', 'gender': 'male'}
    return jsonify(student)

# 使用 flask-restful 开发 API

像初始化 Flask 的 app 一样来初始化 flask-restful 的 API,这个 api 来自于 falsk-restful。

from flask import Flask
from flask_restful import Api
app = Flask(__name__)
api = Api(app)
from app.api import student_resource

在 flask-restful 里面,把对外提供 api 访问的信息称之为资源 (Resource)。

# 定义资源类

定义一个类,继承自 flask_restful 的 Resource。

在类里定义一个 get () 方法,这个 get () 方法将会成为 GET 请求 API 的执行者。

flask_restful 把这种概念都进行了一种转换封装,不需要再去定义很多的信息。

把之前的实现的那个 API,get_student () 重新实现。

from flask_restful import Resource
from app import api
class StudentResource(Resource):
    def get(self, student_id: int):
        return {'id': student_id, 'name': 'Jack', 'gender': 'male'}
    
    def put(self, student_id: int):
        return {'id': student_id, 'name': 'MarryJ', 'gender': 'female'}
api.add_resource(StudentResource, '/students/<int:student_id>')

因为 StudentResource 继承自 Resource,这个类将 json 封装好了,所以可以直接返回 json 格式的对象。

api.add_resource(StudentResource, '/students/<int:student_id>') 这行代码的意思是,对外增加一个资源,不需要说明什么请求,因为请求 GET 会自动调用 get () 方法,请求 POST 会自动调用 post () 方法。调用的资源是 StudentResource 这个类, /students/<int:student_id> 是 URL 地址, <> 内的内容和 get () 方法里的一致,声明的类型也要一致。

这样用 postman 重新发送请求,就可以得到我们 return 的 json 对象。

当然我们也可以试试用 postman 发送 PUT 请求,最后能得到 put () 方法里返回的对象。

这样我们一个对象就能自动的根据前端调用的方式来处理,相比不用 flask-restful 的方法要方便。

注意,post () 方法如果这么写理论上也能得到请求,但不符合规范。

POST 请求是新建一个资源,这个资源 id 通常都是服务器来完成的,请求的时候还不知道 id。并且 POST 请求在 URL 不显示参数。

此外,通常情况下请求的资源不在数据库里,API 会报错信息。简单的展示一下:

理论上错误可以返回一个 json,客户端每次都解析错误的 json 这也是一种方法。

但 HTPP 有状态码,200 表示成功,通常 404 表示没有请求的资源。

所以对客户端来讲,先判断状态码是什么,如果有问题再分析结果里的信息。这也是一种方法。

from flask_restful import Resource
from app import api
class StudentResource(Resource):
    def get(self, student_id: int):
        if student_id == 1:
            return {'id': student_id, 'name': 'Jack', 'gender': 'male'}
        else:
            return {'error': f'Student not found for {id}'}, 404
    def put(self, student_id: int):
        return {'id': student_id, 'name': 'MarryJ', 'gender': 'female'}
api.add_resource(StudentResource, '/students/<int:student_id>')

# 访问数据库

将数据库的数据通过 API 的方式暴露给外部。

连接 mysql 数据库使用 flask-sqlalchemy 这个库。

修改 __init__.py 这个文件为如下,相较于上一节增加了数据库连接的部分

from flask import Flask
from flask_restful import Api
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
api = Api(app)
# mysql://username:password@hostname/database
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqldb://root:root@10.211.55.3/restful_db'
db = SQLAlchemy(app)
# 不调用 flask-restful
# from app.api import student_api
# 调用 flask-restful
from app.api import student_resource

然后新建一个 models 文件夹,在这里保存数据库模型文件。

新建一个 book_model.py ,里面首先引用刚才定义好的 db ,定义一个类的时候要继承自 db.Model

from datetime import datetime
from sqlalchemy import Integer, String, TIMESTAMP
from sqlalchemy.orm import Mapped, mapped_column
from app import db
class BookModel(db.Model):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
    author: Mapped[str] = mapped_column(String(255), nullable=False)
    publish_time: Mapped[datetime] = mapped_column(TIMESTAMP, nullable=False)

上面的代码实现了一个数据库的映射类。

为了访问数据库,通常会定义一个 services 文件夹,在里面访问数据库。

新建一个 book_serivce.py 代码如下:

from app.models.book_model import BookModel
from app import db
class BookService:
    def get_book_by_id(self, book_id: int):
        return db.session.get(BookModel, book_id)

这个文件里只有一个操作,就是根据 book_id 将这本书的信息返回出来。

那么现在已经有了,现在需要写一个 API 将数据送出去。

回到 API 文件夹下,新建一个 book_resource.py 文件,代码如下:

from flask_restful import Resource
from app import api
from app.services.book_service import BookService
class BookResource(Resource):
    def get(self, book_id: int):
        book_model = BookService().get_book_by_id(book_id)
        if book_model:
            return book_model
        else:
            return {'error': f'Book not found for {book_id}'}, 404
api.add_resource(BookResource, '/books/<int:book_id>')

和上节的 student_resource.py 类似,但现在是接收的数据库返回的信息。

但通过 postman 发送 GET 请求发现会虽然通信成功但是有报错,仔细查看会发现返回的 book_model 不是 json 可序列化的格式。

我们回到 book_model.py 增加一个函数来格式化,并在 book_resourcereturn book_model.serialize()

def serialize(self):
        return {
            'id': self.id,
            'name': self.name,
            'author': self.author,
            'publish_time': self.publish_time.isoformat()   # 变成字符串发送
        }

这里的 publish_time 要转换为字符串格式或者其他的格式,因为 datetime 格式不能转换成 json 格式。

此外,这里也可以使用一些插件来进行格式化。

至此,已经完成了数据库的访问。

总结一下就是首先配置好库文件,用 models 文件夹下的文件进行数据库表的映射, services 文件夹下的文件直接访问数据库, api 文件夹下的文件来发送数据库返回的信息,也就是处理外部的请求。

# 增加和修改数据 API

之前写过简单的 GET 查询 API 和通过 API 访问数据库。那么现在来实现增加和修改服务端已存在的数据。

# 增加数据 API

实现添加数据的 API 有几个要点:

  1. API 请求数据提取
    调用 API 的人要添加的数据发给服务端,对于服务端要提取这些数据。
  2. 存入数据库
  3. 返回数据添加结果
    添加结果是否成功,错误要返回问题。

之前实现了 get() 查询一本书的信息,接下里以添加一本书为例子。

添加应该用 post() ,另外添加一本书应该是不知道 book_id 的,因此不应该继续用 /books/<int:book_id> 定义资源类,应该再定义一个资源类。

此外,还有一些操作是不需要 <book_id> 的:查询一堆书等等。

那么接下来再定义一个资源 BookListResource 来管理这些不需要 book_id 的操作。

from datetime import datetime
from flask import request
from flask_restful import Resource
from app import api
from app.models.book_model import BookModel
from app.services.book_service import BookService
class BookResource(Resource):
    def get(self, book_id: int):
        """
        根据book_id查询一本书的信息
        :param book_id: 书的id
        :return: id为<book_id>书的信息
        """
        book_model = BookService().get_book_by_id(book_id)
        if book_model:
            return book_model.serialize()
        else:
            return {'error': f'Book not found for {book_id}'}, 404
class BookListResource(Resource):
    def get(self):
        """
        查询所有书
        :return: 返回所有的书籍信息
        """
        book_list = BookService().get_all_books()
        # 转换成 json 格式
        return [book_model.serialize() for book_model in book_list]
    def post(self):
        """
        添加一本书
        :return: 添加书的信息
        """
        try:
            request_json = request.json
            if request_json:
                name = request_json.get('name', None)
                author = request_json.get('author', None)
                publish_time = datetime.fromisoformat(request_json.get('publish_time', None))
                book_model = BookModel(name=name, author=author, publish_time=publish_time)
                BookService().create_book(book_model)
                return book_model.serialize()
            else:
                return {'error': 'Please provide book info as a json'}, 400
        except Exception as error:
            return {'error': f'{error}'}, 400
api.add_resource(BookResource, '/books/<int:book_id>')
api.add_resource(BookListResource, '/books/')

新增的这个对象可以查询所有书和添加一本新的书,同时对 API 新增一个资源并分配 URL 为 /books/

Bug 未修复:如果添加相同名称的书籍,不会按照代码逻辑返回 400

同时对 book_service.py 进行修改,添加相关函数。

from sqlalchemy import Select, asc
from app.models.book_model import BookModel
from app import db
class BookService:
    def get_book_by_id(self, book_id: int):
        return db.session.get(BookModel, book_id)
    def get_all_books(self):
        query = Select(BookModel).order_by(asc(BookModel.name))
        return db.session.scalars(query).all()
    def create_book(self, book_model: BookModel):
        db.session.add(book_model)
        db.session.commit()
        return book_model

这之后在 postman 里新建一个 POST 请求,进行 API 测试,结果如下:

POST测试

# 修改数据 API

修改数据比添加要简单,因为修改是知道 id 的,因此会用 URL 为 books/<book_id> 这个资源,即 BookResource

在里面添加一个 put() 方法,这个函数的逻辑和刚才写的 post() 类似。

def put(self, book_id: int):
        """
        根据书的id修改书的信息
        :param book_id: 书的id
        :return: 修改后的信息
        """
        try:
            request_json = request.json
            if request_json:
                name = request_json.get('name', None)
                author = request_json.get('author', None)
                publish_time_str = request_json.get('publish_time', None)
                publish_time = datetime.fromisoformat(publish_time_str) if publish_time_str else None
                book_model = BookModel(id = book_id, name=name, author=author, publish_time=publish_time)
                book_model = BookService().update_book(book_model)
                return book_model.serialize()
            else:
                return {'error': 'Please provide book info as a json'}, 400
        except Exception as error:
            return {'error': f'{error}'}, 400

book_service.py 里添加 update_book() 方法:

def update_book(self, book_model: BookModel):
        exit_book = self.get_book_by_id(book_model.id)
        if not exit_book:
            raise Exception(f'Book not found with id: {book_model.id}')
        if book_model.name:
            exit_book.name = book_model.name
        if book_model.author:
            exit_book.author = book_model.author
        if book_model.publish_time:
            exit_book.publish_time = book_model.publish_time
        db.session.commit()
        return exit_book

# 删除数据 API

修改 book_resource.py

def delete(self, book_id: int):
        """
        根据id删除书
        :param book_id: 书的id
        :return: 提示是否删除成功
        """
        delete_count = BookService().delete_book_by_id(book_id)
        if delete_count > 0:
            return {'message': f'Book with id: {book_id} deleted successfully'}
        else:
            return {'error': f'Book not found for {book_id}'}, 404

修改 book_service.py :

def delete_book_by_id(self, book_id: int):
        delete_count = db.session.query(BookModel).filter(BookModel.id == book_id).delete()
        db.session.commit()
        return delete_count

# API 的身份认证

为什么要做身份认证?

通常情况下 API 都是给特定客户端使用的,只有符合身份的客户端才能对 API 进行调用。

普通的 web 应用一般用 session 来保存身份认证信息,但 RESTful API 是无状态的,收到的每一次请求都可能是新的请求,里面可能不包含 session_id。

身份认证有多种手段,这里只介绍一种常用的手段。

身份验证的实现

  • 实现一个登录 Login API
  • 当客户端执行 Login API 时,传入一个用户名和密码
  • 服务端收到验证后是争取的,生成一个加密的字符串发送给客户端。客户端没有私钥,无法解密
  • 客户端向其他 API 发送请求时要携带这个加密的字符串
  • 其他 API 验证是否合理合法

首先来实现 Login API。

增加一个 user_model.py 来实现数据库映射:

from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app import db
class UserModel(db.Model):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    username: Mapped[str] = mapped_column(String(128), nullable=False, unique=True)
    password: Mapped[str] = mapped_column(String(128), nullable=False)
    def serialize(self):
        return {
            'id': self.id,
            'username': self.username
        }

然后数据库操作在 user_service.py 中实现:

from sqlalchemy import Select
from app.models.user_model import UserModel
from app import db
class UserService:
    def login(self, username: str, password: str):
        query = Select(UserModel).where(UserModel.username == username)
        user_model = db.session.scalars(query).first()
        if user_model and user_model.password == password:
            return user_model
        else:
            return None

最后在 user_resource.py 中实现登录逻辑:

import jwt
from flask import request
from flask_restful import Resource
from app import api
from app.common.constants import LOGIN_SECRET
from app.services.user_service import UserService
class LoginResource(Resource):
    def post(self):
        """
        用户登录
        :return: 返回token
        """
        try:
            request_json = request.json
            if request_json:
                username = request_json.get('username', None)
                password = request_json.get('password', None)
                user_model = UserService().login(username, password)
                if user_model:
                    user_json = user_model.serialize()
                    # HS256 加密
                    # LOGIN_SECRET: 乱打的,只要不泄露即可
                    jwt_token = jwt.encode(user_json, LOGIN_SECRET, algorithm='HS256')
                    user_json['token'] = jwt_token
                    return user_json
                else:
                    return {'error': 'Username or password error'}, 401
            else:
                return {'error': 'Please provide username and password info as a json'}, 400
        except Exception as error:
            return {'error': f'{error}'}, 400
api.add_resource(LoginResource, '/login')

这个时候用 postman 发 POST 请求,如果成功则会将 token 返回,这个 token 就是要发给客户端,客户端要给服务端进行验证的东西。

然后我们以新增一本书为例,实现客户端发送 token 让服务端验证。

首先在新增一本书的 POST 请求头里添加 token 键值对,然后修改 book_resource.py :

def post(self):
        """
        添加一本书
        :return: 添加书的信息
        """
        jwt_token = request.headers.get('token', None)
        if not jwt_token:
            return {'error': 'User unauthorized'}, 401
        try:
            user_info = jwt.decode(jwt_token, LOGIN_SECRET, algorithms='HS256')
            if not user_info or not user_info.get('username', None):
                return {'error': 'User unauthorized'}, 401
        except Exception as error:
            return {'error': 'User unauthorized'}, 401
        
        try:
            request_json = request.json
            if request_json:
                name = request_json.get('name', None)
                author = request_json.get('author', None)
                publish_time = datetime.fromisoformat(request_json.get('publish_time', None))
                book_model = BookModel(name=name, author=author, publish_time=publish_time)
                BookService().create_book(book_model)
                return book_model.serialize()
            else:
                return {'error': 'Please provide book info as a json'}, 400
        except Exception as error:
            return {'error': f'{error}'}, 400

这样想要新增一本书都需要 token。

但在实际应用当中这个 jwt 验证应该抽离出来,我们新建 common/api_tool.py ,写一个装饰器,在里面进行身份验证。

from functools import wraps
import jwt
from flask import request
from app.common.constants import LOGIN_SECRET
def token_required():
    def check_token(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            jwt_token = request.headers.get('token', None)
            if not jwt_token:
                return {'error': 'User unauthorized'}, 401
            try:
                user_info = jwt.decode(jwt_token, LOGIN_SECRET, algorithms='HS256')
                if not user_info or not user_info.get('username', None):
                    return {'error': 'User unauthorized'}, 401
            except Exception as error:
                return {'error': 'User unauthorized'}, 401
            result = f(*args, **kwargs)
            return result
        return wrapper
    return check_token

哪个 API 如果要进行身份验证,就在前面加上装饰器 @token_required()

# 文件上传与下载 API

虽然用了 RESTful API,但还是有可能用到文件上传与下载的功能。

文件上传与下载依旧使用普通的 web 请求,但改用了 flask-restful 插件来完成。

文件上传使用 requestparaser 处理请求参数的输入。

在实现服务端 API 之前,做一下 postman 里客户端的请求。

因为 RESTful API 是基于 HTTP 协议的,所以发送请求还是用表单数据来完成。

客户端请求

那么服务端实现也很简单,首先先把获取附件路径的功能抽离出来,新建一个 common/utils.py

from pathlib import Path
def get_attachment_path():
    """
    获取附件路径
    :return:
    """
    home_path = Path(__file__).parent.parent
    attachment_path = home_path.joinpath("attachment")
    if not attachment_path.exists():
        attachment_path.mkdir(parents=True)
    return attachment_path

然后新建一个 attachment_resource.py 来实现上传文件的功能

from flask_restful import Resource, reqparse
from werkzeug.datastructures import FileStorage
from app.common import utils
from app import api
class AttachmentListResource(Resource):
    def __init__(self):
        # 初始化请求参数解析器
        self.parser = reqparse.RequestParser()
        # 添加请求参数
        self.parser.add_argument('attachment',
                                 type=FileStorage,
                                 location="files",
                                 help='Please provide attachment file',
                                 required=True)
    def post(self):
        """
        上传附件
        :return: 上传成功的附件信息
        """
        attachment_file = self.parser.parse_args().get("attachment")
        file_path = utils.get_attachment_path().joinpath(attachment_file.filename)
        attachment_file.save(file_path)
        return {'message': f'Attachment {attachment_file.filename} uploaded successfully'}
api.add_resource(AttachmentListResource, '/attachments')

重点在于 reqparse.RequestParser() 这个函数来分析请求里面的参数,表单上传的数据很适合用这个函数。如果是 json 的话直接 requst 解析即可。

下载文件比上传要更简单了,一行代码就能解决。

from flask import send_file
class AttachmentResource(Resource):
    def get(self, filename):
        """
        发送附件
        :return: 请求的附件
        """
        file_path = utils.get_attachment_path().joinpath(filename)
        return send_file()

# swagger 相关

Flask Swagger生成文档以及其使用