Jump to content

Manual:Domain events/Outlook/Cross-service

From mediawiki.org

Cross Service Integration Events

[edit]

Andrew Otto, Daniel Kinzler, Halley Coplin (FY 24/25)

T379935 DomainEvents - Broadcasting and receiving cross-service Integration events.

Once MediaWiki has a Domain Event Dispatcher implementation that supports in-process event emitters and listeners, the system can be extended to support use of events cross-service.

This will create a foundation for offering first class event driven architectures with MediaWiki. cross-service events provide ‘data integration capabilities between disparate services, including disparate MediaWiki sites and processes.

If done well, DomainEvents could be translated into ‘integration events’ and broadcast for other processes or services to consume, whether those be other MediaWiki processes or unrelated services. Doing this could improve data reusability across the whole of WMF, and beyond. It could decouple data emitters from data receivers, increasing our capabilities to use data to build new products, both in and outside of the MediaWiki platform.

How to engage with this document

[edit]

This is a long discovery and design doc. It is expected that this doc will become out of date over time.

This doc makes heavy use of terms defined in Manual:Domain events/Glossary You might want to keep that open in a tab as you read :)

These sections are likely some of the most pertinent for readers.

If you are very short on time, review Domain Events - Cross Service Design Doc Overview slide deck instead.

If you'd like to discuss these ideas, please comment on T379935 DomainEvents - Broadcasting and receiving cross-service Integration events.

Outline

[edit]
  • Goals
  • Different scenarios and use cases we would like to support
  • Common needs and requirements of the scenarios
  • Answers to questions we started with
  • Possible implementation options
  • Long and short term recommendations

Goals

[edit]
  • Discover how broadcasting, serializing, and receiving DomainEvents cross-service might be accomplished.  
  • Explore commonalities between cross-service scenarios.
  • Propose implementation options that ideally consolidates systems and supports as many cross-service scenarios as possible.
  • Propose short term implementation options that provide the most value for WMF’s current and upcoming objectives

Short Term Recommendations Summary

[edit]

Details of these recommendations can be found in the Recommendations section at the end of this document.

Improve usability of off-wiki generated data in MediaWiki by enabling MediaWiki to receive events.

[edit]
1. MediaWiki PHP Kafka Consumer
[edit]

Implementing event bus (kafka) MediaWiki receiving would allow non MediaWiki emitters to emit data in a way that is decoupled from the MediaWiki usage of that data.  

The emitted event data would be reusable for many use cases, including those inside of MediaWiki.

  • E.g. PageRevertRiskScore events could be emitted by LiftWing.  
  • Those events could be consumed into the Data Lake, by Enterprise, or by MediaWiki for building products features like moderator tools.
2. non MediaWiki job submission
[edit]

Enable non MediaWiki processes to submit MediaWiki job events.

This solution would not improve data reusability, but is a fast path to enabling usage of generated data in MediaWiki

  • E.g. LiftWing, would have to emit multiple versions of PageRevertRiskScore event data, for Data Lake, Search and Enterprise, and also as a job into each wiki where the data should be pushed.

cross-service scenarios

[edit]
scenario emitter receiver
same-wiki MediaWiki MediaWiki
inter-wiki MediaWiki MediaWiki
off-wiki not MediaWiki MediaWiki
beyond-wiki MediaWiki not MediaWiki
beyond-wiki-public MediaWiki not MediaWiki

In the Domain Event System Glossary, cross-service is defined as “scenarios where an event is received in a event is received in a different application or service than the emitter”.  We’ve identified 5 cross-service scenarios.

MediaWiki receiving scenarios

[edit]

These scenarios are MediaWiki receiving events emitted from another process.  

These events are often about MediaWiki entities, like Pages or Revisions, and their properties can be marshaled into core MediaWiki value classes, like PageRecord or RevisionRecord.

same-wiki

[edit]
emitter MediaWiki
receiver MediaWiki

A MediaWiki PHP listener is invoked within the same-wiki site as the emitter.

Examples

  • enwiki receives a PageUpdatedEvent that was emitted by enwiki.

The receiving MediaWiki process has the same configuration and database access as the emitting MediaWiki process.

MediaWiki Requirements

  • DomainEvents must be serializable and marshallable (possibly via MediaWiki’s JsonCodec interfaces)
  • DomainEvents must be broadcast to an event bus
  • MediaWiki must be able to receive events from an event bus
  • MediaWiki must be able to map received events to registered listeners, and then invoke registered listeners

inter-wiki

[edit]
emitter MediaWiki
receiver MediaWiki

A MediaWiki PHP listener is invoked with an event that was emitted by a different wiki site context.

Examples

*Global user blocks and renames are currently handled by a either global state and/or manual multicast to every wiki via the JobQueue.  These are existent inter-wiki use cases that do not need fine grained dependency tracking.

MediaWiki Requirements

  • DomainEvents must be serializable and marshallable (possibly via MediaWiki’s JsonCodec interfaces)
  • DomainEvents must be broadcast to an event bus
  • MediaWiki must be able to receive events from an event bus
  • MediaWiki must be able to map received events to registered listeners, and then invoke registered listeners
  • inter-wiki DomainEvents about MediaWiki entities (e.g Page) must implement WikiAwareEntity
  • inter-wiki DomainEvents serialized event data must be marshallable into WikiAwareEntities via configuration or convention

off-wiki

[edit]
emitter not-MediaWiki
receiver MediaWiki

A MediaWiki PHP listener is invoked with an event that was not emitted by MediaWiki.

Making MediaWiki able to receive off-wiki emitted event data will allow MediaWiki to consume and serve ‘generated’ or ‘derived’ data use cases.

Examples

  • enwiki receives a PageRevertRiskPredictionUpdated event from LiftWing
  • eswiki receives an ImageSuggestionRejected event from a JavaScript client
  • enwiki receives a IpReputationChange event about about IP address trustworthiness that can be used to help patrol or automate editor IP blocking
  • metawiki receives a WikipediaLibraryUserElligibilityChange event from the Wikipedia Library app; plwiki then sends a notification message to the plwiki user.

off-wiki events can be about MediaWiki entities, so they may be ‘wiki aware’, like PageRevertRiskPredictionUpdated. They can be associated with a specific wiki site.  

Other events might be not about MediaWiki things at all, like an IpReputationChange event.

MediaWiki Requirements

  • events must be broadcast to an event bus
  • MediaWiki must be able to receive events from an event bus
  • MediaWiki must be able to map received events to registered listeners, and then invoke registered listeners
  • events about MediaWiki entities (e.g Page) must have a DomainEvent implementation that implements  WikiAwareEntity
  • Serialized events about MediaWiki entities (e.g Page) serialized must be marshallable into WikiAwareEntities via configuration or convention

beyond-wiki receiving scenarios

[edit]

These scenarios are about MediaWiki emitting events for reception by non-MediaWiki processes.  This capability is currently implemented for WMF in the EventBus extension.  

WMF’s Event Platform has services, tooling and conventions for emitting, receiving and using events outside of MediaWiki.  While not a strict requirement for cross-service MediaWiki DomainEvent work, it would be advantageous to utilize Event Platform if possible.

There are 2 beyond-wiki scenarios.

beyond-wiki

[edit]
emitter MediaWiki
receiver not-MediaWiki

MediaWiki broadcasts events to an event bus for any non MediaWiki process to receive.

Examples

  • PageUpdated events from all wikis
    • ingested into Hive tables in the WMF Data Lake.
    • consumed by Search Update Pipeline to trigger updates to ElasticSearch indexes
    • consumed by mediawiki-event-enrichment to create events that contain revision content, HTML content, wikidata entity content, etc. to generate ‘dumps’ of this data.
    • consumed by LiftWing to emit PageRevertRiskPredictionUpdated, or other page prediction events.
  • WikiBaseEntityUpdated
    • Consumed by WQDS Streaming Updater to update graph database
  • wiki events used to update and maintain a feature store for use by LiftWing

