post thumbnail

10 Efficient Data Cleaning Methods You Must Know for Python Web Scraping(part 6)

Master efficient data cleaning for web scraping with these pro techniques: PostgreSQL/MySQL for structured data MongoDB for JSON/nested documents Elasticsearch for fuzzy text searches Optimize with database partitioning Handle billions of records seamlessly Boost your scraping workflow with these essential storage solutions!

2025-10-15

This is the last tutorial on efficient data cleaning methods. Let’s explore the storage methods for data with different uses.

10 . Data storage selection schemes for different scenarios

  1. Structured data for high-frequency queries. For example, if a crawler fetches e-commerce product data, including product names, prices, categories, sales volumes, positive and negative reviews, and manufacturer data, and these data will be mainly used for Web API associated queries later, the preferred option is to use an SQL database, such as PostgreSQL or MySQL. When the data volume reaches hundreds of millions, PostgreSQL generally outperforms MySQL in terms of performance and has better scalability. Install third-party libraries using pip
   pip install psycopg2

Example code for creating tables and inserting data using Python.

   import psycopg2
   from psycopg2 import OperationalError

   def create_connection(db_name, db_user, db_password, db_host, db_port):
       connection = None
       try:
           connection = psycopg2.connect(
               database=db_name,
               user=db_user,
               password=db_password,
               host=db_host,
               port=db_port,
           )
           print("Connection to PostgreSQL DB successful")
       except OperationalError as e:
           print(f"The error '{e}' occurred")
       return connection

   def execute_query(connection, query):
       connection.autocommit = True
       cursor = connection.cursor()
       try:
           cursor.execute(query)
           print("Query executed successfully")
       except OperationalError as e:
           print(f"The error '{e}' occurred")

   # Demo
   if __name__ == "__main__":
       # replace with your db host
       connection = create_connection(
           "your_database", "your_user", "your_password", "localhost", "5432"
       )

       # create table demo
       create_users_table = """
       CREATE TABLE IF NOT EXISTS users (
           id SERIAL PRIMARY KEY,
           name TEXT NOT NULL,
           age INTEGER,
           gender TEXT,
           nationality TEXT
       )
       """
       execute_query(connection, create_users_table)

       # insert data demo
       create_users = """
       INSERT INTO
           users (name, age, gender, nationality)
       VALUES
           ('James', 25, 'male', 'USA'),
           ('Leila', 32, 'female', 'France'),
           ('Brigitte', 35, 'female', 'England'),
           ('Mike', 40, 'male', 'Denmark'),
           ('Elizabeth', 21, 'female', 'Canada');
       """
       execute_query(connection, create_users)    

In fact, using Python to perform CRUD operations on PostgreSQL is quite similar to that on MySQL. Once you master one of them, you can generally apply the knowledge to other SQL-like databases.

When the data volume reaches a certain level and adding appropriate indexes can no longer optimize performance, you need to perform certain table and database partitioning operations.

PostgreSQL’s database and table partitioning is mainly achieved through two methods: vertical partitioning (database partitioning) and horizontal partitioning (table partitioning)

I. Vertical partitioning (database partitioning)

Split the database into multiple physical instances according to business domains. For example:

Original single-database structure:
[User database]
- User table
- Order table
- Product table
- Log table

After vertical partitioning:
[User database]        [Order database]        [Product database]        [Log database]
- User table         - Order table         - Product table         - Log table

This is to split multiple fields in a table into different tables.

Example code segment:

Writing:

with get_citus_connection() as conn:
        with conn.cursor() as cur:
            try:
                # Insert user data
                cur.execute("""
                    INSERT INTO users (user_id, name, email)
                    VALUES (1, 'Alice', '[email protected]'),
                           (2, 'Bob', '[email protected]'),
                           (3, 'Charlie', '[email protected]')
                    ON CONFLICT (user_id) DO NOTHING;
                """)

                # Insert product data
                cur.execute("""
                    INSERT INTO orders (order_id, user_id, product_name, amount)
                    VALUES (101, 1, 'Laptop', 1200.00),
                           (102, 1, 'Mouse', 29.99),
                           (103, 2, 'Keyboard', 79.99),
                           (104, 3, 'Monitor', 329.99)
                    ON CONFLICT (order_id) DO NOTHING;
                """)

                # Insert category data
                cur.execute("""
                    INSERT INTO product_categories (category_id, category_name)
                    VALUES (1, 'Electronics'),
                           (2, 'Accessories')
                    ON CONFLICT (category_id) DO NOTHING;
                """)

                conn.commit()

            except Exception as e:
                print(f"insert data error: {e}")
                conn.rollback()

