Platform Engineering Team/Event Platform Value Stream/Build simple stateless service using PyFlink

From mediawiki.org

This page summarizes the learnings of https://phabricator.wikimedia.org/T318859

[SPIKE] Build simple stateless service using PyFlink[edit]

User Story[edit]

As a platform engineer, I need to try and build a simple stateless service that takes an input stream, transforms, enriches and produces an output using PyFlink

The service should:

  • Listen to mediawiki.revision-create or another existing Kafka topic
  • Make a call to MW Action API
  • Produce some output that combines the data

Is this good abstraction for event driven data producers to create similar services easily?

TL;DR[edit]

  • PyFlink by itself is in some areas more burdensome to use than regular Flink
  • Because PyFlink is just a thin wrapper for Flink, if we wanted to use our existing codebase it's basically writing Java without type hints and with the added overhead of needing to convert Java types into Python types
  • However, if we make wrappers for our existing codebase it becomes much more bearable... for the users. *If* we make wrappers
  • The one major advantage is its ability to easily implement UDFs that can be used in both PyFlink and Flink SQL

Evaluation[edit]

Pros[edit]

  • Python is more familiar to developers
  • Easier to get started; just install pyflink
  • Development is easier. No need to rebuild jars. No need to submit jobs to a cluster just to test it out
  • Can interop with our existing Java codebase
  • Supports Pandas

Cons[edit]

  • Need to know Flink
  • There is not a Python equivalent for every Java function, and it's unclear if the missing items are intentional or if it's still in development
  • Immediately gets more complicated the second you want to interop with Java
    • No type hints
    • Need to convert between Java/Python types

If we want the easiest developer experience, it might involve making a library of custom UDFs and Flink SQL connectors so people don't have to touch Flink at all.

Comparisons[edit]

Datastream API[edit]

Developers can define UDFs by extending one of PyFlink's Function* classes. Developers can also use third-party Python libraries within their UDFs, but must specify the dependencies when executing the jobs on a remote cluster.

Here's an example of a map function in PyFlink that takes a page_id and returns the images on that page:

def get_images(page_id: int):
    response = requests.get(
        f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=images&pageids={page_id}")
    if response.status_code == 200:
        try:
            return response.json()['query']['pages'][str(page_id)]['images']
        except KeyError:
            pass
    return []

class PagesToImages(MapFunction):
    def map(self, value):
        page_id = value['page_id']
        return Row(page_title=value['page_title'], images=get_images(page_id))

# ...
env = StreamExecutionEnvironment.get_execution_environment()
datastream = env.from_source(
	# ...
)
datastream.map(PagesToImages()).print()
env.execute()

Sample Output:

Row(page_title='File:NLC892-411999027560-49817_劉子威集_第3冊.pdf', images=[])
Row(page_title='Cristobal_de_la_Sierra', images=[{'ns': 6, 'title': 'File:Camera-photo.svg'}, {'ns': 6, 'title': 'File:Rhacodactylus ciliatus.jpg'}])
Row(page_title='Категорија:Шефови_влада', images=[])

Full Example

Table API[edit]

The table api allows you to create UDFs which can mimic the datastream UDFs. However, the return type has to be one of DataTypes since it integrates with SQL.

Here's an example of a UDF that does the equivalent of the datastream example:

@udf(result_type=DataTypes.ARRAY(
    DataTypes.ROW([
        DataTypes.FIELD("ns", DataTypes.INT()),
        DataTypes.FIELD("title", DataTypes.STRING())
    ])
))
def get_images(page_id: int):
    response = requests.get(
        f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=images&pageids={page_id}")
    if response.status_code == 200:
        try:
            images = response.json()['query']['pages'][str(page_id)]['images']
            return [Row(ns=int(image["ns"]), title=image["title"]) for image in images]
        except KeyError:
            pass
    return []

# ...
env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(env)

st_env.create_temporary_view(
	"mediawiki_page_create",
	# ...
)
st_env.create_temporary_function("get_images", get_images)

st_env.sql_query(
    """
    SELECT page_title, get_images(page_id) as images FROM mediawiki_page_create
    """
)
streaming_result = result_table.execute()
streaming_result.print()

Sample Output:

+----+--------------------------------+----------------+
| op |                     page_title |         images |
+----+--------------------------------+----------------+
| +I | California_production_of_peach |             [] |
| +I |   Kaozeal:Charlez_I_a_Vro-Saoz | [(6, File:C... |
| +I | Kategorie:Postaveno_v_Němec... |             [] |

Full Example

Flink SQL + Python UDF[edit]

You can pull the UDFs created for the table api and load it into Flink SQL. You can also continue to use external python libraries by importing the library within the UDF. However, unlike running a Pyflink job locally, running the UDF requires submitting it to a remote cluster and therefore you need to submit a virtual environment with all the required dependencies for the UDF to run in.

Packaging a Virtual Env[edit]

1. Create the virtual environment

We don't use conda because it's heavy and has a very long cold start time. Also stacked environments can cause some confusion since all dependencies must be packaged together. Even then, there *will* be some lag every time you execute a query with a UDF since Flink has to unzip and initialize the virtual environment.

python3 -m venv pyflink-venv

2. Activate environment

source pyflink-env/bin/activate

3. Install required dependencies

pip3 install wheel
pip3 install apache-flink==1.15.2
// Other dependencies

4. Zip up files

cd pyflink-venv
zip pyflink-venv.zip ./*

5. (Optional) Move files to a better location

mv pyflink-venv.zip ../pyflink-venv.zip

6. Export Hadoop config

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HBASE_CONF_DIR=/etc/hbase/conf

7. (Assuming you already have Flink downloaded and extracted) Start Flink cluster

cd flink-1.15.2
./bin/start-cluster.sh

8. Start Flink SQL client and tell it to use the packaged virtual env for UDFs

 ./bin/sql-client.sh -pyarch file:///home/path/to/pyflink-venv.zip -pyexec pyflink-venv.zip/bin/python3 -pyclientexec pyflink-venv.zip/bin/python3 -pyfs ../stateless_table.py

9. Remember to stop the cluster when done

./bin/stop-cluster.sh

If you get an error with executing the python UDF, you might need to manually link the flink python jar

wget https://repo1.maven.org/maven2/org/apache/flink/flink-python_2.12/1.15.2/flink-python_2.12-1.15.2.jar

And then add this to step 8:

-j ../flink-python_2.12-1.15.2.jar

Example[edit]

Here's an example that uses the UDF from the above example:

SET 'sql-client.execution.result-mode' = 'tableau';
CREATE FUNCTION get_images AS 'stateless_table.get_images' LANGUAGE PYTHON;

SELECT get_images(1221227) AS images;

Sample Output:

+----+--------------------------------+
| op |                         images |
+----+--------------------------------+
| +I | [(6, File:Commons-logo.svg)... |
+----+--------------------------------+

WIP Example