Introduction to MLOps: A Guide to Getting Started (Parts I-III)

May 18, 2021

Part I: What is MLOps?

MLOps (Machine Learning Operations) is the practice of combining the lessons learned from DevOps for the productionalization of machine learning. Its role is to fill the gap between the data scientist and the machine learning consumers.

Machine Learning vs. Data Science

Machine Learning can be understood as the process of applying a set of techniques to a group of data to create a limited “picture of how the world works,” called a model. This process of creating a model is called training your model. Once you have a trained model you can use that model with new data to better understand the past (data mining) or the future (machine learning).

A data scientist is typically a data analyst who possesses knowledge of machine learning and data modeling. This role can vary but often focuses on analysis and research. Sometimes this research will produce a model paired with data preprocessing required for that model. Once all the research has been completed, a data scientist now hands his research off to an engineer to bring it into “production.” This last step is where MLOps steps in to bring the work to life.

Why MLOps?

In simple terms:

  • Reproducibility of ML/DL
  • Automation ML/DL pipelines
  • Versioning ML/DL and Data
  • ML/DL Lifecycle management
  • Monitoring ML/DL

I will go through examples to paint a general picture of each area.

Our Scenario

In our scenario, we have been hired to set up MLOps for an investment company, Tesseract Inc. Tesseract has a Snowflake data warehouse that we will be using to pull stock data.

The brilliant researchers at Tesseract have developed state-of-the-art A.I., they are using SKlearn & Random Forest to predict stock closing prices.

Our researched A.I.

The A.I. used (patent pending) is as follows

Preprocessing of data

df_2 = pdf.loc[:, ["AdjClose", "Volume"]]
df_2["High_Low_Pert"] = (pdf["High"] - pdf["Low"]) / pdf["Close"] * 100.0
df_2["Pert_change"] = (pdf["Close"] - pdf["Open"]) / pdf["Open"] * 100.0
df_2.fillna(value=-99999, inplace=True)

A.I. Modeling

from sklearn.ensemble import RandomForestRegressor
regr.fit(X, y)
RandomForestRegressor(...)
print(regr.predict([[0, 0, 0, 0]]))

Our MLOps Tech Stack:

  • Airflow: A batch orchestration framework we will be using to automate our ML pipelines.
  • Databricks Spark: A general-purpose parallel processing framework we will use for processing the data in Snowflake and training our ML model.
  • mlflow: A flexible platform for ML operations.
  • AWS Lambda: a serverless platform we will use to offer access to our model.
  • GitLab: a DevOps and Version control platform, which we will use for CI-related tasks.

Our tech stack is based on flexible tools that will work with any situation and any tools. They can be replaced with other tools as needed.

Part II: Airflow

We’ve introduced the basics of MLOps, now let’s talk about the core application in our tech stack, Airflow. Airflow is the central orchestrator for all batch-related tasks.

Swapping technologies

This tech stack is designed for flexibility and scalability. There should be no issues using alternative tooling. For example, if you wanted to replace Airflow you should have no issues swapping it out for another pipelining tool like Dagster or a cloud-specific pipeline tool.

Why Airflow?

Airflow does very well as a batch processing orchestrator because it’s not a GUI, it’s simple to use and yet very flexible. Airflow uses python configuration files to represent pipelines. Although your pipeline is written in Python, only minimal Python knowledge is needed, you are not writing Python applications. Airflow is not limited to a platform, it’s not going to do the processing, so you are free to use whatever platform or processing engine you feel is ideal for your project.

What about streaming pipelines?

In cases where you have a streaming task, I suggest a tool like Jenkins or Gitlab CI/CD to deploy them and treat them like you would any normal backend application. In a future article, we will automate the tasks not covered by Airflow, but for now, we will focus on batch-related tasks.

DAG

A Directed Acyclic Graph, DAG, can be understood as a group of steps that never repeat. A DAG is very useful when creating complex data pipelines with many parent-child relationships. You may think, “well my projects are not so complex, why do I need a DAG?” Even if you have 3–5 steps with varying relationships, a DAG will oftentimes make life easier when expressing your relationships.

In Airflow your Pipeline will be represented as a DAG.

Here is a simple Pipeline as an example:

