This is the last tutorial on efficient data cleaning methods. Let’s explore the storage methods for data with different uses.
10 . Data storage selection schemes for different scenarios
- Structured data for high-frequency queries. For example, if a crawler fetches e-commerce product data, including product names, prices, categories, sales volumes, positive and negative reviews, and manufacturer data, and these data will be mainly used for Web API associated queries later, the preferred option is to use an SQL database, such as PostgreSQL or MySQL. When the data volume reaches hundreds of millions, PostgreSQL generally outperforms MySQL in terms of performance and has better scalability. Install third-party libraries using pip
pip install psycopg2
Example code for creating tables and inserting data using Python.
import psycopg2
from psycopg2 import OperationalError
def create_connection(db_name, db_user, db_password, db_host, db_port):
connection = None
try:
connection = psycopg2.connect(
database=db_name,
user=db_user,
password=db_password,
host=db_host,
port=db_port,
)
print("Connection to PostgreSQL DB successful")
except OperationalError as e:
print(f"The error '{e}' occurred")
return connection
def execute_query(connection, query):
connection.autocommit = True
cursor = connection.cursor()
try:
cursor.execute(query)
print("Query executed successfully")
except OperationalError as e:
print(f"The error '{e}' occurred")
# Demo
if __name__ == "__main__":
# replace with your db host
connection = create_connection(
"your_database", "your_user", "your_password", "localhost", "5432"
)
# create table demo
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER,
gender TEXT,
nationality TEXT
)
"""
execute_query(connection, create_users_table)
# insert data demo
create_users = """
INSERT INTO
users (name, age, gender, nationality)
VALUES
('James', 25, 'male', 'USA'),
('Leila', 32, 'female', 'France'),
('Brigitte', 35, 'female', 'England'),
('Mike', 40, 'male', 'Denmark'),
('Elizabeth', 21, 'female', 'Canada');
"""
execute_query(connection, create_users)
In fact, using Python to perform CRUD operations on PostgreSQL is quite similar to that on MySQL. Once you master one of them, you can generally apply the knowledge to other SQL-like databases.
When the data volume reaches a certain level and adding appropriate indexes can no longer optimize performance, you need to perform certain table and database partitioning operations.
PostgreSQL’s database and table partitioning is mainly achieved through two methods: vertical partitioning (database partitioning) and horizontal partitioning (table partitioning)
I. Vertical partitioning (database partitioning)
Split the database into multiple physical instances according to business domains. For example:
Original single-database structure:
[User database]
- User table
- Order table
- Product table
- Log table
After vertical partitioning:
[User database] [Order database] [Product database] [Log database]
- User table - Order table - Product table - Log table
This is to split multiple fields in a table into different tables.
Example code segment:
Writing:
with get_citus_connection() as conn:
with conn.cursor() as cur:
try:
# Insert user data
cur.execute("""
INSERT INTO users (user_id, name, email)
VALUES (1, 'Alice', '[email protected]'),
(2, 'Bob', '[email protected]'),
(3, 'Charlie', '[email protected]')
ON CONFLICT (user_id) DO NOTHING;
""")
# Insert product data
cur.execute("""
INSERT INTO orders (order_id, user_id, product_name, amount)
VALUES (101, 1, 'Laptop', 1200.00),
(102, 1, 'Mouse', 29.99),
(103, 2, 'Keyboard', 79.99),
(104, 3, 'Monitor', 329.99)
ON CONFLICT (order_id) DO NOTHING;
""")
# Insert category data
cur.execute("""
INSERT INTO product_categories (category_id, category_name)
VALUES (1, 'Electronics'),
(2, 'Accessories')
ON CONFLICT (category_id) DO NOTHING;
""")
conn.commit()
except Exception as e:
print(f"insert data error: {e}")
conn.rollback()
Write data into three tables, and use a JOIN query sentence when querying:
with get_citus_connection() as conn:
with conn.cursor() as cur:
try:
# query user join with product
cur.execute("""
SELECT u.name, o.product_name, o.amount
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE u.user_id = 1;
""")
print("用户1的订单:")
for row in cur.fetchall():
print(row)
# Aggregate Query
cur.execute("""
SELECT user_id, COUNT(*), SUM(amount)
FROM orders
GROUP BY user_id;
""")
print("\nOrder Statistics by User:")
for row in cur.fetchall():
print(f"User ID: {row[0]}, Order Count: {row[1]}, Total amount: {row[2]}")
except Exception as e:
print(f"Query error: {e}")
II. Horizontal partitioning (table partitioning)
Split the table into multiple nodes by rows. Common sharding strategies:
Hash sharding:
# Sharding Function
def get_shard(user_id, total_shards=4):
return user_id % total_shards
Range sharding: Divide by time/ID range. For example:
Table 1: user_id from 1 to 1 million
Table 2: user_id from 1 million to 2 million
...
For example, horizontal partitioning is to split a table with 1 billion records into 5 tables with 20 million records each. For example, if the primary key is an auto-incrementing ID, records whose ID modulo 5 equals 0 are stored in the 0th table, those with a remainder of 1 are stored in the 1st table, and so on, splitting the data into 5 tables.
- Unstructured data, MongoDB document database
For data with multiple levels, such as the following data:
// Log document (including dynamic field request_params)
{
"timestamp": "2025-06-15T10:30:00",
"user_id": 123,
"action": "upload",
"request_params": { // Different request parameters vary
"file_type": "jpg",
"resolution": "1920x1080",
"metadata": {"camera": "iPhone 15"}
}
}
Note that the parameters in the request_params
field are JSON data, and the fields inside are not fixed. In this case, although you can use the JSON type in MySQL to store it, it is not convenient for querying or table partitioning. There may also be cases where only one field has data and other fields do not exist. Such data is most suitable for storage in a document database.
Commonly used document databases in the market include MongoDB and Couchbase. The most popular one is MongoDB, which is easy to install and supports Windows and Linux platforms.
You don’t need to define tables and field types like in MySQL before inserting data into MongoDB, which greatly improves work efficiency.
Here is the sample code for connecting to, inserting data into, and querying data from MongoDB:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, DuplicateKeyError
def connect_to_mongodb():
""" connect MongoDB, return connected object"""
try:
# connect to locao MongoDB database
client = MongoClient('mongodb://localhost:27017/')
# check weather connect successed or not
client.admin.command('ping')
print("connect MongoDB passed!")
# return databse object
return client['mydatabase']
except ConnectionFailure as e:
print(f"connection failed: {e}")
raise
def insert_documents(db):
"""insert object into document"""
# get collection object
collection = db['users']
# insert single document
user_data = {
"name": "Alice",
"age": 30,
"email": "[email protected]",
"interests": ["reading", "swimming"]
}
try:
result = collection.insert_one(user_data)
print(f"insert single document successful,ID: {result.inserted_id}")
except DuplicateKeyError as e:
print(f"insert signle document failed: {e}")
# insert multiple document
users = [
{
"name": "Bob",
"age": 25,
"email": "[email protected]",
"interests": ["hiking"]
},
{
"name": "Charlie",
"age": 35,
"email": "[email protected]",
"interests": ["reading", "photography"]
}
]
result = collection.insert_many(users)
print(f"insert multiple document successful,ID list: {result.inserted_ids}")
def query_documents(db):
"""query document result"""
collection = db['users']
# query all documents
print("\nquery All document:")
for doc in collection.find():
print(doc)
# query document by condition
print("\nquery age > 30 of user:")
for doc in collection.find({"age": {"$gt": 30}}):
print(doc)
# query user with special interests
print("\nquery User like reading:")
for doc in collection.find({"interests": "reading"}):
print(doc)
# return single on document
print("\nquery email:[email protected] 's user':")
user = collection.find_one({"email": "[email protected]"})
print(user)
if __name__ == "__main__":
try:
# connect to databsae
db = connect_to_mongodb()
# insert data
insert_documents(db)
# query data
query_documents(db)
except Exception as e:
print(f"occur error: {e}")
For data with JSON in the fields, you can also easily query the results:
# Sample documents with nested JSON structure
sample_docs = [
{
"product_id": 1001,
"name": "Smartphone",
"specs": {
"screen": "6.5 inches",
"resolution": "2400x1080",
"ram": "8GB",
"storage": "256GB"
},
"price": 5999,
"tags": ["Electronics", "Phone"]
},
{
"product_id": 1002,
"name": "Laptop",
"specs": {
"screen": "14 inches",
"resolution": "1920x1080",
"ram": "16GB",
"storage": "512GB"
},
"price": 8999,
"tags": ["Electronics", "Computer"]
}
]
# Insert sample data
collection.insert_many(sample_docs)
# 1. Simple nested query: Find products with 8GB RAM
query1 = {"specs.ram": "8GB"}
results1 = collection.find(query1)
print("Products with 8GB RAM:")
for doc in results1:
print(doc)
# 2. Complex condition query: Price > 5000 and 14-inch screen
query2 = {
"price": {"$gt": 5000},
"specs.screen": "14 inches"
}
results2 = collection.find(query2)
print("\nProducts with price > 5000 and 14-inch screen:")
for doc in results2:
print(doc)
# 3. Array element query: Find products tagged with "Computer"
query3 = {"tags": "Computer"}
results3 = collection.find(query3)
print("\nProducts tagged with 'Computer':")
for doc in results3:
print(doc)
# 4. Using $elemMatch to query nested documents in arrays
# Assuming document structure:
# {
# "product_id": 1003,
# "reviews": [
# {"user": "alice", "rating": 5, "comment": "Great product"},
# {"user": "bob", "rating": 4, "comment": "Good performance"}
# ]
# }
# Query products with at least one review rating ≥ 5
query4 = {"reviews": {"$elemMatch": {"rating": {"$gte": 5}}}}
A common scenario is to search for matching content in a long string. For example, you need to query whether the phrase “How to use a proxy IP” appears in a text.
If you use MongoDB to handle this, it would be like this:
{"content": {"$regex": "如何使用代理IP"}}
However, the above code will precisely match the string “How to use a proxy IP”. If the text contains “How to use a proxy ip” with a lowercase “ip”, it may not be queried. In this case, you need to use a document query database, the Elasticsearch database.
- A powerful tool for text retrieval, the text query database, Elasticsearch
Elasticsearch is an open-source distributed search and analytics engine based on Apache Lucene, designed for handling massive amounts of data. It provides real-time full-text search, structured search, analysis, and visualization functions, and is widely used in log analysis, information retrieval, business intelligence, and other fields.
Core features
- Distributed architecture
- Automatic sharding and replication, supporting PB-level data expansion
- No single point of failure, automatic load balancing between nodes
- Horizontal scalability (can be scaled to hundreds of nodes)
- Full-text search capabilities
- Fast text retrieval based on inverted indexes
- Supports fuzzy search, synonym search, and approximate matching
- Built-in tokenizers (supports Chinese IK tokenizer)
- Real-time data analysis
- Powerful aggregation framework (Aggregation)
- Supports multi-dimensional analysis (such as grouping, filtering, and statistics)
- Real-time calculation of indicators (average, sum, percentiles, etc.)
- Multi-language support
- Provides a RESTful API, supporting multiple programming languages (Java, Python, JavaScript, etc.)
- Compatible with SQL queries (through the Elasticsearch SQL plugin)
- Integration with the ecosystem
- Forms the ELK Stack with Logstash and Beats for log processing
- Integrates with Kibana for data visualization
- Supports big data frameworks such as Spark and Hadoop
Using Elasticsearch, it can efficiently retrieve all articles that match the meaning of “How to use a proxy IP”. For example, articles containing phrases like “How to use a proxy ip” or “Proxy IP tutorial” that are not 100% identical but have the same meaning.
Elasticsearch can also run on multiple platforms such as Windows, Linux, and macOS. The specific installation process is not covered in detail here due to space limitations. Readers who need to know more can refer to the official documentation. This article mainly focuses on the specific usage methods.
First, insert data:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError
def main():
# Connect to Elasticsearch
es = Elasticsearch(hosts=["http://localhost:9200"])
try:
# Check connection
if not es.ping():
raise ConnectionError("Could not connect to Elasticsearch")
print("Connected to Elasticsearch successfully!")
# Define index name
index_name = "products"
# Create index with mapping (if not exists)
if not es.indices.exists(index=index_name):
es.indices.create(
index=index_name,
body={
"mappings": {
"properties": {
"name": {"type": "text"},
"description": {"type": "text"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"}
}
}
}
)
print(f"Index '{index_name}' created successfully")
# Insert sample documents
documents = [
{
"name": "Wireless Earbuds",
"description": "High-quality wireless earbuds with noise cancellation",
"category": "Audio",
"brand": "SoundWave"
},
{
"name": "Bluetooth Headphones",
"description": "Over-ear bluetooth headphones with long battery life",
"category": "Audio",
"brand": "AudioMax"
},
{
"name": "Smartwatch Pro",
"description": "Advanced smartwatch with fitness tracking",
"category": "Wearables",
"brand": "TechGiant"
},
{
"name": "Laptop Charger",
"description": "Fast charging adapter for laptops",
"category": "Accessories",
"brand": "PowerPlus"
}
]
# Bulk index documents
for doc in documents:
es.index(index=index_name, document=doc)
print("Sample documents indexed")
# Refresh index to make documents searchable
es.indices.refresh(index=index_name)
print("Index refreshed")
The core is a method:
response = es.index(index=index_name,document=doc)
Before inserting data, you need to define the type of each field, whether it participates in tokenization and parsing.
After inserting data, let’s move on to querying. There are exact queries and fuzzy queries here. Fields defined for exact matching have the type keyword
, while fields for tokenized queries have the type text
.
body={
"mappings": {
"properties": {
"name": {"type": "text"},
"description": {"type": "text"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"}
}
}
}
- Next, let’s introduce the specific usage methods of different query types:
- Example 1: Fuzzy search with a misspelling. The misspelled word “wirless” actually means “wireless”.
# Example 1: Fuzzy search with typo tolerance
print("\n=== Example 1: Fuzzy Search ===")
fuzzy_query = {
"query": {
"fuzzy": {
"name": {
"value": "wirless", # Intentionally misspelled
"fuzziness": 1 # Allow 1 character difference
}
}
}
}
results = es.search(index=index_name, body=fuzzy_query)
print(f"Found {results['hits']['total']['value']} documents")
for hit in results["hits"]["hits"]:
print(f"Score: {hit['_score']}, Name: {hit['_source']['name']}")
The query result is:
Wireless Earbuds
Still, the result we want can be retrieved according to the score.
- Prefix fuzzy retrieval
# Example 2: Prefix search
print("\n=== Example 3: Prefix Search ===")
prefix_query = {
"query": {
"prefix": {
"brand": {
"value": "aud" # Match brands starting with "aud"
}
}
}
}
results = es.search(index=index_name, body=prefix_query)
print(f"Found {results['hits']['total']['value']} documents")
for hit in results["hits"]["hits"]:
print(f"Score: {hit['_score']}, Brand: {hit['_source']['brand']}")
Output:
AudioMax
- The above code is used to retrieve data in the
brand
field starting with “aud”. - Use the wildcard “*” for fuzzy matching
# Example 3: Wildcard search
print("\n=== Example 4: Wildcard Search ===")
wildcard_query = {
"query": {
"wildcard": {
"name": {
"value": "smart*" # Match names starting with "smart"
}
}
}
}
results = es.search(index=index_name, body=wildcard_query)
print(f"Found {results['hits']['total']['value']} documents")
for hit in results["hits"]["hits"]:
print(f"Score: {hit['_score']}, Name: {hit['_source']['name']}")
Output:
Smartwatch Pro
The above code is used to retrieve strings starting with “smart”.
As a powerful document database search engine, Elastic Search can meet the basic requirements of big data retrieval. It also supports distributed deployment, fault tolerance, and repair. It is a very efficient tool for big data cleaning and analysis.