Stories

Detail Return Return

Flask 0.1 源碼解讀 - Stories Detail

一、app.run() 在做什麼?

執行 app.run() 便啓動了 Flask 服務,這個服務為什麼能夠監聽 http 請求並做出響應?讓我們進入 run 函數內部一探究竟。

def run(self, host='localhost', port=5000, **options):
    from werkzeug import run_simple
    if 'debug' in options:
        self.debug = options.pop('debug')
    options.setdefault('use_reloader', self.debug)
    options.setdefault('use_debugger', self.debug)
    return run_simple(host, port, self, **options)

可以看到,run 函數3-6行做了些參數默認值設置,最後將參數傳入 run_simple 並調用返回,注意,第3個參數是 Flask 對象(留意 Flask 對象的傳遞)。run_simple 是從 werkzeug 導入的。

def run_simple(hostname, port, application, use_reloader=False,
               use_debugger=False, use_evalex=True,
               extra_files=None, reloader_interval=1, threaded=False,
               processes=1, request_handler=None, static_files=None,
               passthrough_errors=False, ssl_context=None):

    ...

    def inner():
        make_server(hostname, port, application, threaded,
                    processes, request_handler,
                    passthrough_errors, ssl_context).serve_forever()

    ...

    if use_reloader:
        test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        test_socket.bind((hostname, port))
        test_socket.close()
        run_with_reloader(inner, extra_files, reloader_interval)
    else:
        inner()

run_simple() 最終調用 inner(),這個函數做了兩件事:

  1. 構造一個服務:make_server()
  2. 啓動服務:.serve_forever()
def make_server(host, port, app=None, threaded=False, processes=1,
                request_handler=None, passthrough_errors=False,
                ssl_context=None):
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and "
                         "multi process server.")
    elif threaded:
        return ThreadedWSGIServer(host, port, app, request_handler,
                                  passthrough_errors, ssl_context)
    elif processes > 1:
        return ForkingWSGIServer(host, port, app, processes, request_handler,
                                 passthrough_errors, ssl_context)
    else:
        return BaseWSGIServer(host, port, app, request_handler,
                              passthrough_errors, ssl_context)

make_server() 返回一個 BaseWSGIServer 對象。

class BaseWSGIServer(HTTPServer, object):
    ...

    def __init__(self, host, port, app, handler=None,
                passthrough_errors=False, ssl_context=None):
    ...

    HTTPServer.__init__(self, (host, int(port)), handler)
    self.app = app
    
    ...

BaseWSGIServer 繼承 HTTPServerHTTPServer 繼承來自 Python 標準庫 SocketServer.TCPServer 類,其最終繼承自 SocketServer.BaseServer。注意 Flask 對象被賦值給 self.app

BaseServer 類有一個 serve_forever 方法:

def serve_forever(self, poll_interval=0.5):
    self.__is_shut_down.clear()
    try:
        while not self.__shutdown_request:
            r, w, e = _eintr_retry(select.select, [self], [], [],
                                    poll_interval)
            
            if self.__shutdown_request:
                break
            if self in r:
                self._handle_request_noblock()
    finally:
        self.__shutdown_request = False
        self.__is_shut_down.set()

其中有一個 while 循環,在不斷執行:

if ready:
    self._handle_request_noblock()
def _handle_request_noblock(self):
    ...
    self.process_request(request, client_address)
    ...

_handle_request_noblock() 調用 process_request(),這是處理請求的函數,這個函數實例化 self.RequestHandlerClass 類來處理請求,這個類是 BaseServer 初始化時傳入的參數。

到此我們知道,app.run() 實際上實例化了一個 socketserver.BaseServer 類,並調用該類的 server_forever 方法,這個方法主體是一個 while 循環,最終在不斷實例化 self.RequestHandlerClass 來處理請求,在不中斷 while 的情況下,程序會一直運行,不斷接收請求,處理請求。

二、Flask 如何收到請求?