Write data into three tables, and use a JOIN query sentence when querying:

with get_citus_connection() as conn:
    with conn.cursor() as cur:
        try:
            # query user join with product
            cur.execute("""
                    SELECT u.name, o.product_name, o.amount
                    FROM users u
                    JOIN orders o ON u.user_id = o.user_id
                    WHERE u.user_id = 1;
                """)
            print("用户1的订单:")
            for row in cur.fetchall():
                print(row)

                # Aggregate Query
                cur.execute("""
                    SELECT user_id, COUNT(*), SUM(amount)
                    FROM orders
                    GROUP BY user_id;
                """)
                print("\nOrder Statistics by User:")
                for row in cur.fetchall():
                    print(f"User ID: {row[0]}, Order Count: {row[1]}, Total amount: {row[2]}")
                except Exception as e:
                    print(f"Query error: {e}")

II. Horizontal partitioning (table partitioning)

Split the table into multiple nodes by rows. Common sharding strategies:

Hash sharding:

# Sharding Function
def get_shard(user_id, total_shards=4):
    return user_id % total_shards

Range sharding: Divide by time/ID range. For example:

Table 1: user_id from 1 to 1 million
Table 2: user_id from 1 million to 2 million
...

For example, horizontal partitioning is to split a table with 1 billion records into 5 tables with 20 million records each. For example, if the primary key is an auto-incrementing ID, records whose ID modulo 5 equals 0 are stored in the 0th table, those with a remainder of 1 are stored in the 1st table, and so on, splitting the data into 5 tables.

  1. Unstructured data, MongoDB document database

For data with multiple levels, such as the following data:

// Log document (including dynamic field request_params)
{
  "timestamp": "2025-06-15T10:30:00",
  "user_id": 123,
  "action": "upload",
  "request_params": {  // Different request parameters vary
    "file_type": "jpg",
    "resolution": "1920x1080",
    "metadata": {"camera": "iPhone 15"}
  }
}

Note that the parameters in the request_params field are JSON data, and the fields inside are not fixed. In this case, although you can use the JSON type in MySQL to store it, it is not convenient for querying or table partitioning. There may also be cases where only one field has data and other fields do not exist. Such data is most suitable for storage in a document database.

Commonly used document databases in the market include MongoDB and Couchbase. The most popular one is MongoDB, which is easy to install and supports Windows and Linux platforms.

You don’t need to define tables and field types like in MySQL before inserting data into MongoDB, which greatly improves work efficiency.

Here is the sample code for connecting to, inserting data into, and querying data from MongoDB:

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, DuplicateKeyError

def connect_to_mongodb():
    """ connect MongoDB, return connected object"""
    try:
        # connect to locao MongoDB database
        client = MongoClient('mongodb://localhost:27017/')
        # check weather connect successed or not
        client.admin.command('ping')
        print("connect MongoDB passed!")
        # return databse object
        return client['mydatabase']
    except ConnectionFailure as e:
        print(f"connection failed: {e}")
        raise

def insert_documents(db):
    """insert object into document"""
    # get collection object
    collection = db['users']

    # insert single document
    user_data = {
        "name": "Alice",
        "age": 30,
        "email": "[email protected]",
        "interests": ["reading", "swimming"]
    }
    try:
        result = collection.insert_one(user_data)
        print(f"insert single document successful,ID: {result.inserted_id}")
    except DuplicateKeyError as e:
        print(f"insert signle document failed: {e}")

    # insert multiple document
    users = [
        {
            "name": "Bob",
            "age": 25,
            "email": "[email protected]",
            "interests": ["hiking"]
        },
        {
            "name": "Charlie",
            "age": 35,
            "email": "[email protected]",
            "interests": ["reading", "photography"]
        }
    ]
    result = collection.insert_many(users)
    print(f"insert multiple document successful,ID list: {result.inserted_ids}")