MediaWiki Requirements

  • DomainEvents must be serializable
  • DomainEvents must be broadcast to an event bus
  • DomainEvents must be consumable by arbitrary subscribers via an event bus (e.g. a Kafka Consumer)

beyond-wiki – public

[edit]
emitter MediaWiki
receiver not-MediaWiki (external to WMF production)

The beyond-wiki and beyond-wiki – public scenarios are mostly the same.  

The difference is that events received by beyond-wiki – public receivers must not receive any PII.  

Examples

Requirements

  • DomainEvents must be serializable
  • DomainEvents must be broadcast to an event bus
  • DomainEvents must be consumable by arbitrary subscribers via an event bus (e.g. a Kafka Consumer)
  • received events must not have PII

cross-service scenario requirements

[edit]
same-wiki inter-wiki off-wiki beyond-wiki beyond-wiki – public
broadcastable
serializable
marshallable ✅* n/a n/a
WikiAware ❌† ✅(often) n/a n/a
PII redacted
Advantage if Event Platform compatible? ✅‡ ✅‡
  • Events must be broadcast and serializable for all scenarios.
  • For all MediaWiki receiving scenarios, events should be marshallable into a DomainEvent class.
    • * For off-wiki and non-wiki, there may not be a DomainEvent class implementation to marshall to.  We could create a GenericDomainEvent class that exposes data as an associative (typeless) PHP array
  • † same-wiki receiving does not require that DomainEvents be WikiAware, but having them be WikiAware does not hinder same-wiki receiving.
  • Only the beyond-wiki – public scenario requires PII redaction.
  • ‡ While not required for MediaWiki to MediaWiki cross-service receiving, utilizing Event Platform serialization conventions would improve interoperability and data reusability between MediaWiki and other services.

Questions

[edit]

Can we use the same serialized and broadcast event for all cross-service scenarios?

[edit]

✅Short answer: yes, it is possible.

For MediaWiki receivers, yes.  Serialization and broadcasting is not difficult. We are doing this now with the EventBus extension.

The tricky part will be figuring out how to use a single broadcast event in multiple receivers in multiple foreign wikis. See the event bus implementation option section below.

Serialization (and MediaWiki marshalling) can be implemented using JsonCodec.

beyond-wiki receivers require event schemas.  See Event Platform questions below.

This is not a requirement, but if we can do it, we should, as it will reduce the number of event streams that may need to be ‘reconciled’, and improve data reusability.

Should we make DomainEvents WikiAwareEntities?

[edit]

✅ yes.

For MediaWiki receiving scenarios, this will be useful. This will enable us to use the same event in multiple wiki site contexts.

To do this, broadcastable DomainEvents should implement WikiAwareEntity where appropriate.

There will be some complexity when deserializing and marshalling in the receiver. The receiver will have to know if the event came from the same wiki, and if so, set the wiki_id correctly to ‘false’ (false means local wiki).

Why use WMF’s Event Platform?

[edit]

Event Platform is widely used at WMF as a standard for reusable data transfer. From MediaWiki’s perspective, integration with Event Platform has benefits for both broadcasting and receiving events.

As MediaWiki developers create and emit DomainEvents for product features, having a standardized way to broadcast this data for reuse ‘on the outside’ will be quite valuable. Broadcasting DomainEvents will help decouple usage of MediaWiki data outside of MediaWiki from MediaWiki’s internal application database (MediaWiki_replicas).  Because emitters are decoupled from receivers, event data can be reused for more than it was originally intended.  

Event Platform is used outside of MediaWiki to transport event data between disparate services.  Allowing MediaWiki to receive, store, and serve this data in various ways will make it simpler and faster to build products in MediaWiki that use ‘generated data’.

Event Platform is WMF’s standard for event data.  We should use it if we can.

How might we use WMF’s Event Platform?

[edit]

The requirements of Event Platform events are:

  • Events conform to an event schema following conventions
  • Every schema version is immutable and retrievable
  • Events are serialized to a specific schema version
  • Event streams are globally (via metawiki) declared in EventStreamConfig

There may be extra information that needs to be included in serialized events for use by Wikimedia JsonCodec (e.g. how to marshal into a DomainEvent class instance).

How to manage Event Platform schemas?

[edit]

Event Platform uses centrally managed schema git repositories. To use these, DomainEvent implementers will have to commit an event schema to an event schema repo before it can be broadcast.  This could be quite cumbersome.

How can we reduce the burden for developers so that broadcasting an event is easy enough that they actually do it?

NOTE: These ideas might have implications for MediaWiki REST API.  REST API needs a way to store and serve HTTP response JSONSchemas.  Ideally the solution here would converge with the REST API schema serving solution.

A. DomainEvent schema generation helper
[edit]
  • Tooling to automate Event Platform JSONSchema generation from DomainEvent or perhaps via JsonCodec?
    • Committing the schema would still have to be manual and managed separately from MediaWiki codebases though, which is cumbersome.
B. Make MediaWiki a ‘schema registry’
[edit]
  • Make MediaWiki able to serve registered event schemas in an Event Platform compatible way.  I.e. <schema_registry_base_url>/<schema_title>/<schema_version>
  • Questions:
    • There are many wiki sites, but we need a canonical schema.  Should we keep using metawiki for this?
    • How could MediaWiki store each schema version?
    • Should MediaWiki keep static JSON schemas in its repo, or should this be generated dynamically from a code interface?  
  • MW REST API has some support for generating JSONSchemas from PHP.
  • Idea: add an optional getJsonSchema method to JsonCodec library.  This would allow serialization implementers to provide a JsonSchema for the JSON the serializer produces.
    • JsonCodec has a "hint" mechanism to provide the types of serialized/deserialized fields.  A JSONSchema could be used to indicate PHP types for marshalling, similar to how Event Platform Java tooling decodes into Spark or Flink Type systems.  
  • Idea: could json-ld be used to standardize some of what Event Platform schema conventions are doing?
C. Standalone schema registry service
[edit]

Recommendation: B. MediaWiki schema registry

[edit]

If done, this should be done in conjunction with MediaWiki REST API plans to serve REST response schemas in OpenAPI specs.

How to manage Event Platform streams?

[edit]

Event Platform streams are declared using the EventStreamConfig MediaWiki extension.  These streams are declared in mediawiki-config in $wgEventStreams.

The DomainEvents system supports hierarchical event types to allow listeners to register for only specialized events. A listener can choose to listen for all PageState related events, or for something more specific, like PageMoved.

Broadcasting events for each leaf DomainEvent event type is likely too fine grained, but the hierarchy itself does not help us determining the level at which an event should be broadcast.

We’ll need configuration to map between DomainEvent types and serialized integration events, stream names, serializers, schemas, etc.

A. Dynamic EventStreamConfig
[edit]

mediawiki.job.* streams are ‘dynamic’ via a regex pattern matching. This is good for JobQueue, as it means that job streams do not need to be explicitly declared in config.  It is bad for everything else.

  • There is no way to deterministically get a list of all streams.  
  • mediawiki/job schema is non-deterministic.  We can’t use it to automate integration with other systems like we can with deterministic schemas.

In 2024, WMF’s Experiment Platform team implemented (but did not use) a dynamic stream config feature in EventStreamConfig via a GetStreamConfigsHook. This hook allows for injection of stream config that is not statically declared in $wgEventStreams.

Broadcast-able DomainEvents could use this mechanism to declare event platform streams dynamically.

