Cuối tuần vừa rồi, mình có chút thời gian rảnh nên ngồi đọc các tài liệu về HTTP/1.1, HTTP/2 và ASGI, với mục tiêu cài đặt một web server bằng Python. Về cơ bản, mọi thứ hoạt động ổn ở mức nền tảng. Dưới đây là một vài chia sẻ từ những gì mình học được trong quá trình xây dựng một web server như vậy bằng Python.
HTTP/2 là một phương thức đóng gói, truyền và nhận các gói tin HTTP trên nền giao thức TCP nhằm cải thiện hiệu suất so với phiên bản HTTP/1.1. Trong HTTP/1.1, mỗi transaction (gồm request và response) sử dụng một kết nối riêng. Sau này, giao thức được cải tiến để cho phép nhiều transaction sử dụng chung một kết nối, gọi là HTTP Pipelining. Tuy nhiên, cải tiến này nhìn chung không hiệu quả về mặt truyền tải, do các transaction phải tuân theo thứ tự — nếu một transaction trước đó bị chậm, các transaction sau sẽ phải chờ. Dù vậy, HTTP Pipelining cũng giúp giảm số lần handshake giữa client và server.
HTTP/2 ra đời nhằm khắc phục những điểm yếu trong truyền tải dữ liệu của các phiên bản trước. Trong HTTP/2, mỗi transaction được chia nhỏ thành các frame; mỗi transaction được gọi là một stream, và một stream bao gồm nhiều frame. Nhờ đó, trên một kết nối duy nhất, ta có thể truyền đồng thời nhiều frame thuộc nhiều stream khác nhau. Dữ liệu trong các frame này được biểu diễn dưới dạng nhị phân và nén bằng một biến thể của thuật toán Huffman, giúp giảm đáng kể dung lượng truyền tải.
Dưới đây là một số nhận xét của mình về HTTP/1.1 và HTTP/2:
ASGI là viết tắt của Asynchronous Server Gateway Interface. Nó là một giao diện cho phép web server giao tiếp với các ứng dụng web bất đồng bộ trong Python. Nếu ai từng viết các web backend bằng Python cách đây 6–8 năm (như Flask hay Django), thì sẽ quen với một giao diện khác gọi là WSGI – Web Server Gateway Interface. Một trong những web server phổ biến tương thích với giao diện này là HTTP Apache thông qua module uWSGI.
Sự khác biệt giữa hai giao diện này nằm ở mô hình xử lý request: ASGI hỗ trợ xử lý bất đồng bộ thông qua mô hình event loop, trong khi WSGI xử lý các request song song bằng mô hình đa luồng.
Tuy nhiên, việc dùng đa luồng để giải quyết các tác vụ liên quan đến I/O không thực sự hiệu quả — đây chính là vấn đề được nhắc đến trong bài toán C10K. Một ví dụ điển hình cho hướng tiếp cận hiệu quả hơn là việc Nginx sử dụng mô hình bất đồng bộ, so với HTTP Apache vốn sử dụng mô hình đa luồng truyền thống.
Trước khi ASGI xuất hiện, một số framework đã tự phát triển theo hướng bất đồng bộ, chẳng hạn như Tornado. Tuy nhiên, do thiếu một giao diện tiêu chuẩn, việc phát triển và sử dụng các framework bất đồng bộ trở nên khó khăn và thiếu tính tương thích. ASGI ra đời để giải quyết vấn đề đó.
Về cơ bản, ASGI sử dụng coroutine trong Python để xử lý từng request, và điều phối thông qua một event loop. Các sự kiện gửi và nhận dữ liệu qua socket được xử lý theo kiểu non-blocking.
Khi server nhận được một gói tin đến, nó sẽ emit một sự kiện tới ứng dụng. Ngược lại, khi ứng dụng cần gửi response (bao gồm cả trường hợp streaming response), nó sẽ emit một sự kiện để server xử lý và truyền đi qua socket.
Cho bạn nào chưa biết thì event loop là một cơ chế dùng để quản lý và điều phối các đơn vị xử lý dựa trên các sự kiện bất đồng bộ, chẳng hạn như các tác vụ liên quan đến mạng (networking) hoặc hệ thống tập tin (filesystem).
Trong demo này, mình đã cài đặt cả hai giao thức: HTTP/1.1 và HTTP/2. Web server được thiết kế sử dụng nhiều process để tận dụng hiệu quả hơn tài nguyên trên các hệ thống đa nhân (multi-core CPU), nhằm cải thiện hiệu suất xử lý.
Bây giờ, chúng ta sẽ đi vào phần cài đặt chi tiết. Lưu ý: trong bài viết này, mình chỉ trích dẫn những đoạn mã (snippet) quan trọng để giải thích các khía cạnh kỹ thuật chính. Nếu bạn muốn xem đầy đủ mã nguồn, hãy tham khảo tại repo sau: python-webserver-tutorial
Bước đầu tiên: Tạo socket lắng nghe
self.sock = socket.create_server(
address=(host, port),
family=socket.AF_INET,
backlog=self.backlog or 4096,
reuse_port=True,
)
os.set_inheritable(self.sock.fileno(), True)
Hàm create_server là một utility function dùng để tạo một socket lắng nghe cho server. Tham số reuse_port là một flag cho phép nhiều socket cùng bind vào một cổng.
Để hiểu rõ hơn về cách hoạt động của cơ chế này, hãy cùng xem câu lệnh ngay sau đó.
os.set_inheritable(self.sock.fileno(), True)
cho phép socket này được kế thừa bởi các process con. Nhờ cờ reuse_port, các bản sao của socket này có thể bind vào cùng một cổng.
Đây là một cơ chế quan trọng trong mô hình master-worker của web server, nơi process master tạo socket lắng nghe và các process worker kế thừa socket đó để cùng xử lý các kết nối đến.
Process master sẽ tạo socket lắng nghe và sau đó khởi tạo các process con. Các process con này sẽ kế thừa socket từ process cha, và tất nhiên, tất cả sẽ cùng lắng nghe trên một cổng duy nhất.
Vậy điều gì sẽ xảy ra khi có một gói tin được gửi đến cổng đó?
Hệ điều hành Unix-like như Linux có cơ chế cân bằng tải nội bộ để xử lý tình huống này.
Các bạn còn nhớ syscall quen thuộc này chứ?
while 1 {
connect_socket = accept(listen_socket)
}
accept sẽ block cho đến khi có một kết nối đến socket — tức là khi có request gửi tới server.
Vậy socket của process con nào sẽ được chọn để xử lý kết nối này?
Kernel sẽ tạo một giá trị hash dựa trên IP và port của client, sau đó chọn process dựa trên giá trị hash đó. Nhờ cơ chế này, các request đến từ cùng một client sẽ thường được xử lý bởi cùng một process, tạo ra hiệu ứng session sticky một cách tự nhiên.
Vậy là chúng ta đã hoàn tất phần thiết lập server và các worker. Tiếp theo, chúng ta sẽ chuyển sang phần xử lý từng socket kết nối khi có request từ client.
Ở bước này, thay vì xử lý song song bằng đa luồng, chúng ta sẽ xử lý theo kiểu bất đồng bộ với event loop.
Module asyncio của Python đã chuẩn hoá cách định nghĩa các giao thức mạng, và cá nhân mình thấy nó cực kỳ hiệu quả cũng như giúp code trở nên rất clean.
Trong asyncio, việc định nghĩa giao thức mạng được chia thành hai phần, tương ứng với hai class: BaseTransport và BaseProtocol.
BaseTransport đảm nhận vai trò truyền các gói tin trên mạng — nó trả lời cho câu hỏi: Làm sao các byte dữ liệu được truyền đi? Nói cách khác, BaseTransport hoạt động ở tầng 4 (Transport layer) trong mô hình OSI. Trong asyncio, có nhiều loại transport như TCP, UDP, WriteTransport, ReadTransport,...
BaseProtocol chịu trách nhiệm định nghĩa các quy tắc giao tiếp giữa hai bên trong một kết nối — tức là hoạt động ở tầng 7 (Application layer). Một số giao thức tiêu biểu ở tầng này bao gồm: HTTP, JSON-RPC, gRPC, IMAP, POP3,...
Trong trường hợp này, chúng ta sẽ sử dụng Transport — lớp mặc định của asyncio dành cho kết nối TCP — và tự viết hai protocol riêng biệt cho HTTP/1.1 và HTTP/2, dựa trên class Protocol của asyncio.
Ngoài ra, để có thể xây dựng một web server tương thích với các framework hiện đại như FastAPI, các bạn cũng nên tìm hiểu thêm về ASGI. Tài liệu chính thức có thể tham khảo tại đây: https://asgi.readthedocs.io/en/latest/index.html
Đầu tiên, chúng ta sẽ implement H1Protocol để xử lý giao thức HTTP/1.1.
class H1Protocol(asyncio.Protocol):
def __init__(self, app):
# app là một async callable theo chuẩn ASGI dùng để xử lí từng request
self.app = app
# Với HTTP/1.1 mình sử dụng llhttp, một HTTP parser hiệu suât cao của
# NodeJS để parse request thông qua httptools.
# parser này nhận đầu vào là một object với các method như on_url, on_body, on_header, on_header_complete.
# Những method này sẽ được gọi bên trong parser trong quá trình parse
# message.Trong ví dụ này, để đơn giản, mình kết hợp nó với H1Protocol
self.parser = httptools.HttpRequestParser(self) #type: ignore
# task được sử dụng để xử lí request từ khi nhận được
# đến khi app gửi lại response
self.task: asyncio.Task = None
# event được sử dụng để notify cho ứng dụng khi có dữ liệu gửi đến socket,
# từ đó ứng dụng có thể đọc
self.message_event = asyncio.Event()
def connection_lost(self, exc: Exception | None) -> None:
if self.task and not self.task.done():
self.task.cancel('Lost connection')
def data_received(self, data: bytes) -> None:
self.parser.feed_data(data)
def on_headers_complete(self):
self.method = self.parser.get_method()
self.http_version = self.parser.get_http_version()
scope = self.make_scope()
cycle = RequestLifeCycle(scope, self.app, self.transport.write, self.message_event)
self.cycle = cycle
# tạo task để xử lí request theo RequestLifeCycle, việc này
# tương đương với việc tạo thread để xử lí từng request trong
# kiến trúc đa luồng
task = asyncio.create_task(cycle.run())
task.add_done_callback(self.on_done)
self.task = task
def on_body(self, body: bytes):
# khi socket có dữ liệu và sẵn sàng để đọc
self.cycle.body = body #type: ignore
self.message_event.set()
# ---------------- Extra ---------------------
def on_done(self, _: asyncio.Task):
# giữ lại connection để tận dụng cho các transaction về sau
if not self.parser.should_keep_alive():
self.transport.close()
Tiếp theo, chúng ta sẽ viết tiếp H2Protocol cho HTTP/2
class H2Protocol(asyncio.Protocol):
# do trên cùng 1 connection xử lí nhiều stream HTTP2 (transaction)
# nên chúng ta cần 1 table để lưu thông tin các stream (RequestContext)
# với key là stream_id
streams: t.Dict[int, "RequestContext"]
...
def connection_made(self, transport: asyncio.BaseTransport) -> None:
...
self.conn.initiate_connection()
# bất kì một action gì trên giao thức HTTP/2 cũng sẽ cần 1 ACK,
# bao gồm các việc tạo kết nối thành công, điều này là
# không cần thiết trên HTTP/1.1
self.transport.write(self.conn.data_to_send())
...
def data_received(self, data: bytes) -> None:
# các frame được đóng gói và gửi đi bởi client sẽ được nhận
# bởi server và xử lí ở đây. Chúng ta sẽ feed từng đoạn dữ liệu
# vào parser của thư viện h2 và nhận ra các readable events.
# mỗi event đều có type như:
# - RequestReceived: nhận đầy đủ header, tương đương với on_header_complete
# - DataReceived: nhận dữ liệu về HTTP body, tương đương on_body
# - StreamEnded: kết thúc 1 stream (transaction), lưu ý: không tương đương với close connection
# - ...
try:
events = self.conn.receive_data(data)
except h2.exceptions.ProtocolError:
self.transport.write(self.conn.data_to_send())
self.transport.close()
else:
self.transport.write(self.conn.data_to_send())
for event in events:
if isinstance(event, h2.events.RequestReceived):
self.request_received(event.headers, t.cast(int, event.stream_id))
elif isinstance(event, h2.events.DataReceived):
self.receive_data(event.data, t.cast(int, event.stream_id))
elif isinstance(event, h2.events.StreamEnded):
self.stream_complete(t.cast(int, event.stream_id))
elif isinstance(event, h2.events.ConnectionTerminated):
self.transport.close()
elif isinstance(event, h2.events.StreamReset):
self.stream_reset(t.cast(int, event.stream_id))
elif isinstance(event, h2.events.WindowUpdated):
self.window_updated(event.stream_id, event.delta)
elif isinstance(event, h2.events.RemoteSettingsChanged):
if h2.settings.SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
self.window_updated(None, 0)
self.transport.write(self.conn.data_to_send())
def request_received(self, headers: t.List[hpack.HeaderTuple] | None, stream_id: int):
...
# Giống với H1Protocol, chúng ta sẽ bắt đầu tạo và chạy ứng dụng để xử
# lí request ở đây.
message_event = asyncio.Event()
scope = self.make_scope()
cycle = RequestLifeCycle(
scope=scope,
app=self.app,
message_event=message_event,
)
request_context = RequestContext(stream_id, cycle, message_event, self)
self.streams[stream_id] = request_context
ctx.set(request_context)
task = asyncio.create_task(cycle.run())
request_context.task = task
def receive_data(self, data: bytes | None, stream_id: int):
if not data:
return
# khi có dữ liệu của request body, notify event để ứng dụng có thể xử lí
request_context = self.streams[stream_id]
request_context.cycle.body = data
request_context.message_event.set()
def stream_complete(self, stream_id: int):
# clean up stream
self.streams.pop(stream_id)
def stream_reset(self, stream_id: int):
request_context = self.streams[stream_id]
if request_context.flow_control:
request_context.flow_control.cancel()
request_context.flow_control = None
def window_updated(self, stream_id: int | None, delta: int | None):
# đây là một phần quan trọng, trong HTTP/2. HTTP/2 sẽ duy trì một cửa
# sổ để thông báo cho ứng dụng khi nào có thể ghi dữ liệu response vào
# socket. Chúng ta sẽ sử dụng một asyncio.Future để notify cho ứng dụng
# khi nào được ghi dữ liệu vào socket và được ghi bao nhiêu bytes
# (trong hàm send_data bên dưới)
if stream_id and stream_id in self.streams:
request_context = self.streams[stream_id]
f = request_context.flow_control
f.set_result(delta) #type: ignore
elif not stream_id:
for context in self.streams.values():
if context.flow_control:
context.flow_control.set_result(delta)
context.flow_control = None
async def send_data(self, data: bytes, stream_id: int):
while data:
while self.conn.local_flow_control_window(stream_id) < 1:
try:
# chờ cho đến. khi có thể ghi được dữ liệu vào socket
await self.wait_for_flow_control(stream_id)
except asyncio.CancelledError:
return data
chunk_size = min(
self.conn.local_flow_control_window(stream_id),
len(data),
self.conn.max_outbound_frame_size,
)
try:
self.conn.send_data(
stream_id,
data[:chunk_size],
end_stream=(chunk_size == len(data)),
)
except (h2.exceptions.StreamClosedError, h2.exceptions.ProtocolError):
break
self.transport.write(self.conn.data_to_send())
data = data[chunk_size:]
async def wait_for_flow_control(self, stream_id):
f = asyncio.Future()
self.streams[stream_id].flow_control = f
await f
Mọi thứ đã xong, giờ chúng ta sẽ kết hợp hai protocol này để tạo thành một web server hoàn chỉnh và chạy thử benchmark xem sự khác nhau giữa 2 protocol.
class Server:
def run(self):
"""Important!!! Only run in the main process"""
self.sock = self.config.sock
host, port = self.config.socket.getsockname() #type: ignore
self.logger.info(f"Server is starting at {host}:{port}")
if self.config.workers is None:
# Mặc định, event loop của Python hoạt động kém hiệu quả,
# do đó mình sử dụng libuv (1 event loop được sử dụng trong NodeJS)
# để có được hiệu suất cao hơn.
import uvloop
uvloop.install()
asyncio.run(self.serve(self.config.socket))
return
from .worker import Worker
for _ in range(self.config.workers):
worker = Worker(self.app_factory, self.config)
worker.run()
self.workers.append(worker)
for worker in self.workers:
worker.join()
async def serve(self, sock: socket.socket):
self.logger.info(f"Worker {self.pid} is running...")
loop = asyncio.get_running_loop()
_server = await loop.create_server(
# mỗi khi một client tạo kết nối tới server, server sẽ tạo 1 protocol
# bằng hàm protocol_factory
protocol_factory=self.create_protocol,
sock=sock,
# với HTTP/2, chúng ta bắt buộc phải sử dụng SSL, điều này đang dần
# trở nên phổ biến ở các trình duyệt web và các webserver.
ssl=self.config.get_ssl(),
)
await _server.serve_forever()
def create_protocol(
self,
_: asyncio.AbstractEventLoop | None = None,
) -> asyncio.Protocol:
if self.config.enable_h2:
return H2Protocol(self.app)
return H1Protocol(self.app)
Để thực hiện benchmark, mình đã sử dụng hai công cụ khác nhau:
Ứng dụng được dùng để kiểm tra là một app FastAPI cơ bản — nhìn chung là đủ để đánh giá hiệu suất truyền tải giữa hai giao thức.
app = FastAPI()
@app.get('/')
async def index():
return {
"name": "nkthanh.dev",
"age": 18,
"address": "Hanoi, Vietnam"
}
Tham số mình benchmark như sau
- Thời gian: 10 seconds
- Số lượng connection: 8
- Số lượng client: 8
Đây là command các bạn có thể dùng để benchmark với tham số trên
wrk -t 8 -c 8 -d 10s https://127.0.0.1:5001
h2load --threads 8 --clients 8 --duration 10 https://127.0.0.1:5001
Đây là kết quả benchmark trên máy tính của mình: Macbook Pro M2, 16Gi RAM, 8 cores
Kết quả cho thấy hiệu suất truyền tải của HTTP/2 cao hơn khoảng 50% so với HTTP/1.1 — tất nhiên là trong điều kiện cài đặt cơ bản như trong thử nghiệm này.