Deduplication of CluedIn data sets in Python
The cluedin library is a Python SDK to work with CluedIn - a data management platform. It provides a convenient way to work with CluedIn's REST API and GraphQL API.
The CluedIn platform is an outstanding Data Management Platform. It can be used to import data from various sources, transform it, and export it to other systems. It provides a convenient GraphQL API to work with the data. We can use it to query the data and perform data analysis with Python and Jupyter Notebooks like with any other data source.
Let us first install the required libraries:
%pip install cluedin
%pip install levenshtein
%pip install Metaphone
%pip install pandas
We create a JSON file with credentials to our instance (assuming the instance is at foobar.244.117.198.49.sslip.io):
{
"domain": "244.117.198.49.sslip.io",
"org_name": "foobar",
"user_email": "admin@foobar.com",
"user_password": "yourStrong(!)Password"
}
And we can store it in a convenient hidden place and then save the path in an environment variable:
export CLUEDIN_CONTEXT=~/.cluedin/cluedin.json
Then we load data from CluedIn and store it in a JSON file for further processing:
import json
import os
import pandas as pd
import cluedin
def flatten_properties(d):
for k, v in d['properties'].items():
if k.startswith('property-'):
d[k[9:]] = v
del d['properties']
return d
def get_entities_from_cluedin(entity_type):
ctx = cluedin.Context.from_json_file(os.environ['CLUEDIN_CONTEXT'])
ctx.get_token()
query = """
query searchEntities($cursor: PagingCursor, $query: String, $pageSize: Int) {
search(
query: $query
sort: FIELDS
cursor: $cursor
pageSize: $pageSize
sortFields: {field: "id", direction: ASCENDING}
) {
totalResults
cursor
entries {
id
properties(propertyNames:["imdb.name.primaryName"])
}
}
}
"""
variables = {
'query': f'entityType:{entity_type}',
'pageSize': 10_000
}
entities = list(
map(flatten_properties, cluedin.gql.entries(ctx, query, variables)))
with open('data.json', 'w', encoding='utf-8') as file:
json.dump(entities, file, ensure_ascii=False,
indent=2, sort_keys=False)
if not os.path.exists('data.json'):
get_entities_from_cluedin('/IMDb/Person')
data = pd.read_json('data.json')
data.set_index('id', inplace=True)
data
The result is a Pandas DataFrame with all the entities of type /IMDb/Person
:
The number of iterations in the worst case
Our goal is to calculate possible duplicates of the imdb.name.primaryName
property.
Naïvely, we can iterate the dataset, and for each element, iterate the same dataset in a nested loop. That would be a Cartesian product (or rather a Cartesian power) of our set. So for a million of elements, it will be a 1.000.000.000.000 (one trillion, or a million of millions) of pairs.
Luckily, the order of elements does not matter: for any elements like A
and B
, if A
is a possible duplicate of B
, then B
is a possible duplicate of A
. That saves us about 51% of iterations because instead of n^2
of iterations, we only need n * (n - 1) / 2
.
Let's check if this calculation is accurate:
dataset_size = 1_000
temp = range(dataset_size)
# the result of n(n-1) is always even, because either n or n-1 is even
expected_iterations = int(dataset_size * (dataset_size - 1) / 2)
actual_iterations = 0
for i, a in enumerate(temp):
for b in temp[i + 1:]:
actual_iterations += 1
difference = expected_iterations - actual_iterations
print(
f'Expected iterations: {expected_iterations}, actual iterations: {actual_iterations}, difference: {difference}')
The result is:
Expected iterations: 499500, actual iterations: 499500, difference: 0
Or we can use the itertools.combinations
function:
from itertools import combinations_with_replacement
dataset_size = 1_000
df1 = pd.DataFrame({"name": [f"Person {i}" for i in range(dataset_size)]})
new_df = pd.DataFrame(combinations_with_replacement(
df1.index, 2), columns=['a', 'b'])
new_df[new_df['a'] != new_df['b']]
The result is 499500 rows, exactly as expected.
Exact matching
However, there are ways to avoid such a vast number of iterations. For example, we can use Pandas duplicated
function to mark entities with exact duplicates:
data['exact_duplicate'] = data.duplicated(
subset=['imdb.name.primaryName'], keep=False)
data
This code runs less than 0.1 seconds and marks all exact duplicates in our 1.000.000-entities dataset:
Double Metaphone
For the Double Metaphone, we need to calculate the Double Metaphone hash of each name and then compare them in the same way we did for the exact matches:
import metaphone
data['metaphone'] = data['imdb.name.primaryName'].apply(
lambda x: metaphone.doublemetaphone(x)[0])
data = data[data['metaphone'] != '']
data['metaphone_duplicates_count'] = \
data[data['metaphone'] != ''] \
.groupby('metaphone')
.transform('count')
data.sort_values(['metaphone_duplicates_count', 'metaphone'],
ascending=[False, True], inplace=True)
data
This calculation takes a bit less than ten seconds on my three-year-old laptop and gives us the following result:
data[data['metaphone_duplicates_count'] > 1].nunique()['metaphone']
# 126320
Levenshtein distance
We need to calculate the distance between each pair of names for the Levenshtein distance. This can be a very long-running process (a naive implementation will take about 285 years seconds on my laptop). But we can optimize it by reducing the number of comparisons.
First, we can skip exact duplicates and operate only on the unique names.
Second, we can skip names that have different lengths and differ by more than one character.
Third, we will use multiprocessing to speed up the calculation.
We calculate the Levenshtein distance threshold as Levenshtein distance divided by the length of the longest string, so the longer one of the strings, the more characters it can differ from the other string.
We will also use the map-reduce approach: calculate the Levenshtein distance for each pair of strings and store the result for each entity in a separate file, and then we will reduce the results to a single file.
# pylint: disable=missing-module-docstring, missing-function-docstring, missing-class-docstring, line-too-long, disable=invalid-name
# deduplication.py
import json
import os
import time
from datetime import timedelta
from hashlib import md5
from multiprocessing import Pool, cpu_count
from Levenshtein import distance
os.makedirs('levenshtein', exist_ok=True)
def find_duplicates(strings):
a = strings[0]
md5_hash = md5(a.encode('utf-8')).hexdigest()
file_path = f'levenshtein/{md5_hash}.json'
if os.path.exists(file_path):
return
length_distance_threshold = 1
levenshtein_distance_threshold = 0.2
result = {
'name': a,
'duplicates': []
}
for b in strings[1:]:
if abs(len(a) - len(b)) > length_distance_threshold:
continue
levenshtein_distance = distance(a, b) / max(len(a), len(b))
if levenshtein_distance < levenshtein_distance_threshold:
result['duplicates'].append({
'name': b,
'distance': levenshtein_distance
})
if len(result['duplicates']) > 0:
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(result, file, indent=2)
def main():
process_start = int(time.time())
with open('data.json', 'r', encoding='utf-8') as file: # load the dataset
data = list({x['imdb.name.primaryName'] for x in json.load(file)})
print(
f'{timedelta(seconds=time.time() - process_start)} Loaded {len(data)} unique strings from data.json.')
with Pool(processes=cpu_count()) as pool:
print(f'{timedelta(seconds=time.time() - process_start)} Starting {cpu_count()} processes to find duplicates...')
total = len(data)
thousand_start = time.time()
for i, _ in enumerate(pool.imap(find_duplicates, [data[i:] for i, _ in enumerate(data)])):
if i % 1_000 == 0:
print(
f'{timedelta(seconds=time.time() - process_start)} {timedelta(seconds=time.time() - thousand_start)} strings: {i:_} of {total:_} ({i / total * 100:.0f}%)')
thousand_start = time.time()
print(f'{timedelta(seconds=time.time() - process_start)} Done!')
# Reduce
files = os.listdir('levenshtein')
duplicates = {}
for i, file in enumerate(files):
if i % 1_000 == 0:
print(
f'{timedelta(seconds=time.time() - process_start)} {i:_} of {len(files):_} ({i / len(files) * 100:.0f}%)')
with open(f'levenshtein/{file}', 'r', encoding='utf-8') as f:
data = json.load(f)
if data['name'] not in duplicates:
duplicates[data['name']] = {
'count': 0,
'duplicates': []
}
duplicates[data['name']]['count'] += len(data['duplicates'])
duplicates[data['name']]['duplicates'].extend(data['duplicates'])
duplicates = {k: v for k, v in sorted(
duplicates.items(), key=lambda item: item[1]['count'], reverse=True)}
for _, v in duplicates.items():
v['duplicates'] = sorted(
v['duplicates'], key=lambda item: item['distance'], reverse=False)
with open('duplicates.json', 'w', encoding='utf-8') as f: # save the result
json.dump(duplicates, f, indent=2)
print(len(files))
if __name__ == '__main__':
main()
Well, it takes several hours for a million entities, but it is still better than 285 years.
We can dig further and use BK-trees to reduce the number of comparisons even more, but it is a topic for another article.
In real life, it takes a few iterations to explore and narrow down the conditions on a small sample of data and run the final calculation on the whole dataset.