Head First Data

Head First Data

Lab 5 – Ingest Data with Pipeline

A simple way to ingest data is to use a Copy Data activity in a pipeline to extract the data from a source and copy it to a file in the lakehouse.

  1. On the Home page of your LH_Fabric_Bootcamp lakehouse, select Get data and then select New data pipeline
  1. Create a new data pipeline named Ingest Sales Data.
  2. If the Copy Data wizard doesn’t open automatically, select Copy Data > Use copy assistant in the pipeline editor page.
  3. In the Copy Data wizard, on the Choose data source page, type HTTP in the search bar and then select HTTP in the New sources section.
  1. In the Connect to data source pane, enter the following settings for the connection to your data source:
    • URL: https://raw.githubusercontent.com/MicrosoftLearning/dp-data/main/sales.csv
    • Connection: Create new connection
    • Connection nameSpecify a unique name
    • Data gateway: (none)
    • Authentication kind: Anonymous
  2. Select Next. Then ensure the following settings are selected:
    • Relative URLLeave blank
    • Request method: GET
    • Additional headersLeave blank
    • Binary copy: Unselected
    • Request timeoutLeave blank
    • Max concurrent connectionsLeave blank
  3. Select Next, and wait for the data to be sampled and then ensure that the following settings are selected:
    • File format: DelimitedText
    • Column delimiter: Comma (,)
    • Row delimiter: Line feed (\n)
    • First row as header: Selected
    • Compression type: No compression
  4. Select Preview data to see a sample of the data that will be ingested. Then close the data preview and select Next.
  5. On the Connect to data destination page, set the following data destination options, and then select Next:
    • Root folder: Files
    • Folder path name: new_data
    • File name: sales.csv
    • Copy behavior: None
  6. Set the following file format options and then select Next:
    • File format: DelimitedText
    • Column delimiter: Comma (,)
    • Row delimiter: Line feed (\n)
    • Add header to file: Selected
    • Compression type: No compression
  7. On the Copy summary page, review the details of your copy operation and then select Save + Run. A new pipeline containing a Copy Data activity is created, as shown below:
  1. When the pipeline starts to run, you can monitor its status in the Output pane under the pipeline designer. Use the  (Refresh) icon to refresh the status, and wait until it has succeeded.
  2. In the menu bar on the left, select your lakehouse.
  3. On the Home page, in the Explorer pane, expand Files and select the new_data folder to verify that the sales.csv file has been copied.

Create a notebook

  1. On the Home page for your lakehouse, in the Open notebook menu, select New notebook.

After a few seconds, a new notebook containing a single cell will open.

  1. Select the existing cell in the notebook that contains some sample code, and then replace the default code with the following variable declaration.
table_name = "salesPipeline"
  1. In the  menu for the cell (at its top-right) select Toggle parameter cell. This configures the cell so that the variables declared in it are treated as parameters when running the notebook from a pipeline.
  2. Under the parameters cell, use the + Code button to add a new code cell. Then add the following code to it:
from pyspark.sql.functions import *

# Read the new sales data
df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv")

## Add month and year columns
df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Derive FirstName and LastName columns
df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]

# Load the data into a table
df.write.format("delta").mode("append").saveAsTable(table_name)
  1. This code loads the data from the sales.csv file that was ingested by the Copy Data activity, applies some transformation logic, and saves the transformed data as a table – appending the data if the table already exists.
  2. Verify that your notebooks looks similar to this, and then use the  Run all button on the toolbar to run all of the cells it contains.
  3. When the notebook run has completed, in the Explorer pane on the left, in the  menu for Tables select Refresh and verify that a salespipeline table has been created.
  4. In the notebook menu bar, use the ⚙️ Settings icon to view the notebook settings. Then set the Name of the notebook to Load Sales Pipeline and close the settings pane.
  5. In the menu bar on the left, select LH_Fabric_Bootcamp lakehouse.
  6. In the Explorer pane, refresh the view. Then expand Tables, and select the salespipeline table to see a preview of the data it contains.

Modify the pipeline

Now that you’ve implemented a notebook to transform data and load it into a table, you can incorporate the notebook into a pipeline to create a reusable ETL process.

  1. In the menu bar on the left, or directly from the workspace home page, select the Ingest Sales Data pipeline you created previously.
  2. On the Activities tab at the top, in the All activities list, select Delete data.
  1. Then position the new Delete data activity to the left of the Copy data activity and connect its On completion output to the Copy data activity, as shown here:
  • Select the Delete data activity, and in the pane below the design canvas, set the following properties:
  • General
    • Name: Delete old files
  • Source
    • ConnectionYour lakehouse
    • File path type: Wildcard file path
    • Folder path: Files / new_data
    • Wildcard file name: *.csv
    • RecursivelySelected
  • Logging Settings
    • Enable loggingUnselected

These settings will ensure that any existing .csv files are deleted before copying the sales.csv file.

  1. In the pipeline designer, on the Activities tab, select Notebook to add a Notebook activity to the pipeline.
  2. Select the Copy data activity and then connect its On Completion output to the Notebook activity as shown below:
  1. Select the Notebook activity, and then in the pane below the design canvas, set the following properties:
    • General:
      • Name: Load Sales notebook
    • Settings:
      • Notebook: Load Sales Pipeline
      • Base parametersAdd a new parameter with the following properties:
NameTypeValue
table_nameStringnew_sales

The table_name parameter will be passed to the notebook and override the default value assigned to the table_name variable in the parameters cell.

  1. On the Home tab, use the 🖫 (Save) icon to save the pipeline. Then use the  Run button to run the pipeline, and wait for all of the activities to complete.
  1. In the menu bar on the left , select your lakehouse.
  2. In the Explorer pane, expand Tables and select the new_sales table to see a preview of the data it contains. This table was created by the notebook when it was run by the pipeline.

Well, well, well…Your Fabric skills are really shaping up quickly! In this lab, you’ve learned how to ingest the data by using a pipeline. Not only that, you’ve also implemented a data orchestration workflow by combining multiple activities and executing them from within the pipeline. Keep up the great work!