with DAG(dag_id="dag_example",
         schedule_interval="@hourly",
         start_date=datetime(2021, 1, 1)) as dag:
    bash_1 = BashOperator(
        task_id="bash_task1",
        bash_command="date >> /tmp/airflow1"
    )
    bash_2 = BashOperator(
        task_id="bash_task2",
        bash_command="date >> /tmp/airflow2",

    )
# our relationships
    bash_1 >> bash_2

In the above pipeline, we have two tasks that are bash operations that write the current date to files in the /tmp folder. Our relationship defines that task bash_1 happens before bash_2

Operators & Tasks

A single task is represented in airflow as an Operator. When an operator is instantiated it will be called a “task.”

It’s best practice to try to use Operators that are tied to a specific task you want to accomplish like working with Snowflake. There is a general rule of thumb with Airflow, Airflow doesn’t do the work. Airflow should not handle data, artifacts, deployable, or really any of the work you need. Airflow is just the manager, it doesn’t get its hands dirty.

In cases where the specific operator for a resource isn’t available or doesn’t support a feature, hooks are available that give low-level access to resources. In the event that you are unable to use a hook, then you have the choice of writing your own operator or writing some simple python code to interact with the resource.

Astronomer.io

Quickstart Guide

For this series of articles, we will be using Astronomer.io. Astronomer is a company that heavily contributes to the Airflow project and offers a Saas service as well as a customized docker image. Astronomer’s Airflow is 100% compatible with the open-source project. If you prefer to use another version of Airflow 2.0 the core content should work fine. Astronomer.io doesn’t bill for local development, but cloud deployments will be billed.

Before we begin, follow the quickstart guide to deploy a sample DAG and get familiar with the process.

Astronomer’s development flow works as follows:

  1. Create an account.
  2. Create a workspace
  3. Create your local Dev Environment
astro dev init
  1. Create or modify your DAG
  2. Verify Locally
astro dev start
  1. Create a Deployment (see pt. III)
  2. Deploy your DAG to the cloud (see pt. III)
astro deploy

Note: The Quickstart Guide will explain how to enable CLI access to the cloud

Dockerfile

FROM quay.io/astronomer/ap-airflow:2.0.0-buster-onbuild

Additionally, we are going to set our Docker image to the 2.0 release, this will avoid any instabilities. I strongly recommend setting your docker image even for local development and testing.

Databricks

Databricks Spark will be used as the universal heavy lifting tool in our pipeline. Why Databricks Spark? It’s extremely flexible, a managed service, and the ramp-up time to get knowledgeable is not extensive. We will cover Databricks in more depth in the next article, but we will need to set up our Databricks Operator in Airflow.

By default, most of the operators are not included with Airflow. There are a very large number of operators available in provider packages. Provider packages provide additional hooks, operators, and connections. We will be adding a provider for Databricks.

Docs: PyPi Documentation, Astronomer Documentation, Databricks Documentaiton, Jobs REST API

Setting up access

You can signup for Databricks using this free trial.

I will be using AWS with this Databricks deployment, but in theory GCP or Azure should work the same.

Additionally, you will need to set up your AWS deployment and Instance profile. These steps can take some time and can’t be skipped.

The previous steps are related to AWS. There are similar steps for GCP and Azure. My other suggestion is when setting up the deployment make sure you use Cross-account access.

Connection information

In order for Airflow to connect to our Spark cluster(s), we must add an access token and host to our dag. We are going to take a simplistic approach initially, and in a later article integrate with a secrets manager.

Here is a guide to setting up an access token.

Here is a guide to managing connections. The configuration is as follows:

username: token
host: https://XXXXXX.cloud.databricks.com/ (refer to your Databricks deployment)
extra: {"token": "XXXXXXXXXXXXXXXXX"} 

DO NOT COMMIT TOKENS TO SOURCE CONTROL (GITHUB)

Install the Databricks provider package

Provider packages add features and functionalities to Airflow. In this case, we will add a pip package to be pulled from PyPI.

Add the following to the requirements.txt file in our local workspace to add the provider:

apache-airflow-providers-databricks

Our Dags

We are going to deploy three pipelines, the first one that will build our model. This DAG expects an input with two hyperparameter values:

