up 2 down

Me gustaría convertir archivos de un formato a otro utilizando los 32 núcleos que tengo. Tengo este pedazo de código que debe hacer lo que quiera. Es decir. recorrer todos los archivos y convertir los archivos XML similar al parquet. El orden no importa. Por lo tanto, cuando se termina un trabajo, simplemente debe seguir el siguiente archivo y convertirlo sin esperar a otros puestos de trabajo. El número total de puestos de trabajo no debe exceder de un máximo determinado en procs. Comienza muy prometedor, pero después de la primera iteración del proceso se detiene básicamente. También probé Pool.map, Pool.apply_async. Estoy usando un contexto pyspark en un cuaderno jupyter, no está seguro de si esto es importante aquí.

import pandas as pd
from pyspark import SQLContext
from pyteomics import mzxml
from glob import glob

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
from multiprocessing import Process, Queue, Pool
from random import shuffle

def mzxml_to_pandas_df(filename):
    slices = []
    file = mzxml.MzXML(filename)
    # print('Reading:', filename)
    while True:
        try:
            slices.append(pd.DataFrame(file.next()))
        except:
            break
    return pd.concat(slices)

def fix_column_names_for_spark(df):
    df.rename(columns={'m/z array': 'mz_array', 
                       'intensity array': 'intensity_array'}, inplace=True)

def mzxml_to_parquet(filename):
    new_name = filename.replace('.mzXML', '.parquet')
    print(new_name, end=" ")
    if os.path.isdir(new_name):
        print('already exists.')
        return None
    df = mzxml_to_pandas_df(filename)
    fix_column_names_for_spark(df)
    sqlContext.createDataFrame(df).write.save(new_name)
    print('done')

mzxml_files = glob('./data/*mzXML')  # list of filenames
procs = 10
pool = Pool(procs)
shuffle(mzxml_files)  # Does not help. 
pool.map_async(mzxml_to_parquet, mzxml_files)
pool.close()
pool.join()

¿Por qué este código parar en algún momento, no continuar con el resto de archivos? Estoy interesado en el uso correcto de multiprocesamiento, sino también, de manera más elegante de hacer la tarea. ¡Gracias!

Ahora, yo era capaz de reproducir estos errores. Realmente parece ser un problema de chispa:

ERROR: py4j.java_gateway: Se ha producido un error al intentar conectar con   el servidor Java (127.0.0.1:34967) Rastreo (llamada más reciente pasado):
Expediente   "/Home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",   línea 1145, en send_command       self.socket.sendall (command.encode ( "UTF-8")) BrokenPipeError: [Errno 32] tubo Roto

Durante la manipulación de la excepción anteriormente, otro produjo excepción de:

Rastreo (llamadas recientes más última): Archivo   "/Home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",   línea 985, en send_command       respuesta = connection.send_command (comandos) del archivo "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py ",   línea 1149, en send_command       "Error al enviar", e, proto.ERROR_ON_SEND) py4j.protocol.Py4JNetworkError: Error al enviar

Durante la manipulación de la excepción anteriormente, otro produjo excepción de:

Rastreo (llamadas recientes más última): Archivo   "/Home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",   línea 929, en _get_connection       conexión = self.deque.pop() IndexError: estallido de un deque vacía

Durante la manipulación de la excepción anteriormente, otro produjo excepción de:

Rastreo (llamadas recientes más última): Archivo   "/Home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",   la línea 1067, en el inicio       self.socket.connect ((self.address, self.port)) ConnectionRefusedError: [Errno 111] Conexión rechazada

¿Eso quiere decir multiprocesamiento no debe utilizarse junto con la chispa?