Platform Engineering Team/Event Platform Value Stream/Evaluate a pyflink version of Mediawiki Stream Enrichment

From mediawiki.org

This page summarizes the outcome and learnings of https://phabricator.wikimedia.org/T323217.

Evaluate a pyflink version of Mediawiki Stream Enrichment[edit]

A pyflink implementation of Mediawiki Stream Enrichment has been developed and deployed on YARN. While this implementation did not write to a kafka topic directly, all enriched messages (48 hours worth of data) passed jsonschema validation. The python implementation has feature parity with the Scala one. In particular:

- It is built atop the DataStream API and operates on DataFrame[Row] (note that Row here is a pure python object and not a JVM one).

- Errors are reported to a sideoutput of String type.

- http client implements retry logic with backoff.

- Latency, resource consumption and GC footprint are similar between the two implementations.

Goals[edit]

The spike had the following goals.

[x] Run a read-only python implementation of Mediawiki Stream Enrichment on YARN.[edit]

Business logic for enrichment can be found at https://gitlab.wikimedia.org/-/snippets/42. Pyflink Rows are pure python object that map nicely to dict and can be (de) serialized to json. The experience so far has been pretty smooth, and I did not encounter any type mismatch issue (e.g. Map vs Row types). The situation may differ might we need to support other formats, but right now it does not seem a concern.

A context manager and some helpers that encapsulate flink and eventutilities boilerplate can be found at https://gitlab.wikimedia.org/-/snippets/44. This is just a lightweight wrapper that exposed Flink capabilities (map, process) following the delegation pattern. It's built atop work from Andrew Otto and Thomas Chin. Namely:

- https://gitlab.wikimedia.org/tchin/stateless-pyflink-examples/

- https://gist.github.com/ottomata/2778938f1bc97932d49d45bbb64a78e0

The pipeline has been tested locally using mock sources and sinks as well as on YARN using a Kafka source (the "rc0.mediawiki.page_change" stream) and a stdout sync. While this implementation did not write to a kafka topic directly, all enriched messages (48 hours worth of data) passed jsonschema validation.

[x] Collect resource allocation and latency metrics for a long running pyflink job[edit]

Latency, resource consumption and GC footprint are similar between the two implementations. The pyflink job run in application mode with default memory and parallelism settings:

taskmanager.memory.process.size: 1728m

taskmanager.numberOfTaskSlots: 1

parallelism.default: 1

Memory allocation (taskmanager) after a 12 hours run.

JVM (Heap/Non-Heap) Memory
Type	Committed	Used	Maximum
Heap	500 MB	83.7 MB	500 MB
Non-Heap	151 MB	146 MB	744 MB

Outside JVM Memory
Type	Count	Used	Capacity
Direct	4,195	132 MB	132 MB
Mapped	0	0 B	0 B

[x] Help inform integration paths with the upcoming Flink catalog. https://phabricator.wikimedia.org/T322022.[edit]

Integration with Catalog should be straightforward and support both DataFrame as well as Table/SQL use case (that means operations on Row objects).

Some capability is required in catalog to allow for time based operations (window aggregation and temporal joins). See

[x] Help requirement collection for https://phabricator.wikimedia.org/T322125 .[edit]

Sideoutput is available for Python in Flink 1.16, and has been implemented (PoC) in the sample pipeline. Retry logic with exponential backoffs comes for free with the requests python module.

Learnings[edit]

Most complications came at deployment. Pyflink applications need to bundle jars and a virtual environment, so there is a setup cost that we can largely automate (e.g. in the future by using docker images), sale processes on stat nodes and tuning of kerberos required some troubleshooting.

Next Steps / Phab tasks proposal[edit]

I would like to propose the following tasks to be added to be groomed and added to our backlog. Right now this application is in a "work for me" state. My goal is to have the capability in place to build, deploy and support the copy edit service.

  1. Task: flink boilerplate should be moved into a dedicated Git repo with packaging and CI. The goal is not to prescribe and API but to facilitate experimentation and collaboration (e.g. code is alpha).
  2. Task: flink boilerplate interface should integrate Table API. We should allow injection of UDFs (ideally cross language).
  3. Task: we should provide utilities for local experimentation and unit testing. For instance, I would like to be able to inject mocked Sources/Sink and operate with local json files before rolling out to YARN.
  4. Task: we should streamline packaging of pyflink applications, and ideally integrate with the shared flink docker images
  5. Task: sideoutput error reporting should be made more robust.
  6. https://phabricator.wikimedia.org/T324144