Check out my latest videos on Youtube.

Blog.

Data Ingestion for Dummies

Cover Image for Data Ingestion for Dummies
Alex Moses
Alex Moses
Posted underDate Engineering

Azure Data Factory Ingestion Fundamentals

Unleash Your Analytical Prowess: What is a Top-Tier Analyst/Engineer

Ever dreamt of making a real impact as a sought-after analyst or engineer? Are you passionate about delving into complex technical challenges?

An analyst engineer is a problem-solving bridge between engineering and data analysis. They use their technical knowledge to analyze projects, identify areas for improvement, and ensure efficiency. Imagine them as detectives in the data engineering world, wielding data to uncover solutions and optimize data pipeline processes and usage.

Beyond the Buzzwords: Dive Deep into the World of Data

Forget late-night YouTube binges filled with jargon. I’ll equip you with a solid understanding of the intricacies of data pipelines. Learn how to leverage tools like Azure Data Factory (ADF) functions and Databricks notebooks strategically.

What this guide will include?

This guide will not go through the initial setup and implementation of the Azure Storage Table or generating the service principal and access permissions via ADX. Nor will this guide go through the setup of the Microsoft Entra security. This guide is a relatively deep dive into just using ADF for a simple storage blob or azure sql ingestion into a data warehouse of your choice.

Creating a pipeline in Azure Data Factory

Azure Data Factory is a seriously awesome tool. It’s alot like FiveTran, except with a steeper learning curve and a much harder to navigate UI. But that aside from that the level of customisability and tooling right at your fingertips make’s ADF a much more versatile tool in your ETL (Extract, Transform, Load) pipeline arsenal. We will be focusing specifically on one use case for this guide and that is the ingestion of a CX table storage container into a data warehouse environment. This guide will include allot of visual guidance to help keep things easy to follow.

First we create the pipeline. It sounds simple yes, but do not fall for the trap of hitting the new pipeline button. If your ADF environment is connected to git (which it should be) make sure to create a brand new branch off of main to do your setup work in.

Now that you are in the ADF edit screen click the branch icon in the top left corner of the screen. From here there is the option to create a new branch off of the main ADF branch. When naming the branch make sure to include your any descriptors and ticket IDs for Jira.Once you have made the branch you can go into factory resources and cick the ‘+’ icon and create your very own new pipeline. This is the first step to correctly ingesting the data you need into databricks. Unfortunately from this point onwards it doesn’t get any easier, but with the power of Data on your side you can ingest anything! In the blank canvas of the Azure Data Factory GUI you must now state your intent.

Once you have made the branch you can go into factory resources and cick the ‘+’ icon and create your very own new pipeline. This is the first step to correctly ingesting the data you need into databricks. Unfortunately from this point onwards it doesn’t get any easier, but with the power of Data on your side you can ingest anything! In the blank canvas of the Azure Data Factory GUI you must now state your intent. Go to the Activities column and find the copy data activity (use the search bar for max efficiency) and drag it over to the blank canvas.

Now that we have the copy data element on the canvas we can give it a name. This doesn’t really effect the ingestion process but it’s best to keep this title clear and concise. In this case we have called the copy data activity copy_####_data.

Source vs. Sink

Now when I first saw the tabs for source and sink I thought, what does kitchen homeware and appliances have to do with data. But it’s actually not a sink at all… it’s a ‘sink’!

Your source is a source location for a dataset and this can actually range from our own Azure resources to api and other sources of data we can copy from. For a full list of sources that can be accessed just from the inbuilt copy activity click this link here: Copy activity – Azure Data Factory & Azure Synapse. We will discuss how to connect to the source in a later section.

Now let’s discuss the sink. The sink is the location for the copied data, and is very important for the ingestion process. You define the landing dataset type of which you can

Reference an Azure Table Storage account In Source

Connecting to the source

When connecting to the source Azure Table Storage or Blob Storage, you are generating a linked service within ADF, as soon as it is created ADF front-end commits it to the repo. When we are dealing with linked services connections we can dynamically ensure that the container is being sources from the correct environment.

With the source tab over click on the ‘+’ button to create a new dataset (this in turn will generate a new linked service in the ADF instance).

and this will allow you to connect to a linked service. If a linked service has not been created for your table storage container you must create the new linked service using the new linked service tool.

Once you create the linked service it’s important to ensure that the

Dynamic Variables and setting your environment

All the customisability and flexibility of Azure Data Factory is wrapped in dynamic expressions and functions. This is a really cool feature but unfortunately another language and syntax to learn. I will not go into too much detail here if you would like to learn more on this script check this link: Expression and functions – Azure Data Factory & Azure Synapse.

When setting the global environment variable you can reference pipeline() settings and pull that information from it’s run:

Same can be done for the attached linked service:

Which means you can ensure that the environment is always pointing to it’s relevant dataset in the azure storage tables using dynamic functions and variables. In the Prospa Engineering Team there is a guideline to always have a demo, staging and live environment for each application we deploy. The issue with upwire dataset is the staging dataset is empty as the developers at upwire only allow for a demo and staging, which means we can expect the dag run for this pipeline to always fail in staging. at first I had all pipeline environments pulling from the live dataset but after modelling we could view PII information did exist in this dataset so we ensured the linked service would only reference the corresponding environment to ingest the data from.

Create a Sink file and define it’s type and location