def query_documents(db):
    """query document result"""
    collection = db['users']

    # query all documents
    print("\nquery All document:")
    for doc in collection.find():
        print(doc)

    # query document by condition
    print("\nquery age > 30 of user:")
    for doc in collection.find({"age": {"$gt": 30}}):
        print(doc)

    # query user with special interests
    print("\nquery User like reading:")
    for doc in collection.find({"interests": "reading"}):
        print(doc)

    # return single on document 
    print("\nquery email:[email protected] 's user':")
    user = collection.find_one({"email": "[email protected]"})
    print(user)

if __name__ == "__main__":
    try:
        # connect to databsae
        db = connect_to_mongodb()

        # insert data
        insert_documents(db)

        # query data
        query_documents(db)
    except Exception as e:
        print(f"occur error: {e}")    

For data with JSON in the fields, you can also easily query the results:

# Sample documents with nested JSON structure
sample_docs = [
    {
        "product_id": 1001,
        "name": "Smartphone",
        "specs": {
            "screen": "6.5 inches",
            "resolution": "2400x1080",
            "ram": "8GB",
            "storage": "256GB"
        },
        "price": 5999,
        "tags": ["Electronics", "Phone"]
    },
    {
        "product_id": 1002,
        "name": "Laptop",
        "specs": {
            "screen": "14 inches",
            "resolution": "1920x1080",
            "ram": "16GB",
            "storage": "512GB"
        },
        "price": 8999,
        "tags": ["Electronics", "Computer"]
    }
]

# Insert sample data
collection.insert_many(sample_docs)

# 1. Simple nested query: Find products with 8GB RAM
query1 = {"specs.ram": "8GB"}
results1 = collection.find(query1)
print("Products with 8GB RAM:")
for doc in results1:
    print(doc)

# 2. Complex condition query: Price > 5000 and 14-inch screen
query2 = {
    "price": {"$gt": 5000},
    "specs.screen": "14 inches"
}
results2 = collection.find(query2)
print("\nProducts with price > 5000 and 14-inch screen:")
for doc in results2:
    print(doc)

# 3. Array element query: Find products tagged with "Computer"
query3 = {"tags": "Computer"}
results3 = collection.find(query3)
print("\nProducts tagged with 'Computer':")
for doc in results3:
    print(doc)

# 4. Using $elemMatch to query nested documents in arrays
# Assuming document structure:
# {
#   "product_id": 1003,
#   "reviews": [
#       {"user": "alice", "rating": 5, "comment": "Great product"},
#       {"user": "bob", "rating": 4, "comment": "Good performance"}
#   ]
# }
# Query products with at least one review rating ≥ 5
query4 = {"reviews": {"$elemMatch": {"rating": {"$gte": 5}}}}  

A common scenario is to search for matching content in a long string. For example, you need to query whether the phrase “How to use a proxy IP” appears in a text.

If you use MongoDB to handle this, it would be like this:

{"content": {"$regex": "如何使用代理IP"}}

However, the above code will precisely match the string “How to use a proxy IP”. If the text contains “How to use a proxy ip” with a lowercase “ip”, it may not be queried. In this case, you need to use a document query database, the Elasticsearch database.

  1. A powerful tool for text retrieval, the text query database, Elasticsearch

Elasticsearch is an open-source distributed search and analytics engine based on Apache Lucene, designed for handling massive amounts of data. It provides real-time full-text search, structured search, analysis, and visualization functions, and is widely used in log analysis, information retrieval, business intelligence, and other fields.

Core features

  1. Distributed architecture
  1. Full-text search capabilities
  1. Real-time data analysis
  1. Multi-language support
  1. Integration with the ecosystem

Using Elasticsearch, it can efficiently retrieve all articles that match the meaning of “How to use a proxy IP”. For example, articles containing phrases like “How to use a proxy ip” or “Proxy IP tutorial” that are not 100% identical but have the same meaning.

Elasticsearch can also run on multiple platforms such as Windows, Linux, and macOS. The specific installation process is not covered in detail here due to space limitations. Readers who need to know more can refer to the official documentation. This article mainly focuses on the specific usage methods.