{"max_depth": <>, "n_estimators": <>}
from datetime import datetime
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

with DAG(
        dag_id="build_model_pipeline",
        schedule_interval=None,
        start_date=datetime(1981, 1, 1)
) as dag:
    timestamp = datetime.now()
    uid = timestamp.strftime("%b%d%Y")
    print("UID: " + uid)
    cluster_id = "..."
    preprocess_data = DatabricksSubmitRunOperator(
        task_id="preprocess_data",
        spark_python_task={"python_file": "dbfs:/datalake/code/preprocessing/__main__.py",
                           "parameters": f"{uid}"},
        existing_cluster_id=cluster_id
    )

    build_model = DatabricksSubmitRunOperator(
        task_id="build_model",
        spark_python_task={"python_file": "dbfs:/datalake/code/model/__main__.py",
                           "parameters": [f"{uid}",
                                          '{{ (dag_run.conf["max_depth"] if dag_run else "") | tojson }}',
                                          '{{ (dag_run.conf["n_estimators"] if dag_run else "") | tojson }}']},
        existing_cluster_id=cluster_id
    )

    preprocess_data >> build_model

The second pipeline we will deploy will perform a hyperparameter grid search on our data and record that data in mlflow. This process will not be utilizing the spark cluster.

from datetime import datetime

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

with DAG(
        dag_id='search_model_pipeline',
        schedule_interval=None,
        start_date=datetime(1981, 1, 1)
) as dag:
    preprocess_data = DatabricksSubmitRunOperator(
        task_id="preprocess_data",
        spark_python_task={"python_file": "dbfs:/datalake/code/preprocessing/__main__.py",
                           "parameters": ""},
        existing_cluster_id=".."
    )

    search_model = DatabricksSubmitRunOperator(
        task_id="search_model",
        spark_python_task={"python_file": "dbfs:/datalake/code/search/__main__.py",
                           "parameters": ""},
        existing_cluster_id=".."
    )

    preprocess_data >> search_model

Lastly, we will create a pipeline that will perform our hyperparameter grid search utilizing our spark cluster and record that data in mlflow.

from datetime import datetime

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

with DAG(
        dag_id='cluster_search_model_pipeline',
        schedule_interval=None,
        start_date=datetime(1981, 1, 1)
) as dag:
    cluster_id = "...."
    timestamp = datetime.now()
    uid = timestamp.strftime("%b%d%Y")
    print("UID: " + uid)
    preprocess_data = DatabricksSubmitRunOperator(
        task_id="preprocess_data",
        spark_python_task={"python_file": "dbfs:/datalake/code/preprocessing/__main__.py",
                           "parameters": f"{uid}"},
        existing_cluster_id=cluster_id
    )

    search_model = DatabricksSubmitRunOperator(
        task_id="search_model",
        spark_python_task={"python_file": "dbfs:/datalake/code/cluster_search/__main__.py",
                           "parameters": ""},
        existing_cluster_id=cluster_id
    )

    preprocess_data >> search_model

Test

Once we have created our local development environment and made all the above additions we will now need to test our new AIrflow image locally

astro dev start

if you find that there is an issue with your DAG, run the following and make the needed changes.

astro dev stop

Triggering our pipeline

Airflow offers several options when triggering your pipeline Airflow. You can create a schedule (not covered in this article), you can use the UI or CLI to manually trigger and you can use the REST API.

Here is a Guide to the UI.

REST API

The following code should trigger your DAG. You can run curl against your local docker container or the Saas service.

We will not cover the hosted option, but for educational purposes here is Astronomer’s Guide to using the REST API with its Saas service.

Connect to your local Docker container

docker ps
docker exec -it <webserver's id> bash

Once you have bashed into the container you will be able to run curl

curl --header "Content-Type: application/json" \
--request POST \
--data '{
"dag_run_id": "build_model_pipeline",
"execution_date": "2019-08-24T14:15:22Z",
"state": "success",
"conf": {"max_depth": 80, "n_estimators": 100}
}' \
https://localhost/api/v1/dags/{dag_id}/dagRuns

Airflow is very flexible, you can expand upon your tutorial with new resources and use cases. Next, we will build out our Spark applications as well as add CI/CD and deploy our model.

Part III: Python and Databricks Spark

