Hola a todos,
En el post que os traemos hoy vamos a ver como crear (con Visual studio 2017) mediante un script en Python un programa que podremos ejecutar como un servicio de Windows y que extraiga en tiempo real los tweets relacionados con determinadas palabras o hashtags, los almacene en una base de datos sql server, para luego explotarlos con powerbi.
Las tareas que va a realizar el script son las de conectar al API de streaming de twitter mediante unos tokens generados desde la página oficial de twitter para desarrolladores. Al API le haremos una petición y le pasaremos una lista de hashtags o términos y nos irá devolviendo de forma indefinida en “tiempo real” los tweets que se van publicando y que contienen estos términos. Una vez recuperados los tweets se almacenan en un buffer (de tamaño parametrizable) “n” tweets que al alcanzar el tamaño máximo del buffer se volcarán a una tabla en SQL Server.
Para ello lo primero que debemos hacer es crear un nuevo proyecto de Python en el visual studio:
El siguiente paso será añadir las librerías que vamos a usar en nuestro script, y que nos facilitarán el trabajo en gran medida, en este caso nosotros haremos uso de las siguientes:
- La librería para poder manejar archivos json (ya que el API de Twitter devuelve los datos en este formato).
- Usamos también la librería de pandas para poder almacenar datos en dataframes (estructuras de datos en memoria, parecidas a las tablas de SQL).
- Necesitamos usar tweepy que nos facilita el acceso al API de Twitter.
- pyodbc y sqlAlquiemy nos proporciona herramientas para conectar a una base de datos SQL Server.
Para poder hacer uso de estas librerías antes debemos añadirlas a nuestro entorno de Python desde el explorador de soluciones:
Buscamos la librería y la instalamos:
Luego añadiremos el siguiente código al script principal:
import json import pandas as pd import pyodbc from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from pandas.io import sql from sqlalchemy import create_engine from pandas.io.json import json_normalize
Para poder acceder al api de twitter necesitamos crear una aplicación en el portal de twitter en la página https://apps.twitter.com/ con cualquier usuario de twitter podemos dar de alta aplicaciones, al dar de alta una aplicación se generan unos tokens que son necesarios para acceder al api, estos tokens los almacenaremos en variables en nuestro script:
#Declare variables that contains the user credentials to access Twitter API #You can get your own keys in https://apps.twitter.com/ #-------------------------------------------------------------------------------- aToken = "ENTER YOUR ACCESS TOKEN" aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET" cKey = "ENTER YOUR API KEY" cSecret = "ENTER YOUR API SECRET"
En mi caso, obtengo los tokens de la app que previamente ya he creado:
El funcionamiento de nuestro script será el siguiente:
El código es el siguiente:
#Import libraries import json import pandas as pd import pyodbc from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from pandas.io import sql from sqlalchemy import create_engine from pandas.io.json import json_normalize #Declare variables that contains the user credentials to access Twitter API #You can get your own keys in https://apps.twitter.com/ #-------------------------------------------------------------------------------- aToken = "ENTER YOUR ACCESS TOKEN" aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET" cKey = "ENTER YOUR API KEY" cSecret = "ENTER YOUR API SECRET" #-------------------------------------------------------------------------------- #Define after how many twitts we do a insert in the data base. bufferSize = 5 #Defina an array to store the tweets readed from the stream api twittsBuffer = [] #Define a connectiont to read-write to the Sql server Database engine = create_engine("mssql+pyodbc://MyDbUser:MyPassword@MySQLServer/TwitterDB?driver=SQL+Server+Native+Client+11.0") #Define a function that receive a twitt by parameter and store it into the twittBuffer variable #if the twittBuffer reach the buffersize defined lenght then call the function AddTwittsToDB that insert the twitts into #the twittsBuffer array into the SQL Server database and clean the buffer #-------------------------------------------------------------------------------- def AddTwittToBuffer(twitt): global twittsBuffer twittsBuffer.append(twitt) if (len(twittsBuffer) == bufferSize): AddTwittsToDB(twittsBuffer) twittsBuffer = [] print(twitt['coordinates'] if twitt['coordinates']!= None else 'no coordinates') return #This function write the twitts stored in the variable twitBuffer to the SQL Database #-------------------------------------------------------------------------------- def AddTwittsToDB(twitts): tData = {'id': [], 'text': [], 'screen_name': [], 'created_at': [], 'retweet_count': [], 'favorite_count': [], 'friends_count': [], 'followers_count': [], 'lang':[], 'country':[], 'latitude':[], 'lontitude':[]} for t in twitts: tData['id'].append(t['id']) tData['text'].append(t['text']) tData['screen_name'].append(t['user']['screen_name']) tData['created_at'].append(t['created_at']) tData['retweet_count'].append(t['retweet_count']) tData['favorite_count'].append(t['favorite_count']) tData['friends_count'].append(t['user']['friends_count']) tData['followers_count'].append(t['user']['followers_count']) tData['lang'].append(t['lang']) if t['place'] != None : tData['country'].append(t['place']['country']) else : tData['country'].append(None) if t['coordinates'] != None : tData['lontitude'].append(t['coordinates']['coordinates'][0]) tData['latitude'].append(t['coordinates']['coordinates'][1]) else : tData['lontitude'].append(None) tData['latitude'].append(None) tweets = pd.DataFrame(tData) tweets.set_index('id', inplace=True) tweets.to_sql("Tweets",engine,None,if_exists='append') return True #-------------------------------------------------------------------------------- #Create a listener class that process received tweets #On error print status #-------------------------------------------------------------------------------- class StdOutListener(StreamListener): def on_data(self, data): t= json.loads(data) AddTwittToBuffer(t) return True def on_error(self, status): print(status) #-------------------------------------------------------------------------------- #Define a main function, the entry point of the program if __name__ == '__main__': #This object handles Twitter authetification and the connection to Twitter Streaming API myListener = StdOutListener() authenticator = OAuthHandler(cKey, cSecret) authenticator.set_access_token(aToken, aTokenSecret) stream = Stream(authenticator, myListener) #This line filter Twitter Streams to capture data tweets with the included text: 'Microsoft' or 'SolidQ' or 'Visual Studio' stream.filter(track=['PowerBI', 'Tableau', 'Qlikview','Microstrategy','Pyramid Analytics','Business Objects', 'Ibm cognos']) #--------------------------------------------------------------------------------
Durante la ejecución del script, si no existe la tabla se creará e irá añadiendo los twitts a la tabla.
La tabla que se crea tendrá los siguientes campos:
- El id del tweet.
- El país de origen.
- La fecha de creación en formato texto (luego la transformaremos a un formato fecha.
- El numero de favoritos del usuario que twittea (en el momento que twitteó).
- El número de followers del usuario que twittea (en el momento que twitteó).
- El número de amigos del usuario que twittea (en el momento que twitteó).
- El lenguaje detectado por el api de twitter.
- La latitud (si la facilitaron a la hora de twitear).
- La longitud (si la facilitaron a la hora de twitear).
- El numero de retweets que tiene el mensaje.
- El nombre del usuario que twittea.
- El texto del tweet.
Si hacemos una consulta a base de datos veremos lo siguiente:
En nuestro caso estamos capturando los tweets que contengan los siguientes términos ‘PowerBI’, ‘Tableau’, ‘Qlikview’, ‘Microstrategy’, ‘Pyramid Analytics’, ‘Business Objects’, ‘Ibm cognos’ para hacer una comparativa de lo que más se twittea sobre estos términos.
Una vez tenemos el script creado y probado, podemos ejecutarlo como un servicio de windows para que este continuamente descargando los tweets, esto lo podemos hacer de forma sencilla con herramientas como Non Suck Service Manager que podéis descargar de su página https://nssm.cc/
Una vez descargado el nssm, vamos la carpeta donde esta el ejecutable de 64 bits (o el de 32 si es vuestro caso) y ejecutamos desde línea de comando lo siguiente:
Se nos abrirá una ventana de configuración como esta:
En path introducimos la ruta donde está el ejecutable de python junto con el ejecutable python.exe, en startup directory introducimos el directorio (solo la ruta) donde esta el ejecutable de Python y en argument la ruta y nombre de fichero del script de Python que hemos creado con Visual Studio.
Con esto ya tendremos el servicio creado, lo iniciamos y veremos que comienzan a aparecer twitts en nuestra tabla de SQL Server.
Ahora que tenemos nuestra tabla con los tweets almacenados creamos un par de tablas para relacionar los términos con la tabla de tweets y crear un modelo que podamos explotar en power bi, esto lo hacemos en varios pasos:
Primero, creamos una dimension con los términos y un código para cada uno en formato de mascara de bits, son 7 terminos por lo que la tabla tendrá 7 registros y los ids seran los numero del 1 al 64 en potencias de 2 (1,2,4,8,16,32, y 64), de este modo podremos relacionar cada twitt con sus términos mediante una mascara de bits.
Por ejemplo si un twitt contiene las palabras PowerBI y QlickView, el id que lo relacione con sus términos sera la suma de los dos ids de esos términos, en este caso el id de relación seria el 5, la suma del id 1 correspondiente a PowerBi y de 4 que es el id del termino QlickView. Si un twitt contiene todos los 7 términos su id de relacion será el 128 (la suma de todos los ids)
with Dim_Terms as( SELECT 1 as Id, 'PowerBI' AS Term union all SELECT 2 as Id, 'Tableau' AS Term union all SELECT 4 as Id, 'Qlikview' AS Term union all SELECT 8 as Id, 'Microstrategy' AS Term union all SELECT 16 as Id, 'Pyramid Analytics' AS Term union all SELECT 32 as Id, 'Business Objects' AS Term union all SELECT 64 as Id, 'Ibm cognos' AS Term ) select * into DimTerm from Dim_Terms
La tabla se vería del siguiente modo:
La relación que se establece entre esta dimensión de términos y la tabla de “hechos” es una relación “many to many” por lo que vamos a necesitar otras 2 tablas mas que consoliden la relación y poder usarla en powerBI
La primera de estas tablas de relación es la que contiene todas las posibles combinaciones de términos posibles desde un único termino hasta los 7 a la vez, esta tabla tendrá 127 registros (el 0 que correspondería con ningún termino no lo contemplamos de momento)
La query que nos da esta tabla es esta:
with nums as ( select 1 as id union all select id + 1 from nums where id + 1 < 128 ) , datos as( select t.id, REPLACE(REPLACE( ltrim( iif(cast(t.id & 64 as bit) = 1, 'Ibm_cognos','') + iif(cast(t.id & 32 as bit) = 1, ' Business_Objects','') + iif(cast(t.id & 16 as bit) = 1, ' Pyramid_Analytics','') + iif(cast(t.id & 8 as bit) = 1, ' Microstrategy','') + iif(cast(t.id & 4 as bit) = 1, ' Qlikview','') + iif(cast(t.id & 2 as bit) = 1, ' Tableau','') + iif(cast(t.id & 1 as bit) = 1, ' Microsoft','')),' ', ' - '), '_',' ') as Terms from nums t ) select * into Dim_AllTerms from datos option (maxrecursion 0)
y tendrá un aspecto como este:
Ahora relacionamos mediante una tabla de relación “Many to Many” la dimensión con la tabla que mapea cada “combinación de términos” (Dim_AllTerms) con sus términos de la dimensión (DimTerm) esto lo hacemos con la query:
with nums as ( select 1 as id union all select id + 1 from nums where id + 1 < 128 ), datos as( select t.id, cast(t.id & 64 as bit) as bit7 ,cast(t.id & 32 as bit) as bit6 ,cast(t.id & 16 as bit) as bit5 ,cast(t.id & 8 as bit) as bit4 ,cast(t.id & 4 as bit) as bit3 ,cast(t.id & 2 as bit) as bit2 ,cast(t.id & 1 as bit) as bit1 from nums t ) select id, case trend when 'bit1' then 1 when 'bit2' then 2 when 'bit3' then 4 when 'bit4' then 8 when 'bit5' then 16 when 'bit6' then 32 when 'bit7' then 64 else 0 end as IdTrend into F_MtM_Terms From (select * from datos ) p UNPIVOT ( valor for trend in (bit1, bit2, bit3, bit4,bit5,bit6,bit7) ) as unpivo where valor = 1 option (maxrecursion 0)
La tabla tendrá la siguiente apariencia, como veis el Id de “Términos” 5, está repetido 2 veces, uno por cada término individual que contiene, el 1 de Microsoft y el 4 de QlickView:
Por Ultimo creamos una vista que nos devolverá los datos de los twitts en forma de una tabla de hechos:
SELECT country ,favorite_count ,followers_count ,friends_count ,lang ,latitude ,lontitude ,retweet_count ,screen_name as UserName ,[text] , CONVERT(DATETIME, SUBSTRING(created_at,9,2)+'/' + CASE SUBSTRING(created_at,5,3) WHEN 'Jan' then '01/' WHEN 'Feb' then '02/' WHEN 'Mar' then '03/' WHEN 'Apr' then '04/' WHEN 'May' then '05/' WHEN 'Jun' then '06/' WHEN 'Jul' then '07/' WHEN 'Aug' then '08/' WHEN 'Sep' then '09/' WHEN 'Oct' then '10/' WHEN 'Nov' then '11/' WHEN 'Dec' then '12/' else '' end +RIGHT(created_at,4) + ' ' + SUBSTRING(created_at,12,8), 105) AS Created_Date , iif([text] like '%PowerBI%',1,0) + iif([text] like '%Tableau%',2,0) + iif([text] like '%Qlikview%',4,0) + iif([text] like '%Microstrategy%',8,0) + iif([text] like '%Pyramid Analytics%',16,0) + iif([text] like '%Business Objects%',32,0) + iif([text] like '%Ibm cognos%',64,0) IdTrend FROM [TwitterDB].[dbo].[Tweets]
El modelo en PowerBI quedaría del siguiente modo:
* Importante tener en cuenta que las relaciones many to many deben tener el cross filter direction a establecido en “Both” para que funcionen bien.
Si vemos las métricas son muy sencillas, un sum de los favoritos, followers y friends y un count del numero de tweets.
El resultado final de un par de dashboards que se me ocurrieron de ejemplo uno mostrando los tweets por región y lenguaje y otro mostrando el evolutivo de los 2 últimos días en formato “numDia-NumHora”
Espero que os resulte útil o al menos sirva para animaros a experimentar un poco con Python, es una herramienta muy útil y con una curva de aprendizaje muy asequible.
Un Saludo
1 comment
buenos días quisiera saber si hay un tutorial mas detallado puesto que no tengo mucha experiencia en el manejo de base de datos y el entrono de visual le agradecería muchas gracias