First, insert data:

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError

def main():
    # Connect to Elasticsearch
    es = Elasticsearch(hosts=["http://localhost:9200"])

    try:
        # Check connection
        if not es.ping():
            raise ConnectionError("Could not connect to Elasticsearch")
        print("Connected to Elasticsearch successfully!")

        # Define index name
        index_name = "products"

        # Create index with mapping (if not exists)
        if not es.indices.exists(index=index_name):
            es.indices.create(
                index=index_name,
                body={
                    "mappings": {
                        "properties": {
                            "name": {"type": "text"},
                            "description": {"type": "text"},
                            "category": {"type": "keyword"},
                            "brand": {"type": "keyword"}
                        }
                    }
                }
            )
            print(f"Index '{index_name}' created successfully")

        # Insert sample documents
        documents = [
            {
                "name": "Wireless Earbuds",
                "description": "High-quality wireless earbuds with noise cancellation",
                "category": "Audio",
                "brand": "SoundWave"
            },
            {
                "name": "Bluetooth Headphones",
                "description": "Over-ear bluetooth headphones with long battery life",
                "category": "Audio",
                "brand": "AudioMax"
            },
            {
                "name": "Smartwatch Pro",
                "description": "Advanced smartwatch with fitness tracking",
                "category": "Wearables",
                "brand": "TechGiant"
            },
            {
                "name": "Laptop Charger",
                "description": "Fast charging adapter for laptops",
                "category": "Accessories",
                "brand": "PowerPlus"
            }
        ]

        # Bulk index documents
        for doc in documents:
            es.index(index=index_name, document=doc)
        print("Sample documents indexed")

        # Refresh index to make documents searchable
        es.indices.refresh(index=index_name)
        print("Index refreshed")

The core is a method:

response = es.index(index=index_name,document=doc)

Before inserting data, you need to define the type of each field, whether it participates in tokenization and parsing.

After inserting data, let’s move on to querying. There are exact queries and fuzzy queries here. Fields defined for exact matching have the type keyword, while fields for tokenized queries have the type text.

body={
    "mappings": {
        "properties": {
            "name": {"type": "text"},
            "description": {"type": "text"},
            "category": {"type": "keyword"},
            "brand": {"type": "keyword"}
        }
    }
}
 # Example 1: Fuzzy search with typo tolerance
 print("\n=== Example 1: Fuzzy Search ===")
 fuzzy_query = {
     "query": {
         "fuzzy": {
             "name": {
                 "value": "wirless",  # Intentionally misspelled
                 "fuzziness": 1       # Allow 1 character difference
             }
         }
     }
 }

 results = es.search(index=index_name, body=fuzzy_query)
 print(f"Found {results['hits']['total']['value']} documents")
 for hit in results["hits"]["hits"]:
     print(f"Score: {hit['_score']}, Name: {hit['_source']['name']}")

The query result is:

Wireless Earbuds

Still, the result we want can be retrieved according to the score.

# Example 2: Prefix search
print("\n=== Example 3: Prefix Search ===")
prefix_query = {
    "query": {
        "prefix": {
            "brand": {
                "value": "aud"  # Match brands starting with "aud"
            }
        }
    }
}

results = es.search(index=index_name, body=prefix_query)
print(f"Found {results['hits']['total']['value']} documents")
for hit in results["hits"]["hits"]:
    print(f"Score: {hit['_score']}, Brand: {hit['_source']['brand']}")

Output:

AudioMax
# Example 3: Wildcard search
print("\n=== Example 4: Wildcard Search ===")
wildcard_query = {
    "query": {
        "wildcard": {
            "name": {
                "value": "smart*"  # Match names starting with "smart"
            }
        }
    }
}

results = es.search(index=index_name, body=wildcard_query)
print(f"Found {results['hits']['total']['value']} documents")
for hit in results["hits"]["hits"]:
    print(f"Score: {hit['_score']}, Name: {hit['_source']['name']}")

Output:

Smartwatch Pro

The above code is used to retrieve strings starting with “smart”.

As a powerful document database search engine, Elastic Search can meet the basic requirements of big data retrieval. It also supports distributed deployment, fault tolerance, and repair. It is a very efficient tool for big data cleaning and analysis.