while 程序在不斷監聽請求,當接收到請求時,實例化 self.RequestHandlerClass 來處理請求。這個變量在 werkzeug.BaseWSGIServer__init__ 方法中被賦值(以下第10行):

class BaseWSGIServer(HTTPServer, object):
    multithread = False
    multiprocess = False

    def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None):
        if handler is None:
            handler = WSGIRequestHandler
        self.address_family = select_ip_version(host, port)
        HTTPServer.__init__(self, (host, int(port)), handler)

默認值為 werkzeug.WSGIRequestHandler,這個類最終繼承自 SocketServer.BaseRequestHandler,也就是説isinstance(self.RequestHandlerClass, SocketServer.BaseRequestHandler)

class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

SocketServer.BaseRequestHandler 實例化時調用 self.handle 方法。注意,此時 Flask 對象存在於 self.server.app 中。werkzeug.WSGIRequestHandler 將這個方法覆寫:

class WSGIRequestHandler(BaseHTTPRequestHandler, object):

    def handle(self):
        try:
            return BaseHTTPRequestHandler.handle(self)
        except (socket.error, socket.timeout), e:
            self.connection_dropped(e)
        except:
            if self.server.ssl_context is None or not is_ssl_error():
                raise

    def handle_one_request(self):
        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
        elif self.parse_request():
            return self.run_wsgi()


class BaseHTTPRequestHandler(object):

    def handle(self):
        self.close_connection = 1

        self.handle_one_request()
        while not self.close_connection:
            self.handle_one_request()

覆寫的方法調用 BaseHTTPRequestHandler.handle(self),內部調用了 self.handle_one_request 方法,最終調用了 WSGIRequestHandler.run_wsgi 方法:

class WSGIRequestHandler(BaseHTTPRequestHandler, object):

    def run_wsgi(self):
        app = self.server.app
        environ = self.make_environ()

        ...

        def execute(app):
            application_iter = app(environ, start_response)
            try:
                for data in application_iter:
                    write(data)
                # make sure the headers are sent
                if not headers_sent:
                    write('')
            finally:
                if hasattr(application_iter, 'close'):
                    application_iter.close()
                application_iter = None

        ...

        try:
            execute(app)
        except (socket.error, socket.timeout), e:
            ...

run_wsgi 方法第一行即取出 Flask 對象,然後將其傳入 execute 函數並調用,execute 第一行 application_iter = app(environ, start_response),這是在調用 Flask.__call__ 方法。

class Flask(object):

    def wsgi_app(self, environ, start_response):
        with self.request_context(environ):
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
            response = self.make_response(rv)
            response = self.process_response(response)
            return response(environ, start_response)

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

至此,請求被傳到了 Flask 對象中處理,或者説 Flask 收到了請求。

三、Flask 如何處理請求?

Web 服務器把 environ, start_response 兩個參數傳入 Flask.__call__ 處理,正常處理完後將 Flask.__call__ 返回的數據寫入響應體中。Flask 處理請求其實是接收這兩個參數並返回數據。

class Flask(object):

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

    def wsgi_app(self, environ, start_response):
        with self.request_context(environ):
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
            response = self.make_response(rv)
            response = self.process_response(response)
            return response(environ, start_response)

Flask.__call__ 調用 Flask.wsgi_app 並返回,不將處理邏輯直接實現在 Flask.__call__ 裏而封裝在 Flask.wsgi_app裏,是為了能應用中間件,例如:app.wsgi_app = MyMiddleware(app.wsgi_app)

Flask.wsgi_app 詳解:

  • 第8行(self.preprocess_request()):執行處理請求前的某些處理。即執行所有被 before_request 裝飾的函數,如果返回值非空,則不進行真正的請求處理。
  • 第10行(self.dispatch_request()):分發處理請求。將 URL 與對應的處理函數匹配,執行處理。
  • 第11行(self.make_response()):將返回值轉換為響應對象。
  • 第12行(self.process_response()):執行處理請求後的某些處理。即執行所有被 after_request 裝飾的函數。
  • 第13行(response(environ, start_response)):返回一個可迭代對象。

