Event Platform/EventStreams HTTP Service - Wikitech
Jump to content
From Wikitech
Event Platform
(Redirected from
Event Platform/EventStreams
Event Platform
Documentation
Schemas
Schema guidelines
Event stream configuration
Producer requirements
Flaws
Decision log
Stream Processing/Flink
Stream Processing/Flink Catalog
Stream processing use cases
Stream processing framework evaluation
Instrumentation tutorial
Differences from EventLogging legacy
Disambiguation page
Maintainers
History
Services, libraries, and repositories
Event Utilities
(code client libraries)
EventGate service
admin
EventStreams service
admin
EventBus MediaWiki extension
EventLogging MediaWiki extension
Schema API
EventStreams API
Beta Cluster EventStreams API
Primary event schema repo
Secondary event schema repo
Event data in the Data Lake
Access
Ingestion lifecycle
Sanitization
Uses of the Event Platform
Wikidata Query Service updater
Search update pipeline
MediaWiki JobQueue
Changeprop
LiftWing streams
MediaWiki Event Enrichment
Wikimedia Enterprise
via
EventStreams
See also
Issue tracker (Phabricator)
Event Platform epic task (Phabricator)
edit
Example client at
codepen.io/ottomata/pen/LYpPpxj
RecentChange stats tool, built with EventStreams – at
EventStreams
is a web service that exposes continuous streams of structured event data. It does so over HTTP using
chunked transfer encoding
following the
Server-Sent Events
protocol (SSE). EventStreams can be consumed directly via HTTP, but is more commonly used via a client library.
The service supersedes
RCStream
, and might in the future replace
irc.wikimedia.org
. EventStreams is internally backed by
Apache Kafka
Note:
SSE
and
EventSource
are often used interchangeably as the names of this web technology. This document refers to SSE as the server-side protocol, and EventSource as the client-side interface.
Streams
EventStreams provides access to several different data streams, most notably the
recentchange
stream which emits
MediaWiki Recent changes
events.
For a complete list of available streams, refer to the documentation at
The data format of each stream follows a schema. The schemas can be obtained via
, for example
jsonschema/mediawiki/recentchange/latest.yaml
For the
recentchange
stream there is additional documentation at
Manual:RCFeed on mediawiki.org
Wikidata RDF change stream
See
schema
and
codepen where the stream can be selected and viewed in the browser
example stream content
When not to use EventStreams
The public EventStreams service is intended for use by small scale external tool developers. It should not be used to build production services within Wikimedia Foundation. WMF production services that react to events should directly consume the underlying Kafka topic(s).
Examples
Web browser
Use the built-in
EventSource API
in modern browsers:
const
url
'https://stream.wikimedia.org/v2/stream/recentchange'
const
eventSource
new
EventSource
url
);
eventSource
onopen
()
=>
console
info
'Opened connection.'
);
};
eventSource
onerror
event
=>
console
error
'Encountered error'
event
);
};
eventSource
onmessage
event
=>
// event.data will be a JSON message
const
data
JSON
parse
event
data
);
// discard all canary events
if
data
meta
domain
===
'canary'
return
// Edits from English Wikipedia
if
data
server_name
===
'en.wikipedia.org'
// Output the title of the edited page
console
log
data
title
);
};
JavaScript
Node.js ESM (with
wikimedia-streams
import
WikimediaStream
from
'wikimedia-streams'
// 'recentchange' can be replaced with another stream topic
const
stream
new
WikimediaStream
'recentchange'
);
stream
on
'open'
()
=>
console
info
'Opened connection.'
);
});
stream
on
'error'
event
=>
console
error
'Encountered error'
event
);
});
stream
filter
"mediawiki.recentchange"
all
({
wiki
"enwiki"
})
// Edits from English Wikipedia
on
'recentchange'
data
event
=>
// Output page title
console
log
data
title
);
});
Node.js (with
eventsource
import
EventSource
from
'eventsource'
const
url
'https://stream.wikimedia.org/v2/stream/recentchange'
const
eventSource
new
EventSource
url
);
eventSource
onopen
()
=>
console
info
'Opened connection.'
);
};
eventSource
onerror
event
=>
console
error
'Encountered error'
event
);
};
eventSource
onmessage
event
=>
const
data
JSON
parse
event
data
);
// discard canary events
if
data
meta
domain
===
'canary'
return
if
data
server_name
===
'en.wikipedia.org'
// Output the page title
console
log
data
title
);
};
Server side filtering is not supported. You can filter client-side instead, for example to listen for changes to a specific wiki only:
var
wiki
'commonswiki'
eventSource
onmessage
function
event
// event.data will be a JSON string containing the message event.
var
change
JSON
parse
event
data
);
// discard canary events
if
change
meta
domain
===
'canary'
return
if
change
wiki
==
wiki
console
log
`Got commons wiki change on page
${
change
title
);
};
TypeScript
Node.js (with
wikimedia-streams
import
WikimediaStream
from
"wikimedia-streams"
import
MediaWikiRecentChangeEvent
from
'wikimedia-streams/build/streams/MediaWikiRecentChangeEvent'
// "recentchange" can be replaced with any valid stream
const
stream
new
WikimediaStream
"recentchange"
);
stream
filter
"mediawiki.recentchange"
all
({
wiki
"enwiki"
})
// Edits from English Wikipedia
on
'recentchange'
data
/* MediaWikiRecentChangeEvent & { wiki: 'enwiki' } */
event
=>
// Output page title
console
log
data
title
);
});
Python
Using
requests-sse
. Other clients can be found at
T309380 #10304093
. For the
User-Agent
header, see
mw:API:Etiquette
import
json
from
requests_sse
import
EventSource
url
'https://stream.wikimedia.org/v2/stream/recentchange'
headers
"User-Agent"
...
with
EventSource
url
headers
headers
as
stream
for
event
in
stream
if
event
type
==
'message'
try
change
json
loads
event
data
except
ValueError
pass
else
# discard canary events
if
change
'meta'
][
'domain'
==
'canary'
continue
{user}
edited
{title}
format
**
change
))
The standard SSE protocol defines ways to continue where you left after a failure or other disconnect. We support this in EventStreams as well. For example:
import
json
from
requests_sse
import
EventSource
url
'https://stream.wikimedia.org/v2/stream/recentchange'
headers
"User-Agent"
...
last_id
None
with
EventSource
url
headers
headers
as
stream
for
event
in
stream
if
event
type
==
'message'
try
change
json
loads
event
data
except
ValueError
pass
else
# discard canary events
if
change
'meta'
][
'domain'
==
'canary'
continue
if
change
user
==
'Yourname'
change
last_id
event
last_event_id
last_id
# - Run this Python script.
# - Publish an edit to [[Sandbox]] on test.wikipedia.org, and observe it getting printed.
# - Quit the Python process.
# - Pass last_id to last_event_id parameter when creating the stream like
# with EventSource(url, latest_event_id=last_id) as stream: ...
# - Publish another edit, while the Python process remains off.
# - Run this Python script again, and notice it finding and printing the missed edit.
Server-side filtering is not supported. To filter for something like a wiki domain, you'll need to do this on the consumer side side. For example:
wiki
'commonswiki'
headers
"User-Agent"
...
with
EventSource
url
headers
headers
as
stream
for
event
in
stream
if
event
type
==
'message'
try
change
json
loads
event
data
except
ValueError
pass
else
# discard canary events
if
change
'meta'
][
'domain'
==
'canary'
continue
if
change
'wiki'
==
wiki
{user}
edited
{title}
format
**
change
))
Pywikibot
is another way to consume EventStreams in Python. It provides an abstraction that takes care of automatic reconnection, easy filtering, and combination of multiple topics into one stream. For example:
>>>
from
pywikibot.comms.eventstreams
import
EventStreams
>>>
stream
EventStreams
streams
'recentchange'
'revision-create'
],
since
'20250107'
>>>
stream
register_filter
server_name
'fr.wikipedia.org'
type
'edit'
>>>
change
next
stream
>>>
{type}
on page "
{title}
" by "
{user}
" at
{meta[dt]}
.'
format
**
change
))
edit
on
page
"Véronique Le Guen"
by
"Speculos"
at
2019
01
12
T21
19
43
00
00.
Command-line
With
curl
and
jq
Set the
Accept
header and prettify the events with jq.
curl
-s
-H
'Accept: application/json'
jq
Setting the
Accept: application/json
will cause EventStreams to send you newline delimited JSON objects, rather than data in the SSE format.
API
The list of streams that are available will change over time, so they will not be documented here. To see the active list of available streams, visit the
swagger-ui documentation
, or request the swagger spec directly from
. The available stream URI paths all begin with
/v2/stream
, e.g.
"/v2/stream/recentchange"
"get"
"produces"
"text/event-stream; charset=utf-8"
],
"description"
"Mediawiki RecentChanges feed. Schema: https://schema.wikimedia.org/#!//primary/jsonschema/mediawiki/recentchange"
},
"/v2/stream/revision-create"
"get"
"produces"
"text/event-stream; charset=utf-8"
],
"description"
"Mediawiki Revision Create feed. Schema: https://schema.wikimedia.org/#!//primary/jsonschema/mediawiki/revision/create"
Stream selection
Streams are addressable either individually, e.g.
/v2/stream/revision-create
, or as a comma separated list of streams to compose, e.g.
/v2/stream/page-create,page-delete,page-undelete
See available streams:
Historical Consumption
Since 2018-06, EventStreams supports timestamp based historical consumption. This can be provided as individual assignment objects in the
Last-Event-ID
by setting a timestamp field instead of an offset field. Or, more simply, a
since
query parameter can be provided in the stream URL, e.g.
since=2018-06-14T00:00:00Z
since
can either be given as anything parseable by Javascript
Date.parse()
, e.g. a UTC
ISO-8601
datetime string.
When given a timestamp, EventStreams will ask Kafka for the message offset in the stream(s) that most closely match the timestamp. Kafka guarantees that all events after the returned message offset will be after the given timestamp. NOTE: The stream history is not kept indefinitely. Depending on the stream configuration, there will likely be between 7 and 31 days of history available. Please be kind when providing timestamps. There may be a lot of historical data available, and reading it and sending it all out can be compute resource intensive. Please only consume the minimum of data you need.
Example URL:
If you want to manually set which topics, partitions, and timestamps or offsets your client starts consuming from, you can set the Last-Event-ID HTTP request header to an array of objects that specify this. E.g.
[{"topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 1234567}, {"topic": "codfw.mediawiki.recentchange", "partition": 0, "timestamp": 1575906290000}]
Response Format
All examples here will consume recent changes from
. This section describes the format of a response body from a EventStreams stream endpoint.
Requesting
/v2/stream/recentchange
will start a stream of data in the SSE format. This format is best interpreted using an EventSource client. If you choose not to use one of these, the raw stream is still human readable and looks as follows:
eve
nt
message
id
[{
"topic"
"eqiad.mediawiki.recentchange"
"partition"
"timestamp"
1532031066001
},{
"topic"
"codfw.mediawiki.recentchange"
"partition"
"offset"
-1
}]
da
ta
"event"
"data"
"is"
"here"
Each event will be separated by 2 line breaks (
\n\n
), and have
event
id
, and
data
fields.
The
event
will be
message
for data events, and
error
for error events.
id
is a JSON-formatted array of Kafka topic, partition and offset|timestamp metadata. The
id
field can be used to tell EventStreams to start consuming from an earlier position in the stream. This enables clients to automatically resume from where they left off if they are disconnected. EventSource implementations handle this transparently. Note that the topic partition and offset|timestamp for all topics and partitions that make up this stream are included in every message's
id
field. This allows EventSource to be specific about where it left off even if the consumed stream is composed of multiple Kafka topic-partitions.
Note that offsets and timestamps may be used interchangeably SSE
id
. WMF runs stream.wikimedia.org in a multi-DC active/active setup, backed by multiple Kafka clusters. Since Kafka offsets are unique per cluster, using them in a multi DC setup is not reliable. Instead,
id
fields will always use timestamps instead of offsets. This is not as precise as using offsets, but allows for a reliable multi DC service.
You may request that EventStreams begins streaming to you from different offsets by setting an array of topic, partition, offset|timestamp objects in the
Last-Event-ID
HTTP header.
Canary Events
WMF
Data Engineering team
produces artificial 'canary' events
into each stream multiple times an hour. The presence of these canary events in a stream allow us to differentiate between a broken event stream, and an empty one. If a stream has canary_events_enabled=true, then we should expect at least one event in a stream's Kafka topics every hour. If we get no events in an hour, then we can trigger an alert that a stream is broken.
These events are not filtered out in the streams available at
stream.wikimedia.org
. As a user of these streams, you should discard all canary events; i.e. all events where
meta.domain === 'canary'
If you are not using canary events for alerting, discard them!
Discard all events where
meta.domain === 'canary'
The content of most canary event fields are copied directly from the first example event in the event's schema. E.g.
mediawiki/recentchange example
mediawiki/revision/create example
. These examples can also be seen in the
OpenAPI docs for the streams
, e.g.
mediawiki.page-move example value
. See the
code that creates canary events
(as of 2023-11).
Filtering
EventStreams does not have
$wgServerName
(or any other) server side filtering capabilities. You'll need to do your filtering client side, e.g.
/**
* Calls cb(event) for every event where recentchange event.server_name == server_name.
*/
function
filterWiki
event
server_name
cb
if
event
server_name
==
server_name
cb
event
);
eventSource
onmessage
function
event
// Print only events that come from Wikimedia Commons.
filterWiki
JSON
parse
event
data
),
'commons.wikimedia.org'
console
log
);
};
Architecture
SSE vs. WebSockets/Socket.IO
The previous "RCStream" service was written for consumption via Socket.IO, so why did we change the protocol for its replacement?
The WebSocket protocol doesn't use HTTP, which makes it different from most other services we run at Wikimedia Foundation. WebSockets are powerful and can e.g. let clients and servers communicate asynchronously with a bi-directional pipe. EventStreams, on the other hand, is read-only and only needs to send events from the server to a client. By using only 100% standard HTTP, EventStreams can be consumed from any HTTP client out there, without the need for programming several RPC-like initialization steps.
We originally prototyped a Kafka -> Socket.io library (
Kasocki
). After doing so we decided that HTTP-SSE was a better fit, and developed
KafkaSSE
instead.
KafkaSSE
KafkaSSE is a library that glues a Kafka Consumer to a connected HTTP SSE client. A Kafka Consumer is assigned topics, partitions, and offsets, and then events are streamed from the consumer to the HTTP client in
chunked-transfer encoding
. EventStreams maps stream routes (e.g
/v2/stream/recentchanges
) to specific topics in Kafka.
Kafka
WMF maintains several internal
Kafka
clusters, producing hundreds of thousands of messages per second. It has proved to be highly scalable and feature-ful. It is multi producer and multi consumer. Our internal events are already produced through Kafka, so using it as the EventStreams backend was a natural choice.
Kafka allows us to begin consuming from any message offset (that is still present on the backend Kafka cluster). This feature is what allows connected EventStreams clients to auto-resume (via EventSource) when they disconnect.
Notes
Server-side enforced timeout
WMF's HTTP connection termination layer enforces a connection timeout of 15 minutes. A good SSE / EventSource client should be able to automatically reconnect and begin consuming at the right location using the
Last-Event-ID
header.
See
this Phabricator discussion
for more info.
See also
EventStreams/Administration
, for WMF administration.
EventStreams/Powered By
, for a list of example tools that are built on EventStreams.
Event*
, disambiguation for various event-related services at Wikimedia.
RCStream
, predecessor to EventStreams.
Manual:RCFeed
on mediawiki.org, about the underlying format of the recent changes messages.
Manual:$wgRCFeeds
on mediawiki.org, about setting up RCFeed (EventStreams uses the EventBusRCEngine from
EventBus
).
API:Recent changes stream
on mediawiki.org.
Further reading
"Get live updates to Wikimedia projects with EventStreams"
, by Andrew Otto (2017).
"EventStreams updates: New events, composite streams, historical subscription"
, by Andrew Otto (2018).
"Server-Sent Events: the alternative to WebSockets you should be using"
, by Germano Gabbianelli (2022).
External links
Source code of eventstreams service
GitHub mirror
Source code node-kafka-sse library
Issue tracker (Phabricator workboard)
How to track the main Wikidata RDF stream using QLever to have a local up-to-date instance
Retrieved from "
Categories
Services
Event Platform
Event Platform/EventStreams HTTP Service
Add topic