Async Requests with SerpApi and Python

Async Requests with SerpApi and Python

Intro

Async requests are necessary when you need to get a lot of data quickly or not in real time.

This blog post is about:

The subject of test: YouTube Search Engine Results API.

The test includes: 500 YouTube search requests, and extraction of the data.

What is Async parameter

We have a async parameter that tells SerpApi not to wait for the search to be completed thus allows to send more requests faster. The search will be sent and processed on the SerpApi backend.

After all requests have been sent, the data will be extracted from the Search Archive API and checked if the search succeeded or not.

📌Note: This blog post does not cover multithreading.

Time Comparison

Time was recorded using $ time python <file.py>:

TypeSync requestsAsync requests (no pagination)% difference
real13m 43.311s5m 3.663s+36.88% increase
user0m 5.942s0m 11.222s-52.95% decrease
sys0m 0.813s0m 1.191s-68.26% decrease

image

SerpApi Sync Requests

from serpapi import YoutubeSearch
import os, re, json

# shortened for the sake of not making code very long
queries = [
    'burly',
    'silk',
    'monkey',
    'abortive',
    'hot'
]

data = []

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict

    if 'error' in results:
        print(results['error'])
        break

    for result in results.get('video_results', []):
        data.append({
            'title': result.get('title'),
            'link': result.get('link'),
            'channel': result.get('channel').get('name'),
        })

print(json.dumps(data, indent=2, ensure_ascii=False))

Sync Code Explanation

Import libraries:

from serpapi import YoutubeSearch
import os, re, json

Create a list of search queries you want to search:

queries = [
    'burly',
    'silk',
    'monkey',
    'abortive',
    'hot'
]

Create a temporary list that will store extracted data:

data = []

Add a for loop to iterate over all queries, create SerpApi YouTube search parameters, and pass them YoutubeSearch which will make a request to SerpApi:

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict

Check for 'errors', iterate over video results and extract needed data to the temporary list, and print it:

if 'error' in results:
        print(results['error'])
        break

for result in results.get('video_results', []):
    data.append({
        'title': result.get('title'),
        'link': result.get('link'),
        'channel': result.get('channel').get('name'),
    })

print(json.dumps(data, indent=2, ensure_ascii=False))

SerpApi Async Batch Requests without Pagination

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit)
from queue import Queue
import os, re, json

queries = [
    'burly',
    'silk',
    'monkey',
    'abortive',
    'hot'
]

search_queue = Queue()

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
        'async': True,                    # async batch requests
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict

    if 'error' in results:
        print(results['error'])
        break

    print(f"add search to the queue with ID: {results['search_metadata']}")
    search_queue.put(results)


data = []

while not search_queue.empty():
    result = search_queue.get()
    search_id = result['search_metadata']['id']

    print(f'Get search from archive: {search_id}')
    search_archived = search.get_search_archive(search_id)

    print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

    if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
        for result in search_archived.get('video_results', []):
            data.append({
                'title': result.get('title'),
                'link': result.get('link'),
                'channel': result.get('channel').get('name'),
            })
    else:
        print(f'Requeue search: {search_id}')
        search_queue.put(result)

print(json.dumps(data, indent=2))
print('all searches completed')

Async Batch Requests without Pagination Explanation

Same as before, import libraries (a few more):

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit)
from queue import Queue
import os, re, json

Create a list of search queries you want to search:

queries = [
    'burly',
    'silk',
    'monkey',
    'abortive',
    'hot'
]

Create a Queue to store all the requests that have been sent to SerpApi:

search_queue = Queue()

Iterate over all queries, create SerpApi YouTube search parameters with 'async': True parameter present. Check for errors and put() search in the queue:

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
        'async': True,                    # async batch requests
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict

    if 'error' in results:
        print(results['error'])
        break

    print(f"add search to the queue with ID: {results['search_metadata']}")
    search_queue.put(results)

Create a temporary list that will be used to store extracted data from the search archive API:

data = []

Iterate over all queue until it's empty() and get the data from search archive by accessing search ID:

while not search_queue.empty():
    result = search_queue.get()
    search_id = result['search_metadata']['id']

    print(f'Get search from archive: {search_id}')
    search_archived = search.get_search_archive(search_id)

    print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

Check if the search is either cached or succeeded, if so, extract needed data, otherwise we need to requeue the result:

if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
        for result in search_archived.get('video_results', []):
            data.append({
                'title': result.get('title'),
                'link': result.get('link'),
                'channel': result.get('channel').get('name'),
            })
    else:
        print(f'Requeue search: {search_id}')
        search_queue.put(result)

print(json.dumps(data, indent=2))
print('all searches completed')

That's basically it 🙂

To sum-up:

  1. Send all the requests and store them in the queue.
  2. When all requests have been sent, grab them one by one from the queue until the queue is empty.
  3. Extract search ID. The search ID will not be random, it will be the right one from the queue it was earlier stored.
  4. Check if the search is succeeded and extract the data. If not, requeue it.

What comes next

In the next blog post that will be specifically about Async requests using SerpApi, we'll cover the following:

  • how to speed up async requests i.e. multithreading Queue.
  • how to add pagination with async parameter.

Join us on Twitter | YouTube