FLask 先調用 Flask.preprocess_request 處理請求,再調用與 URL 對應的函數處理(注意,如果 Flask.preprocess_request 返回值非空,則跳過與 URL 對應的處理函數),然後把返回值轉換為響應對象,最後調用 Flask.process_response 處理。

3.1 請求前置處理

class Flask(object):

    def before_request(self, f):
        self.before_request_funcs.append(f)
        return f

    def preprocess_request(self):
        for func in self.before_request_funcs:
            rv = func()
            if rv is not None:
                return rv

Flask.before_request 是一個裝飾器,會把被裝飾的函數添加到列表 self.before_request_funcs 中, self.preprocess_request 遍歷 self.before_request_funcs,執行所有函數,如果執行的函數返回值非空,則返回。

3.2 請求處理

class Flask(object):

    def match_request(self):
        rv = _request_ctx_stack.top.url_adapter.match()
        request.endpoint, request.view_args = rv
        return rv

    def dispatch_request(self):
        try:
            endpoint, values = self.match_request()
            return self.view_functions[endpoint](**values)
        except HTTPException, e:
            handler = self.error_handlers.get(e.code)
            if handler is None:
                return e
            return handler(e)
        except Exception, e:
            handler = self.error_handlers.get(500)
            if self.debug or handler is None:
                raise
            return handler(e)

match_request 中調用的 _request_ctx_stack.top.url_adapter.match(),是 _RequestContext.url_adapter.match()

class _RequestContext(object):

    def __init__(self, app, environ):
        self.app = app
        self.url_adapter = app.url_map.bind_to_environ(environ)

_RequestContext.url_adapter 是 Flask 對象中 url_map.bind_to_environ(environ) 返回的值,Map.bin_to_environ 返回 MapAdapter 對象。

MapAdapter.match() 會根據 URL 與 請求方法(GET、POST 等)(URL、請求方法等信息會從 environ 中獲取)返回當前請求的 endpoint 與 參數。

# eg
>>> urls.match("/downloads/42")
('downloads/show', {'id': 42})

在執行 URL 與 endpoint 解析前,需要先添加匹配規則。Flask 如何做的呢?

from werkzeug.routing import Map, Rule


class Flask(object):

    def __init__(self, package_name):
        self.url_map = Map()

Map 存儲 URL 規則和配置參數,可以通過 Map.add 添加 URL 匹配規則。

class Flask(object):

    def add_url_rule(self, rule, endpoint, **options):
        options['endpoint'] = endpoint
        options.setdefault('methods', ('GET',))
        self.url_map.add(Rule(rule, **options))

    def route(self, rule, **options):
        def decorator(f):
            self.add_url_rule(rule, f.__name__, **options)
            self.view_functions[f.__name__] = f
            return f
        return decorator

方法 Flask.add_url_rule 與裝飾器 Flask.route 都能添加匹配規則。裝飾器 route 會將被裝飾的函數名作為 endpoint,並將函數名作為 key,函數作為 value,存在 Flask 對象的 self.view_functions 屬性中。

通過 endpoint, values = self.match_request() 解析得到 endpoint 與 請求參數,再從 self.view_functions 中取出對應的函數處理(return self.view_functions[endpoint](**values))。

endpoint 作用與意義可參考:https://stackoverflow.com/que...

如果處理過程中拋出異常,會根據異常碼(e.code)取出對應的函數,執行異常處理函數。

異常處理函數通過裝飾器 Flask.errorhandler 添加,將錯誤碼作為 key,被裝飾的函數作為 value,存入 Flask 的屬性 self.error_handlers 中:

class Flask(object):

    def errorhandler(self, code):
        def decorator(f):
            self.error_handlers[code] = f
            return f
        return decorator

3.3 生成響應對象

