Funções com Multiprocessing para processamento de textos

Quem acompanhou o post A small journey in the valley of Natural Language Processing and Text Pre-Processing for German language acompanhou um pouco dos desafios de modelar um classificador de textos em alemão.

No entanto uma coisa que me salvou na parte de pre-processing foi que eu praticamente usei o multiprocessing para paralelizar o pré-processamento na coluna de texto e isso me salvou um tempo incrível (relembrando: eu tinha 1+ milhão de registros de texto, com 250 palavras média por registro (com um desvio padrão de 700, tudo isso usando biblioteca interna).

import time
import numpy as np
import pandas as pd
import nlp_pre_processing
# An internal NLP lib to process text
nlp = nlp_pre_processing_library.NLPPreprocessor()
# Multiprocessing library that uses pool
# to distribute the task for all processors
from multiprocessing import Pool
print(f'Start processing...: {(time.time() - start_time)}')
# Tracking the time
start_time = time.time()
# Number of partitions that
# the Pandas Dataframe will be
# splited to parallel processing
num_partitions = 20
# Number of cores that will be used
# more it's better
num_cores = 16
print(f'Partition Number: {num_partitions} - Number of Cores: {num_cores}...')
def main_process_pipeline(df, func):
"""
Function that will split the dataframe
and process all those parts in a n number
of processors
Args:
df (Pandas dataframe): Dataframe that will be splited
func (function): Python function that will be executed in parallel
Returns:
df: Dataframe with all parts concatenated after the function be applied
"""
df_split = np.array_split(df, num_partitions)
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
def pre_process_wrapper(df):
""" Will take the Dataframe and apply a function using lambda"""
df['text'] = df['text'].apply(lambda text: nlp.pre_processing_pipeline(text))
return df
# Unite the Dataframe and the Wrapper
processed_df = main_process_pipeline(df, pre_process_wrapper)
print(f'Processing finished in seconds: {(time.time() - start_time)}')

É isso: Simples e tranquilo.