Como cualquiera que haya trabajado con SSIS sabrá, esta herramienta no admite cambios en los esquemas de las tablas sin que haya que actualizar los paquetes. Esta limitación empezaba a ser un problema en un cliente que hacía cambios en las tablas de origen sin avisarnos (quitar columnas, añadirlas, etc). Esto provocaba que el paquete fallase o no se sincronizaban las nuevas columnas en el destino y además nadie fuera consciente de ello (es un proyecto de sincronización de tablas entre distintas tecnologías). Por darle solución al problema y adelantarme a fallos, desarrollé un pequeño paquete que me avisase si había cambios en las tablas de origen a través de un email donde se indican la instancia, la tabla y si se han borrado o añadido columnas.
Ejemplo de email recibido con la alerta:
Obtener la información del número de columnas de cada tabla
Lo primero que necesitamos es una query contra el origen y el destino que nos liste las tablas y el número de columnas que tiene cada tabla.
Query para SQL Server:
SELECT concat(s.[name],'_',o.[name]) as TableNameHLP
,COUNT(*) as [numColumns]
FROM sys.columns c
INNER JOIN sys.objects o ON c.object_id = o.object_id
INNER JOIN sys.schemas s ON o.schema_id = s.schema_id
WHERE type IN ( 'U' , 'V' )
GROUP BY o.name, s.name, o.type_desc
ORDER BY concat(s.[name],'_',o.[name])
El resultado de esta query es el siguiente:
Con esto ya sabemos el número de columnas de cada tabla de origen. Podemos filtrar el resultado para que sólo liste las tablas necesarias. En mi caso no sincronizo todas las tablas, así que cruzo el resultado de esta query con mi lista de tablas a sincronizar (tenemos una tabla con este listado) y me quedo solo con las tablas que me interesan.
Ahora necesitamos también una query equivalente para el destino. En mi caso es MySQL así que la query es:
select tab.table_name as TableName
,count(col.column_name) as numColumns
from information_schema.tables as tab
inner join information_schema.columns as col
on col.table_schema = tab.table_schema
and col.table_name = tab.table_name
where tab.table_type = 'BASE TABLE'
and tab.table_schema = 'raw'
and tab.table_name like '%<filtro de tablas>%' /*opcional*/
group by tab.table_name
order by tab.table_name;
En destino no tengo el listado de tablas, pero las puedo filtrar fácilmente por nombre. Sino, sacaría la lista completa y la filtraría dentro del paquete al unir con el origen.
Con estas 2 queries ya podemos empezar a montar nuestro paquete de SSIS.
Creación de un paquete SSIS para automatizar la comparación
Creamos un nuevo paquete de SSIS y añadimos un data flow. Dentro de este pondremos 2 orígenes, uno para cada una de la consultas que vimos anteriormente y unimos las salidas con una tarea de Merge Join. Para poder usar este componente necesitamos que los orígenes estén ordenados, así que además de ordenar las queries, deberemos indicarles en sus propiedades que el resultado ya está ordenado y por qué campo. Para ello, hacemos click derecho – “Show Advanced Editor” y en la pestaña “Input and Output properties” seleccionamos el Output. A la derecha nos aparecerán unas propiedades, siendo una de ellas “IsSorted” que pondremos a “True”.
Después, ampliamos el output hasta ver el listado de “Output columns” y seleccionando la columna tableName le indicamos que en la propiedad “SortKeyPosition” que está ordenado por la columna 1:
Hacemos esto para los 2 orígenes del data flow antes de unirlos con el Merge Join que comentábamos anteriormente.
El cruce del join se hará por el nombre de la tabla y devolveremos los campos “numColumn” de ambos orígenes:
Detectar Cambios
Ahora ya solo nos queda diferenciar entre columnas añadidas o columnas borradas (aunque si no os interesa tanto detalle podemos simplificarlo a simplemente que los valores sean diferentes).
A la salida del Merge Join añadimos un “Conditional Split” con 2 salidas:
Después sacamos estas salidas a dos componentes de “row count” para contar cuantas tablas tenemos con nuevas columnas o con columnas borradas. Creamos 2 variables tipo int para usar en los row count. Yo las he llamado: [User::varCountAddedColumnsHLPS] y [User::varCountDeletedColumnsHLPS]+»
Además, creamos otras 2 variables tipos string para almacenar la lista de tablas (que usaremos después para enviar el email).
El siguiente paso es añadir un script component (C#) del tipo transformación e indicando las variables tipo string creadas como “ReadWriteVariables”. Por ejemplo, así quedaría configurado el script component para las tablas que tienen nuevas columnas con [User::varTablesWithAddedColumns] y las que tienen eliminadas con [User::varTablesWithDeletedColumns]
En el apartado de “input columns” marcamos la columna “TableName” como entrada:
Este es el script para concatenar la lista de tablas que luego enviaremos por email:
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
#endregion
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
private string TableList = "";
public override void PostExecute()
{
base.PostExecute();
this.Variables.varTablesWithNewColumns = TableList.ToString();
}
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
TableList = TableList + Row.TableName.ToString() + ", ";
}
}
El script concatena todos los nombres de tabla que recibe separándolos por comas.
Con esto ya tenemos el data flow listo. Quedaría así:
Aviso de cambios en esquema de origen
Para finalizar solo nos faltaría añadir un componente de Send Email task en el control flow. Crearemos una variable para montar nuestro mensaje dinámico poniendo esta expresión:
"There are schema changes in the source <
your source
>". Please update destination to reflect these changes.\n"+ (DT_WSTR,10)@[User::varCountAddedColumnsHLPS]+" tables with new columns in source: "+ @[User::varTablesWithNewColumns] +"\n"+(DT_WSTR,10)@[User::varCountDeletedColumnsHLP]+" tables with deleted columns in source: "+ @[User::varTablesWithDeletedColumns]
El send email task quedaría así:
Para que solo nos envié un email si hay cambios, unimos el dataflow y el send email task con una flecha con evaluación de “expression and constraint” y ponemos esta expresión:
(@[User::varCountAddedColumnsHLPS] != 0) || (@[User::varCountDeletedColumnsHLP] != 0)
Resultado final del paquete:
Una vez terminado este desarrollo lo desplegaremos en el catálogo y lo programaremos en un job. Ya nos podemos despreocupar de si el cliente nos avisa o no de los cambios.
Si tenemos varios orígenes, se podría sacar a una variable el listado de connection strings a los que conectar. Luego usarla como origen de un bucle foreach y meter lo que vimos antes dentro del bucle parametrizando las conexiones. De esta forma, en cada vuelta de bucle se conectará a un origen diferente.