subsequent use, with a common use case being keyword querying.
For example, when we crawl a large amount of e-commerce product data, there is often a field for product details that describes the product’s features.
Take the following Amazon product as an example:

Apart from structured fields like product name and price, there is a text field for product details.

The text under “About this item” is all the detailed description of the product.
If this data is stored in an SQL database and we query for products matching the keyword “usb-c led light” (all lowercase), the above product’s description is relevant, but the text does not contain the exact phrase “usb-c led light”, so the query would return an empty result.
This is where a text-based search engine database is needed.
The most widely used in the industry is Elastic Search.

ElasticSearch is a search engine based on Lucene, which can implement various types of searches such as text search, numeric search, and range search. ElasticSearch supports distributed search, can handle large amounts of data, and provides fast search speeds. In the era of modern big data, ElasticSearch has been widely used in enterprises and organizations for fast data retrieval and analysis.
Data cleaning refers to the process of denoising, correcting, organizing, and filtering data to improve data quality. It is an important part of data processing, as it helps enhance the effectiveness of data analysis and mining. In ElasticSearch, data cleaning and processing are crucial steps that can improve search speed and accuracy.
Elasticsearch achieves fast retrieval based on Lucene’s inverted index. The inverted index includes an ordered dictionary (recording words and their occurrence frequencies) and corresponding Postings (recording documents containing the words). When searching, the query content is decomposed, the corresponding words are found in the dictionary, and then the relevant documents are located.
Query Principle: A user or application sends a search request to any node in the cluster, which acts as a coordinating node to broadcast the query to relevant shards and replicas. Each shard executes the query locally and returns lightweight results to the coordinating node. The coordinating node merges the results (query phase), then sends a fetch request to the shards that have the complete documents to obtain the full documents and returns them to the client.
Next, we will use an example of crawling e-commerce website data to teach you common data cleaning methods for data ingestion and querying in Elasticsearch.
Installation
We will use a Linux + Docker environment for installation, which is convenient and time-saving.
# Fetch Elasticsearch Image(For example 8.10.4)
docker pull elasticsearch:8.10.4
# create data volume
mkdir -p /usr/local/elasticsearch/data /usr/local/elasticsearch/config /usr/local/elasticsearch/plugins
chmod 777 /usr/local/elasticsearch/data # set wirtable permission
# Start Elasticsearch container
docker run -d \
--name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-v /usr/local/elasticsearch/data:/usr/share/elasticsearch/data \
-v /usr/local/elasticsearch/config:/usr/share/elasticsearch/config \
-v /usr/local/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:8.10.4
Verify the installation.
Run in the shell:
curl http://localhost:9200
If the following JSON data is returned, your Elasticsearch installation is successful.
# successful response:
{
"name" : "xxxx",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "xxxx",
"version" : {
"number" : "8.10.4",
"build_flavor" : "default",
"build_type" : "docker",
...
},
"tagline" : "You Know, for Search"
}
If the text data to be stored later is non-English, you need to install the corresponding tokenizer.
In Elasticsearch, a tokenizer is a core component in the text analysis process. Its main function is to split the original text into searchable terms according to specific rules, perform standardization processing on these terms, and finally build an inverted index for fast retrieval.
Its core functions can be summarized as follows:
Text Splitting
Split continuous text (such as sentences, paragraphs) into the smallest search units (words or roots). For example, the English sentence “Hello world, this is a test” will be split into “hello”, “world”, “this”, “is”, “a”, “test”.
Standardization Processing
Standardize the split terms to eliminate “surface differences” in the text, ensuring that terms with the same meaning are treated as the same retrieval unit. Common operations include:
- Case conversion (e.g., converting “Hello” to “hello”)
- Removing punctuation (e.g., converting “world,” to “world”)
- Stem extraction (e.g., unifying “running” and “ran” to the stem “run”)
- Common Tokenizer Types
- Standard Tokenizer: The default tokenizer in Elasticsearch, suitable for English, splits text into words and converts them to lowercase.
- IK Tokenizer: A tokenizer optimized for Chinese, supporting fine-grained (ik_max_word) and coarse-grained (ik_smart) splitting.
- Pinyin Tokenizer: Converts Chinese to pinyin, supporting retrieval by pinyin initials or full pinyin.
- Keyword Tokenizer: Does not split text, treating the entire text as a single term (suitable for exact matching scenarios such as ID cards and phone numbers).
Choosing and configuring tokenizers appropriately is key to achieving efficient Chinese retrieval. For example, in e-commerce scenarios, using the IK tokenizer + Pinyin tokenizer for product titles and details can significantly improve the accuracy and flexibility of user searches.
Let’s try installing a tokenizer suitable for Chinese.
# enter container
docker exec -it elasticsearch /bin/bash
# install IK tokenizer ( should keep same as Elasticsearch version )
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.10.4/elasticsearch-analysis-ik-8.10.4.zip
# quit and restart docker container
exit
docker restart elasticsearch
Now that the tokenizer is installed, we can proceed with the previous task.
First, obtain basic information about products from the Amazon e-commerce website.
Randomly set a batch of product URLs.
# List of Amazon product URLs to scrape
product_urls = [
# Example product URLs - replace with any Amazon product pages
"https://www.amazon.com/dp/B07VGRJDFY",
"https://www.amazon.com/dp/B08N5WRWNW",
"https://www.amazon.com/dp/B09V3KXJPB"
]
Then iterate through each URL to obtain basic product data.
# Scrape and import each product
for url in product_urls:
print(f"\nScraping product from: {url}")
product_data = scrape_amazon_product(url)
if product_data:
print(f"Successfully scraped product: {product_data.get('name')}")
import_to_es(es, product_data, index_name)
else:
print(f"Failed to scrape product from: {url}")
Next, write the crawler function: scrape_amazon_product.
# Extract data from Amazon product page
def scrape_amazon_product(product_url):
"""Scrape product data from Amazon product page"""
# Set request headers to simulate browser behavior
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
try:
# Add random delay to avoid anti-scraping measures
time.sleep(random.uniform(2, 5))
# Send request
response = requests.get(product_url, headers=headers, timeout=10)
response.raise_for_status() # Raise HTTP errors
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract ASIN (Amazon product ID)
parsed_url = urlparse(product_url)
query_params = parse_qs(parsed_url.query)
asin = query_params.get('asin', [None])[0]
# If no ASIN in URL, try extracting from page
if not asin:
asin_meta = soup.find('meta', {'name': 'twitter:data1'})
if asin_meta:
asin = asin_meta.get('content', '').split(':')[-1].strip()
# Extract product name
product_name = soup.find('span', {'id': 'productTitle'})
product_name = product_name.get_text(strip=True) if product_name else None
# Extract price
price = None
original_price = None
currency = '$'
price_elem = soup.find('span', {'class': 'a-price-whole'})
if price_elem:
price_str = price_elem.get_text(strip=True).replace(',', '').replace('.', '')
decimal_elem = soup.find('span', {'class': 'a-price-fraction'})
if decimal_elem:
price_str += '.' + decimal_elem.get_text(strip=True)
price = float(price_str) if price_str else None
# Extract original price (if discounted)
original_price_elem = soup.find('span', {'class': 'a-price a-text-price'})
if original_price_elem:
original_price_str = original_price_elem.get_text(strip=True).replace(currency, '').replace(',', '')
original_price = float(original_price_str) if original_price_str else None
# Extract rating and review count
rating = None
review_count = None
rating_elem = soup.find('span', {'class': 'a-icon-alt'})
if rating_elem:
rating_str = rating_elem.get_text(strip=True).split()[0]
rating = float(rating_str) if rating_str else None
review_count_elem = soup.find('span', {'id': 'acrCustomerReviewText'})
if review_count_elem:
review_count_str = review_count_elem.get_text(strip=True).split()[0].replace(',', '')
review_count = int(review_count_str) if review_count_str else None
# Extract brand
brand = None
brand_elem = soup.find('a', {'id': 'bylineInfo'})
if brand_elem:
brand = brand_elem.get_text(strip=True).replace('Visit the ', '').replace(' Store', '')
# Extract product description
description = None
description_elem = soup.find('div', {'id': 'productDescription'})
if description_elem:
description = description_elem.get_text(strip=True)
# Extract product features
features = []
features_elems = soup.find_all('li', {'class': 'a-spacing-mini'})
if features_elems:
features = [f.get_text(strip=True) for f in features_elems[:5]] # Get first 5 features
features_text = ', '.join(features)
# Extract category information
category = None
sub_category = None
breadcrumbs = soup.find_all('li', {'class': 'a-spacing-none a-list-item'})
if len(breadcrumbs) >= 2:
category = breadcrumbs[-2].get_text(strip=True) if len(breadcrumbs) > 1 else None
sub_category = breadcrumbs[-1].get_text(strip=True) if breadcrumbs else None
# Extract image URL
image_url = None
image_elem = soup.find('img', {'id': 'landingImage'})
if image_elem:
image_url = image_elem.get('src')
# Extract availability status
availability = None
availability_elem = soup.find('div', {'id': 'availability'})
if availability_elem:
availability = availability_elem.get_text(strip=True)
# Build product data dictionary
product_data = {
"product_id": asin,
"name": product_name,
"price": price,
"original_price": original_price,
"currency": currency,
"category": category,
"sub_category": sub_category,
"brand": brand,
"rating": rating,
"review_count": review_count,
"description": description,
"features": features_text,
"url": product_url,
"image_url": image_url,
"availability": availability,
"scraped_at": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
}
return product_data
except Exception as e:
print(f"Error scraping product: {e}")
return None
The code for writing to Elastic Search is relatively simple.
# Import data to Elasticsearch
def import_to_es(es, product_data, index_name='amazon_products'):
"""Import single product data to Elasticsearch"""
if not product_data or not product_data.get('product_id'):
print("Invalid product data, skipping import")
return False
try:
# Use product ID as document ID
response = es.index(
index=index_name,
id=product_data['product_id'],
body=product_data
)
if response['result'] in ['created', 'updated']:
print(f"Successfully imported product: {product_data.get('name')}")
return True
else:
print(f"Failed to import product: {product_data.get('name')}")
return False
except Exception as e:
print(f"Error importing to Elasticsearch: {e}")
return False
The core statement is es.index.
Before writing data, it is necessary to define the Elastic search fields in detail.
# Create index and mapping
def create_ecommerce_index(es, index_name='amazon_products'):
"""Create Amazon product index and mapping"""
if es.indices.exists(index=index_name):
print(f"Index {index_name} already exists")
return True
# Define index mapping
mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"english_analyzer": {
"type": "standard",
"stopwords": "_english_"
}
}
}
},
"mappings": {
"properties": {
"product_id": {"type": "keyword"},
"name": {
"type": "text",
"analyzer": "english_analyzer",
"fields": {"keyword": {"type": "keyword"}}
},
"price": {"type": "float"},
"original_price": {"type": "float"},
"currency": {"type": "keyword"},
"category": {"type": "keyword"},
"sub_category": {"type": "keyword"},
"brand": {"type": "keyword"},
"rating": {"type": "float"},
"review_count": {"type": "integer"},
"description": {"type": "text", "analyzer": "english_analyzer"},
"features": {"type": "text", "analyzer": "english_analyzer"},
"specifications": {"type": "text", "analyzer": "english_analyzer"},
"url": {"type": "keyword"},
"image_url": {"type": "keyword"},
"availability": {"type": "keyword"},
"scraped_at": {"type": "date"}
}
}
}
try:
es.indices.create(index=index_name, body=mapping)
print(f"Successfully created index {index_name}")
return True
except RequestError as e:
print(f"Failed to create index: {e}")
return False
Note the mappings field above, which can control whether the field undergoes word segmentation processing.
If a field is defined as type “keyword”, it will be matched exactly without word segmentation.
If set to type “text”, the tokenizer will be used to segment the field, and during subsequent search matching, word segmentation will also be used for matching instead of rigid full-text matching.
Finally, let’s supplement the process of creating the es instance in the above code.
# Initialize Elasticsearch connection
def init_elasticsearch(host='localhost', port=9200):
"""Initialize Elasticsearch connection"""
es = Elasticsearch([f'http://{host}:{port}'])
if es.ping():
print("Successfully connected to Elasticsearch")
return es
else:
print("Failed to connect to Elasticsearch, please check configuration")
return None
After the crawled data is stored in the Elasticsearch database, you can search certain fields like using a search engine, and it also supports fuzzy search.
In the next tutorial, I will show you how to write an API service for searching product details based on the data stored in this example.