為什麼需要這一步,直接返回 Python 內置數據類型不行嗎?

不行,因為 WSGI 規定 application 端必須返回一個可迭代對象[1]。

class Flask(object):

    def make_response(self, rv):
        if isinstance(rv, self.response_class):
            return rv
        if isinstance(rv, basestring):
            return self.response_class(rv)
        if isinstance(rv, tuple):
            return self.response_class(*rv)
        return self.response_class.force_type(rv, request.environ)

Flask.make_response(rv) 將參數 rv 轉換為 self.response_class 對象。

3.4 請求後置處理

迭代 self.after_request_funcs 中保存的函數,逐個執行,返回最後執行完的數據。

添加函數的方式為 Flask.after_request 裝飾器。

3.5 返回可迭代對象

Flask 默認的 self.response_class 類繼承自 werkzeug.Response,這個類的 __call__ 方法接收 environ, start_response 兩個參數,並返回一個迭代器。

class BaseResponse(object):

    def __call__(self, environ, start_response):
        app_iter, status, headers = self.get_wsgi_response(environ)
        start_response(status, headers)
        return app_iter

當調用 response(environ, start_response) 時返回一個迭代器。

四、Flask 如何處理併發請求?

Flask.wsgi_app 中的 with self.request_context(environ) 有什麼用?

self.request_context 返回 _RequestContext 的實例化對象。

class _RequestContext(object):

    def __init__(self, app, environ):
        self.app = app
        self.url_adapter = app.url_map.bind_to_environ(environ)
        self.request = app.request_class(environ)
        self.session = app.open_session(self.request)
        self.g = _RequestGlobals()
        self.flashes = None

    def __enter__(self):
        _request_ctx_stack.push(self)

    def __exit__(self, exc_type, exc_value, tb):
        if tb is None or not self.app.debug:
            _request_ctx_stack.pop()

進入 with 語句體時,會執行 __enter__ 方法,離開 with 語句體時會執行 __exit__ 方法。也就是説,在 Flask 開始處理請求前,會執行 _request_ctx_stack.puth(self),處理完請求後會執行 _request_ctx_stack.pop(self)

_request_ctx_stack 是一個全局變量,是 LocalStack 類的對象。

class LocalStack(object):

    def __init__(self):
        self._local = Local()
        self._lock = allocate_lock()

    def __release_local__(self):
        self._local.__release_local__()

    def push(self, obj):
        self._lock.acquire()
        try:
            rv = getattr(self._local, 'stack', None)
            if rv is None:
                self._local.stack = rv = []
            rv.append(obj)
            return rv
        finally:
            self._lock.release()

    def pop(self):
        self._lock.acquire()
        try:
            stack = getattr(self._local, 'stack', None)
            if stack is None:
                return None
            elif len(stack) == 1:
                release_local(self._local)
                return stack[-1]
            else:
                return stack.pop()
        finally:
            self._lock.release()

    @property
    def top(self):
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

LocalStack 在初始化時會創建一個線程鎖(self._lock = allocate_lock()),LocalStack.push(obj) 首先請求加鎖,獲取到鎖後將 obj 添加到列表 self._local.stack,如果 self._local 沒有屬性 stack 則將 stack 初始化為空列表,最後釋放鎖(self._lock.release())。self._local.stack 是什麼?

class Local(object):
    __slots__ = ('__storage__', '__lock__')

    def __init__(self):
        object.__setattr__(self, '__storage__', {})
        object.__setattr__(self, '__lock__', allocate_lock())

    def __getattr__(self, name):
        self.__lock__.acquire()
        try:
            try:
                return self.__storage__[get_ident()][name]
            except KeyError:
                raise AttributeError(name)
        finally:
            self.__lock__.release()

    def __setattr__(self, name, value):
        self.__lock__.acquire()
        try:
            ident = get_ident()
            storage = self.__storage__
            if ident in storage:
                storage[ident][name] = value
            else:
                storage[ident] = {name: value}
        finally:
            self.__lock__.release()

