Deduplication of CluedIn data sets in Python

The cluedin library is a Python SDK to work with CluedIn - a data management platform. It provides a convenient way to work with CluedIn's REST API and GraphQL API.

The CluedIn platform is an outstanding Data Management Platform. It can be used to import data from various sources, transform it, and export it to other systems. It provides a convenient GraphQL API to work with the data. We can use it to query the data and perform data analysis with Python and Jupyter Notebooks like with any other data source.

Let us first install the required libraries:

%pip install cluedin
%pip install levenshtein
%pip install Metaphone
%pip install pandas

We create a JSON file with credentials to our instance (assuming the instance is at foobar.244.117.198.49.sslip.io):

{
  "domain": "244.117.198.49.sslip.io",
  "org_name": "foobar",
  "user_email": "admin@foobar.com",
  "user_password": "yourStrong(!)Password"
}

And we can store it in a convenient hidden place and then save the path in an environment variable:

export CLUEDIN_CONTEXT=~/.cluedin/cluedin.json

Then we load data from CluedIn and store it in a JSON file for further processing:

import json
import os

import pandas as pd

import cluedin


def flatten_properties(d):
    for k, v in d['properties'].items():
        if k.startswith('property-'):
            d[k[9:]] = v
    del d['properties']
    return d


def get_entities_from_cluedin(entity_type):
    ctx = cluedin.Context.from_json_file(os.environ['CLUEDIN_CONTEXT'])
    ctx.get_token()

    query = """
        query searchEntities($cursor: PagingCursor, $query: String, $pageSize: Int) {
          search(
            query: $query
            sort: FIELDS
            cursor: $cursor
            pageSize: $pageSize
            sortFields: {field: "id", direction: ASCENDING}
          ) {
            totalResults
            cursor
            entries {
              id
              properties(propertyNames:["imdb.name.primaryName"])
            }
          }
        }
    """

    variables = {
        'query': f'entityType:{entity_type}',
        'pageSize': 10_000
    }

    entities = list(
        map(flatten_properties, cluedin.gql.entries(ctx, query, variables)))

    with open('data.json', 'w', encoding='utf-8') as file:
        json.dump(entities, file, ensure_ascii=False,
                  indent=2, sort_keys=False)


if not os.path.exists('data.json'):
    get_entities_from_cluedin('/IMDb/Person')

data = pd.read_json('data.json')
data.set_index('id', inplace=True)
data

The result is a Pandas DataFrame with all the entities of type /IMDb/Person:

data 0

The number of iterations in the worst case

Our goal is to calculate possible duplicates of the imdb.name.primaryName property.

Naïvely, we can iterate the dataset, and for each element, iterate the same dataset in a nested loop. That would be a Cartesian product (or rather a Cartesian power) of our set. So for a million of elements, it will be a 1.000.000.000.000 (one trillion, or a million of millions) of pairs.

Luckily, the order of elements does not matter: for any elements like A and B, if A is a possible duplicate of B, then B is a possible duplicate of A. That saves us about 51% of iterations because instead of n^2 of iterations, we only need n * (n - 1) / 2.

Let's check if this calculation is accurate:

dataset_size = 1_000
temp = range(dataset_size)

# the result of n(n-1) is always even, because either n or n-1 is even
expected_iterations = int(dataset_size * (dataset_size - 1) / 2)
actual_iterations = 0

for i, a in enumerate(temp):
    for b in temp[i + 1:]:
        actual_iterations += 1

difference = expected_iterations - actual_iterations

print(
    f'Expected iterations: {expected_iterations}, actual iterations: {actual_iterations}, difference: {difference}')

The result is:

Expected iterations: 499500, actual iterations: 499500, difference: 0

Or we can use the itertools.combinations function:

from itertools import combinations_with_replacement

dataset_size = 1_000

df1 = pd.DataFrame({"name": [f"Person {i}" for i in range(dataset_size)]})

new_df = pd.DataFrame(combinations_with_replacement(
    df1.index, 2), columns=['a', 'b'])

new_df[new_df['a'] != new_df['b']]

The result is 499500 rows, exactly as expected.

Exact matching

However, there are ways to avoid such a vast number of iterations. For example, we can use Pandas duplicated function to mark entities with exact duplicates:

data['exact_duplicate'] = data.duplicated(
    subset=['imdb.name.primaryName'], keep=False)

data

This code runs less than 0.1 seconds and marks all exact duplicates in our 1.000.000-entities dataset:

data 1

Double Metaphone

For the Double Metaphone, we need to calculate the Double Metaphone hash of each name and then compare them in the same way we did for the exact matches:

import metaphone

data['metaphone'] = data['imdb.name.primaryName'].apply(
    lambda x: metaphone.doublemetaphone(x)[0])
data = data[data['metaphone'] != '']