Additionally, we could use this hook to explicitly register JobQueue streams, rather than use regexes to match them.  This would greatly enhance discoverability and maintainability of the system, as all streams would be explicitly registered, rather than via a regex.

Note: metawiki is used as the global stream registry.  All streams must minimally be declared for metawiki.

Note: A reason Experiment Platform was advised not to use dynamic stream config is that they were intending to declare streams using a UI and database, rather than in code or config. EventStreamConfig automatically injected via classes declared in code does not have the same downsides as declaring stream datasets in a database.

B. Static EventStreamConfig
[edit]

This is the status quo.  Streams must be explicitly declared in config before they can be used.  

…other ideas?
[edit]

If anyone has other suggestions, please make them!

Recommendation: A. Dynamic EventStreamConfig

[edit]

If done, we should refactor how Job streams are declared, and deprecate and remove EventStreamConfig’s regex stream name matching features.

Data Interfaces: WMF wide or per wiki site?

[edit]

Both Event Platform and WMF wide APIs share a similar problem: we want to standardize APIs across WMF, but we are more than just one website.  We run ~1000 different wiki sites, and each one technically has its own codebase deployment and configuration.

If we implement platform features like dataset discovery and config (like EventStreamConfig) and schema hosting (like OpenAPI specs and event schemas) in MediaWiki, which wiki site is the canonical one for the platform?  How do non-wiki site specific services discover and use platform features?

Thus far, Event Platform in production relies on metawiki as its canonical configuration wiki. This is okay, because mediawiki-config is a global configuration repository, and it is deployed independently of metawiki.  

If we implement automatic stream declaration and schema registry hosting in MediaWiki as recommended above, these features will now be dependent on metawiki itself being deployed.  If a new DomainEvent (or REST API) is deployed in testwiki, but not yet in metawiki, it may not be functional in testwiki.

Recommendation: Solve together with REST API

[edit]

We need a canonical place for WMF platform features. If those features are to be implemented in MediaWiki, we’ll have to pick a wiki to be canonical, and handle its deployment in a special way.  However, it is not clear that using a wiki site is the best way to solve this problem.

This is an unsolved problem.  Ideally MediaWiki REST API and cross-service DomainEvents (and Event Platform) could use the same solution. We will likely revisit and explore this problem for MW APIs as part of the API module definition work planned in FY25-26 WE5.2.

How to receive cross-service events in development environments?

[edit]

We should not require developers to run Kafka just to develop DomainEvents.

Event Platform supports producing events without Kafka in dev envs.

We’d have to implement receiving cross-service events without Kafka, likely via a local file or a database table.

In Q4 of of FY2024-2025, a spike explored feasibility of receiving cross-service events in MediaWiki.  A demo patch showed integration events from stdin integration events might be routed to DomainEvent listeners.

Recommendation: pluggable means of receiving

[edit]

A cross-service receiving implementation must support a pluggable means of receiving.

Should we broadcast events without WMF Event Platform integration?

[edit]

❌For all MediaWiki receiving scenarios, (both wiki and non-wiki): we technically could, but we lose data reusability outside of MediaWiki

❌For beyond-wiki scenarios: no.

In order to use DomainEvents beyond-wiki:

  • events must have a deterministic schema
  • An event’s schema must be retrievable

WMF Event Platform provides tooling and conventions for doing this.

It should be possible to support the MediaWiki receiving wiki cross-service scenario without WMF Event Platform.  Receiving of these events would be confined to usage by MediaWiki.  We don’t strictly need a schema in this case, as the ‘schema’ is managed entirely in MediaWiki with JsonCodec.

It may be possible for MediaWiki to receive off-wiki events without being Event Platform aware, as long as there are strong conventions for events meant to be received by MediaWiki.  E.g.

  • How an event is ‘wiki aware’
  • How an event can be marshalled into a MediaWiki DomainEvents instance.
  • etc.

However, WMF Event Platform makes event data reusable by many consumers.  If a off-wiki emitter is emitting an event only for the purpose of being received by MediaWiki, it will not be reusable in other places.

Recommendation: No

[edit]

We should use Event Platform if we can. Event data will be more re-usable, and using an existent platform will ultimately be less work in the long run.

Does MediaWiki need to support cross-service integration events for 3rd party installations?

[edit]

Perhaps RPC JobQueue execution is enough for 3rd parties? This would mean only same-wiki (and limited RPC inter-wiki) receiving will be supported.

Implementing cross-service integration events will be complicated enough for WMF. If we can avoid 3rd party support, we should.

Recommendation: No

[edit]

How might we broadcast events outside of WMF production land?

[edit]

We do this now.  If events are in an event bus (e.g. Kafka, see event bus implementation idea below), then this is just another beyond-wiki scenario.

This can be done with:

It may be worth considering routing event streams in API gateway, or perhaps in wiki site endpoint(s).  This would help unify the HTTP APIs about MediaWiki in one place.

Recommendation: Out of scope

[edit]

This is out of scope for the MediaWiki Domain Events System.

How might we account for PII and external usage of events that are broadcast externally?

[edit]

❓Short answer: this is complicated.

How to know what is PII?

[edit]

To account for PII in broadcast events, there needs to be a way to identify what fields have PII in them.  

In T263672 Figure out where stream/schema annotations belong (for sanitization and other use cases), it was decided that this information does not belong in event schemas.  Schemas are reusable for multiple purposes.  Whether or not a field contains PII is dependent on the usage of the schema.  The dataset (stream or table) that conforms to the schema has the PII, not the schema itself.

Options
[edit]
  • Use (stream) configuration to indicate that certain schema fields have PII via stream configuration.
  • …?

How to redact PII for external or long term usage?

[edit]

This problem is similar to the Hive Event Table Sanitization process used to sanitize event data for long term storage so that it does not violate WMF’s privacy policy and Data Retention Guidelines.

Event streams are different.  WMF publishes event data externally via the EventStreams API.  

  • No PII should ever be published externally, whereas Hive tables are for WMF internal usage only.
  • Tables can be updated (theoretically), whereas events in streams are immutable.

There are two different problems here.

  1. How to broadcast redacted data
  2. How to redact already broadcast data

How to broadcast redacted data?

[edit]
A. Stream processing redaction
[edit]

Broadcast ‘private’ events once, and then use configured redaction information to redact streams in real time using a stream processor

B. Broadcast internal and public versions of event streams
[edit]

Broadcast each event twice to different streams, once for internal consumption, and again for external consumption.

How to redact already broadcast data?

[edit]

When emitted, event data may be technically PII free.  However, some data can be retroactively redacted on wiki, e.g. when the author of a revision is hidden (AKA revision deletion) and/or the action by the wiki administrator performing the revision delete is hidden (AKA suppression) from other admins.

Even though this data was temporarily available on wiki (and thus public), it is no longer.  Ideally, this redacted data would no longer be consumable via EventStreams.  Data in Kafka is generally immutable, so it cannot be selectively redacted.  See T241178.

A. EventStreams service redacts as it serves data.
[edit]
  • EventStreams would dynamically look up redaction configs and strip PII as it serves to users.  
  • Something (MediaWiki) would need to be able to instruct EventStreams about what to strip.  This could be quite expensive, as each event served to a client would result in a request to MediaWiki.
  • Redaction config, e.g. revision visibility, could be cached or maintained as a materialized view, to reduce requests to MediaWiki.  
  • It will be difficult to implement this feature generically.  Redaction semantics are dataset specific.
B. Compacted redacted Kafka topics.
[edit]
  • This only works for streams that are only meant to represent the current state of an entity.  Each key might only have one (the latest) event in the topic.  

Recommendation: out of scope

[edit]

None of the options are appealing.  

Domain Event System should not build support for beyond-wiki – public.  

