En este segundo post de la serie de Azure Stream Analytics (ASA de ahora en adelante) vamos a hablar sobre escalado y particionamiento, creando steps en la Query del job de ASA de forma manual y particiones para mejorar el rendimiento de los jobs de ASA.
En este caso práctico de ejemplo, tenemos diferentes aplicaciones que envían una gran cantidad de transacciones a un event hub, y nuestro job de ASA tiene que recoger todos esos datos, unirlos y pasarlos a un segundo event hub.
Antes de empezar con este caso práctico vamos hablar sobre los conceptos básicos de escalado y particionamiento de ASA.
Escalado
Los Streaming Units (SUs) representan los recursos de computación que los jobs tienen asignados para realizar sus operaciones. Los jobs de ASA realizan todo el procesado en memoria y cuando la memoria asignada se sobrepasa, fallan o dejan de procesar eventos. Por esto, es muy importante su monitorización para asegurarnos de que los recursos que les hemos asignado son los suficientes para su buen funcionamiento.
Para más información sobre Streaming Units podéis consultar en siguiente enlace.
Monitorización
Antes de explicar más detalles sobre el escalado y los SUs, vamos a explicar brevemente como realizar una monitorización de nuestro job y que métricas son las más importantes.
Para ello en el panel de la izquierda, en la parte inferior, en la sección de “Monitoring” seleccionamos “Metrics” como se muestra en la imagen 1:
Las métricas más importantes para saber si el job está funcionando correctamente son:
- Output Events: Índica el número de eventos que están saliendo del job.
- Input Events: Índica el número de eventos que le están llegando al job.
- SU % Utilizacion: Índica el porcentaje de utilización de los Streaming Units asignados al job.
- Watermark Delay: Índica la diferencia entre la marca de tiempo de entrada y de salida de un evento.
Con estas métricas nos podemos hacer una idea de si el número de eventos de entrada y salida son los adecuados y cuadran con lo que esperamos, si el job necesita más capacidad de procesamiento, o si el job va muy retrasado procesando eventos.
Para añadir una métrica a nuestra gráfica, pulsamos sobre “Add metric” y luego seleccionamos la métrica deseada como se muestra en la imagen 2:
Ajuste de Streaming Units (SUs)
El elegir el número de Streaming Units que necesita nuestro job depende de la configuración de nuestros inputs/outputs (si están particionados o no), y de cómo esté definida la consulta de nuestro job. Otra cosa a tener en cuenta, es que el ajuste del número de SUs requiere de un proceso de prueba y error para ir viendo qué número se ajusta más a nuestra configuración.
Como buena práctica, es recomendable empezar con 6 SUs para consultas que no tienen PARTITION BY e ir ajustando hasta encontrar el número que más se ajuste a nuestro entorno, para ello lo recomendable es ir monitorizando las métricas anteriormente comentadas.
El máximo número de SUs que un job puede utilizar depende de:
- Los steps que estén definidos en la Query del job.
- El número de particiones que tenga dicho step.
Cada step como máximo puede usar 6 SUs, es decir, si tenemos una Query sencilla con un solo step, da igual que le asignes más de 6 SUs al job porque no los va a utilizar. Tenemos dos formas de añadir steps a nuestro job:
- Hacer CTEs (cada CTE se considera como un step).
- Añadir particiones.
Aquí se muestran algunos ejemplos de los SUs que le podemos asignar a nuestro job:
Query | Número máximo de SUs |
|
6 |
|
96 (6 * 16 particiones) |
|
24 (6 * 3 particiones en el step particionado + 6 * 1 step no particionado) |
Uso del Job diagram
Para hacernos una idea de los steps y las particiones que tiene cada step de nuestro job, podemos hacer uso del “Job diagram”. Esto se encuentra en el panel de la parte izquierda bajo “Support + troubleshooting” como se muestra en la imagen 3:
Una vez seleccionado nos aparecerá una pantalla como la que se encuentra en la imagen 4, donde podemos ver “cajitas” que representan los steps, inputs y outputs y el número de particiones asignadas a cada step.
En el momento de la escritura de este post, Microsoft se encuentra en proceso de mejora y rediseño de este diagrama y las métricas que podemos explorar, por lo que se pueden experimentar fallos y alguna inconsistencia.
En la imagen 4 podemos ver un ejemplo en el que en la izquierda tenemos nuestros inputs, en la parte central los steps que tiene nuestra Query y en la parte derecha nuestros outputs:
En la imagen 5, tenemos un ejemplo de como aparecería un input o step si estuvieran particionados:
En la parte inferior del Job diagram, tenemos una gráfica (Imagen 6) muy útil que nos permite ver información de diferentes métricas del job o de algún step en concreto:
Una vez que ya sabemos los steps con los que cuenta nuestro job, entonces ya podemos hacer un primer ajuste de los SUs e ir probando a partir de nuestra estimación para ver que configuración se ajusta mejor.
Cambiar SUs asignados al job
Para cambiar el número de SUs que nuestro job tiene asignados tenemos que seleccionar en el panel de la izquierda bajo “Configure” la opción “Scale” como se muestra en la imagen 7:
En la pantalla que nos aparece solo tendríamos que escribir el número de SUs que queramos y darle a “Save” para guardar nuestra configuración, este escalado no se puede hacer mientras el job está en funcionamiento, tendremos que parar nuestro job cada vez que queramos modificarlo.
Particionado
ASA puede sacar bastante provecho para mejora del rendimiento usando particiones tanto en los inputs como en los outputs del job. El particionado te permite dividir en subconjuntos de datos basándose en una clave de particionado, un job de ASA puede consumir y escribir de diferentes particiones de forma paralela, con lo cual se mejora el rendimiento.
Todos los inputs disponibles para ASA soportan el particionado, los inputs disponibles son:
- Event Hubs.
- IoT Hubs.
- Blob storage.
En lo que se refiere a los outputs, no todos soportan particionado. Esta es la lista de outputs que si soportan particionado:
- Azure Data Lake Storage.
- Azure Functions.
- Azure Table.
- Blob storage.
- Cosmos DB.
- Event Hubs.
- IoT Hub.
- Service Bus.
- SQL and SQL Data Warehouse.
Uno de los outputs que hay que tener en cuenta porque se suele usar con mucha frecuencia y que por desgracia no soporta particionado es Power BI.
Para más información como paralelizar nuestro job podéis consultar el siguiente enlace.
Ejemplo práctico de escalado y particionado de un job de ASA
Como ya comentaba al principio del post, para este ejemplo, contamos con transacciones que llegan a un event hub de 4 diferentes aplicaciones, los datos que nos llegan a cerca de estas transacciones son: el id, país y ciudad, fecha y cantidad.
Primero vamos a empezar con una consulta sencilla que consta de un solo step, en la que leemos de los 4 inputs que representan nuestras aplicaciones, lo hacemos haciendo un UNION ALL de todos ellos dentro de una subconsulta:
SELECT * INTO [OutputEventHub] FROM ( SELECT 'app1' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp1] UNION ALL SELECT 'app2' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp2] UNION ALL SELECT 'app3' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp3] UNION ALL SELECT 'app4' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp4] )C
En la imagen 8 se puede ver como quedaría el job diagram:
Al ser una consulta con un solo step y sin particionado, el máximo número de SUs que puede aprovechar nuestro job de ASA son 6. De hecho, si asignamos al job más SUs del máximo que puede utilizar, el job falla y nos dice el número máximo de SUs que le podemos asignar. Un ejemplo de ese mensaje de error sería el que se muestra a continuación:
"JobFailedMessage": "The streaming job failed: Stream Analytics job 'escalado_test' is only able to use up to 6 Streaming Units based on the provided query. Please adjust the number of Streaming Units or check the number of input partitions and update your query and restart the job."
Le asignamos los 6 SUs y monitorizamos su funcionamiento y observamos lo siguiente:
En la imagen 9, tenemos datos sobre los eventos de entrada (en azul), eventos de salida (en lila), SU% y watermark delay. Aunque estas dos últimas métricas no se aprecian bien en la imagen 9 por la escala, se puede ver su valor en la parte inferior.
En la imagen 9, podemos ver que al principio el job funciona bien, pero conforme avanza el tiempo y tras un pico de eventos de entrada hay una caída de funcionamiento y luego van apareciendo picos en los que el job va procesando poco a poco los eventos que le van llegando.
Esto quiere decir que el job se ha saturado con la gran cantidad de eventos de entrada y no es capaz de procesarlo en tiempo real, esto se ve mejor reflejado en la métrica de watermark delay, donde se puede observar que lleva unos 15 minutos de retraso desde que entra un evento y sale.
En la imagen 10 se puede observar cómo va aumentando linealmente el watermark delay:
El primer paso para mejorar el rendimiento, y que el job sea capaz de procesar sin retraso todos los eventos que le van llegando, es crear más steps en nuestra Query para poder aumentar el número de SUs.
Podemos aumentar el número de steps de forma manual añadiendo CTEs a nuestra consulta. En nuestro ejemplo vamos a crear una CTE por cada input (a cada input le llegan datos de una aplicación), luego otro step donde uniremos todas las transacciones de las aplicaciones, y un último step donde enviaremos la información el event hub de destino.
Aunque en este ejemplo se haya hecho una CTE por cada input, no tiene por qué ser siempre el mismo caso. Si por ejemplo, tenemos una de la aplicaciones a la que le llegan la mayoría de los eventos y luego a las otras tres les llega menos, la mejor opción, sería hacer una CTE para el input al que le llegan más eventos y luego otra CTE con los otros tres input juntos. Es decir, lo recomendable es hacer que los steps estén los más balanceados posible.
Esta sería la consulta después de aplicar las CTEs:
with app1 as ( SELECT 'app1' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp1] ), app2 as ( SELECT 'app2' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp2] ), app3 as ( SELECT 'app3' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp3] ), app4 as ( SELECT 'app4' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp4] ), all_apps as ( select * from app1 union all select * from app2 union all select * from app3 union all select * from app4 ) SELECT * INTO [OutputEventHub] FROM all_apps
En la imagen 11 podemos ver como quedaría el job diagram:
Ahora, contamos con 6 steps, por lo tanto, podemos ampliar el número de SUs a 36 (6 SUs x 6 steps). Después de realizar el cambio, volvemos a ejecutar el job y obtenemos los siguientes resultados:
En la imagen 12 podemos ver las mismas métricas que el caso anterior y vemos que el resultado es muy parecido al anterior, al principio parece que el job funciona bien, pero cuando empiezan a llegarle muchos eventos el job se satura y el watermark delay va aumentando linealmente como se puede ver en la imagen 13.
Después de crear más steps y aumentar los SUs de nuestro job, hemos visto que todavía no es suficiente y que necesitamos poder crear más steps para poder aumentar el número de SUs. Este es nuestro caso, pero dependiendo del caso que sea, este cambio ya podría hacer que nuestro job funcione correctamente y no tengamos que particionar nuestros inputs u outputs.
Como todavía no hemos solucionado nuestro problema, vamos a pasar el siguiente paso, que sería particionar nuestro output, en nuestro caso vamos a crear el output event hub con 4 particiones, porque sabemos que tenemos información de 4 países en nuestros datos y que llegan más o menos el mismo número de transacciones por cada país.
Al igual que el caso anterior, para la elección del número de particiones y para la elección de la columna que queremos que sea nuestro punto de particionado, hay que elegir la que mejor se adapte a nuestras necesidades y que reparta mejor el dato de una forma equitativa entre las particiones.
Otro punto a tener en cuenta, es aumentar el número de Throughput Units en nuestro event hub namespace. Como buena práctica es recomendable asignar una throughput unit por cada partición que tengamos en nuestro event hub namespace.
En nuestro caso, en el event hub namespace tenemos 4 event hubs que son los inputs donde llegan los datos de las aplicaciones y un event hub como output, al cual, le hemos asignado 4 particiones. Por lo tanto vamos a ampliar el número de throughput units a 8.
A la hora de añadir nuestro output al job, hay que tener en cuenta que hay que indicarle la columna de particionamiento (Partition key column), esto se muestra en la imagen 14:
Una vez hecho esto, el siguiente paso será añadir en nuestra query la clausula PARTITION BY. Nuestra query quedaría de la siguiente manera:
with app1 as ( SELECT 'app1' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp1] ), app2 as ( SELECT 'app2' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp2] ), app3 as ( SELECT 'app3' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp3] ), app4 as ( SELECT 'app4' as app, event_id, location.country as country, location.city as city, eventTime, amount FROM [InputApp4] ), all_apps as ( select * from app1 PARTITION BY country union all select * from app2 PARTITION BY country union all select * from app3 PARTITION BY country union all select * from app4 PARTITION BY country ) SELECT * INTO [OutputEventHubPart] FROM all_apps PARTITION BY country
Nuestro job diagram quedaría como se muestra en la imagen 15:
Como se observa, ahora la CTE “all_apps” aprovecha las 2 particiones de los inputs event hubs y la CTE que escribe los datos en el output event hub, ahora aprovecha las 4 particiones que le hemos creado.
Ahora, podemos aumentar de nuevo nuestro número de SUs, tenemos 4 steps sin particiones, 1 step con 2 particiones, y 1 step con 4 particiones, es decir, podemos aumentar nuestro número de SUs a 60 (6 SUs * 4 steps sin partición + 6 SUs * 1 step * 2 particiones + 6 SUs * 1 step * 4 particiones).
Realizamos los cambios y observamos los siguiente:
En la imagen 16, se puede observar, que ahora si, el job es capaz de procesar los eventos y que le van llegando y llevarlos a la salida en tiempo prácticamente real y sin apenas retraso. En la imagen 17 podemos ver que el watermark delay es de 1 segundo y a veces llega a 2 segundos.
Conclusión
Como resumen, hemos visto los conceptos básicos de escalado y particionamiento, y como aplicarlos a un caso práctico real para mejorar el rendimiento de nuestro job de ASA.
Hemos visto como aumentar los SUs de nuestro job, añadiendo pasos de forma manual con CTEs en nuestra Query y añadiendo particiones a nuestro event hub.
Espero que os haya servido de ayuda y nos vemos en nuestro siguiente post.