Funções com Multiprocessing para processamento de textos
2020 Apr 01Quem acompanhou o post A small journey in the valley of Natural Language Processing and Text Pre-Processing for German language acompanhou um pouco dos desafios de modelar um classificador de textos em alemão.
No entanto uma coisa que me salvou na parte de pre-processing foi que eu praticamente usei o multiprocessing
para paralelizar o pré-processamento na coluna de texto e isso me salvou um tempo incrível (relembrando: eu tinha 1+ milhão de registros de texto, com 250 palavras média por registro (com um desvio padrão de 700, tudo isso usando biblioteca interna).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import numpy as np | |
import pandas as pd | |
import nlp_pre_processing | |
# An internal NLP lib to process text | |
nlp = nlp_pre_processing_library.NLPPreprocessor() | |
# Multiprocessing library that uses pool | |
# to distribute the task for all processors | |
from multiprocessing import Pool | |
print(f'Start processing...: {(time.time() - start_time)}') | |
# Tracking the time | |
start_time = time.time() | |
# Number of partitions that | |
# the Pandas Dataframe will be | |
# splited to parallel processing | |
num_partitions = 20 | |
# Number of cores that will be used | |
# more it's better | |
num_cores = 16 | |
print(f'Partition Number: {num_partitions} - Number of Cores: {num_cores}...') | |
def main_process_pipeline(df, func): | |
""" | |
Function that will split the dataframe | |
and process all those parts in a n number | |
of processors | |
Args: | |
df (Pandas dataframe): Dataframe that will be splited | |
func (function): Python function that will be executed in parallel | |
Returns: | |
df: Dataframe with all parts concatenated after the function be applied | |
""" | |
df_split = np.array_split(df, num_partitions) | |
pool = Pool(num_cores) | |
df = pd.concat(pool.map(func, df_split)) | |
pool.close() | |
pool.join() | |
return df | |
def pre_process_wrapper(df): | |
""" Will take the Dataframe and apply a function using lambda""" | |
df['text'] = df['text'].apply(lambda text: nlp.pre_processing_pipeline(text)) | |
return df | |
# Unite the Dataframe and the Wrapper | |
processed_df = main_process_pipeline(df, pre_process_wrapper) | |
print(f'Processing finished in seconds: {(time.time() - start_time)}') |
É isso: Simples e tranquilo.