post thumbnail

Python Web Scraping: Using Elastic Search for Data Cleaning

Learn Python web scraping with Elasticsearch for data cleaning. Index e-commerce product pages, analyze text with tokenization, stopwords, and stemming, define mappings and analyzers, and run keyword and fuzzy searches. Normalize, deduplicate, and structure product details, expose results via an API to power scalable search. Improve indexing, relevance, and ranking.

2025-11-11

subsequent use, with a common use case being keyword querying.

For example, when we crawl a large amount of e-commerce product data, there is often a field for product details that describes the product’s features.

Take the following Amazon product as an example:

Apart from structured fields like product name and price, there is a text field for product details.

The text under “About this item” is all the detailed description of the product.

If this data is stored in an SQL database and we query for products matching the keyword “usb-c led light” (all lowercase), the above product’s description is relevant, but the text does not contain the exact phrase “usb-c led light”, so the query would return an empty result.

This is where a text-based search engine database is needed.

The most widely used in the industry is Elastic Search.

ElasticSearch is a search engine based on Lucene, which can implement various types of searches such as text search, numeric search, and range search. ElasticSearch supports distributed search, can handle large amounts of data, and provides fast search speeds. In the era of modern big data, ElasticSearch has been widely used in enterprises and organizations for fast data retrieval and analysis.

Data cleaning refers to the process of denoising, correcting, organizing, and filtering data to improve data quality. It is an important part of data processing, as it helps enhance the effectiveness of data analysis and mining. In ElasticSearch, data cleaning and processing are crucial steps that can improve search speed and accuracy.

Elasticsearch achieves fast retrieval based on Lucene’s inverted index. The inverted index includes an ordered dictionary (recording words and their occurrence frequencies) and corresponding Postings (recording documents containing the words). When searching, the query content is decomposed, the corresponding words are found in the dictionary, and then the relevant documents are located.

Query Principle: A user or application sends a search request to any node in the cluster, which acts as a coordinating node to broadcast the query to relevant shards and replicas. Each shard executes the query locally and returns lightweight results to the coordinating node. The coordinating node merges the results (query phase), then sends a fetch request to the shards that have the complete documents to obtain the full documents and returns them to the client.

Next, we will use an example of crawling e-commerce website data to teach you common data cleaning methods for data ingestion and querying in Elasticsearch.

Installation

We will use a Linux + Docker environment for installation, which is convenient and time-saving.

# Fetch Elasticsearch Image(For example 8.10.4)
docker pull elasticsearch:8.10.4

# create data volume
mkdir -p /usr/local/elasticsearch/data /usr/local/elasticsearch/config /usr/local/elasticsearch/plugins
chmod 777 /usr/local/elasticsearch/data  # set wirtable permission

# Start Elasticsearch container 
docker run -d \
  --name elasticsearch \
  -p 9200:9200 \
  -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
  -v /usr/local/elasticsearch/data:/usr/share/elasticsearch/data \
  -v /usr/local/elasticsearch/config:/usr/share/elasticsearch/config \
  -v /usr/local/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
  elasticsearch:8.10.4

Verify the installation.

Run in the shell:

curl http://localhost:9200

If the following JSON data is returned, your Elasticsearch installation is successful.

# successful response:
{
  "name" : "xxxx",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "xxxx",
  "version" : {
    "number" : "8.10.4",
    "build_flavor" : "default",
    "build_type" : "docker",
    ...
  },
  "tagline" : "You Know, for Search"
}

If the text data to be stored later is non-English, you need to install the corresponding tokenizer.

In Elasticsearch, a tokenizer is a core component in the text analysis process. Its main function is to split the original text into searchable terms according to specific rules, perform standardization processing on these terms, and finally build an inverted index for fast retrieval.

Its core functions can be summarized as follows:

Text Splitting
Split continuous text (such as sentences, paragraphs) into the smallest search units (words or roots). For example, the English sentence “Hello world, this is a test” will be split into “hello”, “world”, “this”, “is”, “a”, “test”.

Standardization Processing
Standardize the split terms to eliminate “surface differences” in the text, ensuring that terms with the same meaning are treated as the same retrieval unit. Common operations include:

Choosing and configuring tokenizers appropriately is key to achieving efficient Chinese retrieval. For example, in e-commerce scenarios, using the IK tokenizer + Pinyin tokenizer for product titles and details can significantly improve the accuracy and flexibility of user searches.

