🚧 This website is still under construction. Please stay tuned. 🚧
Developers
ETL Pipeline
Deduplication

Deduplication Process

Overview

This documentation outlines the steps and processes undertaken for deduplication of a large dataset using Polars, TF-IDF, DBSCAN, and Splink. The deduplication process is designed to enhance performance, optimize memory usage, and provide accurate clustering and matching of data points.

Tools and Technologies

  • Polars: A DataFrame library for Python, optimized for speed and memory efficiency.
  • TF-IDF: Term Frequency-Inverse Document Frequency, a statistical measure used to evaluate the importance of a word in a document relative to a collection of documents.
  • DBSCAN: Density-Based Spatial Clustering of Applications with Noise, a clustering algorithm.
  • Splink: A library for probabilistic data linkage and deduplication using SQL.

1. Goal

The primary objective is to deduplicate a large dataset to improve data quality and integrity. This involves identifying and flagging duplicate entries to ensure a single, accurate representation of each entity in the dataset.

2. Data Preparation

The existing pipeline is utilized to obtain the necessary data. The following steps then get performed:

2.1 Data Cleaning

  • Remove any irrelevant or redundant information.
  • Handle missing values appropriately.
  • Standardize formats (e.g., dates, text casing).
# Since there is an id column in the identifier column after normalizing it collides with the main id
    # We copy the column and normalize the identifier after that we also rename the id to identifier_id
    identifier_column = df.select("identifier")
    identifier_column = identifier_column.unnest("identifier")
    identifier_column = add_prefix_to_all_columns(identifier_column, "identifier_")
    # Adding it to the data frame and deleting the old column
    df = df.drop("identifier")
    df = pl.concat([df, identifier_column], how="horizontal")

    # We do the same for the address, contactPoint, and details columns
    address_column = df.select("address")
    address_column = address_column.unnest("address")
    address_column = add_prefix_to_all_columns(address_column, "address_")
    df = df.drop("address")
    df = pl.concat([df, address_column], how="horizontal") 
    # ------
    contact_point_column = df.select("contactPoint")
    contact_point_column = contact_point_column.unnest("contactPoint")
    contact_point_column = add_prefix_to_all_columns(
        contact_point_column, "contactPoint_"
    )
    df = df.drop("contactPoint")
    df = pl.concat([df, contact_point_column], how="horizontal")
    # ------
    details_column = df.select("details")
    details_column = details_column.unnest("details")
    details_column = add_prefix_to_all_columns(details_column, "details_")
    df = df.drop("details")
    df = pl.concat([df, details_column], how="horizontal")

Reason for using Polars instead of Pandas is this code above. It is the clean up process using polars, while it is much longer and harder to understand it performed about 10x better than Pandas for the test set that was being used during development.

2.2 Data Preparation

  • Blocking: Clustering data to reduce the number of comparisons during matching.
    • TF-IDF: Convert text data into numerical feature vectors.
      • Calculate Term Frequency (TF).
      • Calculate Inverse Document Frequency (IDF).
      • Combine TF and IDF to create a TF-IDF score for each word in each record.
    • Clustering: Group records into clusters to limit comparisons to likely duplicates.
      • DBSCAN: Density-based clustering to handle arbitrary shapes and sizes, without requiring a predefined number of clusters.
# VECTORIZATION AND CLUSTERING ---------------------------------------------------------
    tfidf = TfidfVectorizer(
        analyzer="word", token_pattern=r"\w{1,}", max_features=10000
    )
    tfidf_matrix = tfidf.fit_transform(df["name"].to_pandas())
    clustering = DBSCAN(eps=0.5, min_samples=2, metric="cosine").fit(tfidf_matrix)
    df = df.with_columns(pl.Series("canopy_cluster", clustering.labels_))

3. Deduplication with Splink

  • Use Splink to perform fuzzy matching within each cluster.
  • Apply Levenshtein distance for string similarity measurement.
settings = {
        "link_type": "dedupe_only",
        "unique_id_column_name": "id",
        "blocking_rules_to_generate_predictions": [
            block_on("canopy_cluster"),
        ],
        "comparisons": [
            ctl.name_comparison("name", term_frequency_adjustments=True),
            ctl.name_comparison("contactPoint_name", term_frequency_adjustments=True),
            ctl.email_comparison("contactPoint_email", include_domain_match_level=True),
            ctl.postcode_comparison("address_postalCode"),
            exact_match("address_streetAddress"),
            exact_match("identifier_id"),
        ],
    }

4. Generate a parquet file with deduplicated records.