Platform Engineering Team/Data Value Stream/Data Pipeline Onboarding/

From mediawiki.org

This page describes how to onboard a new data pipeline to Generated Data Platform.

A 5 slide overview of our capabilities can be found at https://docs.google.com/presentation/d/1gNDKy_XNgbXtZqNHCd2m63JSzURqL_s-Fzc6SBBKtxA/edit#slide=id.g1083d4ca5ed_0_0

Onboarding[edit]

You have developed a notebook or a set of pyspark scripts and would like to orchestrate and schedule them on the Generated Data Platform.

Onboarding flow

The onboarding process requires the following steps:

  1. Create a new Phabricator Task on the Generated Data Platform board and describe your use case. It's important to include (TODO: turn this into a phab template):
    1. What technologies are used by your project? (eg. Jupyter, Hive, etc)
    2. What are the data sources?
    3. Where do you plan to store data?
    4. What is the pipeline schedule?
    5. Privacy review?
    6. Data volumes, cluster resources?
  2. Your code will need to be refactored according to our conventions. See the Create a new data pipeline section below.
  3. We'll ask that your code passes a number of code checks. Once done, open a Draft merge request.
  4. During code review, we'll assist you with deploying your pipeline to our systems.
  5. Once the request is merged, your data pipeline will be operated according to the agreed-upon schedule and SLOs.


Create a new data pipeline[edit]

This section will describe:

  1. How to use our scaffolding tools to get started
  2. How to organize project code
  3. How to orchestrate tasks with Airflow

Scaffolding[edit]

