本篇内容主要讲解“Flask请求处理流程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Flask请求处理流程是什么”吧!
handler的处理流程.(路由)
ThreadedWSGIServer.(多线程非阻塞服务器)
多线程如何保证请求安全.(ctx)
# 1.wsgi阶段 WSGI -> Flask().__call__ # 2.Flask阶段 wsgi_app() -> full_dispatch_request() -> dispatch_request() -> view_handler() full_dispatch_request: 框架实现. 1.请求封装, 请求ctx入栈 2.触发钩子函数.(try_trigger_before_first_request_functions) 3.发送请求开始信号.(request_started.send(self)) 4.触发dispatch_request. 调用到用户的handler view_handler: 用户编写, 实现业务
call(environ, start_response))
flask.app.py 实现了wsgi接口.(上线可以托管在uwsgi/gunicorn后面)
class Flask(_PackageBoundObject): def dispatch_request(self): # 1.从_request_ctx_stack栈顶获取请求. req = _request_ctx_stack.top.request if req.routing_exception is not None: self.raise_routing_exception(req) # 2.Rule对象包含了url到viewhandler的映射 rule = req.url_rule # ... # 3.取出handler并执行 return self.view_functions[rule.endpoint](**req.view_args) def full_dispatch_request(self): # 1.触发first_request_functions self.try_trigger_before_first_request_functions() try: # 2.发送request_started信号, 触发所有注册了信号的函数 request_started.send(self) # 3.预处理请求 rv = self.preprocess_request() if rv is None: # 4.处理请求,调用到用户注册的对应方法 rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) # 5.结束请求 return self.finalize_request(rv) def wsgi_app(self, environ, start_response): # 1.请求封装ctx ctx = self.request_context(environ) error = None try: try: # 2.ctx入栈 ctx.push() response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None # 3.ctx自动出栈 ctx.auto_pop(error) def __call__(self, environ, start_response): return self.wsgi_app(environ, start_response)
View: 基视图类.
实现as_view
dispatch_request: 子类实现
MethodViewType:方法视图元类.
cls.methods = methods 绑定方法集到类属性
MethodView(with_metaclass(MethodViewType, View)): CBV
flask.views.py
http_method_funcs = frozenset( ["get", "post", "head", "options", "delete", "put", "trace", "patch"] ) class View(object): """ class MyView(View): methods = ['GET'] def dispatch_request(self, name): return 'Hello %s!' % name app.add_url_rule('/hello/<name>', view_func=MyView.as_view('myview')) """ methods = None provide_automatic_options = None decorators = () def dispatch_request(self): raise NotImplementedError() @classmethod def as_view(cls, name, *class_args, **class_kwargs): def view(*args, **kwargs): # 1.实例化视图类 self = view.view_class(*class_args, **class_kwargs) # 2.触发dispatch_request return self.dispatch_request(*args, **kwargs) # 注册装饰器 if cls.decorators: view.__name__ = name view.__module__ = cls.__module__ for decorator in cls.decorators: view = decorator(view) view.view_class = cls view.__name__ = name view.__doc__ = cls.__doc__ view.__module__ = cls.__module__ view.methods = cls.methods view.provide_automatic_options = cls.provide_automatic_options # 路由注册的函数. 与url形成映射. 记录在Flask().views_functions return view class MethodViewType(type): def __init__(cls, name, bases, d): super(MethodViewType, cls).__init__(name, bases, d) # 1.cls的属性中没有methods时 if "methods" not in d: methods = set() for base in bases: if getattr(base, "methods", None): methods.update(base.methods) for key in http_method_funcs: if hasattr(cls, key): methods.add(key.upper()) if methods: # 2.绑定methods属性到当前cls cls.methods = methods class MethodView(with_metaclass(MethodViewType, View)): """A class-based view that dispatches request methods to the corresponding class methods. For example, if you implement a ``get`` method, it will be used to handle ``GET`` requests. :: class CounterAPI(MethodView): def get(self): return session.get('counter', 0) def post(self): session['counter'] = session.get('counter', 0) + 1 return 'OK' app.add_url_rule('/counter', view_func=CounterAPI.as_view('counter')) """ def dispatch_request(self, *args, **kwargs): # 1.获取视图实例的meth meth = getattr(self, request.method.lower(), None) if meth is None and request.method == "HEAD": meth = getattr(self, "get", None) assert meth is not None, "Unimplemented method %r" % request.method # 2.执行methd return meth(*args, **kwargs)
实现路由.
auth_view.py
auth = Blueprint('auth', __name__) # 绑定 /api/auth/login 到login函数上, 支持GET,POST @auth.route('/login', methods=['GET', 'POST']) def login(): pass # CBV class Files(views.MethodView): methods = ['POST', 'GET', 'PUT'] decorators = [auth_decorator] def get(self, *args, **kwargs): pass def post(self, *args, **kwargs): pass def put(self, *args, **kwargs): pass auth.add_url_rule('/files', view_func=Files.as_view(name='files'))
main.py
from auth_view import auth app = Flask(__name__) app.register_blueprint(auth, url_prefix='/api/auth')
开发环境的服务器.(默认: ThreadedWSGIServer)
多线程服务器启动.(每个建立一个连接,创建一个线程处理请求).
WSGIRequestHandler: 处理请求,封装(environ, start_response)
Flask().call(environ, start_response): 被执行
app.run -> run_simple -> inner() -> ThreadedWSGIServer().serve_forever() -> BaseWSGIServer().serve_forever() -> HTTPServer().serve_forever() -> TCPServer().serve_forever() -> BaseServer().serve_forever()
flask.server.py
# app.run() class Flask(_PackageBoundObject): def run(self, host=None, port=None, debug=None, load_dotenv=True, **options): options.setdefault("use_reloader", self.debug) options.setdefault("use_debugger", self.debug) options.setdefault("threaded", True) try: run_simple(host, port, self, **options) finally: self._got_first_request = False
SocketServer.py: 标准库实现
serve_forever: 服务器启动,等待请求
process_request: 处理请求
class BaseServer: def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: # 调用ThreadingMixIn().process_request处理请求. self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() class TCPServer(BaseServer): pass class HTTPServer(SocketServer.TCPServer): pass
werkzeug.serving.py: 实现处理wsgi协议的请求
{ "wsgi.multiprocess": False, "HTTP_COOKIE": "csrftoken=; gfsessionid=", "SERVER_SOFTWARE": "Werkzeug/0.16.0", "SCRIPT_NAME": "", "REQUEST_METHOD": "GET", "PATH_INFO": "/api/auth/login", "SERVER_PROTOCOL": "HTTP/1.1", "werkzeug.server.shutdown": <function shutdown_server at 0x10d050050>, "HTTP_CONNECTION": "keep-alive", "werkzeug.request": <BaseRequest "http://localhost:8065/api/auth/login?username=test&password=test" [GET ]>, "wsgi.input": <open file "<socket>", mode "rb" at 0x10d04e030>, "wsgi.multithread": True, "REQUEST_URI": "/api/auth/login?username=test&password=test", "wsgi.version": "(1, 0)", "REMOTE_ADDR": "127.0.0.1", "HTTP_ACCEPT_ENCODING": "gzip, deflate" ... }
class WSGIRequestHandler(BaseHTTPRequestHandler, object): def run_wsgi(self): # 1.读取socket的数据,封装成wsgi的env self.environ = environ = self.make_environ() def execute(app): # 2.调用Flask().__call__() 触发请求流程 application_iter = app(environ, start_response) try: for data in application_iter: write(data) if not headers_sent: write(b"") finally: if hasattr(application_iter, "close"): application_iter.close() application_iter = None try: execute(self.server.app) except (_ConnectionError, socket.timeout) as e: self.connection_dropped(e, environ) except Exception: if self.server.passthrough_errors: raise pass class BaseWSGIServer(HTTPServer, object): """ 单线程,单进程 wsgi server """ def init(): handler = WSGIRequestHandler #给http服务器绑定了wsgi请求的handler HTTPServer.__init__(self, server_address, handler) class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): """A WSGI server that does threading.""" multithread = True daemon_threads = True def make_server(): return ThreadedWSGIServer( host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd ) def inner(...): srv = make_server() # 启动服务器 srv.serve_forever()
app.run流程:
-> srv.serve_forever() -> ThreadedWSGIServer().serve_forever() -> BaseServer().serve_forever()
ThreadingMixIn: 实现了process_request接口. 启动子线程处理当前请求
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" daemon_threads = False def process_request_thread(self, request, client_address): try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
serve_forever 中使用非阻塞处理请求,一旦请求可处理,执行self.process_request, 调用ThreadingMixIn().process_request()
启动新的线程处理请求.
主线程继续执行serve_forever()
新线程的启动执行流程.(触发Flask().call())
1.process_request_thread() 2.self.finish_request(request, client_address) -> BaseServer().finish_request(self, request, client_address): -> self.RequestHandlerClass(request, client_address, self) -> WSGIRequestHandler().handle() -> WSGIRequestHandler().run_wsgi() -> app(environ, start_response) # Flask().__call__() 3.self.shutdown_request(request)
参考
werkzeug.local.py
Local
LocalProxy
LocalStack
LocalManager 当使用线程模型时,get_ident获取线程标识 当使用greenlet协程时, get_ident获取到协程标识
try: from greenlet import getcurrent as get_ident except ImportError: try: from thread import get_ident except ImportError: from _thread import get_ident class Local(object): __slots__ = ("__storage__", "__ident_func__") def __init__(self): object.__setattr__(self, "__storage__", {}) object.__setattr__(self, "__ident_func__", get_ident) def __iter__(self): return iter(self.__storage__.items()) def __call__(self, proxy): """Create a proxy for a name.""" return LocalProxy(self, proxy) def __release_local__(self): self.__storage__.pop(self.__ident_func__(), None) def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__(self, name): try: del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name)
flask.globals.py
_request_ctx_stack: 请求上下文栈
_app_ctx_stack: app上下文栈
current_app: 当前请求,同时启动多个Flask()
request
session
g
# context locals _request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack() current_app = LocalProxy(_find_app) request = LocalProxy(partial(_lookup_req_object, "request")) session = LocalProxy(partial(_lookup_req_object, "session")) g = LocalProxy(partial(_lookup_app_object, "g"))
py3 context是否可以替换: context
到此,相信大家对“Flask请求处理流程是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。