If we solve this, it should be at a layer above the Domain Event System.

What about eventual consistency?

[edit]

Even in-process events do not have delivery guarantees. Some missed cross-service events will happen.

This problem and potential solutions are described in detail in T120242 Eventually Consistent MediaWiki State Change Events.  The potential general solutions rely on a database transaction to persist an event before broadcasting it.

MediaWiki is currently relying on transactions to persist semi-structured ‘events’ in a log in the logging table. This table is incredibly useful for users via the Special:Log feature.  It is not so useful for data re-use in our various cross-service scenarios.  The log table doesn’t cover everything, only what MediaWiki happens to log there.

Wikidata uses a limited Transactional Outbox to ensure eventual consistency of foreign wikis that use Wikidata entities.

It may be possible to write (temporarily) events to a MediaWiki database table.  This table could be used for MediaWiki features, like Special:Log and wikibase change tracking, but do so in a way that is standard and more structured.  This table could then be used to implement a transactional outbox from which events could be broadcast.

Other solutions and variants are proposed in T120242.

Before solving the eventual consistency problem, we should

  • Have a clear understanding of the use cases and value eventually consistent events would provide.
  • Have a clear understanding of database performance and maintenance implications.

Recommendation: Out of scope

[edit]

For now, eventually consistent events is out of scope of the cross-service Domain Events System.

Dependency Tracking and Domain Events?

[edit]

Wikis need to know when specific entities that they are ‘transcluding’ or referencing have changed. E.g.

These examples more or less track dependencies between wiki site entities (typically pages) on a fine grained level.  Each of these ‘dependency tracking systems’ are implemented in their own bespoke way (typically as shared/central database tables).  

It seems likely that cross-service event support would aid the creation of a generalized dependency tracking system. However, events on their own do not provide a mechanism for identifying the individual entities (pages) on one wiki affected by a change on another wiki.

See also:

Possible cross-service Implementations

[edit]

A complexity for cross-service receiving is the need for global listener registration.  For in-process wiki receiving, emitter processes have the same wiki site configuration context as receivers.  Events can be directly dispatched to registered receivers.

In cross-service scenarios, this is usually not the case.  An event must be broadcast, and some intermediary must be responsible for managing listener registration and invoking listeners.

Possible implementations are detailed below.

cross-service scenario coverage – Does the implementation support the scenario?
wiki –

same-wiki

wiki –

inter-wiki

wiki –

off-wiki

non-wiki beyond-wiki beyond-wiki – public
JobQueue ❌* ❌*
event bus

* See asterisk * in JobQueue & DomainEvents – cross-process wiki receivers section below.

JobQueue

[edit]

MediaWiki JobQueue’s purpose is to perform “long-running tasks asynchronously”. As of 2025, WMF uses an Event Platform + Kafka JobQueue implementation.

JobQueue summary

[edit]
  • EventBusJobQueue implementation serializes a MediaWIki IJobSpecification into an Event Platform mediawiki/job JSON event, and produces it to Kafka via the EventBus extension.  
  • The database field in the serialized job event is populated according to the JobQueueGroup instance that was used to submit a job to a specific wiki ID.  
  • changeprop-jobqueue, a NodeJS service, consumes this event from Kafka, and then POSTs it to a WMF custom HTTP RunSingleJob.php entrypoint.  
  • This entrypoint determines which wiki site to launch based on the value of the job event’s database field. (E.g. if database==”enwiki”, the Job will be executed in enwiki context.)  

JobQueue has nice features (retries, deduplication, etc.) that would be arduous to build into a brand new cross-service event receiving mechanism. We may consider utilizing the JobQueue for cases that need these features.  See the event bus retry implementation section for details.

JobQueue & DomainEvents – cross-service wiki receivers

[edit]

* While JobQueue can support some MediaWiki to MediaWiki use cases, it cannot do so in an event driven and reusable data way. That doesn’t mean that supporting this will not be useful!

JobQueue can easily be used to support same-wiki receiving variant.  

JobQueue can support some same-wiki and inter-wiki use cases, but jobs must be explicitly submitted to the destination wiki. The job submitter must be aware of the desire of the destination wiki to receive the event.

No other scenarios can be easily supported. Job events are not meant for ‘broadcast’ receivers; they are MediaWiki specific RPCs that are meant to be executed once.  They must be ‘multicast’. (It may be possible to multicast to a specific wiki, and then have that wiki fan-out events to listeners, reducing the number of times a specific event must be sent to a single wiki.)

Pros and Cons

[edit]

Pros:

  • Already exists
  • Supported by WMF SRE.
  • Supports 3rd party usage
  • Supports explicit ‘multicast’ MediaWiki to MediaWiki use cases

Cons:

  • Jobs are commands, not events
  • Does not improve data re-use; job events are not reusable.  
  • JobQueue cannot be used for beyond-wiki receiving scenarios.  

off-wiki and non-wiki RPC ‘receiving’ may be possible if JobQueue can accept submissions from non MediaWiki processes.

2 JobQueue based implementation ideas are described below.

Implementation: JobQueue invocation mode

[edit]

NOTE: This implementation was considered and declined in T384609 Domain Events: support listener invocation via the job queue.

See also:  Event Dispatcher - Do not implement a job-based asynchronous invocation mode for listeners

This implementation would allow listeners to register to be invoked in a job.

// In enwiki context
$domainEventSource->registerListener(
    “PageUpdated”,			// <- DomainEvent type
    $listener, 			// <- listener callback
    [“invocation_mode” => job”]	// <- listener options
)

Supporting the same-wiki scenario is straightforward.

  • The emitter’s process will have the same local wiki context and configuration as the cross process wiki receiver.  
  • The emitter’s DomainEventDispatcher instance can know about cross-process local wiki subscribers.  
  • The emitter’s DomainEventDispatcher can serialize the DomainEvent into a custom DomainEventJob and submit it to the local wiki’s JobQueue.
  • The DomainEventJob will be executed by a JobExecutor – this is the receiver’s process. DomainEventJob can have an instance of DomainEventDispatcher.  This DomainEventDispatcher can be used to dispatch the DomainEvent deserialized from the job to subscribed listeners.

Supporting inter-wiki scenarios via the JobQueue is more difficult:

  • JobQueue does support submitting inter wiki jobs, but the target wiki must be explicitly specified.
    • $jobQueueGroupFactory->makeJobQueueGroup(  $targetWikiId )->push( $job )
  • Unless the emitting wiki somehow has access to subscribership, its DomainEventDispatcher cannot call makeJobQueueGroup for the correct wikis.

Implementation: JobQueue Ingress Object Helpers

[edit]

T387010 Domain Events: allow listeners to execute code via the job queue

See also: Event Dispatcher - Do not implement a job-based asynchronous invocation mode for listeners

Instead of building in direct support for job based invocation, ingress objects could explicitly submit a job for later execution.  A convenience function could be provided by EventIngressBase to allow Ingress objects to automatically schedule a function to handle the event later in an async Job.

This is a much simpler and straightforward usage of the JobQueue to defer listener execution.

inter-wiki use cases can be supported, but the same-wiki listener that submits the Job must explicitly submit the Job to the foreign wiki’s JobQueue.  This means that the Job and or async listener function must be implemented in the foreign wiki, which is not guaranteed.

WIP: DomainEvents: add convenience for scheduling jobs (1118804)

Idea: non MediaWiki Job submission

[edit]

off-wiki is likely one of the highest value scenarios to support.  It may be possible to utilize the JobQueue to enable use of off-wiki generated or derived data in MediaWiki.  

Jobs are RPCs, so do not improve data reusability. However, events can help integrate data between systems by providing a standard data transfer mechanism. The Search Team recently implemented support for RPC based updates to page documents in ElasticSearch via WeightedTags update events.

