Preface ✨
In part 1, we explored the concept of data pipelines, how Python and SQL can be used to create them, and simple code examples to demonstrate them too.
This time in part 2, we will be venturing into a hands-on project that utilizes real-world data using Python and SQL together. You won't want to miss this one!
This is what the dashboard looks like:
https://youtu.be/eVONxz43XEU
This article will break down the code responsible for creating the ETL pipeline that feeds data into the dashboard rendered in the above video.
Approach 🪖
Set up environment
Set up configuration
Set up logger
Extract data from API
Transform data using pre-processing logic
Load data into Postgres database
Visualize data in a Streamlit app
1. Set up the environment🏖️
a. List the Python modules required
requests pandas python-dotenv psycopg2 streamlit plotly-express
b. Create the virtual environment
I use Anaconda as my package manager and Windows as my main operating system, feel free to use your preferred tools to create your virtual environment.
In the terminal, type in the following:
conda create --name data_engineering_py_sql python=3.9 conda activate data_engineering_py_sql pip install -r requirements.txt
You can list all the modules in the virtual environment by typing:
conda list -n data_engineering_py_sql
Then you can open up VS Code using:
code
c. Add the environment variables file
It is good practice to store sensitive credentials in an environment variable file to avoid unauthorised users accessing them. This will include API keys, access tokens, and passwords, among others.
Assuming you're in the directory of your choice,
API_KEY=your_api_key API_HOST=your_api_host LEAGUE_ID=your_league_id SEASON=your_season DB_NAME=your_database_name DB_USER=your_database_user DB_PASS=your_database_password DB_HOST=your_database_host DB_PORT=your_database_port
The os
and python-dotenv
modules help import these securely into our main script.
d. Add the .gitignore file
Create a .gitignore file to hide any files that contain data that is too sensitive to expose publicly or may not be important to view:
.env __pycache__/ *.log
The files we want to be ignored when committing our repository to GitHub include:
.env - to avoid exposing sensitive environment variables
pycache folders - to hide caching Python byte-compiled operations from non-technical users
.log files - to hide files for storing logs
2. Set up configuration⚙️
First, we'll import all the Python dependencies for this project:
import os import logging import requests import psycopg2 import pandas as pd from dotenv import load_dotenv from requests.exceptions import RequestException
Now we load the environment variables into the session using the os
and python-dotenv
modules:
load_dotenv() API_KEY = os.getenv("API_KEY") API_HOST = os.getenv("API_HOST") LEAGUE_ID = os.getenv("LEAGUE_ID") SEASON = os.getenv("SEASON") DB_NAME = os.getenv("DB_NAME") DB_USERNAME = os.getenv("DB_USERNAME") DB_PASSWORD = os.getenv("DB_PASSWORD") DB_HOST = os.getenv("DB_HOST") DB_PORT = os.getenv("DB_PORT")
We're using dotenv
instead of python-dotenv
to import the load_dotenv
function because dotenv
is a function under python-dotenv
itself, where the load_dotenv
function sits in.
You can find it in this GitHub repository here.
3. Set up logger📝
Python has an in-built module called logging
that allows you to track different stages of code execution by streaming custom messages to the console or writing them directly into log files. This is useful when diagnosing unexpected behaviours or defects that may occur in the code.
A logger can have different severity levels that can be tagged for different situations. Here are the ones available:
DEBUG (10)
INFO (20)
WARNING (30)
ERROR (40)
CRITICAL (50)
3.1 Initialize the logger and set up the severity level
We start by setting up the logger and setting the minimum severity level to DEBUG:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
This means it captures all log messages from DEBUG and upwards (i.e. INFO, WARNING etc.).
3.2 Set up the handlers
The handlers are the objects that help us direct the log messages to our target destinations, like files, consoles or other outputs.
Here's how we set up the handlers:
File handler
This handler writes messages to a log file on the system.
file_handler = logging.FileHandler('football_table_standings.log') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
Console handler
This handler streams the messages to the main console
console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
3.3. Add handlers to the logger
To use these handlers, we need to add them to the logger object:
logger = logging.getLogger() logger.addHandler(file_handler) logger.addHandler(console_handler)
3.4. Why loggers instead of print statements?
Using loggers is a better practice than using print
statements because
you are provided with different severity levels, which is useful when working with different environments e.g. if you're working in a production environment you may only want to log WARNING messages while in development you require DEBUG and INFO
you can write the outputs into different locations like files, consoles, third-party platforms etc. The print
statement is only limited to console outputs. If you make your logging framework robust enough, you can even integrate it with monitoring and alerting tools - print
statements do not come with that flexibility
the customized messages from logs can provide more context when they contain information like timestamps, file names, line numbers, which makes the troubleshooting process easier than using print
statements
An API is an interface that enables two entities to exchange information between each other. If one system (client) sends a request to another system (server) for certain information, the API is responsible for sending the client's request and collecting the information once authorization is approved by the server.
In this scenario, our data pipeline is sending a request to the football API endpoint for the league table standings.
4.1 Define the API endpoints and headers
url = "<https://api-football-v1.p.rapidapi.com/v3/standings>" headers = {"X-RapidAPI-Key": API_KEY, "X-RapidAPI-Host": API_HOST}
🚫Note: Avoid hard-coding the API keys into your script, instead consider inserting them into environment variable files and call them into your script.
4.2 Define the query parameters:
This is for customizing the response provided by the API endpoint. Here we want the data to be filtered by the current season and league, represented by the SEASON
and LEAGUE_ID
variables respectively:
query_string = {'season': SEASON, 'league': LEAGUE_ID}
4.3 Send the API request with exception handling
Python's in-built request
library is used to make API calls simply. But sometimes unpredictable issues can occur which require us to diagnose and fix, like the API temporarily going down, our API limit has been exceeded, a request timed out etc. We need mechanisms in place to handle these cases gracefully.
We can use try-except
blocks for this:
try: api_response = requests.get(url, headers=headers, params=query_string, timeout=15) api_response.raise_for_status() except HTTPError as http_err: logger.error(f'HTTP error occurred: {http_err}') except Timeout: logger.error('Request timed out after 15 seconds') except RequestException as request_err: logger.error(f'Request error occurred: {request_err}')
Let's break down what is happening:
We make the API request in the try
block via the api_response
object, then the api_response.raise_for_status()
raises error messages if the API call is not successful e.g. HTTPError
.
For each except
block, different exceptions are expected to be raised if an error or issue is identified:
HTTPError
- for capturing issues during the processing of the HTTP request.
Timeout
- this exception is raised when the API fails to respond within the given time limit. We've set the time limit to 15 seconds, so any request beyond this should return a Timeout Exception.
RequestException
- this catches many general exceptions linked to the request
library. This will be raised if the exception isn't necessarily a HTTPError
or Timeout
one.
Having these in place makes our pipeline robust enough to handle any potential disruption that could occur.
4.4 Parse the API response
After obtaining a successful response from the API, we can advance to parsing the JSON response it provides:
standings_data = api_response.json()['response']
The next step is to shape the data into a structure that is approachable and useable.
We drill into the nested fields to find the standings data:
standings = standings_data[0]['league']['standings'][0]
5.2. Flatten the data
Then we need to shape this data into a format that's easy to use.
To do this, we need to
iterate through each team's standings,
extract the relevant fields,
append the fields to a list
To start, create an empty list:
data_list = []
Then we create a for
loop to begin extracting and appending the data to the same list:
for team_info in standings: rank = team_info['rank'] team_name = team_info['team']['name'] played = team_info['all']['played'] win = team_info['all']['win'] draw = team_info['all']['draw'] lose = team_info['all']['lose'] goals_for = team_info['all']['goals']['for'] goals_against = team_info['all']['goals']['against'] goals_diff = team_info['goalsDiff'] points = team_info['points'] data_list.append([rank, team_name, played, win, draw, lose, goals_for, goals_against, goals_diff, points] )
5.3. Convert the data into a dataframe
We will use the pandas
library to turn the list into a dataframe:
columns = ['P', 'Team', 'GP', 'W', 'D', 'L', 'F', 'A', 'GD', 'Pts'] standings_df = pd.DataFrame(data_list, columns=columns)
…and then we'll display the dataframe:
print(standings_df.to_string(index=False))
This should return the following results (as of 12th September 2023):
P Team GP W D L F A GD Pts 1 Manchester City 4 4 0 0 11 2 9 12 2 Tottenham 4 3 1 0 11 4 7 10 3 Liverpool 4 3 1 0 9 3 6 10 4 West Ham 4 3 1 0 9 4 5 10 5 Arsenal 4 3 1 0 8 4 4 10 6 Brighton 4 3 0 1 12 6 6 9 7 Crystal Palace 4 2 1 1 5 4 1 7 8 Brentford 4 1 3 0 8 5 3 6 9 Nottingham Forest 4 2 0 2 6 6 0 6 10 Aston Villa 4 2 0 2 8 9 -1 6 11 Manchester United 4 2 0 2 5 7 -2 6 12 Chelsea 4 1 1 2 5 5 0 4 13 Fulham 4 1 1 2 4 10 -6 4 14 Newcastle 4 1 0 3 7 7 0 3 15 Wolves 4 1 0 3 4 8 -4 3 16 Bournemouth 4 0 2 2 4 8 -4 2 17 Sheffield Utd 4 0 1 3 4 7 -3 1 18 Everton 4 0 1 3 2 8 -6 1 19 Luton 3 0 0 3 2 9 -7 0 20 Burnley 3 0 0 3 3 11 -8 0
6. Load data into Postgres database📤
Now that we have our data transformed into a suitable format, we can upload this into the Postgres database.
6.1. Establish a connection with the database
We first need to set up the connection:
postgres_connection = psycopg2.connect( dbname=DB_NAME, user=DB_USERNAME, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT ) cur = postgres_connection.cursor()
6.2. Create a table in the database
The next step is to create a table that matches the structure of the data frame within the Postgres database.
We can leverage SQL for this operation and then execute and commit the operation:
create_table_sql_query = """ CREATE TABLE IF NOT EXISTS premier_league_standings_tbl ( position INT PRIMARY KEY, team VARCHAR(255), games_played INTEGER, wins INTEGER, draws INTEGER, losses INTEGER, goals_for INTEGER, goals_against INTEGER, goal_difference INTEGER, points INTEGER ); """ cur.execute(create_table_sql_query) postgres_connection.commit()
6.3. Insert the data into the table
Now that the table is created, we can loop through the dataframe and insert the data into the Postgres table
Let's start by writing the INSERT
query into a Python variable:
insert_data_sql_query = """ INSERT INTO public.premier_league_standings_tbl ( position, team, games_played, wins, draws, losses, goals_for, goals_against, goal_difference, points ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (position) DO UPDATE SET team = EXCLUDED.team, games_played = EXCLUDED.games_played, wins = EXCLUDED.wins, draws = EXCLUDED.draws, losses = EXCLUDED.losses, goals_for = EXCLUDED.goals_for, goals_against = EXCLUDED.goals_against, goal_difference = EXCLUDED.goal_difference, points = EXCLUDED.points """
Let me break down what this statement is doing quickly:
INSERT INTO...VALUES
- for inserting the data into the premier_league_standings_tbl
table and the columns specified
ON CONFLICT...DO UPDATE SET
- for performing an upsert into the premier_league_standings_tbl
table i.e. it updates rows with new data if they already have or inserts new rows if they do not exist
Then we write the for
loop to iterate through each row of the standings_df
dataframe:
for idx, row in standings_df.iterrows(): cur.execute(insert_data_sql_query, (row['P'], row['Team'], row['GP'], row['W'], row['D'], row['L'], row['F'], row['A'], row['GD'], row['Pts']) ) postgres_connection.commit()
6.4. Create a SQL view for ranked standings
After loading the data into the table, we need our football data to reflect the true order based on the performance metrics of each team after each game is played. We will do this by ordering the data by the points, goal difference, and then the goals scored by each team.
To avoid modifying the existing data, we can create a view to encapsulate the transformation logic:
create_ranked_standings_view_sql_query = """ CREATE OR REPLACE VIEW public.premier_league_standings_vw AS SELECT RANK() OVER (ORDER BY points DESC, goal_difference DESC, goals_for DESC) as position ,team ,games_played ,wins ,draws ,losses ,goals_for ,goals_against ,goal_difference ,points FROM public.premier_league_standings_tbl; """ cur.execute(create_ranked_standings_view_sql_query) postgres_connection.commit()
The RANK()
function is a window function that allows us to rank each team by specific columns, which has enabled us to create a new position
successfully.
6.5. Close the database connection
cur.close() postgres_connection.close()
7. Visualize data in a Streamlit app📊
The final step is to visualize the Premier League table standings.
7.1. Set up the Streamlit app
We need to create a Python file named as app.py
for the Streamlit app, and then we will import the important libraries to make it work:
import os import psycopg2 import pandas as pd from PIL import Image import streamlit as st import plotly.express as px from dotenv import load_dotenv
Then we initialize the environment variables:
load_dotenv() API_KEY = os.getenv("API_KEY") API_HOST = os.getenv("API_HOST") LEAGUE_ID = os.getenv("LEAGUE_ID") SEASON = os.getenv("SEASON") DB_NAME = os.getenv("DB_NAME") DB_USERNAME = os.getenv("DB_USERNAME") DB_PASSWORD = os.getenv("DB_PASSWORD") DB_HOST = os.getenv("DB_HOST") DB_PORT = os.getenv("DB_PORT")
7.2. Fetch the data from the Postgres database
Using the to_sql
function from pandas
allows us to pull data from Postgres with ease:
get_premier_league_standings_sql_query = """ SELECT position ,team ,games_played ,wins ,draws ,losses ,goals_for ,goals_against ,goal_difference ,points FROM public.premier_league_standings_vw ORDER BY position; """ final_standings_df = pd.read_sql(get_premier_league_standings_sql_query, postgres_connection) final_standings_df.set_index('position', inplace=True) postgres_connection.close()
7.3. Display the data in Streamlit
Now let's work on the UI a little.
We'll start by defining the page title and the icon for the app:
st.set_page_config( page_title = "Premier League Standings 2023/24", page_icon = "⚽", layout = "wide" )
Let's also add the Premier League logo to the top right-hand side of the web app:
prem_league_logo_filepath = "/assets/premier_league_logo.png" prem_league_logo_image = Image.open(prem_league_logo_filepath) col1, col2 = st.columns([4, 1]) col2.image(prem_league_logo_image)
…and then add a main title:
st.title("⚽🏆 Premier League Table Standings 2023/24 ⚽🏆")
Let's also add a sidebar to display some brief useful information:
st.sidebar.title('Instructions 📖') st.sidebar.write(""" The table showcases the current Premier League standings for the 2023/24 season. Toggle the visualization options to gain deeper insights! """)
The users can decide if they want to see a visual representation of the football data from the table :
show_visualization = st.sidebar.radio('Would you like to view the standings as a visualization too?', ('No', 'Yes')) fig_specification = px.bar(final_standings_df, x = 'team', y = 'points', title = 'Premier League Standings 2023/24', labels = {'points':'Points', 'team':'Team', 'goals_for': 'Goals Scored', 'goals_against': 'Goals Conceded', 'goal_difference':'Goal Difference'}, color = 'team', height = 600, hover_data = ['goals_for', 'goals_against', 'goal_difference'] ) if show_visualization == 'Yes': st.table(final_standings_df) st.write("") fig = fig_specification st.plotly_chart(fig, use_container_width=True) else: st.table(final_standings_df)
7.4. Run the app
Enter this into the terminal:
streamlit run app.py
Now the dashboard should be rendered in a Streamlit app like the video showed earlier!🥳
You can find the full codebase in the GitHub repo here.
Conclusion🏁
Starting from sending the API calls to the endpoint, and then using SQL logic to extract, transform and load data across the application, and wrapping the data with a Streamlit app, we've managed to demonstrate we can combine Python and SQL to create an ETL pipeline to feed data into a tool that others can use in the real world.
The most important thing is understanding this approach is utilized in many applications in the real world.
Stay tuned, there just may be a part 3 coming soon, where we deal with migrating data from the database into the cloud this time!👀
Feel free to reach out via my handles and I'll be happy to point you in the right direction where I can: LinkedIn| Email | Twitter | Tiktok
评论
发表评论