您好,登錄后才能下訂單哦!
這篇文章主要介紹了python執行SQL語句怎么實現,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
pip3 install directsql
directsql 目前只提供三個外部類
__all__=["SqlGenerator","MysqlConnection","MysqlPool"]
導入方式
from directsql.sqlgenerator import SqlGenerator #該類用于生成sql語句
#下面是一個池化連接對象MysqlPool 和一個簡單連接對象 MysqlConnector
from directsql.connector import MysqlConnection,MysqlConnector
# 1. 傳入有名參數
conn = MysqlConnection(host='127.0.0.1', port=3306, password='123456', database='test_base')
print(conn.database)
conn=MysqlPool(host='127.0.0.1', port=3306, password='123456', database='test_base')
# 也可使用 直接 參數字典
conn_args = {
'host': '127.0.0.1',
'port': 3306,
'password': '123456',
'database':'test_base',
}
conn = MysqlConnection(**conn_args)#單個連接
print(conn.database)
conn = MysqlPool(**conn_args) #池化連接對象
print(conn.database)
#2 直接使用 字符串
#以下字符串是常用的終端 連接命令
string_arg="mysql -uroot -h227.0.0.1 -P3306 -p123456 -Dtest_base"
conn = MysqlConnection(string_arg=string_arg)
print(conn.database)
conn = MysqlPool(string_arg=string_arg)
print(conn.database)
事實上directsql 封裝了 很多 語句。可以滿足很大一部分日常使用場景。但是如果有復雜的語句,仍然需要調用原生的sql 執行。而且directsql 中很多封裝好的方法是先拼接sql 再 調用該語句,所以這里還是先簡單介紹下,directsql 如何執行原生sql。
無論是 MysqlConnection 類 還是 MysqlPool 類 都通過 execute_sql 方法 來執行sql。
例如 :
id | name | age |
---|---|---|
1 | 羅輯 | 28 |
2 | 莊顏 | 25 |
3 | 葉文潔 | 54 |
4 | 程心 | 25 |
5 | 云天明 | 27 |
conn = MysqlConnection(string_arg="mysql -uroot -h227.0.0.1 -P3306 -p123456 -Dtest")
result,count=conn.execute_sql("select * from test_table ")
print(result)
print(count)
>>> ((1, '羅輯', '28'), (2, '莊顏', '25'), (3, '葉文潔', '54'), (4, '程心', '25'), (5, '云天明', '27'))
>>> 5
#這里默認是普通游標,你也可以指定使用字典游標:
result, count = conn.execute_sql("select * from test_table ", cursor_type='dict')
>>>[{'ID': 1, 'name': '羅輯', 'age': '28'}, {'ID': 2, 'name': '莊顏', 'age': '25'}, {'ID': 3, 'name': '葉文潔', 'age': '54'}, {'ID': 4, 'name': '程心', 'age': '25'}, {'ID': 5, 'name': '云天明', 'age': '27'}]
>>>5
execute_sql 方法 返回的是一個元組,(結果集,條數)
下文出現的所有方法無特殊說明都是返回元組,且支持dict游標
附帶參數執行語句
這里的參數使用起來和 pymysql 提供的 execute 以及executemany 沒有任何 差別,以下簡單提供幾個示例:
#傳元組
result,count=conn.execute_sql("select * from test_table where age=%s ",param=(25,))
#傳字典
result, count = conn.execute_sql("select * from test_table where age=%(age)s ", param={'age': 25})
#元組列表
result, count = conn.execute_sql("insert into test_table(`age`,`name`)values(%s,%s) ", param=[('宋運輝', 37), ('程開顏', 33)])
#字典列表
result, count = conn.execute_sql("insert into test_table(`age`,`name`)values(%(age)s,%(name)s) ",
param=[ {"name":"宋運輝",'age':37}, {"name":"程開顏",'age':33} ])
select 方法 可以接受多參數,參數列表如下。
def select(self, columns='id', table=None, where=None, group_by: str = None, order_by: str = None, limit: int = None, offset=None,cursor_type=None):
》》》 conn.select('*', 'test_table')
select id from test_table where age=25
》》》 conn.select('*', 'test_table', where={'age': 25})
select name,age from test_table where age=25 and id=2
多字段直接傳入字符串
》》》 conn.select("age,name", 'test_table', where={'age': 25,'id':2})
傳入列表/元組
》》》 conn.select(['age','name'], 'test_table', where={'age': 25,'id':2})
select * from test_table group by id order by age desc limit 1 offset 1
》》》conn.select('*', 'test_table', order_by='age desc',group_by='id',limit=1,offset=1)
select 功能看起來甚至不如直接寫原生sql 快,但是如果查詢條件是在不斷變化的,尤其是where條件,那么使用select 方法 會比自行拼接更方便。
例如,需要不斷地讀取一個字典變量,然后根據這個變量中的條件去查詢數據,而這個字典的鍵個數會變化,但是鍵都恰好是表的字段。這個時候使用select 方法會十分簡便,只需要令where參數等于那個字典即可。
平心而論,這個方法確實用處不大。
def insert_into(self, table, data: dict or list, columns=None, ignroe=False, on_duplicate_key_update: str = None, return_id=False):
該方法可以接受傳入字典或者 字典列表,并且可選 返回 游標影響的條數 或者是 新插入的數據的id。
columns 為空時,將取第一條數據的所有鍵,此時請確保所有數據鍵相同。
#傳入 字典
data_1 = {"age": 44, 'name': "雷東寶"}
count = conn.insert_into('test_table', data_1)#默認返回受影響條數
print(count) #
>>> 1
return_id = conn.insert_into('test_table', data_1,return_id=True)# 可選返回id
print(return_id)
>>>22533
#傳入字典列表
data_2={"age": 22, 'name': "宋運萍"}
all_data=[data_1,data_2]
count = conn.insert_into('test_table', all_data)
#限定 插入的字段。(字典有多字段,但是只需要往表里插入指定的字段時)
data_3= {"age": 44, 'name': "雷東寶","title":"村支書"} #title不需要,只要age和name
count = conn.insert_into('test_table', data_1,columns=["age","name"] )
#ignore 參數
data_1 = {"age": 44, 'name': "雷東寶","id":22539}
count = conn.insert_into('test_table',ignore=True )
print(count)
>>> 0 # 由于表中id 22539 已經存在,該條記錄不會插入,影響 0條數據
#on_duplicate_key_update 參數
data_1 = {"age": 44, 'name': "雷東寶","id":22539} #id=22539 已經存在
count = conn.insert_into('test_table', data_1,on_duplicate_key_update=' name="雷copy" ')
print(count)#返回影響條數
>>>2 #嘗試插入一條,但是發生重復,于是刪除新數據,并更新舊數據。實際上影響了兩條。
在insert_into 方法中提供了 on_duplicate_key_update 參數,但是實際上使用起來比較雞肋,需要自己傳入 on_duplicate_key_update 后的語句進行拼接。
如果你僅僅只是需要在發生重復時將舊數據的特定字段更新為新數據對應字段的值時。merge_into 方法更適合。
在 其他關系型數據庫中,提供有merge into 的語法,但是mysql 中沒有提供。 不過這里我們通過insert 和 on_duplicate_key_update 語法 封裝出了一個 類似merge_into 的方法。 該方法返回的是影響的條數
def* merge_into(self, table, data, columns=None, need_merge_columns: list = None):
columns 為空時,將取第一條數據的所有鍵,此時請確保所有數據鍵相同。
need_merge_columns 為在發生重復時需要替換(覆蓋)的字段。
data_1 = {"age": 44, 'name': "雷東寶","id":22539}
data_2={"age": 22, 'name': "宋運萍","id":22540}
all_data = [data_1, data_2,]
count=conn.merge_into('test_table',all_data,need_merge_columns=['name',])
print(count)
>>>4 #兩條數據正好都是重復的,插入兩條又刪除后修改兩條 ,返回4
該方法簡單,不做過多說明。該方法 返回的是影響的條數
def replace_into(self,table, data: dict or list, columns=None)
data_1 = {"age": 44, 'name': "雷東寶","id":22539}
data_2={"age": 22, 'name': "宋運萍","id":22540}
all_data = [data_1, data_2,]
count=conn.replace_into('test_table',all_data)
def update(self,table, data: dict, where, columns: None or list = None, limit=None):
該方法data 參數只接受傳入字典。該方法 返回的是影響的條數
data_1 = {"age": 44, 'name': "雷copy"}
count=conn.update('test_table',data_1,where={'id':22539}) #更新 id=22539的數據為 新的data_1
print(count)
>>>1
除此之外,還提供了一個衍生的方法
def update_by_primary(self, table, data: dict, pri_value, columns=None, primary: str = 'id'):
用于通過主鍵去更新數據。pri_value 即為主鍵的值。primary 為主鍵,默認為id
data_1 = {"age": 44, 'name': "雷cpy"}
count=conn.update_by_primary('test_table',data_1,pri_value=22539)
def delete_by_primary(self, table, pri_value, primary='id'):
"""
通過主鍵刪除數據
"""
def delete(self,table, where: str or dict, limit: int = 0):
"""
通過where條件刪除數據
"""
count=conn.delete('test_table',where={'name':'雷東寶'}) #刪除name=雷東寶的數據
count=conn.delete_by_primary('test_table',pri_value=22539) #刪除主鍵等于22539 的數據
def do_transaction(self, sql_params: list, cursor_type=None):
sql_params 為 元組列表。 【(sql_1,param_1),(sql_2,param_2】
如果sql 不需要參數也要傳入 None ,如 【(sql_1,None),】
sql_params = [
("update test_table set name=%(name)s where id=%(id)s ", {'name': '洛基', 'id': 22539}),
("update test_table set name=%(name)s where id=%(id)s ", {'name': 'mask', 'id': 22540}),
]
count=conn.do_transaction(sql_params)
>>>((), 1) #返回最后一條執行語句的 結果和影響條數
def read_ss_result(self, sql, param=None, cursor_type='ss'):
cursor_type 可選 ss 和 ssdict
注意,該方法返回的是 生成器對象,拿到結果需要不斷進行遍歷。
result=conn.read_ss_result("select * from test_table")
for data in result:
print(data)
感謝你能夠認真閱讀完這篇文章,希望小編分享的“python執行SQL語句怎么實現”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。