Trabajar con ficheros CSV ha sido y seguirá siendo una tarea muy habitual en los procesos de extracción y carga de datos. Para la información semiestructurada el formato JSON se ha convertido en el nuevo “formato standard” de intercambio de información, bajando del pedestal al formato XML que durante muchos años ha tenido esta posición. En este artículo vamos a ver cómo Synapse trabaja con este tipo de ficheros.
Este dataset está compuesto de un fichero de hechos y cinco ficheros de dimensiones:
Nos vamos a centrar únicamente en el fichero de hechos que es el que tiene una volumetría más significativa. Para realizar una primera prueba, vamos a convertir el fichero a JSON usando para ello PowerShell:
import-csv “Data8277.csv” | ConvertTo-Json | Add-Content -Path “Data8277.json”
Claramente esto resultó ser una mala idea para resolver este problema. Por una parte durante el proceso de conversión, PowerShell se “comió” más de 30 GBs de memoria física, lo cual parece una cantidad muy elevada para hacer este tipo de conversión, y casi otras 20 GB de memoria paginada a disco:
Además resultó extremadamente lento, llevándole más de 25 minutos (mucha paciencia tuve) llegar a este punto de saturación de memoria y tenerlo que abortar.
Para el siguiente intento me planteé utilizar Python ya que siempre está en boca de todos los “big data lovers” por lo que debería ser capaz de hacer esta conversión sin problemas. Por tanto, armado con la última versión de Visual Studio Code con la extensión ms-Python, lint-Python y la última versión de Python 3.9.5 y Pandas me dispongo a realizar la conversión.
Comenzaremos probando que podemos leer el fichero y sacar un par de líneas:
import pandas as pddata = pd.read_csv(“Data8277.csv”, sep=”,”)print(data.head(2
Conviértete tus datos en información clave para tu negocio con Azure Synapse
Escalabilidad limitada, conclusiones eficaces y seguridad inigualable son algunas de las características de Azure Synapse. Explora este servicio y da respuesta a cómo podría ayudarte a tomar decisiones sólidas para tu negocio, realizando una prueba concepto de 10 jornadas sin coste.
Quiero una prueba gratuita de Azure Synapse
Una vez comprobado que podemos obtener un par de líneas vamos a proceder a la conversión del fichero completo:
import pandas as pddata = pd.read_csv(“Data8277.csv”, sep=”,”)with open(‘Data8277.json’, ‘w’) as f: f.write(data.to_json(orient=’records’, lines=True))
Podemos ver que durante la ejecución, nuestro Python llega a consumir hasta 12 GB de memoria:
Pero al menos el tiempo de ejecución es relativamente reducido, y consigue realizar la conversión de CSV a JSON en algo menos de 1 minuto:
Si intentamos abrir el fichero con Visual Studio Code nos encontramos con un error de falta de memoria:
Intentamos reiniciar con 4 GB de límite y obtenemos el mismo error. Lo subimos a 10000 MB y con ello podemos abrir finalmente el fichero para verificar que efectivamente tenemos un JSON válido:
Para ser más exactos lo que tendremos es un fichero new-line delimited JSON (JSONL) que es la alternativa más compacta y utilizada cuando hablamos de cargas de ficheros JSON en este tipo de entornos.
Si intentamos hacer una lectura “tonta” para ver los dos primeros registros, como hicimos con el CSV, nos encontramos con otra desagradable sorpresa. Podemos ver como al lanzarlo tenemos un consumo absurdo de memoria y tenemos que acabar abortando el proceso:
import pandas as pddata = pd.read_json(“Data8277.json”, lines=True)print(data.head(2))
Como no tenemos intención de manejar este fichero localmente dejaremos esta problemática como algo que tenga que resolver Synapse. Adicionalmente dividiremos los mismos ficheros CSV y JSONL pero divididos en ficheros de 100 filas, 10000 filas y 1 millón de filas para poder probar varias combinaciones de tamaños de ficheros:
En bastantes ocasiones nos encontramos que se intentan procesar muchos ficheros pequeños con este tipo de arquitecturas lake+MPP y por ello hemos preparado estas carpetas para simular el uso de una mayor cantidad de ficheros pequeños (no nos hemos querido ir al caso extremo de 1 línea por fichero ya que sería casi inmanejable al tener el fichero original unos 35 millones de filas).
El siguiente paso será crear nuestro Synapse Workspace si no lo tenemos ya creado en nuestra subscripción de Azure. Debemos tener en cuenta que algunos datacenters pueden no aceptar nuestra petición y debamos elegir otra región:
Una vez tengamos nuestro Synapse Workspace creado procederemos a subir a la cuenta Azure Data Lake Gen2 asociada algunos de los ficheros para comprobar que podemos acceder a ellos sin problemas:
SELECT TOP 100 *FROMOPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/SplitJSON_10000/Data8277_000001.jsonl’, FORMAT = ‘CSV’, FIELDQUOTE = ‘0x0b’, FIELDTERMINATOR =’0x0b’)WITH ( line varchar(max)) AS [result]
Normalmente lo que necesitaremos es trabajar sobre los datos del JSON en formato tabular, por lo que extraeremos sus propiedades con JSON_VALUE:
SELECT top 100JSON_VALUE (line, ‘$.Year’) AS Year , JSON_VALUE (line, ‘$.Age’)AS Age , JSON_VALUE (line, ‘$.Ethnic’) AS Ethnic , JSON_VALUE (line, ‘$.Sex’) AS Sex , JSON_VALUE (line, ‘$.Area’) AS Area , JSON_VALUE (line, ‘$.count’) AS CountFROMOPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/SplitJSON_10000/Data8277_000001.jsonl’, FORMAT = ‘CSV’, FIELDQUOTE = ‘0x0b’, FIELDTERMINATOR =’0x0b’)WITH ( line varchar(max)) AS [result]
Los mismos datos los podemos extraer de los ficheros originales en formato CSV:
SELECT top 100 C1 AS Year , C2 AS Age , C3 AS Ethnic , C4 AS Sex , C5 AS Area , C6 AS CountFROM OPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/SplitCSV_10000/Data8277_000001.csv’, FORMAT = ‘CSV’, PARSER_VERSION=’2.0′, FIRSTROW =2) AS[result]
Una vez tenemos todos los ficheros cargados, vamos a realizar una sencilla prueba para comprobar cuanto tiempo necesitamos para contar simplemente las filas que tenemos en cada caso. La cantidad de registros debe ser en todos los casos exactamente la misma pero tenemos una gran diferencia en el número de ficheros así como en su tamaño.
Para el caso de los ficheros JSONL utilizaremos esta consulta, cambiando únicamente la ruta de los ficheros a la carpeta con los ficheros con 100, 10000 o un millón de filas:
SELECTcount(*)FROMOPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/SplitJSON_1000000/*.jsonl’, FORMAT = ‘CSV’, FIELDQUOTE = ‘0x0b’, FIELDTERMINATOR =’0x0b’)WITH ( line varchar(max)) AS [result]– ~19 segundos
Y para los CSV utilizaremos la siguiente consulta, modificando igualmente la ruta a los ficheros correspondientes:
SELECTcount(*)FROM OPENROWSET( BULK?’https://synapseverne4.dfs.core.windows.net/synapseverne4/SplitCSV_1000000/*.csv’, FORMAT?=?’CSV’, PARSER_VERSION=’2.0′, FIRSTROW =2) AS [result]– ~11 segundos
Es interesante también que en las operaciones que realicemos sobre el Serverless pool de Synapse consultemos la pestaña de mensajes donde se nos indicará la cantidad de datos leídos, movidos, escritos ya que de ellos dependerá el coste de ejecutar dicha consulta:
La siguiente tabla muestra los tiempos en segundos para cada uno de los casos. Se han ejecutado 3 veces y sacado una media, ya que puede haber variaciones muy significativas entre ejecuciones:
Durante las pruebas también tuvimos varios errores transitorios (que desaparecían en una segunda ejecución:
La realidad es que obtuvimos duraciones bastante desiguales para una misma consulta, especialmente en cuanto teníamos 2-3 consultas concurrentes. Era frecuente que las consultas aparecieran como “suspended”:
Claramente el backend no proporciona un comportamiento estable y predecible cuando hay concurrencia lo que hace que en general la experiencia con el SQL Serverless pool pueda resultar bastante frustrante. Realmente un usuario esperaría que al tratarse de un modelo de pago por consulta, el “backend” a nivel de servicio fuese enormemente potente y por tanto las diferencias de tiempo fuesen muy pequeñas entre ejecuciones.
Como veis no hemos entrado en definir jerarquías para los ficheros, algo que sería muy recomendable especialmente si pensamos tener algún tipo de filtrado/particionado por fechas apoyándonos en las funciones filepath() y filename(). Podéis obtener más información sobre estas funciones aquí: https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/query-specific-files
Respecto a las métricas, en el momento en el que se realizaron las pruebas vemos que no se muestran correctamente las consultas completadas, fallidas y canceladas, mostrándose el mismo valor en todos los casos (el total):
También destacar que otra de las métricas que tenemos disponible muestra valores absurdamente altos, lo cual apunta o bien a un overhead importante por la configuración que estamos forzando o bien que estamos visualizando volúmenes de datos del backend compartido que no deberíamos estar viendo.
En total solamente tenemos poco más de 12 GB en el Data Lake y claramente las operaciones que estamos realizando en 15 minutos no mueven 243 GBs en vista de los tiempos de ejecución que estamos teniendo:
Es posible también que este contador venga distorsionado debido a que ficheros de menos de 4 MB se consideran que tienen el coste de 1 transacción de storage, por lo que si tenemos muchos ficheros pequeños puede que se acaben contabilizando valores muy elevados por dicha razón:
De todas formas durante todas las pruebas realizadas el consumo acumulado por la parte del storage no fue más allá de unos pocos euros:
Una vez vistos los tiempos obtenidos y la inestabilidad de los tiempos debemos plantearnos pasar a formato PARQUET la información y evitar utilizarla en crudo. Por ejemplo podemos cargar con una CETAS la información que tenemos:
CREATE EXTERNAL FILE FORMAT formatoParquet WITH (FORMAT_TYPE=PARQUET)CREATE EXTERNAL DATA SOURCE lake WITH (location=’https://synapseverne4.dfs.core.windows.net/synapseverne4) CREATE EXTERNAL TABLE Data8277 WITH ( LOCATION = ‘/PARQUET’, DATA_SOURCE = lake, FILE_FORMAT = formatoParquet )ASSELECT *FROM OPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/Data8277.csv’, FORMAT = ‘CSV’, PARSER_VERSION=’2.0′, FIRSTROW =2) WITH ( — Recomendable usar los tipos correctos… Year NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Age NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Ethnic NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Sex NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Area NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Count NVARCHAR(10) COLLATE Latin1_General_BIN2 ) AS [result]
Una vez en formato PARQUET ya parece que hablemos de un sistema totalmente distinto. Un conteo es prácticamente instantáneo tanto a través del fichero como de la tabla externa:
SELECTcount(*)FROMOPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/PARQUET/Data8277.parquet’, FORMAT=’PARQUET’) AS [result]WHERE YEAR=’2018′ select count(*) from Data8277WHERE YEAR=’2018′
Y consultas más complejas como un agregado por año y sacar el valor total y el medio, con conversiones de tipo texto de por medio incluso únicamente llevan unos pocos segundos:
SELECTMAX(TRY_CAST(Count as int)) max_count, AVG(try_cast(Count as int)) avg_count, YearFROMOPENROWSET( BULK ‘https://synapseverne4.dfs.core.windows.net/synapseverne4/PARQUET/Data8277.parquet’, FORMAT=’PARQUET’) WITH ( — Recomendable usar los tipos correctos… Year NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Age NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Ethnic NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Sex NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Area NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Count NVARCHAR(10) COLLATE Latin1_General_BIN2 ) AS [result]GROUP BY YEAR SELECTMAX(TRY_CAST(Count as int)) max_count, AVG(try_cast(Count as int)) avg_count, YearFROM Data8277GROUP BY YEAR
Una razón que salta a la vista al comparar los tamaños de los ficheros es que el tamaño de éstos condiciona el rendimiento. Esto es cierto pero hasta cierto punto, ya que el procesamiento del propio tipo de fichero y su naturaleza (orientado a filas o columnas) afecta también en gran manera. Por ejemplo podemos comprimir con GZIP el fichero CSV y tener incluso menos tamaño que el formato PARQUET:
Sin embargo aunque ocupe menos, los tiempos necesarios para realizar las operaciones de conteo y la consulta anterior son 5-6 veces más lentas contra el fichero CSV comprimido que contra el fichero PARQUET (4s vs 24s en el caso de la consulta agregada y 2s vs 18s en el conteo):
SELECTcount(*)FROM OPENROWSET( BULK ‘Data8277.csv.gz’, FORMAT = ‘CSV’ ,DATA_SOURCE=’lake’ ,DATA_COMPRESSION = ‘GZIP’) WITH ( — Recomendable usar los tipos correctos… Year NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Age NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Ethnic NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Sex NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Area NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Count NVARCHAR(10) COLLATE Latin1_General_BIN2 ) aWHERE YEAR=’2018′ SELECTMAX(TRY_CAST(Count as int)) max_count, AVG(try_cast(Count as int)) avg_count, YearFROM OPENROWSET( BULK?’Data8277.csv.gz’, FORMAT = ‘CSV’ ,DATA_SOURCE=’lake’ ,DATA_COMPRESSION = ‘GZIP’) WITH ( — Recomendable usar los tipos correctos… Year NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Age NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Ethnic NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Sex NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Area NVARCHAR(10) COLLATE Latin1_General_BIN2 ,Count NVARCHAR(10) COLLATE Latin1_General_BIN2 ) aGROUP BY YEAR
Sin embargo consideramos que como la conversión de un fichero CSV, JSON, etc. a PARQUET no está exenta de ciertos riesgos como por ejemplo errores en los tipos de datos, sus longitudes o encodings utilizados es interesante preservar los ficheros originales.
También puede ocurrirnos que los ficheros CSV,JSON que procesamos puedan cambiar a lo largo del tiempo (especialmente habitual en los JSON) y llegue a “perderse” información de forma silenciosa en nuestros procesos de conversión si directamente eliminamos los CSV/JSON de forma automatizada tras convertirlos a PARQUET.
Teniendo en cuenta el precio del tier archive si comprimimos dichos ficheros CSV/JSON en GZIP podremos mantenerlos historificados a un muy bajo coste por GB:
Conclusión
Lo que podemos concluir por tanto es que claramente el manejo de una gran cantidad de ficheros CSV/JSON pequeños supone un problema enorme para el rendimiento ya que no olvidemos que en todos los casos la cantidad de datos, el total de filas, era exactamente el mismo. En base a las pruebas un tamaño de un millón de filas parece ser un valor de referencia adecuado. En nuestro caso ese millón de filas equivale a unos 23 MB lo cual es relativamente bajo pero esto es debido a que manejamos una tabla muy estrecha. Si la tabla fuese más ancha (lo normal) nos encontraríamos que con 1 millón de filas tendríamos tamaños de entre 100 MB y 1 GB por fichero, tamaños óptimos para este tipo de explotación masiva.
Por otra parte tanto los CSVs como los JSON en general consideramos que deberían quedar relegados a ser “ciudadanos” de segunda en Synapse. Prácticamente cualquier volumen de datos que sea mínimamente significativo necesita ser convertido a PARQUET para que el rendimiento sea aceptable por lo que se hace muy importante seguir las buenas prácticas para la transformación y conversión de información RAW en información usable con fines analíticos. Si queremos preservar el dato original sin que el espacio ocupado sea excesivo recomendamos que se compriman y archiven usando la archive tier para minimizar el coste.
¡Has llegado al final! Parece que te ha gustado nuestro post sobre Azure Synapse
Escalabilidad limitada, conclusiones eficaces y seguridad inigualable son algunas de las características de Azure Synapse. Explora este servicio y da respuesta a cómo podría ayudarte a tomar decisiones sólidas para tu negocio, realizando una prueba concepto de 10 jornadas sin coste.