If it were possible for non MediaWiki processes to submit Jobs to the WMF’s EventBus JobQueue, off-wiki generated data could be explicitly received by MediaWiki in a Job.

This idea is a shortcut around implementing custom MediaWiki API endpoints that can be used to submit Jobs.

But, why not just submit an MediaWiki API request?

  • API requests are runtime coupling between ‘emitters’ and ‘receivers’.
  • A MediaWiki API endpoint and Job must be implemented for each use case.
  • HTTP APIs perform poorly for backfilling. The queue nature of WMF’s Kafka based JobQueue can throttle data backfills without losing data.

Examples:

This idea needs more exploration, but it could be a shortcut to quickly achieving some value to support off-wiki use cases.  

Note that this idea is a shortcut.  It would be preferable if MediaWiki could receive events directly, as described below in the event bus implementation.

The MediaWiki JobQueue exists now and the complexities of this idea are relatively well known.  This idea may provide a low effort way to improve usability of off-wiki generated data.

Implementation: Event bus

[edit]

Note: ‘event bus’ (lower case) is a generic term, and does not refer to the MediaWiki EventBus extension.

How might an event bus (e.g. Kafka) be used to implement cross-service receiving?

This section will explore using an event bus to support all cross-service scenarios.

Goal:

  • Each event is only broadcast once
  • A wiki can register to listen for wiki specific events at minimum per wiki.
    • enwiki can receive PageUpdated from enwiki, or PageUpdated events from commonswiki.
    • wikidatawiki may possibly receive PageUpdated events from all wikis
  • A non-wiki specific event can be received by any wiki site
    • enwiki receiving IpReputationChange events
  • A broadcast event can be received beyond-wiki
    • Data Lake ingestion, stream processing, etc.

MediaWiki Broadcasting

[edit]

The actual broadcasting mechanism is relatively straightforward; the EventBus extension has been doing this for years.

Complexities arise when we consider the details of MediaWiki receiving and how they relate to event serialization and stream segregation.

A. DomainEvent listener broadcasts
[edit]
  • An in-process DomainEventSubscriber instance subscribes to configured DomainEvent types.  When the listener function is invoked for an event, it:
    • serializes the event
    • determines the stream (AKA queue, topic, channel) name for the event.
    • Produces the serialized event to the event bus.

This is generally how EventBus works now, but there will be changes needed in order to support MediaWiki Receiving.

Pros:

  • EventDispatchEngine does not have to be aware of cross-service complexity.
  • Less complexity: we do not have to support cross-service for 3rd party MediaWiki installations.

Cons:

  • MediaWiki 3rd party cross-service usage is not supported.
B. DomainEventDispatcher broadcasts
[edit]
  • EventDispatchEngine broadcasts configured event types to an event bus
  • The emitter’s process doesn’t know about registered listeners, so it can’t use that info to determine which events to broadcast. It would just broadcast all configured event types.

Pros and Cons

  • the inverse of Option A.

Recommendation: Option A

[edit]

Option A. DomainEvent listener broadcasts events

Stream Segregation

[edit]

Depending on the chosen MediaWiki receiving implementation (see below), we may need to broadcast events to wiki-specific streams, rather than streams with events from all wikis.

Note: the term ‘integration event’  refers to whatever level events are broadcast to a stream.  ‘Event type’ refers specifically to the $eventType of the DomainEvent class implementation, (e.g. ‘PageRevisionUpdated’.) Multiple DomainEvent types will likely be serialized and broadcast to the same integration event stream, E.g. all PageState change events (regardless of their specific ‘type’) would be broadcast to the same mediawiki.page_change stream.

A. Stream per integration event
[edit]

The existent (2025) EventBus extension broadcasts events from any wiki to a single stream per integration event.  E.g. all wikis produce page state change events to the same mediawiki.page_change.v1 stream.

All receivers must filter for the wiki events they are interested in. Note that EventStreams API users have to do this now.

We could consider adding rudimentary (wiki_id only?) filtering capabilities to the DomainEventSource implementation to hide the filtering from the receiver, but the filtering would still be done in the receiver’s process.

Pros:

  • simple
  • status quo

Cons:

  • Receiver processes have to do a lot of filtering.
  • Does not account for private wikis.  EventStreams API exposes event streams for public consumption; private wiki events must not be included. This can be supported with a special case separate stream for private wikis, but understanding this special case can be awkward for consumers.

An optimization would of Option A. would be to set Kafka message headers with simple information, e.g.

{ “wiki_id”: “commonswiki” }

This would still require receiver side filtering, but would reduce the overhead.  Full events would not need to be deserialized or marshaled.  

Pros:

  • Slight performance improvement over plain Option A.

Cons:

  • Low flexibility in header values to set. Kafka headers must be set when broadcasting.
  • Receiver processes have to do a lot of filtering.

This can be done if we chose Option A, so it is not listed as its own standalone option.

B. Stream per wiki per integration event
[edit]

Declare individual streams for each wiki and integration event.  E.g. mediawiki.commonswiki.page_change.v1.  Receivers then could indicate which streams to consume from based on how they register listeners.

EventStreamConfig allows for specifying the topics that make up a stream.  It is possible to declare streams that are made up of the topics from many streams.  E.g. we could have a mediawiki.all.page_change.v1 stream that is composed of all of the topics for all wiki page_change.v1 streams.

Pros:

  • Supports wiki-aware scenarios: inter-wiki and off-wiki where events are about a specific wiki site
  • Flexibility in how streams are declared and grouped
  • WMF Event Platform and EventStreamConfig exist
  • May integrate better with WMF MediaWiki APIs as they are usually per wiki site (e.g. do we want to expose EventStreams for a wiki within that wiki site? Or via API Gateway?)
  • Enables a requested EventStreams feature.

Cons:

  • Adds complexity when determining the stream a receiver wants.
  • Adds complexity for Data Lake where all streams should be ingested.  
    • Data Lake shouldn’t ingest each stream as its own table.  Data Lake ingestion would join the streams into one table and partition on wiki_id.
    • (Note: support for custom Data Lake table partitioning will be added in T377600.)
  • Unclear plans for long term support of EventStreamConfig MediaWiki extension.
  • More topics.  With ~1000 wikis, that means *1000 more topics for the ‘all’ stream case.
C. Hybrid: broadcast multiple streams for the same integration event
[edit]

Instead of only Option A. or Option B., do both.

That is: broadcast both mediawiki.page_change.v1 as well as the per wiki streams, e.g. mediawiki.commonswiki.page_change.v1, mediawiki.enwiki.page_change.v1 etc.

Pros:

  • Receivers choose which streams are best for their use case, per wiki, or global, and don’t have to filter.

Cons:

  • Broadcasters must be aware of the desire and configured to support different ways of consuming events.
  • Events are no longer broadcast only once.  More opportunities for inconsistencies and need for bespoke reconciliation.
D. Hybrid: stream processing to split or join streams
[edit]

Achieve both Option A. and Option B. by using stream processing to produce one from the other.

Something like:

  • Choose Option A. Stream per integration event, e.g. mediawiki.page_change.v1
  • Stream processing consumes mediawiki.page_change.v1 and produces the split downstream per wiki streams e.g. mediawiki.commonswiki.page_change.v1, mediawiki.enwiki.page_change.v1 etc.

Or

  • Choose Option C. Stream per wiki per integration event, e.g. mediawiki.commonswiki.page_change.v1, mediawiki.enwiki.page_change.v1 etc.
  • Stream processing consumes all page_change streams and joins them into one (or few?) combined mediawiki.page_change.v1 streams.

