Platform Engineering Team/Event Platform Value Stream/Build simple stateless service using Flink SQL

From mediawiki.org

This page summarizes the learnings of https://phabricator.wikimedia.org/T318856

[SPIKE] Build simple stateless service using Flink SQL[edit]

Author: Gabriele Modena <gmodena@wikimedia.org>

Bug: https://phabricator.wikimedia.org/T318856

To simplify the process of creating streaming stateless applications on Event Platform, this SPIKE investigated using Flink SQL to implement a near real-time enrichment data pipeline. I implemented a SQL service that:

  • Listens to mediawiki.revision-create or another existing Kafka topic
  • Makes a call to MW Action API and extracts the wikitext associated with a revision id
  • Produces some output that combines the data. The whole logic is contained in a single sql script https://gitlab.wikimedia.org/-/snippets/37 that can be executed via Flink's sql client with:
sql-client.sh -f flink-http-action-connector.sql

A demo can be viewed at flink-http-action-connector.

This work required to implement an ad-hoc flink connector to issue http requests and retrieve content (more below).

Evaluation[edit]

+ A platform engineer can implement simple enrichment applications as SQL transformations.

+ No knowledge of Flink is required.

+ Rich SQL semantics (e.g. windowing) https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/overview/

+ Flink SQL is interoperable with Python and JVM UDFs.We can extend the simple use case by embedding logic in python and java/scala.

- SQL can be hard to maintain (e.g. test)

- I could no find an off the shelf piece of software to perform lookups on Action API. A custom connector is provided for demo purposes. this code must be made production ready and maintained.

- The custom connector cannot be used in SELECT statements, only in JOINS. That's how the lookup semantic works.

- While SQL is simple for a user to write, our team should still operate a Flink cluster.

- Without a Catalog, SQL applications have a lot of redundancy (we need to create schemas for all topics/endpoints we want to work with). In order for SQL applications to be viable, we must implement such a Catalog atop eventstreams and eventutils. Initial work suggest this to be a feasible piece of work, but more grooming and scoping is required.

Considerations and follow up work[edit]

I think there is merit in exploring SQL further, especially in combination with python UDFs and a stable k8s deployment. Follow up work should address:

  1. How hard will it be to operate a Flink cluster? Onboarding on DSE k8s should help us better scope this concern.
  2. Implement a Catalog to automatically expose known kafka topics to SQL applications.
  3. Can we have an openapi spec to automatically generate json schema for our endpoints? Can we decorate such endpoints to facilitate lookup join semantics?

mediawiki-http flink connector[edit]

Flink has an interface that implements loookup join semantics. The semantic of join requires a set of keys that are present in all relationships. Some response content (e.g. Action API) might violate join semantics (it might not contain a required key). Existing http connectors that assume that a table schema matches the content of a rest response, and where not suitable for our use case. For demo purposes I implemented a connector to asynchronously query http endpoints, and used it an enrichment pipeline that queries Action API to retrieve wikitext for a given revision.

A table using this connector must follow this semantics. 1. If present, a domain field is used to set the Host head field in a request.

1. If present, a domain field is used to set the Host head field in a request.

2. a response content field should be present (we need it to store the API response as string).

All other schema fields will be used as parameters to the query string.


Example

CREATE TABLE MwAction(
    revids integer, -- this field will be part of the query string (revids=<val>). It's name should match the action api param
    domain string, -- special field (not part of action api) that we need to set in the request header. Optional?
    response string -- special field (not part of action api) that will store the action api response content.
) WITH (
    'connector' = 'mediawiki-http',
    'format' = 'json', -- TODO: we don't need really this, since internally we don't need to serialize data as Json.
    'url' = 'https://en.wikipedia.org/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main', -- the base url to query. Can contain defaults query string params. Query parameters could also be declared as table columns, and used in where clauses (... where rvslots = 'main');
    -- 'fields.domain' = 'domain', -- Optional field that stores the domain (default: `domain`). When present, it will
    -- be used to set the Host request header.
    'fields.response' = 'response', -- Field stores the response payload (default: `response`).
    'http.client.header.user-agent' = 'wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot'
);

When used in the a JOIN ON revids statement, the table will:

1. send a query to: https://en.wikipedia.org/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main&revids=<value> 2. store the response content in the response field.

A simple stateless SQL application[edit]

An enrichment pipeline would look can be expressed as:

SELECT
    mw_action.revids,
    mw_action.domain,
    JSON_VALUE(mw_action.response, '$.query.pages[0].revisions[0].slots.main.content') as wikitext -- parse the response payload as json an extract the wikitext field.
FROM MockRevisionCreate AS revision_create JOIN MwAction FOR SYSTEM_TIME AS OF revision_create.proc_time AS mw_action
ON revision_create.rev_id = mw_action.revids

here we use Flink's json functions capabilities (JSON_VALUE) to parse response content and extract wikitext.