Let’s try installing a tokenizer suitable for Chinese.

# enter container
docker exec -it elasticsearch /bin/bash

# install IK tokenizer ( should keep same as Elasticsearch version )
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.10.4/elasticsearch-analysis-ik-8.10.4.zip

# quit and restart docker container
exit
docker restart elasticsearch

Now that the tokenizer is installed, we can proceed with the previous task.

First, obtain basic information about products from the Amazon e-commerce website.

Randomly set a batch of product URLs.

# List of Amazon product URLs to scrape
product_urls = [
    # Example product URLs - replace with any Amazon product pages
    "https://www.amazon.com/dp/B07VGRJDFY",
    "https://www.amazon.com/dp/B08N5WRWNW",
    "https://www.amazon.com/dp/B09V3KXJPB"
]

Then iterate through each URL to obtain basic product data.

# Scrape and import each product
for url in product_urls:
    print(f"\nScraping product from: {url}")
    product_data = scrape_amazon_product(url)

    if product_data:
        print(f"Successfully scraped product: {product_data.get('name')}")
        import_to_es(es, product_data, index_name)
    else:
        print(f"Failed to scrape product from: {url}")

Next, write the crawler function: scrape_amazon_product.

# Extract data from Amazon product page
def scrape_amazon_product(product_url):
    """Scrape product data from Amazon product page"""
    # Set request headers to simulate browser behavior
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Connection": "keep-alive"
    }

    try:
        # Add random delay to avoid anti-scraping measures
        time.sleep(random.uniform(2, 5))

        # Send request
        response = requests.get(product_url, headers=headers, timeout=10)
        response.raise_for_status()  # Raise HTTP errors

        # Parse HTML
        soup = BeautifulSoup(response.text, 'html.parser')

        # Extract ASIN (Amazon product ID)
        parsed_url = urlparse(product_url)
        query_params = parse_qs(parsed_url.query)
        asin = query_params.get('asin', [None])[0]

        # If no ASIN in URL, try extracting from page
        if not asin:
            asin_meta = soup.find('meta', {'name': 'twitter:data1'})
            if asin_meta:
                asin = asin_meta.get('content', '').split(':')[-1].strip()

        # Extract product name
        product_name = soup.find('span', {'id': 'productTitle'})
        product_name = product_name.get_text(strip=True) if product_name else None

        # Extract price
        price = None
        original_price = None
        currency = '$'

        price_elem = soup.find('span', {'class': 'a-price-whole'})
        if price_elem:
            price_str = price_elem.get_text(strip=True).replace(',', '').replace('.', '')
            decimal_elem = soup.find('span', {'class': 'a-price-fraction'})
            if decimal_elem:
                price_str += '.' + decimal_elem.get_text(strip=True)
            price = float(price_str) if price_str else None

        # Extract original price (if discounted)
        original_price_elem = soup.find('span', {'class': 'a-price a-text-price'})
        if original_price_elem:
            original_price_str = original_price_elem.get_text(strip=True).replace(currency, '').replace(',', '')
            original_price = float(original_price_str) if original_price_str else None

        # Extract rating and review count
        rating = None
        review_count = None

        rating_elem = soup.find('span', {'class': 'a-icon-alt'})
        if rating_elem:
            rating_str = rating_elem.get_text(strip=True).split()[0]
            rating = float(rating_str) if rating_str else None

        review_count_elem = soup.find('span', {'id': 'acrCustomerReviewText'})
        if review_count_elem:
            review_count_str = review_count_elem.get_text(strip=True).split()[0].replace(',', '')
            review_count = int(review_count_str) if review_count_str else None

        # Extract brand
        brand = None
        brand_elem = soup.find('a', {'id': 'bylineInfo'})
        if brand_elem:
            brand = brand_elem.get_text(strip=True).replace('Visit the ', '').replace(' Store', '')

        # Extract product description
        description = None
        description_elem = soup.find('div', {'id': 'productDescription'})
        if description_elem:
            description = description_elem.get_text(strip=True)

        # Extract product features
        features = []
        features_elems = soup.find_all('li', {'class': 'a-spacing-mini'})
        if features_elems:
            features = [f.get_text(strip=True) for f in features_elems[:5]]  # Get first 5 features
        features_text = ', '.join(features)

        # Extract category information
        category = None
        sub_category = None
        breadcrumbs = soup.find_all('li', {'class': 'a-spacing-none a-list-item'})
        if len(breadcrumbs) >= 2:
            category = breadcrumbs[-2].get_text(strip=True) if len(breadcrumbs) > 1 else None
            sub_category = breadcrumbs[-1].get_text(strip=True) if breadcrumbs else None

        # Extract image URL
        image_url = None
        image_elem = soup.find('img', {'id': 'landingImage'})
        if image_elem:
            image_url = image_elem.get('src')

        # Extract availability status
        availability = None
        availability_elem = soup.find('div', {'id': 'availability'})
        if availability_elem:
            availability = availability_elem.get_text(strip=True)

        # Build product data dictionary
        product_data = {
            "product_id": asin,
            "name": product_name,
            "price": price,
            "original_price": original_price,
            "currency": currency,
            "category": category,
            "sub_category": sub_category,
            "brand": brand,
            "rating": rating,
            "review_count": review_count,
            "description": description,
            "features": features_text,
            "url": product_url,
            "image_url": image_url,
            "availability": availability,
            "scraped_at": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        }

        return product_data

    except Exception as e:
        print(f"Error scraping product: {e}")
        return None