Pros:

  • Receivers choose which streams are best for their use case, per wiki, or global, and don’t have to filter.

Cons:

  • Must maintain a separate stream processing application.
  • More opportunities for inconsistencies and need for bespoke reconciliation.

Recommendation: Option A. Stream per wiki per integration event

[edit]

Option A. is easier to implement, and is the status quo.  It may be that extra listener filtering is not that bad.

Option A. also leaves room for optimization. We could still do one of the hybrid solutions later.

However, Option A. must take into account private wikis.  These are currently (and will need to be) in separate streams. If the PII problem is addressed, it may be possible to circumvent this complexity. And/or if we chose Option C. or D., this problem would be addressed naturally.

MediaWiki Receiving

[edit]

Once events are in a bus, in order to be received by MediaWiki, something will need to:

  • Map from registered listeners to relevant streams
  • Consume events from relevant streams in the event bus
  • Deserialize events (and marshal them into a DomainEvent instance)
  • Invoke registered listeners for that event

At WMF, Apache Kafka is our event bus. For the purposes of this design doc, we will explore targeting support for Kafka.

Note: the event bus implementation will need to be pluggable. We’ll want to support cross-service receiving in dev environments without Kafka.

Consuming Events

[edit]

Receiving events != executing jobs.

WMF Event Platform JobQueue uses an external service (change-propagation) to consume all job events from and execute the appropriate job via an HTTP call.  

Unlike job RPCs, information about what code to execute is not included in a broadcast event. Listener registration is known only to the DomainEventSource in MediaWiki. This information is needed to determine which streams to consume from.

A: External service
[edit]

E.g. change-propagation.

To support this global MediaWiki listener registration will need to be exposed to the service, perhaps via an HTTP API.

Exposing MediaWiki listener registration so that an external service can consume and then route events to MediaWiki sounds quite fiddly.  If we can avoid this, we should.

It could be avoided by routing all events to every wiki, but this would be quite wasteful.

B: PHP Kafka Consumer
[edit]

In 2025, there are some decent PHP Kafka clients . If we could use MediaWiki constructs to configure event bus (Kafka) consumer clients, we could then use consumed events to invoke registered listeners.  

Long lived PHP
[edit]

Usually, event bus consumers are long lived processes.  PHP processes are meant to serve one HTTP request at a time; PHP may not function well as a very long lived process .  Long lived MediaWiki maintenance jobs can be launched from the CLI, but they aren’t meant to live forever.

The SRE ServiceOps team has recently (late 2024) added limited support for scheduling long running MediaWiki jobs in k8s.

MediaWiki receiving could be implemented in a similar way: a ‘domainEventConsumer.php’ maintenance script. This script could consume one event at a time (like RunSingleJob.php does now), or consume batches of events as a ‘long lived’ maintenance script.

Each wiki site would have to run at least one ‘domainEventConsumer.php’ script.

See: php-kafka-consumer demo

Recommendation: PHP Kafka Consumer

[edit]

The remainder of this design will assume we can use a PHP Kafka consumer in MediaWiki.

If this assumption is invalid, we will need to think more about how to achieve event consumption via Option A.

Kafka consumer group granularity

[edit]

When a Kafka client consumes from topics, it registers as a ‘consumer group’.  The consumer group id can commit topic-partition offsets back to Kafka.  Kafka keeps track of committed offsets per consumer group.

Within a single wiki site, an individual broadcasted event may be received by many listeners.

At what level should Kafka consumer groups be used?  

Note: It is not clear how well Kafka consumer groups scale.  If a bottleneck is too many groups committing offsets to Kafka, it may be possible to overcome this by delegating consumer group offset management to MediaWiki. This is a common practice in some framework’s Kafka consumers (e.g. Flink, Gobblin, etc.).

Note: These options are about Kafka consumer groups.  There can still be multiple consumer processes within one consumer group, for scaling purposes.

A. Per wiki site and registered listener
[edit]

In lieu of practical constraints, this is the ideal and most correct option.

Pros:

  • Listeners remain isolated.  A failure in one listener does not cause the others to stall.
  • Offsets are committed to Kafka for each listener. Listeners can progress independent of other listeners.

Cons:

  • Too many Kafka consumer groups.
    • ~1000 wiki sites with ~100 registered listeners per wiki site == ~100,000 distinct Kafka consumer groups
    • How many consumer groups can Kafka handle?  Kafka Brokers can scale and parallelize many client connections, but not necessarily consumer groups. A rudimentary search reveals 10,000 might be too many.
    • A lot of network traffic: Each event is consumed multiple times – once for each registered listener.
    • Too many consumer groups to monitor and troubleshoot when things go wrong.
B. Per wiki site and Subscriber instance
[edit]

Each DomainEventSubscriber instance gets its own Kafka consumer group.

Pros:

  • Subscriber instances remain isolated.  
    • A Subscriber might register multiple event listeners, but a Subscriber is generally implemented to receive events for a similar purpose.
  • Offsets are committed to Kafka for each Subscriber instance.
    • Subscribers can progress independent of other Subscribers.
  • Subscribers can be monitored and maintained individually.

Cons:

  • A lot of Kafka consumer groups.
    • ~1000 wiki sites with ~20 Subscriber instances per wiki site: ~20,000 Kafka consumer groups.
    • Still a lot of network traffic: Each event is consumed multiple times – once for each Subscriber instance.
    • A lot of consumer groups to monitor and troubleshoot when things go wrong.
C. Per wiki site
[edit]

Pros:

  • ~1000 Kafka consumer groups.  Manageable.

Cons:

  • Subscribers and listeners are no longer isolated. A failure of one might fail the others
  • Offset commits are per wiki.  Any registered listener may halt consumer progress.
D. Global
[edit]

Note: this option will not be possible with a MediaWiki PHP Kafka consumer.  This option mainly exists to illustrate the difference in Kafka consumer group granularity if we used an external process to consume from Kafka.

A single global Kafka consumer group (outside of a wiki site context):

  • consumes all relevant event streams
  • Launches each wiki site and hands off consumed events (individually or batched)
  • Each wiki site invokes their registered listeners with received events
    • Events without registered listeners are dropped

This is similar to how WMF JobQueue works now.  Change-propagation is the global consumer that consumes all mediawiki.job.* events, and each wiki site is run via the RunSingleJob.php custom endpoint.

However, each job event is destined to a specific wiki site & job callback.  

For event receiving, a global consumer does not know what listeners a wiki site has. For every event (or batch of events), a HTTP call to a hypothetical ConsumeEvent.php custom endpoint on each wiki must be made. If a wiki site doesn’t have any registered listeners for an event, the wiki site is instantiated unnecessarily.

Fan out to all wikis is handled by HTTP calls, rather than by Kafka consumers.

Pros:

  • A single Kafka consumer group to manage
  • Each event is only consumed once (for all MediaWiki sites)

Cons:

  • Unnecessary wiki site instantiation
  • Global consumer does not run in wiki context
    • No way to know about a wiki site’s registered listeners
  • Still a lot of network traffic; the traffic is in HTTP calls rather than via many event consumers.

Recommendation: Option C

[edit]

Option C. Per wiki site.  

TODO: vet and add more detail on recommendation, especially with SRE and ServiceOps.

Retries & deduplication

[edit]

Assuming we will not isolate cross-service listener invocation with Kafka consumer groups, we may need to support retries.

in-process listeners are also not isolated, and may fail each other, and could benefit from a supported retry feature.

A. JobQueue for retries
[edit]

The WMF Event Platform based JobQueue has a lot of nice features that would be significant work to build from scratch.  Most relevant are:

Guaranteed retries. The job executor endpoint in MediaWiki responds with an HTTP error code in case the execution has failed. In that case ChangePropagation posts a retry event into a special topic and retries executing the job with exponentially growing delay up to a configurable number of times.

