Platform Engineering Team/Event Platform Value Stream/Build simple stateless service using PyFlink
This page summarizes the learnings of https://phabricator.wikimedia.org/T318859
[SPIKE] Build simple stateless service using PyFlink
[edit]User Story
[edit]As a platform engineer, I need to try and build a simple stateless service that takes an input stream, transforms, enriches and produces an output using PyFlink
The service should:
- Listen to
mediawiki.revision-create
or another existing Kafka topic - Make a call to MW Action API
- Produce some output that combines the data
Is this good abstraction for event driven data producers to create similar services easily?
TL;DR
[edit]- PyFlink by itself is in some areas more burdensome to use than regular Flink
- Because PyFlink is just a thin wrapper for Flink, if we wanted to use our existing codebase it's basically writing Java without type hints and with the added overhead of needing to convert Java types into Python types
- However, if we make wrappers for our existing codebase it becomes much more bearable... for the users. *If* we make wrappers
- The one major advantage is its ability to easily implement UDFs that can be used in both PyFlink and Flink SQL
Evaluation
[edit]Pros
[edit]- Python is more familiar to developers
- Easier to get started; just install pyflink
- Development is easier. No need to rebuild jars. No need to submit jobs to a cluster just to test it out
- Can interop with our existing Java codebase
- Supports Pandas
Cons
[edit]- Need to know Flink
- There is not a Python equivalent for every Java function, and it's unclear if the missing items are intentional or if it's still in development
- Immediately gets more complicated the second you want to interop with Java
- No type hints
- Need to convert between Java/Python types
If we want the easiest developer experience, it might involve making a library of custom UDFs and Flink SQL connectors so people don't have to touch Flink at all.
Comparisons
[edit]Datastream API
[edit]Developers can define UDFs by extending one of PyFlink's Function*
classes. Developers can also use third-party Python libraries within their UDFs, but must specify the dependencies when executing the jobs on a remote cluster.
Here's an example of a map function in PyFlink that takes a page_id
and returns the images on that page:
def get_images(page_id: int):
response = requests.get(
f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=images&pageids={page_id}")
if response.status_code == 200:
try:
return response.json()['query']['pages'][str(page_id)]['images']
except KeyError:
pass
return []
class PagesToImages(MapFunction):
def map(self, value):
page_id = value['page_id']
return Row(page_title=value['page_title'], images=get_images(page_id))
# ...
env = StreamExecutionEnvironment.get_execution_environment()
datastream = env.from_source(
# ...
)
datastream.map(PagesToImages()).print()
env.execute()
Sample Output:
Row(page_title='File:NLC892-411999027560-49817_劉子威集_第3冊.pdf', images=[])
Row(page_title='Cristobal_de_la_Sierra', images=[{'ns': 6, 'title': 'File:Camera-photo.svg'}, {'ns': 6, 'title': 'File:Rhacodactylus ciliatus.jpg'}])
Row(page_title='Категорија:Шефови_влада', images=[])
Table API
[edit]The table api allows you to create UDFs which can mimic the datastream UDFs. However, the return type has to be one of DataTypes
since it integrates with SQL.
Here's an example of a UDF that does the equivalent of the datastream example:
@udf(result_type=DataTypes.ARRAY(
DataTypes.ROW([
DataTypes.FIELD("ns", DataTypes.INT()),
DataTypes.FIELD("title", DataTypes.STRING())
])
))
def get_images(page_id: int):
response = requests.get(
f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=images&pageids={page_id}")
if response.status_code == 200:
try:
images = response.json()['query']['pages'][str(page_id)]['images']
return [Row(ns=int(image["ns"]), title=image["title"]) for image in images]
except KeyError:
pass
return []
# ...
env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(env)
st_env.create_temporary_view(
"mediawiki_page_create",
# ...
)
st_env.create_temporary_function("get_images", get_images)
st_env.sql_query(
"""
SELECT page_title, get_images(page_id) as images FROM mediawiki_page_create
"""
)
streaming_result = result_table.execute()
streaming_result.print()
Sample Output:
+----+--------------------------------+----------------+
| op | page_title | images |
+----+--------------------------------+----------------+
| +I | California_production_of_peach | [] |
| +I | Kaozeal:Charlez_I_a_Vro-Saoz | [(6, File:C... |
| +I | Kategorie:Postaveno_v_Němec... | [] |
Flink SQL + Python UDF
[edit]You can pull the UDFs created for the table api and load it into Flink SQL. You can also continue to use external python libraries by importing the library within the UDF. However, unlike running a Pyflink job locally, running the UDF requires submitting it to a remote cluster and therefore you need to submit a virtual environment with all the required dependencies for the UDF to run in.
Packaging a Virtual Env
[edit]1. Create the virtual environment
We don't use conda because it's heavy and has a very long cold start time. Also stacked environments can cause some confusion since all dependencies must be packaged together. Even then, there *will* be some lag every time you execute a query with a UDF since Flink has to unzip and initialize the virtual environment.
python3 -m venv pyflink-venv
2. Activate environment
source pyflink-env/bin/activate
3. Install required dependencies
pip3 install wheel pip3 install apache-flink==1.15.2 // Other dependencies
4. Zip up files
cd pyflink-venv zip pyflink-venv.zip ./*
5. (Optional) Move files to a better location
mv pyflink-venv.zip ../pyflink-venv.zip
6. Export Hadoop config
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` export HBASE_CONF_DIR=/etc/hbase/conf
7. (Assuming you already have Flink downloaded and extracted) Start Flink cluster
cd flink-1.15.2 ./bin/start-cluster.sh
8. Start Flink SQL client and tell it to use the packaged virtual env for UDFs
./bin/sql-client.sh -pyarch file:///home/path/to/pyflink-venv.zip -pyexec pyflink-venv.zip/bin/python3 -pyclientexec pyflink-venv.zip/bin/python3 -pyfs ../stateless_table.py
9. Remember to stop the cluster when done
./bin/stop-cluster.sh
If you get an error with executing the python UDF, you might need to manually link the flink python jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-python_2.12/1.15.2/flink-python_2.12-1.15.2.jar
And then add this to step 8:
-j ../flink-python_2.12-1.15.2.jar
Example
[edit]Here's an example that uses the UDF from the above example:
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE FUNCTION get_images AS 'stateless_table.get_images' LANGUAGE PYTHON;
SELECT get_images(1221227) AS images;
Sample Output:
+----+--------------------------------+
| op | images |
+----+--------------------------------+
| +I | [(6, File:Commons-logo.svg)... |
+----+--------------------------------+