编写python Web框架


异步IO

异步IO是指当程序需要执行一个耗时的IO操作时,它只发出IO操作指令,并不等待IO结果,然后就去执行其他代码了,当IO返回结果时,再通知CPU进行处理。

异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复”读取消息-处理消息”这一过程:

1
2
3
4
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)

如果是使用同步IO模型,在”发出IO请求”到收到”IO完成”的这段时间里,主线程只能挂起,但在异步IO模型下,主线程并没有挂起,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程

协程,又称微线程,纤程。英文名Coroutine。

协程看上去就像子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

1
2
3
4
5
6
7
8
9
def A():
print('1')
print('2')
print('3')

def B():
print('x')
print('y')
print('z')

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,但是在A中是没有调用B的,结果可能是:

1
2
x
y
3
z

在Python中协程是通过generator来实现的。

下面的代码,使用协程实现生产者-消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'


def producer(c):
c.send(None)

n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consuming return: %s...' % r)
c.close()


if __name__ == '__main__':
c = consumer()
producer(c)

输出结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consuming return: 200 OK...
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consuming return: 200 OK...
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consuming return: 200 OK...
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consuming return: 200 OK...
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consuming return: 200 OK...

consumer函数是一个generator,把一个consumer传入producer后:

  1. producer首先调用c.send(None)启动生成器

  2. 一旦生成了东西,通过c.send(n) 切换到consumer执行

  3. consumer通过yield拿到消息,处理,又通过yield把结果传回

  4. producer拿到consumer处理的结果,继续生产下一条消息

  5. producer决定不生产了,通过c.close()关闭consumer,整个过程结束。

asyncio

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())


if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutinehello生成器标记为coroutine类型,用tasks封装两个hello协程,把这两个协程扔到EventLoop中执行。asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待。程序最终输出以下结果:

Hello world! (<_MainThread(MainThread, started 5476)>)
Hello world! (<_MainThread(MainThread, started 5476)>)
(暂停约1秒)
Hello again! (<_MainThread(MainThread, started 5476)>)
Hello again! (<_MainThread(MainThread, started 5476)>)

由打印的当前线程名称可以看出,两个coroutine是由同一个线程并发执行的。
如果把asyncio.sleep(1)换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。

async/await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

请注意,asyncawait是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  1. @asyncio.coroutine替换为async
  2. yield from替换为await

使用async/await替换上面hello协程:

1
2
3
4
async def hello():
print('Hello world! (%s)' % threading.currentThread())
await asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())

aiohttp

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

安装aiohttp:

pip install aiohttp

编写一个HTTP服务器,分别处理以下URL:

  • /- 首页返回b'<h1>Index</h1>'
  • /hello/{name} - 根据URL参数返回文本hello, %s!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
from aiohttp import web


async def index(request):
resp = web.Response(body=b'<h1>Index</h1>')
resp.content_type = 'text/html;charset=utf-8'
return resp