Why Databricks Spark?

We will be using Databricks Spark as a general platform to run our code. In some cases, we will Spark to run code across the “cluster.” The benefit here is that we have a very easy-to-use way to run Python code without having the complexity of servers, containers, Kubernetes, etc. This approach isn’t the best choice in all cases, but for teams that want a simple path to automate SQL/Python/R/JVM code, this is a powerful option.

To recap: we will be having Databricks spin up AWS EC2 instances as needed, run our code, and then terminate. There is another option that Databricks offers for SQL-only workflows which is serverless. I plan on covering this technology in a future article.

Python Code

The code provided will focus on the core logic of the applications but I highly suggest you follow basic software engineering processes like error handling, logging among others.

Here are the applications we will be using:

Why not a notebook?

In this example, we are choosing to use python applications over notebooks. If your workflow uses notebooks, you are more than welcome to use notebooks. Databricks has a powerful cloud-based notebook platform that integrates with Airflow. Airflow’s Databricks Operators can interact with notebooks, so the same framework can work for notebooks.

Databricks CLI

Databricks has a very easy-to-use CLI, that allows you to control just about every component in the Databricks ecosystem. In this article, we will be using it to control the DBFS (Databrick’s file system), our cluster, and deploy our applications.

Here are useful guides for setting up your Databricks CLI:

Deploy Cluster

This is the cluster configuration I used for my testing, you can use it as a template. You can also do a Databricks local cluster to save even more money. I have set up a timer to auto-terminate your cluster. This will avoid having any AWS servers running without work. Keep in mind Databricks offers a free trial period, but you will pay for any AWS costs.

cluster.json:

{
    "autoscale": {
        "min_workers": 1,
        "max_workers": 2
    },
    "cluster_name": "mlops_tiny_ml",
    "spark_version": "8.2.x-cpu-ml-scala2.12",
    "spark_conf": {},
    "aws_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT_WITH_FALLBACK",
        "zone_id": "us-west-2b",
        "instance_profile_arn": "arn:aws:iam::112437402463:instance-profile/databricks_instance_role_s3",
        "spot_bid_price_percent": 100,
        "ebs_volume_type": "GENERAL_PURPOSE_SSD",
        "ebs_volume_count": 3,
        "ebs_volume_size": 100
    },
    "node_type_id": "m4.large",
    "driver_node_type_id": "m4.large",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {},
    "autotermination_minutes": 10,
    "enable_elastic_disk": false,
    "cluster_source": "UI",
    "init_scripts": []
}

To deploy your cluster you can use the CLI

databricks clusters create --json-file cluster.json

Library Deployment in Cluster

Now that we have our cluster deployed, we will need to install any needed libraries. We used a machine learning runtime for the cluster, so most of the libraries are provided by Databricks.

Here is the CLI for installing the needed library on the cluster:

databricks libraries install --pypi-package yfinance --cluster-id <cluster-id>

To see your cluster id’s you can run:

databricks clusters list

Deploy Spark Code

For simplicity’s sake will be uploading the code to dbfs. Spark can also pull the code from cloud storage like S3. You can also incorporate an artifactory, but there must be a basic application spark loads from dbfs or blog storage.

Here are our steps:

  1. Create the “data lake”

We will create the needed folders in dbfs.

databricks fs rm dbfs:/datalake -r
databricks fs mkdirs dbfs:/datalake
databricks fs mkdirs dbfs:/datalake/code
databricks fs ls dbfs:/datalake

2. Deploy the Preprocess code

Now we will deploy the preprocessing code to dbfs. The code takes data from yahoo finance, does some basic feature engineering, and saves that data.

cd preprocessing_folder
databricks fs rm -r dbfs:/datalake/code/preprocessing
databricks fs mkdirs dbfs:/datalake/code/preprocessing
databricks fs cp app dbfs:/datalake/code/preprocessing -r

3. Deploy the Model code

Our Model code pulls the data saved in the preprocessing task and takes input from airflow defining hyperparameters, and then builds out model. The app will then record useful information in mlflow, and save the built model for future use.

cd model_folder 
databricks fs rm -r dbfs:/datalake/code/model
databricks fs mkdirs dbfs:/datalake/code/model
databricks fs cp app dbfs:/datalake/code/model -r

