I've been load testing quite a bit recently. So far I've been impressed by the
wrk
project's use of sockets to pour on a ridiculously high amount of
load. When I had stretched the system under load past its limits, finding
the fortio
project was a breath of fresh air, in particular because of
the inclusion of a -qps
flag to control the rate (and duration) of a load
test.
So what does this have to do with requests
? These load testing tools (and
many others) are great if you know what to expect of your failures, but I was
load testing to debug a problem. I wanted a way to keep all failed
responses and inspect them after the fact; I wanted an escape hatch into
the event loop firing off requests.
Motivated by wrk
, I wanted to utilize a large-ish thread pool all sharing
a connection pool. A shared connection pool enables testing of re-used TCP
sockets and can generate more load since TLS negotiation can be re-used. Using
the select
package and low-level socket objects to accomplish this
task likely would've given maximal throughput but the sheer amount of work
required was not worth it, especially because a modest few hundred requests
per second was all I was looking for (i.e. a controlled but steady flow of
traffic). So I sought to put the über-popular requests
1 package to the task.
Contents
Zero Modification Approach
Based on a very popular StackOverflow question, I knew
requests.Session()
wasn't "really" threadsafe, but I figured I'd give it
a try anyhow. By using a requests
adapter, a connection pool size can be
specified
import requests
def connection_pool(size):
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=size, pool_maxsize=size
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
To take it for a test drive, I spun up 4 threads to share a connection pool of size 2 and at first things looked just fine. Running
import threading
URL = "https://www.google.com/"
def make_request(i, pool):
response = pool.get(URL)
print(f"{i} Status Code: {response.status_code}")
def spawn_threads(pool_size, thread_count):
pool = connection_pool(pool_size)
threads = []
for i in range(thread_count):
thread = threading.Thread(target=make_request, args=(i, pool))
thread.start()
threads.append(thread)
return threads
def main():
threads = spawn_threads(2, 4)
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
all 4 threads seem to do their work without interfering with one another
1 Status Code: 200
2 Status Code: 200
0 Status Code: 200
3 Status Code: 200
Zero Correctness
To be sure, I added a response hook to actually verify the socket being used on each request
import socket
def make_response_hook(i):
def response_hook(response, **unused_kwargs):
sock = socket.fromfd(
response.raw.fileno(), socket.AF_INET, socket.SOCK_STREAM
)
client_ip, port = sock.getsockname()
print(f"{i} Socket client address: {client_ip}:{port}")
return response_hook
def make_request(i, pool):
response = pool.get(URL, hooks={"response": make_response_hook(i)})
print(f"{i} Status Code: {response.status_code}")
After running this I confirmed that the threads were not sharing a pool of (at most) 2 open TCP sockets:
0 Socket client address: 192.168.7.31:50784
0 Status Code: 200
3 Socket client address: 192.168.7.31:50786
3 Status Code: 200
2 Socket client address: 192.168.7.31:50787
1 Socket client address: 192.168.7.31:50785
1 Status Code: 200
2 Status Code: 200
To me, this indicates either requests.Session
directly or some component
(e.g. the underlying urllib3
package) is using thread local storage
for parts of the connection pool2.
However, using a global lock on usage of the connection pool
LOCK = threading.Lock()
def make_request(i, pool):
with LOCK:
response = pool.get(URL, hooks={"response": make_response_hook(i)})
print(f"{i} Status Code: {response.status_code}")
it is clear that the same socket is used for all requests
0 Socket client address: 192.168.7.31:51143
0 Status Code: 200
1 Socket client address: 192.168.7.31:51143
1 Status Code: 200
2 Socket client address: 192.168.7.31:51143
2 Status Code: 200
3 Socket client address: 192.168.7.31:51143
3 Status Code: 200
so there is some level of sharing across threads.
Queue and Per-thread Pool
Since I couldn't directly rely on requests.Session()
as a multithreaded
pool I sought to create one. I briefly looked into the
requests_toolbelt.threaded.pool
module but it also lacks the escape
hatch I was looking for.
Using a global lock as above completely defeats the point of concurrent
workers, so this is not an option. Requiring each thread to maintain its own
pool may be unnecessarily restrictive, but due to the state sharing issue each
of the N
distinct connections will need an exclusive lock.
In order to simulate N
locks while maintaining some modicum of throughput,
a queue.Queue()
can be used:
import queue
def threadsafe_pool(size):
id_queue = queue.Queue(maxsize=size)
connections = {}
for i in range(size):
id_queue.put(i)
connections[i] = connection_pool(1)
return connections, id_queue
Rather than putting the requests.Session()
objects directly into the queue,
a read-only dictionary can be shared across all threads. Putting this to use,
the connections and the queue can be passed to the make_request()
thread
target. The worker can make a (blocking) get()
for a connection ID from the
queue, keep it until the request has completed and place the connection ID back
on the queue for re-use:
def make_request(i, connections, id_queue):
connection_id = id_queue.get()
connection = connections[connection_id]
response = connection.get(URL, hooks={"response": make_response_hook(i)})
id_queue.task_done()
id_queue.put(connection_id)
print(f"{i} Status Code: {response.status_code}")
Verification
To see that this works as expected, the spawn_threads()
helper can be
updated to pass along the connections and queue
def spawn_threads(pool_size, thread_count):
connections, id_queue = threadsafe_pool(pool_size)
threads = []
for i in range(thread_count):
thread = threading.Thread(
target=make_request, args=(i, connections, id_queue)
)
thread.start()
threads.append(thread)
return threads
and running the code shows exactly 2 sockets were used (and re-used)
1 Socket client address: 192.168.7.31:51426
1 Status Code: 200
0 Socket client address: 192.168.7.31:51425
0 Status Code: 200
2 Socket client address: 192.168.7.31:51426
2 Status Code: 200
3 Socket client address: 192.168.7.31:51425
3 Status Code: 200
Turning up the difficulty level a bit, the sockets used can be tracked
(utilizing the atomicity of list.append()
in Python)
SOCKET_PAIRS = []
def make_response_hook(i):
def response_hook(response, **unused_kwargs):
sock = socket.fromfd(
response.raw.fileno(), socket.AF_INET, socket.SOCK_STREAM
)
SOCKET_PAIRS.append(sock.getsockname())
return response_hook
Using this, a histogram of the sockets
import collections
def main():
threads = spawn_threads(5, 256)
for thread in threads:
thread.join()
histogram = collections.Counter(SOCKET_PAIRS)
for (ip_, port), count in histogram.most_common(len(histogram)):
print(f"{ip_}:{port} -> {count}")
shows that only 5 sockets were used to service all 256 requests
4 Status Code: 200
0 Status Code: 200
...
255 Status Code: 200
192.168.7.31:51625 -> 52
192.168.7.31:51624 -> 52
192.168.7.31:51621 -> 51
192.168.7.31:51623 -> 51
192.168.7.31:51622 -> 50