# Event streams public API
The Event streams public API endpoints allow you to publish and retrieve messages from event topics. Each message in the topic has an ID that you can use as an offset. You must store either the Message ID or Last message timestamp to use these endpoints.
Refer to the OpenAPI specification (opens new window) for Workato Event streams public API for more information.
API DOMAIN AND NAMESPACE DIFFERENCES FOR EVENT STREAMS
The Event streams APIs are divided into two categories. Each category uses a different domain and namespace.
# Public API
The following endpoints were migrated to the https://event-streams.workato.com
domain:
- Consume messages
POST /api/v1/topics/:topic_id/consume
- Publish a message
POST /api/v1/topics/:topic_id/publish
- Publish a batch of messages
POST /api/v1/batch/topics/:topic_id/publish
Note: Legacy endpoints published on https://www.workato.com/api
(under the /pubsub/topics/
namespace) still function but are rate-limited to 1,000 requests per minute.
# Developer API
The following endpoints use the https://www.workato.com
domain:
- List topics
GET /api/event_streams/topics
- Create a topic
POST /api/event_streams/topics
- Get a topic by ID
GET /api/event_streams/topics/:topic_id
- Update a topic
PUT /api/event_streams/topics/:topic_id
- Purge a topic
PUT /api/event_streams/topics/:topic_id/purge
- Delete a topic
DELETE /api/event_streams/topics/:topic_id
Refer to the Event streams Developer API documentation for more information.
# Quick reference
Type | Resource | Description |
---|---|---|
POST | /api/v1/topics/:topic_id/consume | Consume messages from a topic. |
POST | /api/v1/topics/:topic_id/publish | Publish a message to a topic. |
POST | /api/v1/batch/topics/:topic_id/publish | Publish a batch of messages to a topic. |
# Consume messages
Retrieve messages from the topic. This resource provides the option to retrieve all messages within the topic or, by including parameters in the request body, to fetch messages after a specified ID or timestamp. The response is limited to a maximum of 50 messages in each batch.
You can enable long polling mode by setting the timeout_secs
parameter. In this mode, the API returns available messages instantly. If there are no messages, the API waits for a maximum of timeout_secs
seconds. If a new message appears within this timeframe, it is instantly returned; otherwise, the API provides an empty list after the call times out.
POST /api/v1/topics/:topic_id/consume
# URL parameters
Name | Type | Description |
---|---|---|
topic_id | integer required | Event topic ID. |
# Payload
Name | Type | Description |
---|---|---|
after_message_id | string optional | Message ID. The service returns all messages after the message with the specified ID. The ID must correspond to a message that exists in the topic. |
since_time | string optional | Timestamp in RFC 3339 format. The service returns all messages after the timestamp specified. The current time is used if the since_time value is a time in the future. |
batch_size | integer optional | Maximum batch size to return. The maximum value is 50. Returns batch of 50 messages if not specified. |
timeout_secs | integer optional | Maximum timeout for long polling. The maximum value is 60. The default value is 0. When 0, long polling is disabled. |
USING SINCE_TIME PARAMETER
We recommend using after_message_id
to control the topic message cursor. since_time
should only be used for the first request (to poll messages from a specific timestamp without polling the whole topic), or if you need to re-retrieve messages from the topic. Using since_time
, especially in combination with batch publish and long polling, doesn't guarantee message order and can lead to skipped messages.
# Sample request
curl -X POST "https://event-streams.workato.com/api/v1/topics/<id>/consume" \
-H 'Authorization: Bearer <api_token>' \
-H 'Content-Type: application/json' \
--data '{"after_message_id": "A12x"}'
# Response
{
"messages": [
{
"message_id": "A12y",
"payload": {
"Name": "Jane",
"Surname": "Doe"
},
"time": "2023-04-14T15:07:14.437+00:00"
},
{
"message_id": "A12z",
"payload": {
"Name": "John",
"Surname": "Doe"
},
"time": "2023-04-14T15:43:40.227+00:00"
}
]
}
# Publish message
Publish a message to a topic. The message must comply with the topic schema.
POST /api/v1/topics/:topic_id/publish
# URL parameters
Name | Type | Description |
---|---|---|
topic_id | integer required | Event topic ID. |
# Payload
Name | Type | Description |
---|---|---|
JSON required | Message to publish to the topic. The message must comply with the topic schema. |
# Sample request
curl -X POST "https://event-streams.workato.com/api/v1/topics/<id>/publish" \
-H 'Authorization: Bearer <api_token>' \
--data '{"Name": "John", "Surname": "Doe"}'
# Response
{
"message_id": "A1BRi"
}
# Publish a batch of messages
Publish a batch of messages to a topic. The messages must comply with the topic schema.
POST /api/v1/batch/topics/:topic_id/publish
# URL parameters
Name | Type | Description |
---|---|---|
topic_id | integer required | Event topic ID. |
# Payload
Name | Type | Description |
---|---|---|
payloads | Array of JSON required | Array of messages to publish to the topic. The messages must comply with the topic schema. The maximum array size is 100. |
# Sample request
curl -X POST "https://event-streams.workato.com/api/v1/topics/<id>/publish" \
-H 'Authorization: Bearer <api_token>' \
--data '{"payload":[{"Name": "John", "Surname": "Doe"},
{"Name": "Jane", "Surname": "Doe"}]}'
# Response
{
"is_partial_error": false,
"message_ids": {
"0": {
"message_id": "A1DFi",
"result": "success"
},
"1": {
"message_id": "A1DL8",
"result": "success"
}
}
}
Last updated: 10/16/2024, 3:02:32 PM