Platform Engineering Team/Event Platform Value Stream/PoC Mediawiki Stream Enrichment

From mediawiki.org

This page describes an implementation of T307959.

1. Code: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment

2. Package: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/packages/234

3. Status: PoC

4. Deployment environment: YARN https://phabricator.wikimedia.org/T323914

Mediawiki Stream Enrichment[edit]

A proof of concept Flink Service that consumes page_change events and produces wikitext enriched events in page_content_change.

Consume enriched events.[edit]

Eneriched events are produced into eqiad.rc0.mediawiki.page_content_change brokered in Kafka Jumbo,

Example:

kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.rc0.mediawiki.page_content_change

Flink on YARN[edit]

The job has been tested on Flink 1.15.

tl;dr: the steps below are more or less automated by this unsupported script: https://gitlab.wikimedia.org/-/snippets/41 .

A standalone cluster can be setup locally (on a stat machine atop YARN) with

wget <nowiki>https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz</nowiki>

tar xvzf flink-1.15.0-bin-scala_2.12.tgz

cd flink-1.15.0

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh --detached

The package target can be manually copied to a stat machine with:

scp  target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0

Start a Flink cluster on YARN with

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh --detached

From yarn.wikimedia.org you can access the Flink dashboard. This will allow monitoring job execution (Task Manager panel), and eventually stopping the job.

Job lifecycle management[edit]

Launch the job[edit]

Finally launch the job with

./bin/flink run -c org.wikimedia.mediawiki.event.enrichment.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar

Restart the job[edit]

The job uses kafka offsets to determine start (resume) points. A stopped job can be restarted. Streaming will resume from the latest recorded Kafka offset.

Yarn deployment (long lived kerberos ticket)[edit]

Currently Mediawiki Stream Enrichment runs as `analytics` job in the YARN production queue. This deployment consist of a Session cluster and the job itself.

Startup scripts can be found at:

By default a flink dist is setup in `/tmp` on `an-launcher1002`. This is an intentional ephemeral installation. Upon server restart Flink cluster and the enrichment job will need to be re-deployed.

View the output of a Flink job at the command line[edit]

On YARN stdout is directed to the container job, and won't be visible from the cli. We can display container output by accessing its logs with

yarn logs -applicationId <applicationId> -containerId <containerId>

Where

  • <applicationId> is the Flink cluster id returned by yarn-session.sh, and visible at https://yarn.wikimedia.org.
  • <containerId> is the container running a specific task, that you can find in Flink's Task Manager at https://yarn.wikimedia.org/proxy/<applicationId>/#/task-manager.

For more details see the project doc. The Flink Web Interface will be available at yarn.wikimedia.org under https://yarn.wikimedia.org/proxy/<applicationId>.

Config[edit]

There's a couple of gotchas.

JVM[edit]

We need to rewrite the Host HTTP header to properly route HTTP request from the internal YARN cluster to https://api-ro.discovery.wmnet.

To do so, we need to configure the JVM http-client to allow restricted headers.

Add the following to conf/flink-conf.yaml:

env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true

Kerberos[edit]

Kerberos authentication is required to access WMF Analytics resources. The relevant config settings are found in conf/flink-conf.yaml: ===

security.kerberos.login.use-ticket-cache: true

<nowiki>#</nowiki> security.kerberos.login.keytab:

security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA

<nowiki>#</nowiki> The configuration below defines which JAAS login contexts

security.kerberos.login.contexts: Client,KafkaClient

Scala free Flink[edit]

flink-scala deps must be removed from the Flink distribution. As of 1.15 we run flink scala free. See https://flink.apache.org/2022/02/22/scala-free.htm.

rm flink-1.15/lib/flink-scala*