Sending 45,000 Http Requests with Python/Django in 2021

Photo by Faisal on Unsplash

In developing soundfiles.fm, I’m currently facing an interesting challenge. The site need to send requests to servers to obtain their (potentially) updated RSS feeds. A lot of RSS feeds. The number comes out to about 45,000 requests that need to be sent. Naturally, doing such a thing with synchronous requests on a single thread will take a very long time… So how can we speed things up?

Now, if you’re reading this for your own personal solution, If you don’t mind using the httplib library and you only need to send HEAD requests (not download anything), I would point you towards this solution on Stack overflow. However, I need to download and parse XML!

There is another potential solution to this problem: gevent which makes it possible to launch http requests asynchronously. Sounds nice – I haven’t tried this. In general I want to keep my dependancies light and use the standard requests module which seems to have a larger community and offers more documentation and support.

Ultimately I went with Python’s Concurrent Futures and its ThreadPoolExecutor!

Let’s take a look at how I’ve used this. Note, I’m creating a django admin command, with a handle function like:

def handle(self):
	podcasts = Podcast.objects.all()
	r = None
	count = 0
	start_time = time.time()
        
	with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor:
		res = [executor.submit(self.work, podcast) for podcast in podcasts]
		for future in concurrent.futures.as_completed(res):
		count += 1
		if count % 100 == 0:
	    		print(f"{count} requests completed in {time.time() - start_time}s")


max_workers here is the maximum number of tasks that can run at a single time. It defaults to your number of cores x5

My work method makes the request, and parses the xml, and submits the appropriate items to the database.

def work(self, podcast):
            headers={
            'user-agent': 
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36'}
            
            r = None          

            try:
                r = requests.request(method='GET', url, timeout=10.1, headers=headers)
            except Exception as e:
		# do stuff

            if r.ok:
                parser = MyParserParser(r.text)
                self.update_podcast_channel(podcast, parser)
                self.update_episodes(podcast, parser)

	

self.update_podcast_channel(…) and self.update_episodes(…) parse the xml feeds and deliver them to the database.


Now here’s another issue. If we have multiple threads hitting the database… What happens? As it turns out, the ORM is threadsafe! See this Stack overflow question The database? Well if we’re using sqlite3, there are steps we can take to make it afford concurrency. However, it’s not clear to me that such attempts will enhance performance compared to using a single thread, especially considering the fact that this is python (hence, we have the infamous GIL). So to make things simple, I propose a queue and a producer/consumer model. Let’s make a thread that looks like:


DB_BUF_SIZE = 1000 # max number of DB objects allowed to sit in Queue
object_update_create_que = queue.Queue(DB_BUF_SIZE)

class ObjectUpdateCreateThread(threading.Thread):
    """
        Handle saving items to the db
    """
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        super(ObjectUpdateCreateThread, self).__init__()
        self.target = target
        self._stopevent = threading.Event()
        self.name = name
        return

    def run(self):
        while (not self._stopevent.isSet()) or (not object_update_create_que.empty()):
            if not object_update_create_que.empty():
                ob = object_update_create_que.get()
                try:
                    ob.save()
                except Exception as e:
                    pass
                else:
                    pass
            else:
                time.sleep(1)

    def join(self, timeout=None):
        """ stop """
        self._stopevent.set()
        threading.Thread.join(self, timeout)

Note: I’m doing individual db inserts with object.save() because I like the ability to feed in different types of objects into this thread and call save(), If this polymorphism advantage is irrelavant for you, I suggest using bulk_create() instead. Now we can start this thread in our handle function like so:

def handle(self):    
	r = None
	count = 0
	start_time = time.time()
        t3 = ObjectUpdateCreateThread(name ='object_update_create_consumer')
        t3.start()
	with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor:
		res = [executor.submit(self.work, podcast) for podcast in podcasts]
		for future in concurrent.futures.as_completed(res):
		count += 1
		if count % 100 == 0:
	    		print(f"{count} requests completed in {time.time() - start_time}s")	
	t3.join()

Now when we add items to the queue:

#1) Parse out podcast object from rss feed
#2) Call this function to add object to the queue
object_update_create_que.put(podcast, block=True)

Now, there was an issue with concurrent.futures, whereby .as_completed would not release its memory after completing. This issue is fixed, but there is still the problem that concurrent.futures doesn’t appear to wait for the callback as_completed to terminate, resulting in a massive use of RAM for large data like this.

One solution to this problem is to use bounded_pool_executor instead. I haven’t tried that approach, as I prefer to opt for main-stream python libraries. Instead I simply performed the above operation in chunks like so:

def handle(self):    
	r = None
	count = 0
	cur_index = 0
        chunk_amt = 100
	start_time = time.time()
        t3 = ObjectUpdateCreateThread(name ='object_update_create_consumer')
        t3.start()
        while cur_index < len(podcasts):
            cur_list = podcasts[cur_index:cur_index + chunk_amt]
            cur_index = cur_index + chunk_amt
		with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor:
			res = [executor.submit(self.work, podcast) for podcast in cur_list]
			for future in concurrent.futures.as_completed(res):
			count += 1
			if count % 100 == 0:
	    			print(f"{count} requests completed in {time.time() - start_time}s")
	t3.join()

Using this code, I’m able to retreive and parse 1,000 RSS feeds in about 300 seconds on my small virtual machine, running 2 3.6ghz cores and 4GB of RAM. Doing the same with a single thread took me almost an hour. Admittedly, I haven’t tested the full 45,000 yet, and I hope to do that as soon as this code gets to production.

That’s all I have for now! If you’ve found this information helpful, or have a more effecient solution please let me know! And hopefully I can find the time to write more posts about my Django adventures.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: