Creating N1QL Labelled Image Database using Couchbase, FastAPI, and SerpApi

Creating N1QL Labelled Image Database using Couchbase, FastAPI, and SerpApi

This is a part of the series of blog posts related to Artificial Intelligence Implementation. If you are interested in the background of the story or how it goes:

On the previous weeks we explored how to create your own image dataset using SerpApi's Google Images Scraper API automatically, and used those images to automatically train a network using simple command object to be passed to FastAPI. This week we will improve the database creation method by using Couchbase as a Storage Server, and will show how to fetch a random element from a given subset.

Couchbase Configuration

We will be needing Couchbase Community server, and Python SDK for Couchbase for this project. You can find relevant information on Couchbase Docs.

For this tutorial we will be using CouchBase version 7.1.0 for Debian 11: couchbase You can access it from this link

Once you install it, define username and password from the server destination (http://kagermanov:8091 in my case), you will be greeted with such dashboard: server This means you have successfully deployed the server. For those of you who want to stop the background process on Linux, you can type sudo systemctl stop couchbase-server to stop the server at your will.

Head to Buckets on the left hand menu and add a new bucket called images from ADD BUCKET button: add_bucket Make sure to choose a ram amount that won't force your local system into frenzy. I

Now, You need to add a scope and collection within this bucket via Scopes & Collections button: scopes_and_collections

Add a scope named image, and within it, a collection named labelled_image: add_scope add_collection

Next, head to playground where you can do a manual query, and run the following:

CREATE PRIMARY INDEX ON `images`;

manual_query

Lastly, make sure you install the Couchbase Python SDK via pip, and everything is set for our server.

Automatic Image Collector

Let's create a seperate file within our project called add_couchbase.py. This will be the refactored version of add.py which was automatically gathering images with a certain query. Here are the requirements for it:

from couchbase.options import (ClusterOptions, ClusterTimeoutOptions, QueryOptions)
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from serpapi import GoogleSearch
from datetime import timedelta
from pydantic import BaseModel
import mimetypes
import requests
import random
import uuid
import urllib
import base64

To break them down for their specific usecase: |Requirement|Explanation| |---|---| |couchbase.options.ClusterOptions|For defining timeout and authentication options of storage cluster connection| |couchbase.options.ClusterTimeoutOptions|For maximum time allowed to perform an operation in the cluster| |couchbase.options.QueryOptions|For giving positional parameters within a N1QL query to Couchbase Server| |couchbase.auth.PasswordAuthenticator|For authenticating connection with the storage server| |couchbase.cluster.Cluster|For the connection to the storage server| |serpapi.GoogleSearch|For Scraping links of images from Google Images| |datetime.timedelta|For defining a time interval| |pydantic.BaseModel|For defining objects to be passed as parameters| |mimetypes|For guessing the extension of the image| |requests|For making GET requests to image links| |random|For creating a random number within an interval to fetch random image from storage database| |uuid|For creating a unique identifier for each image| |base64|For converting image byte data to base64 for storing purposes|

Let's define a pydantic base model for an image to be uploaded to the Couchbase Storage Server:

class Document(BaseModel):
    type: str = "image"
    id: str
    classification: str
    base64: str
    uri: str

id will be a unique uuid of the image for image to be called manually in the future. classification is the query given to SerpApi's Engine. base64 will be the the string representation of the image to be recreated within a training session. uri will represent the url we fetch the image from.

Let's initialize our Storage Database in a class:

class ImagesDataBase:
    def __init__(self):
        username = "<Your Couchbase Username>"
        password = "<Your Couchbase Password>"
        bucket_name = "images"
        auth = PasswordAuthenticator(
                username,
                password
        )
        timeout_opts = ClusterTimeoutOptions(kv_timeout=timedelta(seconds=10))
        self.cluster = Cluster('couchbase://localhost', ClusterOptions(auth, timeout_options=timeout_opts))
        self.cluster.wait_until_ready(timedelta(seconds=5))
        cb = self.cluster.bucket(bucket_name)
        self.cb_coll = cb.scope("image").collection("labelled_image")

Here's the function for inserting an image to the Storage server with a unique id:

    def insert_document(self, doc: Document):
        doc = doc.dict()
        print("\nInsert CAS: ")
        try:
            key = doc["type"] + "_" + str(doc["id"])
            result = self.cb_coll.insert(key, doc)
            print(result.cas)
        except Exception as e:
            print(e)

doc in this context represents the Document object we store the image inside to be uploaded to the Couchbase Server.

Let's have another function to call an image by its unique key we will generate. This function will not be used in the context of this blog post.

    def get_image_by_key(self, key):
        print("\nGet Result: ")
        try:
            result = self.cb_coll.get(key)
            print(result.content_as[str])
        except Exception as e:
            print(e)

Next, we need to build a function in which helps us upload only the unique images. The differentiator will be the unique link within the scope of a classsification:

    def check_if_it_exists(self, link, cs):
        try:
            sql_query = 'SELECT uri FROM `images`.image.labelled_image WHERE classification = $1 AND uri = $2'
            row_iter = self.cluster.query(
                sql_query,
                QueryOptions(positional_parameters=[cs, link]))
            for row in row_iter:
                return row
        except Exception as e:
            print(e)

This function takes link, which is the link to the image, and cs, which is the classifier of the image. If an image with the same link does not already exist within our storage, it returns None. The reason we don't query the entire database for the uniqueness is simple. First, it wouldn't be efficient in the long run. Second, same images could have different classifications. Imagine the logo of Apple, the company. It is also Apple, the fruit. If we are classifying between Apple Logo and Blackberry Logo, and if the image is in Apple classification only, there is a chance that the model could fail to interpret. This approach might create unnoticable duplicate images with different classifications But in the long run it would prove useful. Here's an example of the following manual query that we know already exists in the Couchbase Server:

SELECT uri FROM `images`.image.labelled_image WHERE classification = 'Pomegrenate' AND uri = 'https://i0.wp.com/post.healthline.com/wp-content/uploads/2022/02/pomegranate-seeds-fruit-1296x728-header.jpg?w=1155&h=1528'

uri_found

Now that we have the uniqueness out of the way, let us focus on randomness of a given query. This part will also not be used in this blog post, but for future purposes. The function will give the number of images in a subset of classifications. It is useful for determening a random number within range of maximum number size.

    def get_max_image_size(self, cs):
        try:
            sql_query = 'SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = $1'
            row_iter = self.cluster.query(
                sql_query,
                QueryOptions(positional_parameters=[cs]))
            for row in row_iter:
                return row
        except Exception as e:
            print(e)

Here's another example query for the size of Orange Images in the storage server:

SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Orange'

orange_count

Now, let's define a function that randomly picks a number for us. For this we will define a random integer outside the scope of the query we will feed. But we will define this random integer with the previous function we constructed:

    def random_lookup_by_classification(self, cs):
        max_size = self.get_max_image_size(cs)['max_size']
        random_number = random.randint(0,max_size - 1)
        print("\nLookup Result: ")
        try:
            sql_query = 'SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = $1)[$2]'
            row_iter = self.cluster.query(
                sql_query,
                QueryOptions(positional_parameters=[cs, random_number]))
            for row in row_iter:
                return row
        except Exception as e:
            print(e)

Here is an example query with the random number 37, which is between 0 and 103(from 104 images of Orange):

SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = 'Orange')[37]

random_found

We have eveything we need for this week and the coming week's blog post now. Let's redefine what we have already defined. A pydantic model for the Query object we pass to the endpoint:

class Query(BaseModel):
        google_domain: str = "google.com"
        num: str = "100"
        ijn: str = "0"
        q: str
        api_key: str ## You may replace this with `api_key: str = "Your API Key"`

Again, the API key mentioned here is your unique API key for SerpApi. It can be accessed via Api Key page.

Here's the redefinition of the Download Class. We can omit some parts to keep uniqueness, and add new ones like database object.

class Download:
    def __init__(self, query: Query, db: ImagesDataBase):
        self.db = db
        self.query = query
        self.results = []

There is no change in the fucntion of SerpApi's Google Images API implementation. However, let me restate one amazing fact again. If the query you are searching is cached, you can get it free of charge.

    def serpapi_search(self):
        params = {
            "engine": "google",
            "ijn": self.query.ijn,
            "q": self.query.q,
            "google_domain": self.query.google_domain,
            "tbm": "isch",
            "num": self.query.num,
            "api_key": self.query.api_key
        }

        search = GoogleSearch(params)
        results = search.get_dict()
        results = results['images_results']
        self.results = results = [x['original'] for x in results]

Let's define another function for downloading and image and returning it as a Document object:

    def get_document(self, link):
        print("Downloading {}".format(link))
        classification = self.query.q
        r = requests.get(link)
        base64_str = base64.b64encode(r.content).decode('ascii')
        extension = mimetypes.guess_extension(r.headers.get('content-type', '').split(';')[0])
        id = uuid.uuid1().hex
        if extension == ".jpg" or extension == ".jpeg" or extension == ".png":
            doc = Document(id = id, classification = classification, base64 = base64_str, uri = link )
            return doc
        else:
            return None

Next, we define the function to insert the Document objects we get from the previous function. We check for the uniqueness of the link to reduce duplicates in this function also:

    def move_to_db(self, link):
        doc = self.get_document(link)
        sameness = self.db.check_if_it_exists(self.query.q, link)
        if doc is not None and sameness is None:
            self.db.insert_document(doc=doc)

Here, we can iterate through all the links gathered from SerpApi's Google Images Scraper API, and upload them to our Couchbase Storage Server:

    def move_all_images_to_db(self):
        self.serpapi_search()
        for result in self.results:
            try: 
                self.move_to_db(result)
            except:
                "\n Passed image"

Now that we have everything in place, let us define the add_couchbase.py function within our main.py:

from fastapi import FastAPI
from add_couchbase import Download, Query, ImagesDataBase
from create import CSVCreator, ClassificationsArray
from dataset import CustomImageDataLoader, CustomImageDataset
from train import CNN, Train
from commands import TrainCommands

app = FastAPI()

@app.get("/")
def read_root():
  return {"Hello": "World"}

@app.post("/add_to_db/")
def create_query(query: Query):
  db = ImagesDataBase()
  serpapi = Download(query, db)
  serpapi.serpapi_search()
  serpapi.move_all_images_to_db()
  return  {"status": "Complete"}

...

Collecting Images and Storing Them with Classifications

Let's put everything we made into practice. Run the server with the following command:

uvicorn main:app --host 0.0.0.0 --port 8000

and then head to localhost:8000/docs to try out /add_to_db/ endpoint with the following request body:

{
  "google_domain": "google.com",
  "num": "100",
  "ijn": "0",
  "q": "string",
  "api_key": "<Your API Key>"
}

post_add

If you observe the terminal, you will see that the process of updating the database is happening in real time:

...
Insert CAS: 
1655331991397793792
Downloading https://target.scene7.com/is/image/Target/GUEST_c3800365-97f3-4fe9-8061-8894a378cc85?wid=488&hei=488&fmt=pjpeg

Insert CAS: 
1655331991890821120
Downloading https://solidstarts.com/wp-content/uploads/Mango_edited-scaled.jpg

Insert CAS: 
1655331993304432640
Downloading https://www.netmeds.com/images/cms/wysiwyg/blog/2019/04/Raw_Mango_898.jpg
...

If we query the database even before it finished, we can see that the entries with the classification label Mango are being updated. Here's the command for it:

SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Mango'

query_count_2 It already added 67 unique images to the database we can use to train our network in the coming weeks.

Conclusion

N1QL Databases such as Couchbase have fast response times compared to other Storage Databases. In this regard, I thought refactoring this part as an essential step before takign any further actions. This implementation will provide us with the speed and scalability we hope to support us in the coming week's challanges in comparing different approaches in Image Classification. It is also important for async handling of some functions such as inserting Images instead of naming them using the OS.

I am grateful the user for their attention, and all the support of Brilliant People of SerpApi. In the coming weeks, we will explore utilizing async handling, and corotines to lower the response time of the actions. If you are interested in the tutorials just like these, feel free to sign up to SerpApi Blog, or follow us on the medium you have found us. Your support is the primary factor in making such tutorials.