Commit management. ChangePropagation service makes sure that every event is somehow acted upon - either successfully executed or deduplicated, retried or acknowledged as completely failed. None of the events should be lost.

Rather than reimplement these features for cross-service receiving, JobQueue could be used by any listener that needed more than ‘fire and forget’ capabilities.  This mechanism could be used for both in-process and cross-service receivers.

Using the feature described in T387010 Domain Events: allow listeners to execute code via the job queue and here in the Implementation: JobQueue Subscriber sugar section, Subscribers would simply submit a job to have one of their methods invoked later with the event.

Pros:

  • Already exists
  • Works relatively well
  • Will “work” (with fewer features) outside of WMF production with any MediaWiki JobQueue implementation
  • JobQueue at WMF is not going away, even if it WMF’s implementation is replaced or improved

Cons:

  • Relies on change-propagation service, which is not well maintained at WMF (as of 2025-02).
    • Introducing this dependency might imply extending change-propagation support window (e.g. postpone its deprecation), or taking over maintainership of the service.
  • Retries for event receiving are just RPCs (perhaps is this a pro?) instead of event driven.  
    • I.e. Instead of each receiver managing its own success/failure states and progression (like stream processing frameworks do), retries are handled as separate async work tasks specific to each listener.
B. Kafka Share Groups (AKA Queues)
[edit]

Kafka Share Groups are a Kafka 4.0 feature that enable queue-like semantics for Kafka consumers.

Rather than relying on change-propagation’s implementation of retry topics, use of Kafka Share Groups could enable both log and queue style consumption of event data from Kafka.

Individual Kafka messages can be acknowledged (or rejected) by a consumer.  Rejected messages can be retried later.

Perhaps, JobQueue and cross-service DomainEvent listeners could utilize the same MediaWiki PHP based system for consuming events, handling retries, and executing callbacks.  This could be a way to move away from change-propagation.

Pros:

  • Opportunity to decommission Change-Propagation and consolidate and improve MediaWiki ability to execute async functions.

Cons:

  • A lot of work.
  • A lot of unknowns.

Long Term Recommendation: Option B. Kafka Share Groups

[edit]

There is a lot of potential for consolidation of unmaintained systems. If this works, both MediaWiki Jobs and events throughout WMF could use the same system for consuming events.

Short Term Recommendation: Option A. JobQueue for Retries

[edit]

JobQueue exists now, and change-propagation isn’t going away without a concerted effort. Until WMF prioritizes this, cross-service Domain Events receiving should rely on JobQueue for retries.

Listener registration and invocation

[edit]

Note for readers: This section contains a lot of hypothetical code detail. It is kept here for posterity as an exercise in how DomainEvent interfaces might be used to receive cross-service integration events.

In MediaWiki, how might a listener register to receive events from an event bus, and then be invoked when that event is consumed?

Overview of in-process listener registration

[edit]

Here is summary of how in-process listener registration works.  More details are in the Manual:Domain_events/Architecture.

There are 2 relevant interfaces:

  • DomainEventSource – listener callback registration
  • DomainEventSubscriber – Sugar to reduce boilerplate and automate configuration for listener callback registration.

See the Domain Event System Glossary for official definitions.

The DomainEventSource registerListener interface is defined as:

public function registerListener(
    string $eventType,
    $listener,
    array $options = self::DEFAULT_LISTENER_OPTIONS
): void;

While it is possible for users to explicitly call registerListener, it will be more common for extensions and other usages to use the DomainEventSubscriber interface. The EventIngressBase implementation reduces boilerplate code, as well as provides a means for lazy instantiation for Subscribers, delaying instantiation until a Subscriber’s listener needs to be invoked. The extension.json ExtensionRegistry config interface for in-process Subscribers looks like:

"DomainEventIngresses": [
    {
        "class": "MyExtension\\MySubsciber",
        "events": [ "PageUpdated", "ImageUpdated" ],
    },
]

MySubscriber class then implements listeners that get automatically registered based on naming convention:

class MySubscriber {
    public function handlePageUpdatedEvent( PageUpdatedEvent $event ): void {
        // code here
    }
    public function handleImageUpdatedEvent( PageUpdatedEvent $event ): void {
        // code here
    }
}

MediaWiki core ServiceWiring and ExtensionRegistry will automatically call registerListeners on the in-process DomainEventSource implementation in EventDispatcherEngine.

Event bus cross-service listener registration

[edit]

in-process listeners generally only need to provide $eventType to register a listener callback.

cross-service event bus listeners need to provide more information.  E.g.

  • Stream name(s) – which streams have the events for the desired $eventType?
  • Event bus config – how to connect and consume event messages: broker addresses, TLS settings, etc.?
  • What JsonCodec serializer class should be used to deserialize and marshal the events?

Much of this can be injected into the DomainEventSource implementation via configuration.

However, stream names need to be specified when the listener is registered. The registerListener $options parameter can be used for this purpose.

Given that cross-service listeners require more registration config, it should be cleaner to use different cross-service DomainEventSource and DomainEventSubscriber implementations than the in-process base classes, i.e EventDispatchEngine and DomainEventIngresses.

In order to convince ourselves that this is possible, let’s explore how this might be implemented.

See also

Assumptions:

  • At registration time, a subscriber only needs to provide $eventType and stream names
  • Other configuration can be retrieved using $eventType and/or stream names as config keys

Note: The code samples below are incomplete PoC ideas for illustration purposes only.  Details (subscriber resolution, event type chain hierarchy, constants, JsonCodec implementation and selection etc.) are omitted.

/**
 * Extensions subclass this to implement listeners.
 * They must implement the getStreamNames() method.
 */
abstract class EventBusSubscriberBase
    extends EventSubscriberBase {

    /**
     * Returns a list of remote event streams that this
     * subscriber should receive events for this $eventType.
     * 
     * Subscriber implementers must return stream names can
     * be used by EventBusSource to look up relevant config
     * for those streams.
     *
     * NOTE: we may be able to provide some defaults or
     *       an abstraction on this to aide in selecting
     *       the correct streams.
     *
     * @return string[]
     */
    abstract public function getStreamNames( 
        string $eventType 
    ): array;

    /**
     * getListenerOptions will be called by EventBusSource 
     * when registering a listener for an event type.
     *
     * NOTE: getListenerOptions is defined by
     *       EventSubscriberBase and is not part of
     *       the DomainEventSubscriber interface.
     */
    protected function getListenerOptions(
        string $eventType,
        string $suffix
    ): array {
        $options = parent::getListenerOptions(
            $eventType,
            $suffix 
        );
	    // look up the stream names the Subscriber wants
        // to receive for this $eventTYpe.
        // E.g for ImageUpdated $eventType, 
        // this might return:
        //   [ 
        //     "mediawiki.commonswiki.image_change.v1", 
        //     "mediawiki.enwiki.image_change.v1" 
        //   ]
        $streamNames = $this->getStreamNames( $eventType );

        if ( $streams !== null ) {
            $options["stream_names"] = $streamNames;
        }

        return $options;
    }

}

A different extension.json ExtensionRegistry ‘EventBusSubscribers’ config key could be used to register cross-service EventBusSubscriberBase implementations.

"EventBusSubscribers": [
    {
        "class": "MyExtension\\MyEventBusSubscriber",
        "events": [ 
            "PageUpdated",
            "ImageUpdated",
            "PageRevertRiskUpdated"
        ],
    },
]

An EventBusSource DomainEventSource could implement the required logic to use EventBusSubscriber listener options to call registerListener, get the listener options, and use that to invoke listeners with incoming events.

