This page summarizes the outcome and learnings of https://phabricator.wikimedia.org/T323217.
A pyflink implementation of Mediawiki Stream Enrichment has been developed and deployed on YARN. While this implementation did not write to a kafka topic directly, all enriched messages (48 hours worth of data) passed jsonschema validation. The python implementation has feature parity with the Scala one. In particular:
- It is built atop the DataStream API and operates on DataFrame[Row] (note that Row here is a pure python object and not a JVM one).
- Errors are reported to a sideoutput of String type.
- http client implements retry logic with backoff.
- Latency, resource consumption and GC footprint are similar between the two implementations.
The spike had the following goals.
[x] Run a read-only python implementation of Mediawiki Stream Enrichment on YARN.
Business logic for enrichment can be found at https://gitlab.wikimedia.org/-/snippets/42. Pyflink Rows are pure python object that map nicely to dict and can be (de) serialized to json. The experience so far has been pretty smooth, and I did not encounter any type mismatch issue (e.g. Map vs Row types). The situation may differ might we need to support other formats, but right now it does not seem a concern.
A context manager and some helpers that encapsulate flink and eventutilities boilerplate can be found at https://gitlab.wikimedia.org/-/snippets/44. This is just a lightweight wrapper that exposed Flink capabilities (map, process) following the delegation pattern. It's built atop work from Andrew Otto and Thomas Chin. Namely:
The pipeline has been tested locally using mock sources and sinks as well as on YARN using a Kafka source (the "rc0.mediawiki.page_change" stream) and a stdout sync. While this implementation did not write to a kafka topic directly, all enriched messages (48 hours worth of data) passed jsonschema validation.
Latency, resource consumption and GC footprint are similar between the two implementations. The pyflink job run in application mode with default memory and parallelism settings:
Memory allocation (taskmanager) after a 12 hours run.
JVM (Heap/Non-Heap) Memory Type Committed Used Maximum Heap 500 MB 83.7 MB 500 MB Non-Heap 151 MB 146 MB 744 MB Outside JVM Memory Type Count Used Capacity Direct 4,195 132 MB 132 MB Mapped 0 0 B 0 B
Integration with Catalog should be straightforward and support both DataFrame as well as Table/SQL use case (that means operations on Row objects).
Some capability is required in catalog to allow for time based operations (window aggregation and temporal joins). See
[x] Help requirement collection for https://phabricator.wikimedia.org/T322125 .
Sideoutput is available for Python in Flink 1.16, and has been implemented (PoC) in the sample pipeline. Retry logic with exponential backoffs comes for free with the requests python module.
Most complications came at deployment. Pyflink applications need to bundle jars and a virtual environment, so there is a setup cost that we can largely automate (e.g. in the future by using docker images), sale processes on stat nodes and tuning of kerberos required some troubleshooting.
Next Steps / Phab tasks proposal
I would like to propose the following tasks to be added to be groomed and added to our backlog. Right now this application is in a "work for me" state. My goal is to have the capability in place to build, deploy and support the copy edit service.
- Task: flink boilerplate should be moved into a dedicated Git repo with packaging and CI. The goal is not to prescribe and API but to facilitate experimentation and collaboration (e.g. code is alpha).
- Task: flink boilerplate interface should integrate Table API. We should allow injection of UDFs (ideally cross language).
- Task: we should provide utilities for local experimentation and unit testing. For instance, I would like to be able to inject mocked Sources/Sink and operate with local json files before rolling out to YARN.
- Task: we should streamline packaging of pyflink applications, and ideally integrate with the shared flink docker images
- Task: sideoutput error reporting should be made more robust.