4. Deploy the Search

I have added two apps that will perform a search to find the most ideal hyperparameter combination. This code will not perform any functionality across our Spark cluster.

cd search_folder 
databricks fs rm -r dbfs:/datalake/code/search
databricks fs mkdirs dbfs:/datalake/code/search
databricks fs cp app dbfs:/datalake/code/search -r

5. Deploy the Cluster Search

Using a combination of useful functions, this code will perform the same hyperparameter search using Spark. Using our cluster allows us to distribute the load of building the large number of models needed for our search process.

cd cluster_search_folder 
databricks fs rm -r dbfs:/datalake/code/cluster_search
databricks fs mkdirs dbfs:/datalake/code/cluster_search
databricks fs cp app dbfs:/datalake/code/cluster_search -r

Triggering our Model

Now that our code is deployed, let’s trigger the model pipeline to create a Machine learning model. Take note of where the preprocessed data and the Pickled model are stored.

Using the Model in Spark SQL

We are going to test our model out using SQL and a little python.

Being able to take a standard Sklearn model, and now use PURE SQL on just about any data source (Kafka, REST API, Mongodb, RDBMS, Data lake, Data warehouse, Neo4j etc..) is something very unique to Spark.

Here we will register our sklearn pickled model in a Spark SQL UDF.

model_path = "/Users/<username>/datalake/stocks/experiments/<experiment_name>
predict = spark_udf(spark, model_path, result_type="double")
spark.udf.register("predictionUDF", predict)

Now we are going to use the path for our data we saved in our “preprocess” task in our model pipeline to create a temp View Delta Table.

CREATE OR REPLACE TEMPORARY VIEW stocksParquet
USING DELTA
OPTIONS(
path = "...")

Now we will use our UDF to predict and the output will be in a new table called predictions.

USING <database>
DROP TABLE IF EXISTS predictions;CREATE TABLE predictions AS (
SELECT * , cast(predictUDF(cols..) AS double) AS prediction
FROM stocksParquet
LIMIT 10000
)

Finally, you will be able to see your predictions using the following SQL

SELECT * FROM predictions;

Saving your model

In our next article, we will use our pickled model. First, we will manually download the model and save it locally.

Your ML model should be accessible in the Experiment:

(workspaces/users/<username>/datalake/stocks/experiments/<name of experiment>

In the experiments screen, you will see several experiments among other things. Click the model, that corresponds to the run you want to use.

Now you will be presented with a screen, that will allow you to download the ML model. Click on the pickle5 model  model.pkl, then the download artifact icon.

This is a basic way to access your models. Save this model because in our next article we will package this model in a docker image and deploy it in an AWS lambda.

mlflow

In our Python code, we used mlflow to manage our machine learning. In this section, We will go through some of the basics of mlflow.

mlflow is a powerful, minimalistic ML lifecycle platform. The benefit of mlflow over other similar tools is that it is 100% neutral to any platform, language, or method for creating your pipeline. mlflow allows you to record your versioning, metrics, parameters, and models easily. There are two types of ways you can interact with mlflow:

  • REST API
  • Native libraries

mlflow uses the following concepts:

Experiments:

An experiment is a way of grouping all of your ML projects. To set your experiment use the following command:

mlflow.set_experiment("path to folder where stored..")

Run:

Within each Experiment, you will have several iterations that you would like to track.

with mlflow.start_run(run_name='arima_param'):
...

Parameter:

Parameters can be considered knobs for the tuning of your model. Oftentimes these can be hyperparameters. mlflow allows you to record any parameter you use in your experiment.

mlflow.log_param('n_estimators', -7000)

Metric:

A metric is a way of measuring your run. We will be looking at RMSE and MAE.

mlflow.log_metric("r2", r2)

At this point, we have an orchestrator, and all of our spark applications deployed. In the upcoming articles, we will continue where we have left off and deploy our model in production, then add some best practices to our workflow.

NestJS — The Hero We Didn’t Know We Needed Deconstructing the Lakehouse

Brian Lipp

Sr. Data Engineer/Backend Dev​
Brian has worked in the Data field for many years in many hybrid roles combining Data Engineering, Backend Software Engineering, and Machine Learning.