/**
 * DomainEventSource for events received from an event bus.
 * 
 * Note that unlike the in-process EventDispatchEngine
 * DomainEventSource, this is not also a 
 * DomainEventDispatcher. DomainEventDispatchers are used
 * by emitters to emit events.
 * 
 * This class does provide a way to invoke registered
 * listeners with incoming events, similar to a
 * Dispatcher, but the difference is that the events
 * are passed in as they are consumed from an event bus.
 */
class EventBusSource implements DomainEventSource {
    /**
     * An associative array mapping event names and streams
     * to lists of listeners.
     *
     * @var array<string,array<callable>>
     */
    private array $listeners = [];

    /**
     * Set of streams that listeners have explicitly
     * registered for.
     * NOTE: this is a string: true map
     *       to manage this as a Set, rather than a List.
     * @var array<string,true>
     */
    private array $streamNames = [];

     /**
      * @param string $eventType
      * @param callable $listener
      * @param array $options
      *     The "stream_names" key will list
      *     streams to consume from.
      */
    public function registerListener(
        string $eventType,
        $listener,
        array $options = self::DEFAULT_LISTENER_OPTIONS
    ): void {

        $streamNames = $options["stream_names"];

        foreach ( $streamNames as $streamName ) {
            $listenerKey = "$eventType@$streamName";
            // save this $listener to be invoked with
            // events consumed from streamName that are
            // the desired $eventType.
            $this->listeners[$listenerKey][] = $listener;
            // Save the fact that there are listeners
            // registered for events from this stream.
            $this->streamNames[$streamName] = true;
        }
    }

    /**
     * Returns list of all stream names that have registered
     * listeners.
     *
     * This will be used by the event bus consumer to
     * determine which streams to consume from.
     * @return <array<string>>
     */
    public function getRegisteredStreamNames(): array {
        return array_keys( $this->streams );
    }

    /**
     * Invoke all listeners that have been registered for
     * the event's type and stream name.
     *
     * This function will be called by the event bus
     * consumer with consumed events.
     *
     * NOTE: We could implement DomainEventDispatcher and 
     *       dispatch() method, but should we?  
     */
    public function invokeListeners(
        DomainEvent $event, 
        string $streamName
    ): void {
        $eventType = $event->getType();
        $listenerKey = "$eventType@$streamName"
        $listeners = $this->listeners[$listenerKey] ?? [];

        foreach ( $listeners as $listener ) {
            // invoke $listener with $event
            $listener( $event );
        }
    }
}

ServiceWiring and ExtensionRegistry can then instantiate EventBusSource, register all EventBusSubscribers declared in extension.json, and provide it as an ‘EventBusSource’ service object.

Finally a domainEventConsumer.php maintenance script can use the EventBusSource service object to:

  • Use stream name to lookup other required configuration, like broker addresses, JsonCodec deserializer classes, etc.
  • consume events from streams that have registered listeners
  • Decode those events using JsonCodec into a DomainEvent class
  • have those listeners invoked with consumed events

domainEventConsumer.php would be executed periodically and frequently to consume ‘batches’ of events.

class DomainEventConsumer extends Maintenance {

    // ...

    private function consumeEvents(
        $maxEvents, 
        $maxTime
    ): void {
        // This EventBusSource should have access
        // to all registered listener and listener options.
        $eventBusSource = $this->getServiceContainer()->getEventBusSource();

        // EventConsumerFactory is a hypothetical interface.
        // 
        // Given stream names, it creates an an EventConsumer 
        // instance that can consume events one at a time from 
        // those streams.
        //   
        // This assumes that an EventStreamFactory service object
        // instance is injected with relevant configuration 
        // needed to do this.
        // 
        // E.g. for Kafka:
        // - JsonCodec class to use to deserialize a
        //   consumed JSON message into a DomainEvent instance
        // - For a KafkaEventConsumerFactory:
        //   kafka client configuration, including broker
        //   addresses, consumer group id, etc.
        // - For a developer mode file or database table
        //   implementation: file path from which to consume 
        //   events, or database table from which to read events.
        $eventConsumerFactory = $this->getServiceContainer()->getEventConsumerFactory();

        // Create an EventConsumer for all streams with
        // registered listeners in the current wiki site.
        $eventConsumer = $consumerFactory->newConsumer( 
            $eventBusSource->getRegisteredStreamNames();
        );

        $startTime = MWTimestamp::time();
        for ( $count = 0; $count < $maxEvents; $count++ ) {

            // consume() might return a DomainEvent and a
            // $streamName it was consumed from. (TBD)
            list( $event, $streamName ) = $eventConsumer->consume();

            $eventBusSource->invokeListeners(
                $event,
                $streamName
            );

            $timeElapsed = MWTimestamp::time() - $startTime;
            if ( $maxTime > 0 && $timeElapsed > $maxTime ) {
                break;
            }
        }
    }
}

Okay okay, enough detail.  We could go farther into how a hypothetical EventConsumer and relevant interfaces and configs might look and how service objects are wired together, but I think we have enough.

Assuming we can use a Kafka consumer in MediaWiki, we can use Domain Events System interfaces to support receiving cross-service events.


Recommendations

[edit]

Long term

[edit]

Short term

[edit]

Those long term recommendations are a lot of work!  And there are still many unknowns.  Are there smaller portions that we could do soon?  What will provide the most value with the least amount of effort?

Which cross-service scenario support would WMF benefit mostly? What capabilities are we currently (2025-04) lacking?

  • beyond-wiki is manually achievable with EventBus and Event Platform.
  • same-wiki async event handling is achievable with JobQueue RPCs via JobQueue Subscriber sugar (this does not improve data reusability)
  • inter-wiki async event handling is achievable with JobQueue RPCs, and possibly with JobQueue Subscriber sugar (this does not improve data reuseability)

That leaves the off-wiki scenario.  MediaWiki currently has no standard mechanism to receive events (or RPC jobs) from non MediaWiki emitters. Supporting the off-wiki scenario has a potentially high impact at WMF, especially as more product features that need ‘generated data’ (outside of MediaWiki) are prioritized.

Assuming that non MediaWiki emitters will use Event Platform, we can skip a lot of the complexities around automatically managing schemas and streams and broadcasting with MediaWiki.  Here are 2 possible options to achieve short term results.

Event bus MediaWiki receiving

[edit]

Doing this would open up a standard way for non MediaWiki emitters to transfer data into MediaWiki for serving product features.

  • Determine feasibility of PHP MediaWiki Kafka Consumer
  • Continue exploring the event bus MediaWiki receiving implementation
  • Implement DomainEvent event bus interfaces and support, integrated with (static) EventStreamConfig.

Implementing event bus MediaWiki receiving would allow non MediaWiki emitters to emit data in a way that is decoupled from the MediaWiki usage of that data.  

E.g. PageRevertRiskScore events could be emitted by LiftWing.  Those events could be consumed into the Data Lake, by Enterprise, or by MediaWiki for building product features like moderator tools.

The emitted event data would be reusable for many use cases, including those inside of MediaWiki.

Non MediaWiki job submission

[edit]

This would allow non MediaWiki emitters to push event data (or just jobs) into MediaWiki wiki sites directly.  This solution would support usage of generated data in MediaWiki, but it would not improve data reusability.

E.g. LiftWing, would have to emit multiple versions of PageRevertRiskScore event data, for Data Lake and Enterprise, and also as a job into each wiki where the data should be pushed.

Recommendation: explore both

[edit]

Both RPCs and events have their use cases, and their pros and cons.  

It may be possible to quickly implement non MediaWiki Job submission now, and the unknowns and complexities are limited.

Event bus MediaWiki receiving will be powerful, but there are still complexities and unknowns, and will likely be a lot of work.