1. sanicdb简介
SanicDB是为python的异步web框架sanic方便操作mysql(mariadb)而开发的工具,是对aiomysql.Pool的轻量级封装。Sanic是异步IO的web框架,用异步IO的方式读取mysql也才能更加发挥它的效率。
备注:sanicdb也可以用于其他任何需要异步IO操作mysql的地方,不仅仅限于sanic中
github:https://github.com/veelion/sanicdb
安装
pip install sanicdb
2. 初始化
1 | class SanicDB: |
前4个参数是对应mysql数据库的。
loop是应用程序中的事件循环(event_loop),默认为None,则程序会自动调用asyncio.get_event_loop()创建。此外,sanic和loop两者只需要提供一个即可,当同时存在时,了解数据库其实用的是sanic中的loop。可以在初始化中看到
1 | async def init_pool(self): |
- minsize: 最小连接数量
- maxsize:最大连接数量
- return_dict : 是返回的数据一条记录为一个dict,key是MySQL表的字段名,value是字段的值
- pool_recycle :连接池重连的时间间隔,MySQL默认的连接闲置时间是8小时(我们服务器目前已改为24小时)
- autocommit:是否自动提交
3. 函数定义
sanicdb提供了6个功能函数。
query
功能如名,查询数据库。
1
2async def query(self, query, *parameters, **kwparameters):
"""Returns a row list for the given query and parameters."""get
查询数据库,只返回一条数据。
1
2
3async def get(self, query, *parameters, **kwparameters):
"""Returns the (singular) row returned by the given query.
"""execute
执行操作,返回受影响的行ID
1
2async def execute(self, query, *parameters, **kwparameters):
"""Executes the given query, returning the lastrowid from the query."""table_has
检查一个table中是否含有某字段为某值的记录
1
async def table_has(self, table_name, field, value):
table_insert
插入数据库。
1
2async def table_insert(self, table_name, item, ignore_duplicated=True):
'''item is a dict : key is mysql table field'''table_update
更新数据库
1
2
3async def table_update(self, table_name, updates,
field_where, value_where):
'''updates is a dict of {field_update:value_update}'''
4. 示例
普通的异步mysql例子,参照官网给出的test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import asyncio
from sanicdb import SanicDB
async def test(loop):
db = SanicDB('localhost', 'testdb', 'root', 'the_password',
minsize=3, maxsize=5,
connect_timeout=5,
loop=loop)
sql = 'Drop table test'
await db.execute(sql)
sql = """CREATE TABLE `test` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`name` varchar(16) NOT NULL,
`content` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`)
) ENGINE=MyISAM ;"""
await db.execute(sql)
sql = 'select * from test where name = %s'
data = await db.query(sql, 'abc')
print('query():', data)
sql += ' limit 1'
d = await db.get(sql, 'abc')
print('get():', d)
sql = 'delete from test where name=%s'
lastrowid = await db.execute(sql, 'xyz')
print('execute(delete...):', lastrowid)
sql = 'insert into test set name=%s, content=%s'
lastrowid = await db.execute(sql, 'xyz', '456')
print('execute(insert...):', lastrowid)
ret = await db.table_has('test', 'name', 'abc')
print('has(): ', ret)
ret = await db.table_update('test', {'content': 'updated'},
'name', 'abc')
print('update():', ret)
sql = 'select * from test where name = %s'
data = await db.query(sql, 'abc')
print('query():', data)
item = {
'name': 'abc',
'content': '123'
}
i = 0
while 1:
i += 1
if i % 2 == 0:
lastid = await db.table_insert('test', item, ignore_duplicated=False)
else:
lastid = await db.table_insert('test', item)
print('insert():', lastid)
await asyncio.sleep(1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test(loop))sanic web中异步插叙mysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35# author: sunshine
# datetime: 2020/10/29 10:12
from sanic import Sanic
from sanic_cors import CORS
from sanic.response import json
from sanicdb import SanicDB
app = Sanic(__name__)
# 支持跨域
CORS(app)
db = SanicDB(
host='172.27.0.6',
database='privacy',
user='root',
password='PdiCond$402875432',
sanic=app,
maxsize=18
)
async def simple_query(request):
"""
对数据库的简单查询测试
:param request:
:return:
"""
sql_str = 'select * from employee'
num = 0
data = await app.db.query(sql_str)
num += len(data)
return json({"查询条数": num})
if __name__ == '__main__':
app.run('0.0.0.0', port=5067)