The code for writing to Elastic Search is relatively simple.

# Import data to Elasticsearch
def import_to_es(es, product_data, index_name='amazon_products'):
    """Import single product data to Elasticsearch"""
    if not product_data or not product_data.get('product_id'):
        print("Invalid product data, skipping import")
        return False

    try:
        # Use product ID as document ID
        response = es.index(
            index=index_name,
            id=product_data['product_id'],
            body=product_data
        )

        if response['result'] in ['created', 'updated']:
            print(f"Successfully imported product: {product_data.get('name')}")
            return True
        else:
            print(f"Failed to import product: {product_data.get('name')}")
            return False

    except Exception as e:
        print(f"Error importing to Elasticsearch: {e}")
        return False

The core statement is es.index.

Before writing data, it is necessary to define the Elastic search fields in detail.

# Create index and mapping
def create_ecommerce_index(es, index_name='amazon_products'):
    """Create Amazon product index and mapping"""
    if es.indices.exists(index=index_name):
        print(f"Index {index_name} already exists")
        return True

    # Define index mapping
    mapping = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
            "analysis": {
                "analyzer": {
                    "english_analyzer": {
                        "type": "standard",
                        "stopwords": "_english_"
                    }
                }
            }
        },
        "mappings": {
            "properties": {
                "product_id": {"type": "keyword"},
                "name": {
                    "type": "text",
                    "analyzer": "english_analyzer",
                    "fields": {"keyword": {"type": "keyword"}}
                },
                "price": {"type": "float"},
                "original_price": {"type": "float"},
                "currency": {"type": "keyword"},
                "category": {"type": "keyword"},
                "sub_category": {"type": "keyword"},
                "brand": {"type": "keyword"},
                "rating": {"type": "float"},
                "review_count": {"type": "integer"},
                "description": {"type": "text", "analyzer": "english_analyzer"},
                "features": {"type": "text", "analyzer": "english_analyzer"},
                "specifications": {"type": "text", "analyzer": "english_analyzer"},
                "url": {"type": "keyword"},
                "image_url": {"type": "keyword"},
                "availability": {"type": "keyword"},
                "scraped_at": {"type": "date"}
            }
        }
    }

    try:
        es.indices.create(index=index_name, body=mapping)
        print(f"Successfully created index {index_name}")
        return True
    except RequestError as e:
        print(f"Failed to create index: {e}")
        return False

Note the mappings field above, which can control whether the field undergoes word segmentation processing.

If a field is defined as type “keyword”, it will be matched exactly without word segmentation.

If set to type “text”, the tokenizer will be used to segment the field, and during subsequent search matching, word segmentation will also be used for matching instead of rigid full-text matching.

Finally, let’s supplement the process of creating the es instance in the above code.

# Initialize Elasticsearch connection
def init_elasticsearch(host='localhost', port=9200):
    """Initialize Elasticsearch connection"""
    es = Elasticsearch([f'http://{host}:{port}'])
    if es.ping():
        print("Successfully connected to Elasticsearch")
        return es
    else:
        print("Failed to connect to Elasticsearch, please check configuration")
        return None

After the crawled data is stored in the Elasticsearch database, you can search certain fields like using a search engine, and it also supports fuzzy search.

In the next tutorial, I will show you how to write an API service for searching product details based on the data stored in this example.