在初始化 self._local.stack 屬性時,self._local.stack = rv = [] 等同於 Local.__setattr__(self._local, 'stack', []),這裏首先加鎖,然後獲取線程id(ident = get_ident()),將線程id作為字典 self.__storage__ 的鍵,將 {'stack': []} 作為值,即:

self.__storage__ = {
    "thread1": {
        "stack": []
    },
    "thread2": {
        "stack": []
    },
    ...
}

LocalStack.push(obj) 是將 obj 加入了對應線程的 stack 列表。LocalStack.pop() 是將 obj 從對應的線程的 stack 列表移除。另外還有 LocalStack.top,返回對應線程存在 stack 中的對象。

為什麼要如此大費周章的做這件事呢?來看一個例子:

假設 Web 服務器單進程啓動,啓動2個線程,同一時刻有2個請求進來,每個請求都有對應的 environ 數據,如果直接賦值給同一個變量,後一個請求會覆蓋前一個請求的數據,因為線程數據是共享的。要如何保存這兩個請求的數據?並在需要的時候能正確取出?Flask 的做法是設置一個全局變量 _request_ctx_stack,存儲數據最終用一個字典 self.__storage__,將不同線程的請求數據用線程id作為 key,請求數據存在線程id對應的 stack 列表中。

self.__storage__ = {
    "thread1": {
        "stack": [obj1]
    },
    "thread2": {
        "stack": [obj2]
    },
    ...
}

Flask 提供了非常便捷的方式來獲取當前請求中的 Flask 對象、請求、session、全局變量等數據:current_app, request, session, g

_request_ctx_stack = LocalStack()
current_app = LocalProxy(lambda: _request_ctx_stack.top.app)
request = LocalProxy(lambda: _request_ctx_stack.top.request)
session = LocalProxy(lambda: _request_ctx_stack.top.session)
g = LocalProxy(lambda: _request_ctx_stack.top.g)

其本質是返回存在於 _request_ctx_stack.top 中對應的屬性,返回的數據是與當前線程id相對應的,實現了線程數據隔離。

LocalProxy 是封裝 local 的代理對象,能夠保護 local 對象,防止其被意外修改,對應設計模式中的代理模式。

五、總結

回過頭看看 Flask 的常見用法:

from flask import Flask, request, session


app = Flask(__name__)


@app.route('/login', methods=['GET', 'POST'])
def login():
    error = None
    if request.method == 'POST':
        if request.form['username'] != USERNAME:
            error = 'Invalid username'
        elif request.form['password'] != PASSWORD:
            error = 'Invalid password'
        else:
            session['logged_in'] = True
            flash('You were logged in')
            return redirect(url_for('show_entries'))
    return render_template('login.html', error=error)


if __name__ == '__main__':
    app.run()

首先實例化一個 Flask 對象 app,使用裝飾器 route 添加 URL 與處理函數,執行 app.run(),啓動服務。

訪問 localhost:5000/login,服務接收到請求後會將 request, current_app 等數據保存至線程id對應的 stack 中,然後進行前置處理(self.preprocess_request),解析 URL,獲取 endpoint 與參數,通過 endpoint 從 self.view_functions 中獲取對應的處理函數,處理完後將返回值轉換為響應對象,進行後置處理(self.process_response),最後返回可迭代對象(response(environ, start_response)),之後便是服務器處理的部分了。

版本:python 2.7, werkzeug==0.6.1, Flask==0.1

參考文獻:

[1] Eby, P.J. (2010). PEP 3333 – Python Web Server Gateway Interface v1.0.1. Retrieved February 18, 2023, from https://peps.python.org/pep-3...

[2] What is an 'endpoint' in Flask?.(n.d). Retrieved February 18, 2023, from https://stackoverflow.com/que...

本文源碼:https://github.com/yyywang/fl...

Add a new Comments

Some HTML is okay.