To Parquet or CSV, that is the question.

Once you have completed the source dataset configuration and are ready to move on you can begin working on the sink. The sink as said earlier is the landing zone for the dataset you are copying. It’s important at this stage to also ensure that the sink file lands in the relevant landing zone for each databricks environment.

When you click on creating a a new sink dataset you have many options listed.

We will be using the CSV landing page but also commonly used is landing parquet files. These both can be easily read in our Apache Spark enabled data warehouse environments and turned in materialised views or tables. CSV is easier for us to handle as Analysts and for a dataset that is mostly structured this is the method I recommend. This is found under the “File System” sink option and then when picking the format select delimited text.

Once you have created this sink object and named it make sure it’s connected to the linked service that handles the landing zone for parquet and csv data so that it saves the file in a place the data warehouse can easily access. An example file location parameters is as follows:

You can set the variables that are used to define the dataset file location in the sink configuration window:

Since we weren’t dealing with copying multiple tables in the above example we did not need dynamic variables for the folder path and file name but with dynamic parameters you can use this one ADF pipeline to handle multiple source files/tables and handle ingestion for a larger domain.

Making sure the schema is defined (CSV headers)

Create a Notebook to ingest the datasource

It’s as simple as CREATE, USING CSV

Yes it really is that simple. Once the sink dataset has been saved in a location that’s accessible to the data warehouse you can use a simple python or csv script to read the CSV and turn it into a useable database!

Make sure to deploy this code to the github repo before doing this. Otherwise you will be referencing a file in a shared location that can be easily deleted or altered affecting the ingestion process.

CREATE DATABASE IF NOT EXISTS data_schema.table_name DROP TABLE IF EXISTS data_schema.table_name 
CREATE TABLE data_schema.table_name 
USING CSV OPTIONS( path "abfss://filelocatioforsinkfilew", header "True", mode "PERMISSIVE", inferSchema "True");
Run the pipeline and verify the data is successfully copied

Validating shows no errors, great! Now it’s time to press RUN!

Refelection of Data Engineering Principals and discussion of this guide missing specifics on Secure authorisation and effective use of parameterization here

It’s Time to Dag and Bag

Now all that’s left is to set up the dag and have it run automatically on a daily basis. I won’t go into too much detail on this except that in the future we will be relying less on time based triggers and have a more dependency focused set up where a dag finishing will set off the dependent dags. Right now however it is scheduled to run parallel to the other ingestion dags.

When orchestrating wether it is in airflow or directly within ADF it is important to add in the environment parameters as well as attempt loops so if an ingestion fails it will try again.


ENV = environ.get("ENVIRONMENT")

with DAG(
    "ingest_data",
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
    },
    start_date=today("Australia/Sydney") - timedelta(days=1),
    schedule=None,
    catchup=False,
    params={
        "folder_path": Param(None, type=["null", "string"]),
        "file_name": Param(None, type=["null", "string"]),
        "sheet_name": Param(None, type=["null", "string"]),
        "run_date": Param(None, type=["null", "string"]),
    },
    max_active_runs=16,
):
    start = EmptyOperator(task_id="start")

    @task
    def get_adf_parameters(**context):
        airflow_parameters = context["dag_run"].conf
        return {k: v for k, v in airflow_parameters.items() if v}

    adf_parameters = get_adf_parameters()

    ingest_data = AzureDataFactoryRunPipelineOperator(
        task_id="ingest_data",
        factory_name=f"{ENV}-adf",
        resource_group_name=f"{ENV}-warehouse",
        pipeline_name="ingest_data",
        parameters=adf_parameters,
        wait_for_termination=False,
    )

    wait_for_ingest_data = AzureDataFactoryPipelineRunStatusSensor(
        task_id="wait_for_ingest_data",
        factory_name=f"{ENV}-adf",
        resource_group_name=f"{ENV}-rg-warehouse",
        run_id="{{ task_instance.xcom_pull(task_ids='ingest_data', key='run_id') }}",
        poke_interval=60,
        retries=3,
        mode="reschedule",
    )

    end = EmptyOperator(task_id="end")

    chain(
        start,
        cast(DependencyMixin, adf_parameters),
        ingest_data,
        wait_for_ingest_data,
        end,
    )

TaggedAirFlowAnalyst EngineerAzure Data Factory


More Stories

Cover Image for AI Research Assistants: Revolutionizing Knowledge Discovery and Synthesis

AI Research Assistants: Revolutionizing Knowledge Discovery and Synthesis

In the era of information overload, efficient research and knowledge sharing have become critical challenges. AI research assistants, powered by advanced technologies like LangChain and web search tools, are emerging as a promising solution. These intelligent systems leverage agentic workflows and the ReAct (Reasoning and Acting) framework to streamline the process of information gathering, analysis, […]

Alex Moses
Alex Moses
Cover Image for Exploring the Power of Deep Seek V3: Features, API Use, and Privacy Concerns

Exploring the Power of Deep Seek V3: Features, API Use, and Privacy Concerns

In the ever-evolving world of artificial intelligence, new tools and models constantly emerge, promising revolutionary advancements and accessibility. One such standout is Deep Seek V3 , a cutting-edge open-source AI model with an astounding 685 billion parameters. This powerful tool has quickly become a favorite in the AI space due to its exceptional coding, reasoning […]

Alex Moses
Alex Moses