Clone our data pipelines monorepo (TODO: this will move to the Generated Data Platform group once that's in place).

git clone git@gitlab.wikimedia.org:repos/generated-data-platform/datapipelines.git

Create a branch for your new data pipeline. Each new branch should reference its Phabricator task.

git checkout -b <TASK_ID>-your-data-pipeline

Scaffold a new data pipeline project with:

make datapipeline

This will create a project template under your_data_pipeline and a new airflow DAG template under dags/your_data_pipeline_dag.py.

Organize project code[edit]

your_data_pipeline/README.md will contain getting started information.

This directory contains the transformations (Tasks) that you want to orchestrate and schedule. Project code is organized as follows:

  • conf contains Spark job specific config files. spark.properties will let you define your cluster topology and desired resources. We default to a yarn-regularsized cluster.
  • pyspark contains Spark based data processing tasks.
  • sql contains SQL/HQL queries.

Python tasks must be located under pyspark and follow these conventions

  • runtime dependencies must be declared in requirements.txt
  • test dependencies must be declared in requirements-test.txt
  • all code belongs to the src module.
  • src/transform.py contains boilerplate to help get you started with a pyspark job.
  • conftest.py and tests contain pytest boilerplate to test spark code.

Task orchestration with Airflow[edit]

dags/your_data_pipeline_dag.pycontains a template to orchestrate pyspark (and SQL) sequential tasks according to our convention. A spark task can be configured as an instance of factory.sequence.PySparkTask. PySparkTasks, to be executed sequentially, can be appended to a task list, that is then passed to a generate_dag() method that will stitch them together into an Airflow DAG. The example below is taken from our sample-project pipeline

# Configure a Spark environment to run sample-project
# in yarn-cluster mode.
# SparkConfig will take care of configuring PYSPARK_SUBMIT_ARGS,
# as well as Python dependencies.
spark_config = SparkConfig(
  pipeline="sample-project",
  pipeline_home=config["pipeline_home"],
)

# A spark job is a script that takes some input
# and produces some output.
# The script should be provided in your project src module.
pyspark_script = os.path.join(
  config["pipeline_home"], "sample-project", "pyspark", "src", "transform.py"
)

# You should specify the HDFS directory
# where a task input data resides.
input_path = "/path/to/hdfs/input"

# You should specify the HDFS directory
# where a task output data should be saved.
output_path = "/path/to/hdfs/output"

# PySparkTask is a helper class that
# helps you submit a pyspark_script to the cluster.
t1 = PySparkTask(
  main=pyspark_script,
  input_path=input_path,
  output_path=output_path,
  config=spark_config,
)

tasks = [t1, ]
# generate_dag() will chain and execute tasks in sequence (t1 >> t2 >> ... >> tn).
# The generated dag is appended to the global dags namespace.
globals()["sample-project"] = generate_dag(
  pipeline="sample-project", tasks=tasks, dag_args=dag_args
)

From the top level directory, you can now run make test-dags. The command will check that dags/your_data_pipeline_dag.py is a valid airflow dag. The output should look like this:

---------- coverage: platform linux, python 3.7.11-final-0 -----------
Name                                    Stmts   Miss  Cover
-----------------------------------------------------------
dags/factory/sequence.py                   70      3    96%
dags/ima.py                                49      5    90%
dags/similarusers-train-and-ingest.py      20      0   100%
dags/your_data_pipeline_dag.py             19      0   100%
-----------------------------------------------------------
TOTAL                                     158      8    95%

=========================== 8 passed, 8 warnings in 12.75s ===========================
______________________________________ summary ____________

What is a data pipeline?[edit]

A Generated Datasets Platform pipeline is made up of two components:

  1. Project specific tasks and data transformation that operate on input (sources) and produce output (sink). We depend on Apache Spark for elastic compute.
  2. An Airflow DAG that is a thin orchestration layer that composes and executes tasks

Data pipelines are executed on Hadoop. Elastic computing is provided by Spark (jobs are deployed in cluster mode). Scheduling and orchestration are delegated to Apache Airflow. Currently, we support Python based projects. Scala support is planned.

Some caveats apply.

Task Orchestration[edit]

  • Airflow dags are a thin layer of (declarative) execution steps.
  • Airflow dags must not contain any logic
  • Airflow tasks must not perform local compute.
  • Airflow tasks must not persist data locally.

Compute[edit]

  • pyspark jobs must be deployed in cluster-mode.
  • pyspark jobs can declare their dependencies and virtual environments, but they must not vendor libraries or third-party modules (e.g. research algos).
  • pyspark code should be idiomatic
  • Avoid computation on the driver; use UDFs instead.

Loading data to Cassandra[edit]

**This section is a draft**.

Data can be loaded to Cassandra, our canonical data store, using the HiveToCassandra spark job from analytics-refinery. This jobs requires a dataset to be persisted in Hive / Parquet first, and it then syncs it to a Cassandra keyspace using an HQL query.

It can be configured using our dag template with a SparkTask

config = SparkConfig()
cassandra_loader = SparkTask(main="org.wikimedia.analytics.refinery.job.HiveToCassandra",
application_jar="hdfs:///wmf/refinery/current/artifacts/refinery-job.jar",
main_args=f"""--config_file {os.path.join(config.pipeline_home,
"/my/config/file.properties")} 
-f {os.path.join(config.pipeline_home,
/my/config/cassandra_loader_query.hql)} 
--cassandra_keyspace local_group_default_T_example""")

The job requires:

  1. Parquet data stored in Hive.
  2. A file.properties HiveToCassandra config file (TODO: provide an example).
  3. A query file (cassandra_loader_query.hql in the example) that selects which columns from the Hive table to Cassandra keyspace (TODO: provide an example).

Conventions, code style[edit]

We favour test-driven development with pytest, lint with flake8, and type check with mypy. We encourage, but not yet enforce, the use of isort and black for formatting code. We log errors and information messages with the Python logging library.

Pipeline best practices[edit]

See Airflow Coding Convention for more details.

Tasks (= transformation, function, scripts) must be reproducible and follow functional programming paradigms:

  • Tasks should be idempotent.
  • Tasks should be deterministic.
  • Tasks should not generate side effects.
Implementation details[edit]

From an implementation perspective, data pipelines should follow these principles:

  • DAGs should be a thin layer and not contain any business logic, queries, computation, etc.
  • Specify config details consistently, by moving parameters to a config file.
  • Group tasks in the Airflow UI, to make it explicit to users the status of data processing / enrichment / export steps of a data pipeline.
  • Avoid local computations (our airflow instance shares scheduler and task executor processes). Currently, we target the YARN resource manager of elastic compute. Data pipelines are implemented atop Apache Spark. Spark jobs must be submitted from the airflow instance, and run in cluster mode

Code checks[edit]

A valid project is expected to pass the following code checks:

  • Compile time type-checking
  • Unit tests
  • Linting
  • DAG validation tests

Code checks are triggered automatically after a git push

A DAG validation tests live under the top-level tests directory. They can be triggered manually with make test-dags.

Continuous Integration[edit]

Codechecks are executed at push and MR events.

Python Code Style[edit]

The most up-to-date spec of our code style can be found at https://gitlab.wikimedia.org/gmodena/platform-airflow-dags/-/blob/multi-project-dags-repo/datapipeline-scaffold/%7B%7Bcookiecutter.pipeline_directory%7D%7D/pyspark/tox.ini

We lint with flake8 and the following (conservative) settings:

  • McCabe complexity threshold: 10
  • maximum allowed line length: 127 (Default PEP8: 79)
  • check for syntax errors or undefined names

We perform compile time type checks with mypy and the following rule set:

[mypy]
python_version = 3.7
disallow_untyped_defs = True # methods signature should be typed
disallow_any_unimported = True # disallows usage of types that come from unfollowed imports
no_implicit_optional = True # <- Explicit is better than implicit. Open to debate :)
check_untyped_defs = True # Type-checks the interior of functions without type annotations.
warn_return_any = True # Shows a warning when returning a value with type Any from a function declared with a non- Any return type.
show_error_codes = True # Show error codes in output
warn_unused_ignores = True # Warns about unneeded # type: ignore comments.

For more information about why and how we use mypy, see this PET lightning talk https://drive.google.com/file/d/1Oszl3ziSrSf4v1olZhRu331DP3AlQoDx/view?usp=sharing

Deployment[edit]

This section describes how to add a new pipeline to our deployment targets. This is a step that currently requires a member of the Generated Data Platform team in the loop.

Deployment pipelines are declared in the TARGET variable in Makefile. To deploy a new pipeline, append its project directory name to TARGET. For example, if a new pipeline has been created as my_new_datapipeline, the new TARGET list would look like the following:

TARGET := "image-matching your_data_pipeline"

Example[edit]

https://gitlab.wikimedia.org/repos/generated-data-platform/datapipelines/-/tree/main/sample-project

This pipeline makes a simple query request to the analytics data lake and returns the results. It is intended to serve as a simple data pipeline use case.

The pipeline is composed by:

- Project code implemented with pyspark https://gitlab.wikimedia.org/repos/generated-data-platform/datapipelines/-/tree/main/sample-project

- An airflow dag module for orchestration of project code https://gitlab.wikimedia.org/repos/generated-data-platform/datapipelines/-/blob/main/dags/sample-project_dag.py