您好,登錄后才能下訂單哦!
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from info import constants
from . import db
class BaseModel(object):
"""模型基類,為每個模型補充創建時間與更新時間"""
create_time = db.Column(db.DateTime, default=datetime.now) # 記錄的創建時間
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now) # 記錄的更新時間
# 用戶收藏表,建立用戶與其收藏新聞多對多的關系
tb_user_collection = db.Table(
"info_user_collection",
db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True), # 新聞編號
db.Column("news_id", db.Integer, db.ForeignKey("info_news.id"), primary_key=True), # 分類編號
db.Column("create_time", db.DateTime, default=datetime.now) # 收藏創建時間
)
tb_user_follows = db.Table(
"info_user_fans",
db.Column('follower_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True), # 粉絲id
db.Column('followed_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True) # 被關注人的id
)
class User(BaseModel, db.Model):
"""用戶"""
__tablename__ = "info_user"
id = db.Column(db.Integer, primary_key=True) # 用戶編號
nick_name = db.Column(db.String(32), unique=True, nullable=False) # 用戶昵稱
password_hash = db.Column(db.String(128), nullable=False) # 加密的密碼
mobile = db.Column(db.String(11), unique=True, nullable=False) # 手機號
avatar_url = db.Column(db.String(256)) # 用戶頭像路徑
last_login = db.Column(db.DateTime, default=datetime.now) # 最后一次登錄時間
is_admin = db.Column(db.Boolean, default=False)
signature = db.Column(db.String(512)) # 用戶簽名
gender = db.Column( # 訂單的狀態
db.Enum(
"MAN", # 男
"WOMAN" # 女
),
default="MAN")
# 當前用戶收藏的所有新聞
collection_news = db.relationship("News", secondary=tb_user_collection, lazy="dynamic") # 用戶收藏的新聞
# 用戶所有的粉絲,添加了反向引用followed,代表用戶都關注了哪些人
followers = db.relationship('User',
secondary=tb_user_follows,
primaryjoin=id == tb_user_follows.c.followed_id,
secondaryjoin=id == tb_user_follows.c.follower_id,
backref=db.backref('followed', lazy='dynamic'),
lazy='dynamic')
# 當前用戶所發布的新聞
news_list = db.relationship('News', backref='user', lazy='dynamic')
@property
def password(self):
raise AttributeError("當前屬性不可讀")
@password.setter
def password(self, value):
self.password_hash = generate_password_hash(value)
def check_passowrd(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"avatar_url": constants.QINIU_DOMIN_PREFIX + self.avatar_url if self.avatar_url else "",
"mobile": self.mobile,
"gender": self.gender if self.gender else "MAN",
"signature": self.signature if self.signature else "",
"followers_count": self.followers.count(),
"news_count": self.news_list.count()
}
return resp_dict
def to_admin_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"mobile": self.mobile,
"register": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"last_login": self.last_login.strftime("%Y-%m-%d %H:%M:%S"),
}
return resp_dict
class News(BaseModel, db.Model):
"""新聞"""
__tablename__ = "info_news"
id = db.Column(db.Integer, primary_key=True) # 新聞編號
title = db.Column(db.String(256), nullable=False) # 新聞標題
source = db.Column(db.String(64), nullable=False) # 新聞來源
digest = db.Column(db.String(512), nullable=False) # 新聞摘要
content = db.Column(db.Text, nullable=False) # 新聞內容
clicks = db.Column(db.Integer, default=0) # 瀏覽量
index_image_url = db.Column(db.String(256)) # 新聞列表圖片路徑
category_id = db.Column(db.Integer, db.ForeignKey("info_category.id"))
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id")) # 當前新聞的作者id
status = db.Column(db.Integer, default=0) # 當前新聞狀態 如果為0代表審核通過,1代表審核中,-1代表審核不通過
reason = db.Column(db.String(256)) # 未通過原因,status = -1 的時候使用
# 當前新聞的所有評論
comments = db.relationship("Comment", lazy="dynamic")
def to_review_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"status": self.status,
"reason": self.reason if self.reason else ""
}
return resp_dict
def to_basic_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"index_image_url": self.index_image_url,
"clicks": self.clicks,
}
return resp_dict
def to_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"comments_count": self.comments.count(),
"clicks": self.clicks,
"category": self.category.to_dict(),
"index_image_url": self.index_image_url,
"author": self.user.to_dict() if self.user else None
}
return resp_dict
class Comment(BaseModel, db.Model):
"""評論"""
__tablename__ = "info_comment"
id = db.Column(db.Integer, primary_key=True) # 評論編號
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id"), nullable=False) # 用戶id
news_id = db.Column(db.Integer, db.ForeignKey("info_news.id"), nullable=False) # 新聞id
content = db.Column(db.Text, nullable=False) # 評論內容
parent_id = db.Column(db.Integer, db.ForeignKey("info_comment.id")) # 父評論id
parent = db.relationship("Comment", remote_side=[id]) # 自關聯
like_count = db.Column(db.Integer, default=0) # 點贊條數
def to_dict(self):
resp_dict = {
"id": self.id,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"parent": self.parent.to_dict() if self.parent else None,
"user": User.query.get(self.user_id).to_dict(),
"news_id": self.news_id,
"like_count": self.like_count
}
return resp_dict
class CommentLike(BaseModel, db.Model):
"""評論點贊"""
__tablename__ = "info_comment_like"
comment_id = db.Column("comment_id", db.Integer, db.ForeignKey("info_comment.id"), primary_key=True) # 評論編號
user_id = db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True) # 用戶編號
class Category(BaseModel, db.Model):
"""新聞分類"""
__tablename__ = "info_category"
id = db.Column(db.Integer, primary_key=True) # 分類編號
name = db.Column(db.String(64), nullable=False) # 分類名
news_list = db.relationship('News', backref='category', lazy='dynamic')
def to_dict(self):
resp_dict = {
"id": self.id,
"name": self.name
}
return resp_dict
####################################
在Flask-SQLAlchemy中,插入、修改、刪除操作,均由數據庫會話管理。
會話用 db.session 表示。在準備把數據寫入數據庫前,要先將數據添加到會話中然后調用 commit() 方法提交會話。
在 Flask-SQLAlchemy 中,查詢操作是通過 query 對象操作數據。
最基本的查詢是返回表中所有數據,可以通過過濾器進行更精確的數據庫查詢。
from flask import Flaskfrom flask_sqlalchemy import SQLAlchemy app = Flask(__name__)#設置連接數據庫的URLapp.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True#查詢時會顯示原始SQL語句app.config['SQLALCHEMY_ECHO'] = Truedb = SQLAlchemy(app)class Role(db.Model): # 定義表名 __tablename__ = 'roles' # 定義列對象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) us = db.relationship('User', backref='role') #repr()方法顯示一個可讀字符串 def __repr__(self): return 'Role:%s'% self.nameclass User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, index=True) email = db.Column(db.String(64),unique=True) password = db.Column(db.String(64)) role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) def __repr__(self): return 'User:%s'%self.nameif __name__ == '__main__': app.run(debug=True)
class Role(db.Model): ... #關鍵代碼 us = db.relationship('User', backref='role', lazy='dynamic') ...class User(db.Model): ... role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
其中realtionship描述了Role和User的關系。在此文中,第一個參數為對應參照的類"User"
第二個參數backref為類User申明新屬性的方法
第三個參數lazy決定了什么時候SQLALchemy從數據庫中加載數據
設置為 dynamic 的話,role.users 返回查詢對象,并沒有做真正的查詢,可以利用查詢對象做其他邏輯,比如:先排序再返回結果
設置為 subquery 的話,role.users 返回所有數據列表
如果設置為子查詢方式(subquery),則會在加載完Role對象后,就立即加載與其關聯的對象,這樣會讓總查詢數量減少,但如果返回的條目數量很多,就會比較慢
另外,也可以設置為動態方式(dynamic),這樣關聯對象會在被使用的時候再進行加載,并且在返回前進行過濾,如果返回的對象數很多,或者未來會變得很多,那最好采用這種方式
過濾器 | 說明 |
---|---|
filter() | 把過濾器添加到原查詢上,返回一個新查詢 |
filter_by() | 把等值過濾器添加到原查詢上,返回一個新查詢 |
limit | 使用指定的值限定原查詢返回的結果 |
offset() | 偏移原查詢返回的結果,返回一個新查詢 |
order_by() | 根據指定條件對原查詢結果進行排序,返回一個新查詢 |
group_by() | 根據指定條件對原查詢結果進行分組,返回一個新查詢 |
方法 | 說明 |
---|---|
all() | 以列表形式返回查詢的所有結果 |
first() | 返回查詢的第一個結果,如果未查到,返回None |
first_or_404() | 返回查詢的第一個結果,如果未查到,返回404 |
get() | 返回指定主鍵對應的行,如不存在,返回None |
get_or_404() | 返回指定主鍵對應的行,如不存在,返回404 |
count() | 返回查詢結果的數量 |
paginate() | 返回一個Paginate對象,它包含指定范圍內的結果 |
db.create_all()
db.drop_all()
ro1 = Role(name='admin') db.session.add(ro1) db.session.commit() #再次插入一條數據 ro2 = Role(name='user') db.session.add(ro2) db.session.commit()
us1 = User(name='wang',email='wang@163.com',password='123456',role_id=ro1.id) us2 = User(name='zhang',email='zhang@189.com',password='201512',role_id=ro2.id) us3 = User(name='chen',email='chen@126.com',password='987654',role_id=ro2.id) us4 = User(name='zhou',email='zhou@163.com',password='456789',role_id=ro1.id) us5 = User(name='tang',email='tang@itheima.com',password='158104',role_id=ro2.id) us6 = User(name='wu',email='wu@gmail.com',password='5623514',role_id=ro2.id) us7 = User(name='qian',email='qian@gmail.com',password='1543567',role_id=ro1.id) us8 = User(name='liu',email='liu@itheima.com',password='867322',role_id=ro1.id) us9 = User(name='li',email='li@163.com',password='4526342',role_id=ro2.id) us10 = User(name='sun',email='sun@163.com',password='235523',role_id=ro2.id) db.session.add_all([us1,us2,us3,us4,us5,us6,us7,us8,us9,us10]) db.session.commit()""" 查詢所有用戶數據 查詢有多少個用戶 查詢第1個用戶 查詢id為4的用戶[3種方式] 查詢名字結尾字符為g的所有數據[開始/包含] 查詢名字不等于wang的所有數據[2種方式] 查詢名字和郵箱都以 li 開頭的所有數據[2種方式] 查詢password是 `123456` 或者 `email` 以 `itheima.com` 結尾的所有數據 查詢id為 [1, 3, 5, 7, 9] 的用戶列表 查詢name為liu的角色數據 查詢所有用戶數據,并以郵箱排序 每頁3個,查詢第2頁的數據 """
返回名字等于wang的所有人
User.query.filter_by(name='wang').all()
User.query.first()
User.query.all()
User.query.filter(User.name.endswith('g')).all()
User.query.get()
User.query.filter(User.name!='wang').all()
from sqlalchemy import not_ User.query.filter(not_(User.name=='chen')).all()
from sqlalchemy import and_ User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()
from sqlalchemy import or_ User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
user = User.query.first() db.session.delete(user) db.session.commit() User.query.all()
user = User.query.first() user.name = 'dong'db.session.commit() User.query.first()
角色和用戶的關系是一對多的關系,一個角色可以有多個用戶,一個用戶只能屬于一個角色。
查詢角色的所有用戶
#查詢roles表id為1的角色ro1 = Role.query.get(1)#查詢該角色的所有用戶ro1.us.all()
查詢用戶所屬角色
#查詢users表id為3的用戶us1 = User.query.get(3)#查詢用戶屬于什么角色us1.role
################################################
在開發過程中,需要修改數據庫模型,而且還要在修改之后更新數據庫。最直接的方式就是刪除舊表,但這樣會丟失數據。
更好的解決辦法是使用數據庫遷移框架,它可以追蹤數據庫模式的變化,然后把變動應用到數據庫中。
在Flask中可以使用Flask-Migrate擴展,來實現數據遷移。并且集成到Flask-Script中,所有操作通過命令就能完成。
為了導出數據庫遷移命令,Flask-Migrate提供了一個MigrateCommand類,可以附加到flask-script的manager對象上。
首先要在虛擬環境中安裝Flask-Migrate。
pip install flask-migrate
代碼文件內容:
#coding=utf-8from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_migrate import Migrate,MigrateCommandfrom flask_script import Shell,Manager app = Flask(__name__) manager = Manager(app) app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/Flask_test'app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = Trueapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Truedb = SQLAlchemy(app)#第一個參數是Flask的實例,第二個參數是Sqlalchemy數據庫實例migrate = Migrate(app,db) #manager是Flask-Script的實例,這條語句在flask-Script中添加一個db命令manager.add_command('db',MigrateCommand)#定義模型Roleclass Role(db.Model): # 定義表名 __tablename__ = 'roles' # 定義列對象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) user = db.relationship('User', backref='role') #repr()方法顯示一個可讀字符串, def __repr__(self): return 'Role:'.format(self.name)#定義用戶class User(db.Model): __talbe__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) #設置外鍵 role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) def __repr__(self): return 'User:'.format(self.username)if __name__ == '__main__': manager.run()
#這個命令會創建migrations文件夾,所有遷移文件都放在里面。 python database.py db init
自動創建遷移腳本有兩個函數
upgrade():函數把遷移中的改動應用到數據庫中。
downgrade():函數則將改動刪除。
自動創建的遷移腳本會根據模型定義和數據庫當前狀態的差異,生成upgrade()和downgrade()函數的內容。
對比不一定完全正確,有可能會遺漏一些細節,需要進行檢查
python database.py db migrate -m 'initial migration'
python database.py db upgrade
可以根據history命令找到版本號,然后傳給downgrade命令:
python app.py db history 輸出格式:<base> -> 版本號 (head), initial migration
回滾到指定版本
python app.py db downgrade 版本號
1.python 文件 db init
2.python 文件 db migrate -m"版本名(注釋)"
3.python 文件 db upgrade 然后觀察表結構
4.根據需求修改模型
5.python 文件 db migrate -m"新版本名(注釋)"
6.python 文件 db upgrade 然后觀察表結構
7.若返回版本,則利用 python 文件 db history查看版本號
8.python 文件 db downgrade(upgrade) 版本號
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。