# 实现第一个 RESTful API
用 Flask 先写一个小 demo 来了解 RESTful API。
- URL 以以下规范显示
http://localhost:5000/students/1
其中, students
为表名,表面 API 是要对 students 进行请求。
1
表示表示,要对 id 号为 1 的 students 进行相关请求。
- 发送 GET 请求
结合 URL,说明要请求 id 号为 1 的学生信息。
- Response
送回 json 格式的数据,例如:
{ | |
"id": 1, | |
"name": "Jack", | |
"gender": "male" | |
} |
首先我们新建一个 student_api.py
,在里面填写如下代码:
from app import app | |
from flask import jsonify | |
@app.route('/students/<student_id>') | |
def get_student(student_id): | |
student = {'id': student_id, 'name': 'Jack', 'gender': 'male'} | |
return jsonify(student) |
上面的代码中我们创建了一个路由,这个就是进入的 URL 地址,符合我们的第一个要求。默认是 GET 请求。
因为信息应该是从数据库里取,但为了方便我们就先自己定义一个 json 格式的数据,然后返回。
在 postman 中我们发送一个请求,能够成功接收到刚才定义的 json 数据。
当然,GET 还可以发送请求,这个时候在 postman 的 body 里选 raw 格式,发送我们的 json 数据进行调试。
我们在之前的 python 文件中更新代码,并在第八行加入断点调试,可以接收到我们发送的请求。
from app import app | |
from flask import jsonify, request | |
@app.route('/students/<student_id>') | |
def get_student(student_id): | |
if request.is_json: | |
args = request.get_json() | |
student = {'id': student_id, 'name': 'Jack', 'gender': 'male'} | |
return jsonify(student) |
# 使用 flask-restful 开发 API
像初始化 Flask 的 app 一样来初始化 flask-restful 的 API,这个 api 来自于 falsk-restful。
from flask import Flask | |
from flask_restful import Api | |
app = Flask(__name__) | |
api = Api(app) | |
from app.api import student_resource |
在 flask-restful 里面,把对外提供 api 访问的信息称之为资源 (Resource)。
# 定义资源类
定义一个类,继承自 flask_restful 的 Resource。
在类里定义一个 get () 方法,这个 get () 方法将会成为 GET 请求 API 的执行者。
flask_restful 把这种概念都进行了一种转换封装,不需要再去定义很多的信息。
把之前的实现的那个 API,get_student () 重新实现。
from flask_restful import Resource | |
from app import api | |
class StudentResource(Resource): | |
def get(self, student_id: int): | |
return {'id': student_id, 'name': 'Jack', 'gender': 'male'} | |
def put(self, student_id: int): | |
return {'id': student_id, 'name': 'MarryJ', 'gender': 'female'} | |
api.add_resource(StudentResource, '/students/<int:student_id>') |
因为 StudentResource 继承自 Resource,这个类将 json 封装好了,所以可以直接返回 json 格式的对象。
api.add_resource(StudentResource, '/students/<int:student_id>')
这行代码的意思是,对外增加一个资源,不需要说明什么请求,因为请求 GET 会自动调用 get () 方法,请求 POST 会自动调用 post () 方法。调用的资源是 StudentResource
这个类, /students/<int:student_id>
是 URL 地址, <>
内的内容和 get () 方法里的一致,声明的类型也要一致。
这样用 postman 重新发送请求,就可以得到我们 return 的 json 对象。
当然我们也可以试试用 postman 发送 PUT 请求,最后能得到 put () 方法里返回的对象。
这样我们一个对象就能自动的根据前端调用的方式来处理,相比不用 flask-restful 的方法要方便。
注意,post () 方法如果这么写理论上也能得到请求,但不符合规范。
POST 请求是新建一个资源,这个资源 id 通常都是服务器来完成的,请求的时候还不知道 id。并且 POST 请求在 URL 不显示参数。
此外,通常情况下请求的资源不在数据库里,API 会报错信息。简单的展示一下:
理论上错误可以返回一个 json,客户端每次都解析错误的 json 这也是一种方法。
但 HTPP 有状态码,200 表示成功,通常 404 表示没有请求的资源。
所以对客户端来讲,先判断状态码是什么,如果有问题再分析结果里的信息。这也是一种方法。
from flask_restful import Resource | |
from app import api | |
class StudentResource(Resource): | |
def get(self, student_id: int): | |
if student_id == 1: | |
return {'id': student_id, 'name': 'Jack', 'gender': 'male'} | |
else: | |
return {'error': f'Student not found for {id}'}, 404 | |
def put(self, student_id: int): | |
return {'id': student_id, 'name': 'MarryJ', 'gender': 'female'} | |
api.add_resource(StudentResource, '/students/<int:student_id>') |
# 访问数据库
将数据库的数据通过 API 的方式暴露给外部。
连接 mysql 数据库使用 flask-sqlalchemy
这个库。
修改 __init__.py
这个文件为如下,相较于上一节增加了数据库连接的部分
from flask import Flask | |
from flask_restful import Api | |
from flask_sqlalchemy import SQLAlchemy | |
app = Flask(__name__) | |
api = Api(app) | |
# mysql://username:password@hostname/database | |
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqldb://root:root@10.211.55.3/restful_db' | |
db = SQLAlchemy(app) | |
# 不调用 flask-restful | |
# from app.api import student_api | |
# 调用 flask-restful | |
from app.api import student_resource |
然后新建一个 models
文件夹,在这里保存数据库模型文件。
新建一个 book_model.py
,里面首先引用刚才定义好的 db
,定义一个类的时候要继承自 db.Model
。
from datetime import datetime | |
from sqlalchemy import Integer, String, TIMESTAMP | |
from sqlalchemy.orm import Mapped, mapped_column | |
from app import db | |
class BookModel(db.Model): | |
__tablename__ = 'books' | |
id: Mapped[int] = mapped_column(Integer, primary_key=True) | |
name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True) | |
author: Mapped[str] = mapped_column(String(255), nullable=False) | |
publish_time: Mapped[datetime] = mapped_column(TIMESTAMP, nullable=False) |
上面的代码实现了一个数据库的映射类。
为了访问数据库,通常会定义一个 services
文件夹,在里面访问数据库。
新建一个 book_serivce.py
代码如下:
from app.models.book_model import BookModel | |
from app import db | |
class BookService: | |
def get_book_by_id(self, book_id: int): | |
return db.session.get(BookModel, book_id) |
这个文件里只有一个操作,就是根据 book_id 将这本书的信息返回出来。
那么现在已经有了,现在需要写一个 API 将数据送出去。
回到 API
文件夹下,新建一个 book_resource.py
文件,代码如下:
from flask_restful import Resource | |
from app import api | |
from app.services.book_service import BookService | |
class BookResource(Resource): | |
def get(self, book_id: int): | |
book_model = BookService().get_book_by_id(book_id) | |
if book_model: | |
return book_model | |
else: | |
return {'error': f'Book not found for {book_id}'}, 404 | |
api.add_resource(BookResource, '/books/<int:book_id>') |
和上节的 student_resource.py
类似,但现在是接收的数据库返回的信息。
但通过 postman 发送 GET 请求发现会虽然通信成功但是有报错,仔细查看会发现返回的 book_model
不是 json 可序列化的格式。
我们回到 book_model.py
增加一个函数来格式化,并在 book_resource
里 return book_model.serialize()
:
def serialize(self): | |
return { | |
'id': self.id, | |
'name': self.name, | |
'author': self.author, | |
'publish_time': self.publish_time.isoformat() # 变成字符串发送 | |
} |
这里的 publish_time
要转换为字符串格式或者其他的格式,因为 datetime 格式不能转换成 json 格式。
此外,这里也可以使用一些插件来进行格式化。
至此,已经完成了数据库的访问。
总结一下就是首先配置好库文件,用 models
文件夹下的文件进行数据库表的映射, services
文件夹下的文件直接访问数据库, api
文件夹下的文件来发送数据库返回的信息,也就是处理外部的请求。
# 增加和修改数据 API
之前写过简单的 GET 查询 API 和通过 API 访问数据库。那么现在来实现增加和修改服务端已存在的数据。
# 增加数据 API
实现添加数据的 API 有几个要点:
- API 请求数据提取
调用 API 的人要添加的数据发给服务端,对于服务端要提取这些数据。 - 存入数据库
- 返回数据添加结果
添加结果是否成功,错误要返回问题。
之前实现了 get()
查询一本书的信息,接下里以添加一本书为例子。
添加应该用 post()
,另外添加一本书应该是不知道 book_id
的,因此不应该继续用 /books/<int:book_id>
定义资源类,应该再定义一个资源类。
此外,还有一些操作是不需要 <book_id>
的:查询一堆书等等。
那么接下来再定义一个资源 BookListResource
来管理这些不需要 book_id
的操作。
from datetime import datetime | |
from flask import request | |
from flask_restful import Resource | |
from app import api | |
from app.models.book_model import BookModel | |
from app.services.book_service import BookService | |
class BookResource(Resource): | |
def get(self, book_id: int): | |
""" | |
根据book_id查询一本书的信息 | |
:param book_id: 书的id | |
:return: id为<book_id>书的信息 | |
""" | |
book_model = BookService().get_book_by_id(book_id) | |
if book_model: | |
return book_model.serialize() | |
else: | |
return {'error': f'Book not found for {book_id}'}, 404 | |
class BookListResource(Resource): | |
def get(self): | |
""" | |
查询所有书 | |
:return: 返回所有的书籍信息 | |
""" | |
book_list = BookService().get_all_books() | |
# 转换成 json 格式 | |
return [book_model.serialize() for book_model in book_list] | |
def post(self): | |
""" | |
添加一本书 | |
:return: 添加书的信息 | |
""" | |
try: | |
request_json = request.json | |
if request_json: | |
name = request_json.get('name', None) | |
author = request_json.get('author', None) | |
publish_time = datetime.fromisoformat(request_json.get('publish_time', None)) | |
book_model = BookModel(name=name, author=author, publish_time=publish_time) | |
BookService().create_book(book_model) | |
return book_model.serialize() | |
else: | |
return {'error': 'Please provide book info as a json'}, 400 | |
except Exception as error: | |
return {'error': f'{error}'}, 400 | |
api.add_resource(BookResource, '/books/<int:book_id>') | |
api.add_resource(BookListResource, '/books/') |
新增的这个对象可以查询所有书和添加一本新的书,同时对 API 新增一个资源并分配 URL 为 /books/
。
Bug 未修复:如果添加相同名称的书籍,不会按照代码逻辑返回 400
同时对 book_service.py
进行修改,添加相关函数。
from sqlalchemy import Select, asc | |
from app.models.book_model import BookModel | |
from app import db | |
class BookService: | |
def get_book_by_id(self, book_id: int): | |
return db.session.get(BookModel, book_id) | |
def get_all_books(self): | |
query = Select(BookModel).order_by(asc(BookModel.name)) | |
return db.session.scalars(query).all() | |
def create_book(self, book_model: BookModel): | |
db.session.add(book_model) | |
db.session.commit() | |
return book_model |
这之后在 postman 里新建一个 POST 请求,进行 API 测试,结果如下:
# 修改数据 API
修改数据比添加要简单,因为修改是知道 id 的,因此会用 URL 为 books/<book_id>
这个资源,即 BookResource
。
在里面添加一个 put()
方法,这个函数的逻辑和刚才写的 post()
类似。
def put(self, book_id: int): | |
""" | |
根据书的id修改书的信息 | |
:param book_id: 书的id | |
:return: 修改后的信息 | |
""" | |
try: | |
request_json = request.json | |
if request_json: | |
name = request_json.get('name', None) | |
author = request_json.get('author', None) | |
publish_time_str = request_json.get('publish_time', None) | |
publish_time = datetime.fromisoformat(publish_time_str) if publish_time_str else None | |
book_model = BookModel(id = book_id, name=name, author=author, publish_time=publish_time) | |
book_model = BookService().update_book(book_model) | |
return book_model.serialize() | |
else: | |
return {'error': 'Please provide book info as a json'}, 400 | |
except Exception as error: | |
return {'error': f'{error}'}, 400 |
在 book_service.py
里添加 update_book()
方法:
def update_book(self, book_model: BookModel): | |
exit_book = self.get_book_by_id(book_model.id) | |
if not exit_book: | |
raise Exception(f'Book not found with id: {book_model.id}') | |
if book_model.name: | |
exit_book.name = book_model.name | |
if book_model.author: | |
exit_book.author = book_model.author | |
if book_model.publish_time: | |
exit_book.publish_time = book_model.publish_time | |
db.session.commit() | |
return exit_book |
# 删除数据 API
修改 book_resource.py
:
def delete(self, book_id: int): | |
""" | |
根据id删除书 | |
:param book_id: 书的id | |
:return: 提示是否删除成功 | |
""" | |
delete_count = BookService().delete_book_by_id(book_id) | |
if delete_count > 0: | |
return {'message': f'Book with id: {book_id} deleted successfully'} | |
else: | |
return {'error': f'Book not found for {book_id}'}, 404 |
修改 book_service.py
:
def delete_book_by_id(self, book_id: int): | |
delete_count = db.session.query(BookModel).filter(BookModel.id == book_id).delete() | |
db.session.commit() | |
return delete_count |
# API 的身份认证
为什么要做身份认证?
通常情况下 API 都是给特定客户端使用的,只有符合身份的客户端才能对 API 进行调用。
普通的 web 应用一般用 session 来保存身份认证信息,但 RESTful API 是无状态的,收到的每一次请求都可能是新的请求,里面可能不包含 session_id。
身份认证有多种手段,这里只介绍一种常用的手段。
- 实现一个登录 Login API
- 当客户端执行 Login API 时,传入一个用户名和密码
- 服务端收到验证后是争取的,生成一个加密的字符串发送给客户端。客户端没有私钥,无法解密
- 客户端向其他 API 发送请求时要携带这个加密的字符串
- 其他 API 验证是否合理合法
首先来实现 Login API。
增加一个 user_model.py
来实现数据库映射:
from sqlalchemy import Integer, String | |
from sqlalchemy.orm import Mapped, mapped_column | |
from app import db | |
class UserModel(db.Model): | |
__tablename__ = 'users' | |
id: Mapped[int] = mapped_column(Integer, primary_key=True) | |
username: Mapped[str] = mapped_column(String(128), nullable=False, unique=True) | |
password: Mapped[str] = mapped_column(String(128), nullable=False) | |
def serialize(self): | |
return { | |
'id': self.id, | |
'username': self.username | |
} |
然后数据库操作在 user_service.py
中实现:
from sqlalchemy import Select | |
from app.models.user_model import UserModel | |
from app import db | |
class UserService: | |
def login(self, username: str, password: str): | |
query = Select(UserModel).where(UserModel.username == username) | |
user_model = db.session.scalars(query).first() | |
if user_model and user_model.password == password: | |
return user_model | |
else: | |
return None |
最后在 user_resource.py
中实现登录逻辑:
import jwt | |
from flask import request | |
from flask_restful import Resource | |
from app import api | |
from app.common.constants import LOGIN_SECRET | |
from app.services.user_service import UserService | |
class LoginResource(Resource): | |
def post(self): | |
""" | |
用户登录 | |
:return: 返回token | |
""" | |
try: | |
request_json = request.json | |
if request_json: | |
username = request_json.get('username', None) | |
password = request_json.get('password', None) | |
user_model = UserService().login(username, password) | |
if user_model: | |
user_json = user_model.serialize() | |
# HS256 加密 | |
# LOGIN_SECRET: 乱打的,只要不泄露即可 | |
jwt_token = jwt.encode(user_json, LOGIN_SECRET, algorithm='HS256') | |
user_json['token'] = jwt_token | |
return user_json | |
else: | |
return {'error': 'Username or password error'}, 401 | |
else: | |
return {'error': 'Please provide username and password info as a json'}, 400 | |
except Exception as error: | |
return {'error': f'{error}'}, 400 | |
api.add_resource(LoginResource, '/login') |
这个时候用 postman 发 POST 请求,如果成功则会将 token 返回,这个 token 就是要发给客户端,客户端要给服务端进行验证的东西。
然后我们以新增一本书为例,实现客户端发送 token 让服务端验证。
首先在新增一本书的 POST 请求头里添加 token 键值对,然后修改 book_resource.py
:
def post(self): | |
""" | |
添加一本书 | |
:return: 添加书的信息 | |
""" | |
jwt_token = request.headers.get('token', None) | |
if not jwt_token: | |
return {'error': 'User unauthorized'}, 401 | |
try: | |
user_info = jwt.decode(jwt_token, LOGIN_SECRET, algorithms='HS256') | |
if not user_info or not user_info.get('username', None): | |
return {'error': 'User unauthorized'}, 401 | |
except Exception as error: | |
return {'error': 'User unauthorized'}, 401 | |
try: | |
request_json = request.json | |
if request_json: | |
name = request_json.get('name', None) | |
author = request_json.get('author', None) | |
publish_time = datetime.fromisoformat(request_json.get('publish_time', None)) | |
book_model = BookModel(name=name, author=author, publish_time=publish_time) | |
BookService().create_book(book_model) | |
return book_model.serialize() | |
else: | |
return {'error': 'Please provide book info as a json'}, 400 | |
except Exception as error: | |
return {'error': f'{error}'}, 400 |
这样想要新增一本书都需要 token。
但在实际应用当中这个 jwt 验证应该抽离出来,我们新建 common/api_tool.py
,写一个装饰器,在里面进行身份验证。
from functools import wraps | |
import jwt | |
from flask import request | |
from app.common.constants import LOGIN_SECRET | |
def token_required(): | |
def check_token(f): | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
jwt_token = request.headers.get('token', None) | |
if not jwt_token: | |
return {'error': 'User unauthorized'}, 401 | |
try: | |
user_info = jwt.decode(jwt_token, LOGIN_SECRET, algorithms='HS256') | |
if not user_info or not user_info.get('username', None): | |
return {'error': 'User unauthorized'}, 401 | |
except Exception as error: | |
return {'error': 'User unauthorized'}, 401 | |
result = f(*args, **kwargs) | |
return result | |
return wrapper | |
return check_token |
哪个 API 如果要进行身份验证,就在前面加上装饰器 @token_required()
。
# 文件上传与下载 API
虽然用了 RESTful API,但还是有可能用到文件上传与下载的功能。
文件上传与下载依旧使用普通的 web 请求,但改用了 flask-restful 插件来完成。
文件上传使用 requestparaser
处理请求参数的输入。
在实现服务端 API 之前,做一下 postman 里客户端的请求。
因为 RESTful API 是基于 HTTP 协议的,所以发送请求还是用表单数据来完成。
那么服务端实现也很简单,首先先把获取附件路径的功能抽离出来,新建一个 common/utils.py
from pathlib import Path | |
def get_attachment_path(): | |
""" | |
获取附件路径 | |
:return: | |
""" | |
home_path = Path(__file__).parent.parent | |
attachment_path = home_path.joinpath("attachment") | |
if not attachment_path.exists(): | |
attachment_path.mkdir(parents=True) | |
return attachment_path |
然后新建一个 attachment_resource.py
来实现上传文件的功能
from flask_restful import Resource, reqparse | |
from werkzeug.datastructures import FileStorage | |
from app.common import utils | |
from app import api | |
class AttachmentListResource(Resource): | |
def __init__(self): | |
# 初始化请求参数解析器 | |
self.parser = reqparse.RequestParser() | |
# 添加请求参数 | |
self.parser.add_argument('attachment', | |
type=FileStorage, | |
location="files", | |
help='Please provide attachment file', | |
required=True) | |
def post(self): | |
""" | |
上传附件 | |
:return: 上传成功的附件信息 | |
""" | |
attachment_file = self.parser.parse_args().get("attachment") | |
file_path = utils.get_attachment_path().joinpath(attachment_file.filename) | |
attachment_file.save(file_path) | |
return {'message': f'Attachment {attachment_file.filename} uploaded successfully'} | |
api.add_resource(AttachmentListResource, '/attachments') |
重点在于 reqparse.RequestParser()
这个函数来分析请求里面的参数,表单上传的数据很适合用这个函数。如果是 json 的话直接 requst 解析即可。
下载文件比上传要更简单了,一行代码就能解决。
from flask import send_file | |
class AttachmentResource(Resource): | |
def get(self, filename): | |
""" | |
发送附件 | |
:return: 请求的附件 | |
""" | |
file_path = utils.get_attachment_path().joinpath(filename) | |
return send_file() |