Deduplikering af CluedIn-datasæt i Python
Biblioteket cluedin er et Python SDK til at arbejde med CluedIn — en datamanagement-platform. Det giver en bekvem måde at arbejde med CluedIns REST API og GraphQL API.
CluedIn-platformen er en fremragende datamanagement-platform. Den kan bruges til at importere data fra forskellige kilder, transformere det og eksportere det til andre systemer. Den tilbyder et praktisk GraphQL API til at arbejde med dataen. Vi kan bruge det til at forespørge data og udføre dataanalyse med Python og Jupyter Notebooks ligesom med enhver anden datakilde.
Lad os først installere de nødvendige biblioteker:
%pip install cluedin
%pip install levenshtein
%pip install Metaphone
%pip install pandas
Vi opretter en JSON-fil med legitimationsoplysninger til vores instans (vi antager, at instansen er på foobar.244.117.198.49.sslip.io):
{
"domain": "244.117.198.49.sslip.io",
"org_name": "foobar",
"user_email": "admin@foobar.com",
"user_password": "yourStrong(!)Password"
}
Og vi kan gemme den et praktisk skjult sted og derefter gemme stien i en miljøvariabel:
export CLUEDIN_CONTEXT=~/.cluedin/cluedin.json
Derefter indlæser vi data fra CluedIn og gemmer det i en JSON-fil til videre behandling:
import json
import os
import pandas as pd
import cluedin
def flatten_properties(d):
for k, v in d['properties'].items():
if k.startswith('property-'):
d[k[9:]] = v
del d['properties']
return d
def get_entities_from_cluedin(entity_type):
ctx = cluedin.Context.from_json_file(os.environ['CLUEDIN_CONTEXT'])
ctx.get_token()
query = """
query searchEntities($cursor: PagingCursor, $query: String, $pageSize: Int) {
search(
query: $query
sort: FIELDS
cursor: $cursor
pageSize: $pageSize
sortFields: {field: "id", direction: ASCENDING}
) {
totalResults
cursor
entries {
id
properties(propertyNames:["imdb.name.primaryName"])
}
}
}
"""
variables = {
'query': f'entityType:{entity_type}',
'pageSize': 10_000
}
entities = list(
map(flatten_properties, cluedin.gql.entries(ctx, query, variables)))
with open('data.json', 'w', encoding='utf-8') as file:
json.dump(entities, file, ensure_ascii=False,
indent=2, sort_keys=False)
if not os.path.exists('data.json'):
get_entities_from_cluedin('/IMDb/Person')
data = pd.read_json('data.json')
data.set_index('id', inplace=True)
data
Resultatet er en Pandas DataFrame med alle entities af typen /IMDb/Person:
Antallet af iterationer i værste tilfælde
Vores mål er at beregne mulige duplikater af egenskaben imdb.name.primaryName.
Naivt kan vi iterere over datasættet, og for hvert element iterere over det samme datasæt i en indlejret løkke. Det ville være et kartesisk produkt (eller rettere en kartesisk potens) af vores mængde. Så for en million elementer vil det være 1.000.000.000.000 (en billion, eller en million af millioner) par.
Heldigvis er rækkefølgen af elementerne ligegyldig: for alle elementer som A og B gælder det, at hvis A er en mulig duplikat af B, så er B en mulig duplikat af A. Det sparer os for cirka 51% af iterationerne, fordi vi i stedet for n^2 iterationer kun behøver n * (n - 1) / 2.
Lad os tjekke, om denne beregning er korrekt:
dataset_size = 1_000
temp = range(dataset_size)
# the result of n(n-1) is always even, because either n or n-1 is even
expected_iterations = int(dataset_size * (dataset_size - 1) / 2)
actual_iterations = 0
for i, a in enumerate(temp):
for b in temp[i + 1:]:
actual_iterations += 1
difference = expected_iterations - actual_iterations
print(
f'Expected iterations: {expected_iterations}, actual iterations: {actual_iterations}, difference: {difference}')
Resultatet er:
Expected iterations: 499500, actual iterations: 499500, difference: 0
Eller vi kan bruge funktionen itertools.combinations:
from itertools import combinations_with_replacement
dataset_size = 1_000
df1 = pd.DataFrame({"name": [f"Person {i}" for i in range(dataset_size)]})
new_df = pd.DataFrame(combinations_with_replacement(
df1.index, 2), columns=['a', 'b'])
new_df[new_df['a'] != new_df['b']]
Resultatet er 499.500 rækker, præcis som forventet.
Eksakt matching
Der er dog måder at undgå et så stort antal iterationer. For eksempel kan vi bruge Pandas' duplicated-funktion til at markere entities med eksakte duplikater:
data['exact_duplicate'] = data.duplicated(
subset=['imdb.name.primaryName'], keep=False)
data
Denne kode kører på under 0,1 sekunder og markerer alle eksakte duplikater i vores datasæt med 1.000.000 entities:
Double Metaphone
Til Double Metaphone skal vi beregne Double Metaphone-hashen for hvert navn og derefter sammenligne dem på samme måde, som vi gjorde for de eksakte matches:
import metaphone
data['metaphone'] = data['imdb.name.primaryName'].apply(
lambda x: metaphone.doublemetaphone(x)[0])
data = data[data['metaphone'] != '']
data['metaphone_duplicates_count'] = \
data[data['metaphone'] != ''] \
.groupby('metaphone')
.transform('count')
data.sort_values(['metaphone_duplicates_count', 'metaphone'],
ascending=[False, True], inplace=True)
data
Denne beregning tager lidt under ti sekunder på min tre år gamle bærbare computer og giver os følgende resultat:
data[data['metaphone_duplicates_count'] > 1].nunique()['metaphone']
# 126320
Levenshtein-afstand
Til Levenshtein-afstanden skal vi beregne afstanden mellem hvert par af navne. Dette kan være en meget langvarig proces (en naiv implementering ville tage omkring 285 år på min bærbare computer). Men vi kan optimere det ved at reducere antallet af sammenligninger.
For det første kan vi springe eksakte duplikater over og kun arbejde med de unikke navne.
For det andet kan vi springe navne over, der har forskellige længder og adskiller sig med mere end ét tegn.
For det tredje bruger vi multiprocessing til at fremskynde beregningen.
Vi beregner Levenshtein-afstandstærsklen som Levenshtein-afstanden divideret med længden af den længste streng, så jo længere en af strengene er, desto flere tegn kan den adskille sig fra den anden streng.
Vi bruger også map-reduce-tilgangen: beregn Levenshtein-afstanden for hvert par af strenge og gem resultatet for hver entity i en separat fil, og derefter reducerer vi resultaterne til en enkelt fil.
# pylint: disable=missing-module-docstring, missing-function-docstring, missing-class-docstring, line-too-long, disable=invalid-name
# deduplication.py
import json
import os
import time
from datetime import timedelta
from hashlib import md5
from multiprocessing import Pool, cpu_count
from Levenshtein import distance
os.makedirs('levenshtein', exist_ok=True)
def find_duplicates(strings):
a = strings[0]
md5_hash = md5(a.encode('utf-8')).hexdigest()
file_path = f'levenshtein/{md5_hash}.json'
if os.path.exists(file_path):
return
length_distance_threshold = 1
levenshtein_distance_threshold = 0.2
result = {
'name': a,
'duplicates': []
}
for b in strings[1:]:
if abs(len(a) - len(b)) > length_distance_threshold:
continue
levenshtein_distance = distance(a, b) / max(len(a), len(b))
if levenshtein_distance < levenshtein_distance_threshold:
result['duplicates'].append({
'name': b,
'distance': levenshtein_distance
})
if len(result['duplicates']) > 0:
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(result, file, indent=2)
def main():
process_start = int(time.time())
with open('data.json', 'r', encoding='utf-8') as file: # load the dataset
data = list({x['imdb.name.primaryName'] for x in json.load(file)})
print(
f'{timedelta(seconds=time.time() - process_start)} Loaded {len(data)} unique strings from data.json.')
with Pool(processes=cpu_count()) as pool:
print(f'{timedelta(seconds=time.time() - process_start)} Starting {cpu_count()} processes to find duplicates...')
total = len(data)
thousand_start = time.time()
for i, _ in enumerate(pool.imap(find_duplicates, [data[i:] for i, _ in enumerate(data)])):
if i % 1_000 == 0:
print(
f'{timedelta(seconds=time.time() - process_start)} {timedelta(seconds=time.time() - thousand_start)} strings: {i:_} of {total:_} ({i / total * 100:.0f}%)')
thousand_start = time.time()
print(f'{timedelta(seconds=time.time() - process_start)} Done!')
# Reduce
files = os.listdir('levenshtein')
duplicates = {}
for i, file in enumerate(files):
if i % 1_000 == 0:
print(
f'{timedelta(seconds=time.time() - process_start)} {i:_} of {len(files):_} ({i / len(files) * 100:.0f}%)')
with open(f'levenshtein/{file}', 'r', encoding='utf-8') as f:
data = json.load(f)
if data['name'] not in duplicates:
duplicates[data['name']] = {
'count': 0,
'duplicates': []
}
duplicates[data['name']]['count'] += len(data['duplicates'])
duplicates[data['name']]['duplicates'].extend(data['duplicates'])
duplicates = {k: v for k, v in sorted(
duplicates.items(), key=lambda item: item[1]['count'], reverse=True)}
for _, v in duplicates.items():
v['duplicates'] = sorted(
v['duplicates'], key=lambda item: item['distance'], reverse=False)
with open('duplicates.json', 'w', encoding='utf-8') as f: # save the result
json.dump(duplicates, f, indent=2)
print(len(files))
if __name__ == '__main__':
main()
Ja, det tager flere timer for en million entities, men det er stadig bedre end 285 år.
Vi kan grave dybere og bruge BK-træer til at reducere antallet af sammenligninger endnu mere, men det er et emne for en anden artikel.
I praksis tager det et par iterationer at udforske og indsnævre betingelserne på et lille udsnit af data, inden man kører den endelige beregning på hele datasættet.