async def hello(request):
text = '<h1>hello, %s</h1>' % request.match_info['name']
resp = web.Response(body=text.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp


async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
  1. index协程,当访问首页/时被调用,返回一个<h1>Index</h1>html文档。

  2. hello协程,当访问/hello/{name}时被调用,从request.match_info['name']取出对应的参数,然后再返回对应的html文档。

  3. init协程,添加访问的url,创建一个http服务。

运行程序,在浏览器中输入http://127.0.0.1:8000/,可以看到如下结果:

index.png

在浏览器中输入http://127.0.0.1:8000/hello/web,可以看到如下结果:**

web.png

aiohttp已经是一个Web框架了,但是对于使用者来说,还是相对比较底层,编写一个URL的处理函数需要这么几步:

  1. 编写一个协程的处理函数,如上面的indexhello

  2. request中获取传入的参数,如上面的hello函数的request.match_info['name']

  3. 返回一个Response对象。

对于这些重复的工作可以由框架完成。

Web框架

get和post装饰器

coroweb.py模块中,定义getpost装饰器,把一个函数映射为一个URL处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

def get(path):
"""
定义一个装饰器 @get('/path')
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'GET'
wrapper.__route__ = path
return wrapper
return decorator


def post(path):
"""
定义一个装饰器 @post('/path')
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(args, kw)
wrapper.__method__ = 'POST'
wrapper.__route__ = path
return wrapper
return decorator

@get、或@post装饰器添加了__method__属性,表示http的GETPOST请求,__route__属性,表示http请求的URI资源地址,一个函数通过@get、或@post的装饰就附带了URL信息,例如上面的indexhello可以写成:

1
2
3
4
5
6
7
@get('/')
def index(request):
pass

@get('/hello/{name}')
def hello(request, name):
pass

add_routes

例如在handlers.py模块定义了使用@get@post装饰器的URL处理函数,通过调用add_routes函数进行注册。add_routes函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_route(app, fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if path is None or method is None:
raise ValueError('@get or @post not defined in %s.' % str(fn))

if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
fn = asyncio.coroutine(fn)
logging.info('add route %s %s => %s(%s)' %(method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method, path, fn)


def add_routes(app, module_name):
"""
注册模块中含有__method__、__route__属性的函数
"""
n = module_name.rfind('.')
if n == (-1):
mod = __import__(module_name, globals(), locals())
else:
name = module_name[n+1:]
mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)

for attr in dir(mod):
if attr.startswith('_'):
continue

fn = getattr(mod, attr)
if callable(fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)

if method and path:
add_route(app, fn)

handlers.py模块中添加如下函数,这样当调用add_routes(app, 'handlers')就会自动注册handlers.py模块含有@get@post装饰器的URL处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from coroweb import get
from aiohttp import web


@get('/')
def index(request):
resp = web.Response(body=b'<h1>Index</h1>')
resp.content_type = 'text/html;charset=utf-8'
return resp


@get('/hello/{name}')
def hello(request):
text = '<h1>hello, %s</h1>' % request.match_info['name']
resp = web.Response(body=text.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp

apps.py模块中定义如下的测试代码,运行程序,在浏览器中输入http://127.0.0.1:8000/http://127.0.0.1:8000/hello/web可以看到和上面一样的效果。

1
2
3
4
5
6
7
8
9
10
11
12
async def init(loop):
app = web.Application(loop=loop)
add_routes(app, 'handlers')
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

RequestHandler

通过上面的@get@post装饰器,我们就可以把一个URL请求和对应的处理函数关联起来了。但是我们在处理函数中还是要从request获取参数,进行分析,对于这个步骤,可以使用下面的RequestHandler类来实现。

RequestHandler 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def get_required_kw_args(fn):
"""
获取fn函数中的没有默认值的命名关键字参数
如:foo(a, b, *, c, d=10, **kw) 则返回 ('c',)
"""
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
args.append(name)
return tuple(args)


def get_named_kw_args(fn):
"""
获取fn函数中的命名关键字参数
如:foo(a, b, *, c, d=10, **kw) 则返回 ('c', 'd')
"""
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
args.append(name)
return tuple(args)


def has_named_kw_args(fn):
"""
判断fn函数中的是否含有命名关键字参数
"""
params = inspect.signature(fn).parameters
for param in params.values():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
return True


def has_var_kw_args(fn):
"""
判断fn函数中的是否含有关键字参数
如:foo(a, b, *, c, d=10, **kw) 的**kw参数
"""
params = inspect.signature(fn).parameters
for param in params.values():
if param.kind == inspect.Parameter.VAR_KEYWORD:
return True


def has_request_arg(fn):
"""
request参数后面的参数必须是可变位置参数、命名关键字参数、关键字参数、
"""
sig = inspect.signature(fn)
params = sig.parameters
found = False
for name, param in params.items():
if name == 'request':
found = True
continue
if found and (param.kind != inspect.Parameter.VAR_POSITIONAL
and param.kind != inspect.Parameter.KEYWORD_ONLY
and param.kind != inspect.Parameter.VAR_KEYWORD):
raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
return found


class RequestHandler(object):
"""
从URL函数中分析其需要接收的参数,从request中获取必要的参数,调用URL函数
"""
def __init__(self, app, fn):
self._app = app
self._func = fn
self._has_request_arg = has_request_arg(fn)
self._has_var_kw_args = has_var_kw_args(fn)
self._has_named_kw_args = has_named_kw_args(fn)
self._named_kw_args = get_named_kw_args(fn)
self._required_kw_args = get_required_kw_args(fn)

async def __call__(self, request):
kw = None
if self._has_var_kw_args or self._has_named_kw_args or self._required_kw_args:
# POST请求
if request.method == 'POST':
if not request.content_type:
return web.HTTPBadRequest('Missing Content-Type.')
ct = request.content_type.lower()

# 数据类型是json
if ct.startswith('application/json'):
params = await request.json()
if not isinstance(params, dict):
return web.HTTPBadRequest('JSON body must be object.')
kw = params

# 数据类型是url编码
elif ct.startwith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
params = await request.post()
kw = dict(**params)
else:
return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)
# GET 请求
if request.method == 'GET':
# 解析URL参数
qs = request.query_string
if qs:
kw = dict()
for k, v in parse.parse_qs(qs, True).items():
kw[k] = v[0]
if kw is None:
kw = dict(**request.match_info)
else:
if not self._has_var_kw_args and self._named_kw_args:
copy = dict()
for name in self._named_kw_args:
if name in kw:
copy[name] = kw[name]
kw = copy
for k, v in request.match_info.items():
if k in kw:
logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
kw[k] = v

# 含有request参数
if self._has_request_arg:
kw['request'] = request

if self._required_kw_args:
# 请求中传入的参数是否含有函数的命令关键字参数
for name in self._required_kw_args:
if not name in kw:
return web.HTTPBadRequest('Missing argument: %s' % name)
logging.info('call with args: %s' % str(kw))
try:
r = await self._func(**kw)
return r
except APIError as e:
return dict(error=e.error, data=e.data, message=e.message)

上面的RequestHandler实现了__call__函数,所以可以想使用函数那样调用,无需创建实例后调用。

RequestHandler从URL函数中分析其需要接收的参数,从request中获取必要的参数,调用URL函数,有了RequestHandler定义,我们就可以在handlers.py的模块中使用各种参数的URL,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@get('/blog/{id}')
def get_blog(id):
pass

@get('/api/users')
def api_get_users(*, page='1'):
pass

@post('/api/authenticate')
def authenticate(*, email, passwd):
pass

@post('/api/blogs/{id}/delete')
def api_delete_blog(request, *, id):
pass

将上面的add_route函数中下面的语句:

app.router.add_route(method, path, fn)

修改为:

app.router.add_route(method, path, RequestHandler(app, fn))

handlers.py定义的indexhello函数修改为下面的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
@get('/')
def index():
resp = web.Response(body=b'<h1>Index</h1>')
resp.content_type = 'text/html;charset=utf-8'
return resp


@get('/hello/{name}')
def hello(name):
text = '<h1>hello, %s</h1>' % name
resp = web.Response(body=text.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp

运行apps.py模块中的测试程序,在浏览器中输入http://127.0.0.1:8000/http://127.0.0.1:8000/hello/web可以看到和上面一样的效果。

middleware

middleware是一种拦截器,一个URL在被某个函数处理前,可以改变URL的输入、输出。middleware的用处就在于把通用的功能从每个URL处理函数中拿出来,集中放到一个地方。例如,一个记录URL日志的logger可以简单定义如下:

app.py中定义如下的logger_factory函数:

1
2
3
4
5
async def logger_factory(app, handler):
async def logger(request):
logging.info('Request: %s %s' % (request.method, request.path))
return (await handler(request))
return logger

对于上面的indexhello的URL处理函数返回的web.Response对象,我们可以定义一个response_factory拦截器进行处理:

app.py中定义如下的response_factory函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

async def response_factory(app, handler):
""""
根据URL函数返回值的类型,构造相应的web.Response对象
"""
async def response(request):
# 调用URL处理函数
r = await handler(request)

if isinstance(r, web.StreamResponse):
return r

if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'

if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp

if isinstance(r, dict):
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda obj: obj.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp

if isinstance(r, int) and 100 <= r < 600:
return web.Response(r)

if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and 100 <= r < 600:
return web.Response(t, str(m))

resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response

response_factory函数,在调用URL处理函数后,对其返回值的类型,进行判断,并返回相应的web.Response的对象,如返回的类型是str,对应的是html文档,返回的类型是dict,对应的是json对象。

有了上面的response_factory拦截器,handlers.pyindexhello的URL处理函数,可以修改为如下:

1
2
3
4
5
6
7
@get('/')
def index():
return '<h1>Index</h1>'

@get('/hello/{name}')
def hello(name):
return '<h1>hello, %s</h1>' % name

app.pyinit函数中添加logger_factoryresponse_factory拦截器,代码如下:

1
2
3
4
5
6
7
async def init(loop):
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory])
add_routes(app, 'handlers')
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv

运行apps.py模块中的测试程序,在浏览器中输入http://127.0.0.1:8000/http://127.0.0.1:8000/hello/web可以看到和上面一样的效果。

使用Web框架和ORM框架

有了上面的Web框架,对于使用者来说,就非常方便了。如果要处理一个URL,只需要在handlers.py模块中添加相应的处理函数即可。

现在我们可以上文中的ORM框架结合起来使用,例如,下面的get_users函数用于访问数据库中user表的数据。

handlers.py模块中定义get_users函数:

1
2
3
4
5
6
@get('/users')
async def get_users():
users = await User.find_all()
for user in users:
user.passwd = '*******'
return users

app.pyinit函数中,创建数据库的连接池:

1
2
3
4
5
6
7
8
async def init(loop):
await orm.create_pool(user='heqingliang', password='heqingliang', db='test', loop=loop)
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory])
add_routes(app, 'handlers')
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv

运行app.py测试程序,在浏览器中输入http://127.0.0.1:8000/users,可以看到数据库中user表的数据,以json形式返回:

users.png