本篇内容主要讲解“Flask请求处理流程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Flask请求处理流程是什么”吧!
handler的处理流程.(路由)
ThreadedWSGIServer.(多线程非阻塞服务器)
多线程如何保证请求安全.(ctx)
# 1.wsgi阶段
WSGI -> Flask().__call__
# 2.Flask阶段
wsgi_app() -> full_dispatch_request() -> dispatch_request() -> view_handler()
full_dispatch_request: 框架实现.
1.请求封装, 请求ctx入栈
2.触发钩子函数.(try_trigger_before_first_request_functions)
3.发送请求开始信号.(request_started.send(self))
4.触发dispatch_request. 调用到用户的handler
view_handler: 用户编写, 实现业务
call(environ, start_response))
flask.app.py 实现了wsgi接口.(上线可以托管在uwsgi/gunicorn后面)
class Flask(_PackageBoundObject):
def dispatch_request(self):
# 1.从_request_ctx_stack栈顶获取请求.
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
# 2.Rule对象包含了url到viewhandler的映射
rule = req.url_rule
# ...
# 3.取出handler并执行
return self.view_functions[rule.endpoint](**req.view_args)
def full_dispatch_request(self):
# 1.触发first_request_functions
self.try_trigger_before_first_request_functions()
try:
# 2.发送request_started信号, 触发所有注册了信号的函数
request_started.send(self)
# 3.预处理请求
rv = self.preprocess_request()
if rv is None:
# 4.处理请求,调用到用户注册的对应方法
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
# 5.结束请求
return self.finalize_request(rv)
def wsgi_app(self, environ, start_response):
# 1.请求封装ctx
ctx = self.request_context(environ)
error = None
try:
try:
# 2.ctx入栈
ctx.push()
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 3.ctx自动出栈
ctx.auto_pop(error)
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
View: 基视图类.
实现as_view
dispatch_request: 子类实现
MethodViewType:方法视图元类.
cls.methods = methods 绑定方法集到类属性
MethodView(with_metaclass(MethodViewType, View)): CBV
flask.views.py
http_method_funcs = frozenset(
["get", "post", "head", "options", "delete", "put", "trace", "patch"]
)
class View(object):
"""
class MyView(View):
methods = ['GET']
def dispatch_request(self, name):
return 'Hello %s!' % name
app.add_url_rule('/hello/<name>', view_func=MyView.as_view('myview'))
"""
methods = None
provide_automatic_options = None
decorators = ()
def dispatch_request(self):
raise NotImplementedError()
@classmethod
def as_view(cls, name, *class_args, **class_kwargs):
def view(*args, **kwargs):
# 1.实例化视图类
self = view.view_class(*class_args, **class_kwargs)
# 2.触发dispatch_request
return self.dispatch_request(*args, **kwargs)
# 注册装饰器
if cls.decorators:
view.__name__ = name
view.__module__ = cls.__module__
for decorator in cls.decorators:
view = decorator(view)
view.view_class = cls
view.__name__ = name
view.__doc__ = cls.__doc__
view.__module__ = cls.__module__
view.methods = cls.methods
view.provide_automatic_options = cls.provide_automatic_options
# 路由注册的函数. 与url形成映射. 记录在Flask().views_functions
return view
class MethodViewType(type):
def __init__(cls, name, bases, d):
super(MethodViewType, cls).__init__(name, bases, d)
# 1.cls的属性中没有methods时
if "methods" not in d:
methods = set()
for base in bases:
if getattr(base, "methods", None):
methods.update(base.methods)
for key in http_method_funcs:
if hasattr(cls, key):
methods.add(key.upper())
if methods:
# 2.绑定methods属性到当前cls
cls.methods = methods
class MethodView(with_metaclass(MethodViewType, View)):
"""A class-based view that dispatches request methods to the corresponding
class methods. For example, if you implement a ``get`` method, it will be
used to handle ``GET`` requests. ::
class CounterAPI(MethodView):
def get(self):
return session.get('counter', 0)
def post(self):
session['counter'] = session.get('counter', 0) + 1
return 'OK'
app.add_url_rule('/counter', view_func=CounterAPI.as_view('counter'))
"""
def dispatch_request(self, *args, **kwargs):
# 1.获取视图实例的meth
meth = getattr(self, request.method.lower(), None)
if meth is None and request.method == "HEAD":
meth = getattr(self, "get", None)
assert meth is not None, "Unimplemented method %r" % request.method
# 2.执行methd
return meth(*args, **kwargs)
实现路由.
auth_view.py
auth = Blueprint('auth', __name__)
# 绑定 /api/auth/login 到login函数上, 支持GET,POST
@auth.route('/login', methods=['GET', 'POST'])
def login():
pass
# CBV
class Files(views.MethodView):
methods = ['POST', 'GET', 'PUT']
decorators = [auth_decorator]
def get(self, *args, **kwargs):
pass
def post(self, *args, **kwargs):
pass
def put(self, *args, **kwargs):
pass
auth.add_url_rule('/files', view_func=Files.as_view(name='files'))
main.py
from auth_view import auth
app = Flask(__name__)
app.register_blueprint(auth, url_prefix='/api/auth')
开发环境的服务器.(默认: ThreadedWSGIServer)
多线程服务器启动.(每个建立一个连接,创建一个线程处理请求).
WSGIRequestHandler: 处理请求,封装(environ, start_response)
Flask().call(environ, start_response): 被执行
app.run -> run_simple -> inner() -> ThreadedWSGIServer().serve_forever()
-> BaseWSGIServer().serve_forever()
-> HTTPServer().serve_forever()
-> TCPServer().serve_forever()
-> BaseServer().serve_forever()
flask.server.py
# app.run()
class Flask(_PackageBoundObject):
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
options.setdefault("use_reloader", self.debug)
options.setdefault("use_debugger", self.debug)
options.setdefault("threaded", True)
try:
run_simple(host, port, self, **options)
finally:
self._got_first_request = False
SocketServer.py: 标准库实现
serve_forever: 服务器启动,等待请求
process_request: 处理请求
class BaseServer:
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that select.select has returned that the socket is
readable before this function was called, so there should be
no risk of blocking in get_request().
"""
try:
request, client_address = self.get_request()
except socket.error:
return
if self.verify_request(request, client_address):
try:
# 调用ThreadingMixIn().process_request处理请求.
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
class TCPServer(BaseServer):
pass
class HTTPServer(SocketServer.TCPServer):
pass
werkzeug.serving.py: 实现处理wsgi协议的请求
{
"wsgi.multiprocess": False,
"HTTP_COOKIE": "csrftoken=; gfsessionid=",
"SERVER_SOFTWARE": "Werkzeug/0.16.0",
"SCRIPT_NAME": "",
"REQUEST_METHOD": "GET",
"PATH_INFO": "/api/auth/login",
"SERVER_PROTOCOL": "HTTP/1.1",
"werkzeug.server.shutdown": <function shutdown_server at 0x10d050050>,
"HTTP_CONNECTION": "keep-alive",
"werkzeug.request": <BaseRequest "http://localhost:8065/api/auth/login?username=test&password=test" [GET
]>,
"wsgi.input": <open file "<socket>", mode "rb" at 0x10d04e030>,
"wsgi.multithread": True,
"REQUEST_URI": "/api/auth/login?username=test&password=test",
"wsgi.version": "(1, 0)",
"REMOTE_ADDR": "127.0.0.1",
"HTTP_ACCEPT_ENCODING": "gzip, deflate"
...
}
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
def run_wsgi(self):
# 1.读取socket的数据,封装成wsgi的env
self.environ = environ = self.make_environ()
def execute(app):
# 2.调用Flask().__call__() 触发请求流程
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
if not headers_sent:
write(b"")
finally:
if hasattr(application_iter, "close"):
application_iter.close()
application_iter = None
try:
execute(self.server.app)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e, environ)
except Exception:
if self.server.passthrough_errors:
raise
pass
class BaseWSGIServer(HTTPServer, object):
"""
单线程,单进程 wsgi server
"""
def init():
handler = WSGIRequestHandler
#给http服务器绑定了wsgi请求的handler
HTTPServer.__init__(self, server_address, handler)
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer):
"""A WSGI server that does threading."""
multithread = True
daemon_threads = True
def make_server():
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
def inner(...):
srv = make_server()
# 启动服务器
srv.serve_forever()
app.run流程:
-> srv.serve_forever()
-> ThreadedWSGIServer().serve_forever()
-> BaseServer().serve_forever()
ThreadingMixIn: 实现了process_request接口. 启动子线程处理当前请求
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
daemon_threads = False
def process_request_thread(self, request, client_address):
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def process_request(self, request, client_address):
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()
serve_forever 中使用非阻塞处理请求,一旦请求可处理,执行self.process_request, 调用ThreadingMixIn().process_request()
启动新的线程处理请求.
主线程继续执行serve_forever()
新线程的启动执行流程.(触发Flask().call())
1.process_request_thread()
2.self.finish_request(request, client_address)
-> BaseServer().finish_request(self, request, client_address):
-> self.RequestHandlerClass(request, client_address, self)
-> WSGIRequestHandler().handle()
-> WSGIRequestHandler().run_wsgi()
-> app(environ, start_response) # Flask().__call__()
3.self.shutdown_request(request)
参考
werkzeug.local.py
Local
LocalProxy
LocalStack
LocalManager 当使用线程模型时,get_ident获取线程标识 当使用greenlet协程时, get_ident获取到协程标识
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class Local(object):
__slots__ = ("__storage__", "__ident_func__")
def __init__(self):
object.__setattr__(self, "__storage__", {})
object.__setattr__(self, "__ident_func__", get_ident)
def __iter__(self):
return iter(self.__storage__.items())
def __call__(self, proxy):
"""Create a proxy for a name."""
return LocalProxy(self, proxy)
def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None)
def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
flask.globals.py
_request_ctx_stack: 请求上下文栈
_app_ctx_stack: app上下文栈
current_app: 当前请求,同时启动多个Flask()
request
session
g
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))
py3 context是否可以替换: context
到此,相信大家对“Flask请求处理流程是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/tplinuxhyh/blog/5020744