data['metaphone_duplicates_count'] = \
    data[data['metaphone'] != ''] \
        .groupby('metaphone')
        .transform('count')

data.sort_values(['metaphone_duplicates_count', 'metaphone'],
                 ascending=[False, True], inplace=True)

data

This calculation takes a bit less than ten seconds on my three-year-old laptop and gives us the following result:

data 2
data[data['metaphone_duplicates_count'] > 1].nunique()['metaphone']
# 126320

Levenshtein distance

We need to calculate the distance between each pair of names for the Levenshtein distance. This can be a very long-running process (a naive implementation will take about 285 years seconds on my laptop). But we can optimize it by reducing the number of comparisons.

First, we can skip exact duplicates and operate only on the unique names.

Second, we can skip names that have different lengths and differ by more than one character.

Third, we will use multiprocessing to speed up the calculation.

We calculate the Levenshtein distance threshold as Levenshtein distance divided by the length of the longest string, so the longer one of the strings, the more characters it can differ from the other string.

We will also use the map-reduce approach: calculate the Levenshtein distance for each pair of strings and store the result for each entity in a separate file, and then we will reduce the results to a single file.

# pylint: disable=missing-module-docstring, missing-function-docstring, missing-class-docstring, line-too-long, disable=invalid-name

# deduplication.py

import json
import os
import time
from datetime import timedelta
from hashlib import md5
from multiprocessing import Pool, cpu_count

from Levenshtein import distance

os.makedirs('levenshtein', exist_ok=True)


def find_duplicates(strings):
    a = strings[0]

    md5_hash = md5(a.encode('utf-8')).hexdigest()
    file_path = f'levenshtein/{md5_hash}.json'

    if os.path.exists(file_path):
        return

    length_distance_threshold = 1
    levenshtein_distance_threshold = 0.2

    result = {
        'name': a,
        'duplicates': []
    }

    for b in strings[1:]:
        if abs(len(a) - len(b)) > length_distance_threshold:
            continue

        levenshtein_distance = distance(a, b) / max(len(a), len(b))

        if levenshtein_distance < levenshtein_distance_threshold:
            result['duplicates'].append({
                'name': b,
                'distance': levenshtein_distance
            })

    if len(result['duplicates']) > 0:
        with open(file_path, 'w', encoding='utf-8') as file:
            json.dump(result, file, indent=2)


def main():
    process_start = int(time.time())

    with open('data.json', 'r', encoding='utf-8') as file: # load the dataset
        data = list({x['imdb.name.primaryName'] for x in json.load(file)})

    print(
        f'{timedelta(seconds=time.time() - process_start)} Loaded {len(data)} unique strings from data.json.')

    with Pool(processes=cpu_count()) as pool:
        print(f'{timedelta(seconds=time.time() - process_start)} Starting {cpu_count()} processes to find duplicates...')

        total = len(data)
        thousand_start = time.time()

        for i, _ in enumerate(pool.imap(find_duplicates, [data[i:] for i, _ in enumerate(data)])):
            if i % 1_000 == 0:
                print(
                    f'{timedelta(seconds=time.time() - process_start)} {timedelta(seconds=time.time() - thousand_start)} strings: {i:_} of {total:_} ({i / total * 100:.0f}%)')
                thousand_start = time.time()

        print(f'{timedelta(seconds=time.time() - process_start)} Done!')

    # Reduce

    files = os.listdir('levenshtein')

    duplicates = {}

    for i, file in enumerate(files):
        if i % 1_000 == 0:
            print(
                f'{timedelta(seconds=time.time() - process_start)} {i:_} of {len(files):_} ({i / len(files) * 100:.0f}%)')
        with open(f'levenshtein/{file}', 'r', encoding='utf-8') as f:
            data = json.load(f)

            if data['name'] not in duplicates:
                duplicates[data['name']] = {
                    'count': 0,
                    'duplicates': []
                }

            duplicates[data['name']]['count'] += len(data['duplicates'])
            duplicates[data['name']]['duplicates'].extend(data['duplicates'])

    duplicates = {k: v for k, v in sorted(
        duplicates.items(), key=lambda item: item[1]['count'], reverse=True)}

    for _, v in duplicates.items():
        v['duplicates'] = sorted(
            v['duplicates'], key=lambda item: item['distance'], reverse=False)

    with open('duplicates.json', 'w', encoding='utf-8') as f: # save the result
        json.dump(duplicates, f, indent=2)

    print(len(files))


if __name__ == '__main__':
    main()

Well, it takes several hours for a million entities, but it is still better than 285 years.

We can dig further and use BK-trees to reduce the number of comparisons even more, but it is a topic for another article.

In real life, it takes a few iterations to explore and narrow down the conditions on a small sample of data and run the final calculation on the whole dataset.