# Getting started
URL: https://developers.cloudflare.com/pipelines/getting-started/
import { Render, PackageManagers, Details } from "~/components";
Cloudflare Pipelines allows you to ingest load high volumes of real time streaming data, and load into [R2 Object Storage](/r2/), without managing any infrastructure.
By following this guide, you will:
1. Setup an R2 bucket.
2. Create a pipeline, with HTTP as a source, and an R2 bucket as a sink.
3. Send data to your pipeline's HTTP ingestion endpoint.
4. Verify the output delivered to R2.
:::note
Pipelines is in **public beta**, and any developer with a [paid Workers plan](/workers/platform/pricing/#workers) can start using Pipelines immediately.
:::
***
## Prerequisites
To use Pipelines, you will need:
## 1. Set up an R2 bucket
Create a bucket by following the [get started guide for R2](/r2/get-started/), or by running the command below:
```sh
npx wrangler r2 bucket create my-bucket
```
Save the bucket name for the next step.
## 2. Create a Pipeline
To create a pipeline using Wrangler, run the following command in a terminal, and specify:
- The name of your pipeline
- The name of the R2 bucket you created in step 1
```sh
npx wrangler pipelines create my-clickstream-pipeline --r2-bucket my-bucket --batch-max-seconds 5 --compression none
```
After running this command, you will be prompted to authorize Cloudflare Workers Pipelines to create an R2 API token on your behalf. These tokens used by your pipeline when loading data into your bucket. You can approve the request through the browser link which will open automatically.
Choosing a pipeline name
When choosing a name for your pipeline:
- Ensure it is descriptive and relevant to the type of events you intend to ingest. You cannot change the name of the pipeline after creating it.
- The pipeline name must be between 1 and 63 characters long.
- The name cannot contain special characters outside dashes (`-`).
- The name must start and end with a letter or a number.
You will notice two optional flags are set while creating the pipeline: `--batch-max-seconds` and `--compression`. These flags are added to make it faster for you to see the output of your first pipeline. For production use cases, we recommend keeping the default settings.
Once you create your pipeline, you will receive a summary of your pipeline's configuration, as well as an HTTP endpoint which you can post data to:
```sh
π Authorizing R2 bucket "my-bucket"
π Creating pipeline named "my-clickstream-pipeline"
β Successfully created pipeline my-clickstream-pipeline
Id: [PIPELINE-ID]
Name: my-clickstream-pipeline
Sources:
HTTP:
Endpoint: https://[PIPELINE-ID].pipelines.cloudflare.com/
Authentication: off
Format: JSON
Worker:
Format: JSON
Destination:
Type: R2
Bucket: my-bucket
Format: newline-delimited JSON
Compression: GZIP
Batch hints:
Max bytes: 100 MB
Max duration: 300 seconds
Max records: 100,000
π You can now send data to your Pipeline!
Send data to your Pipeline's HTTP endpoint:
curl "https://[PIPELINE-ID].pipelines.cloudflare.com/" -d '[{ ...JSON_DATA... }]'
To send data to your Pipeline from a Worker, add the following configuration to your config file:
{
"pipelines": [
{
"pipeline": "my-clickstream-pipeline",
"binding": "PIPELINE"
}
]
}
```
## 3. Post data to your pipeline
Use a curl command in your terminal to post an array of JSON objects to the endpoint you received in step 1.
```sh
curl -H "Content-Type:application/json" \
-d '[{"event":"viewedCart", "timestamp": "2025-04-03T15:42:30Z"},{"event":"cartAbandoned", "timestamp": "2025-04-03T15:42:37Z"}]' \
```
Once the pipeline successfully accepts the data, you will receive a success message.
You can continue posting data to the pipeline. The pipeline will automatically buffer ingested data. Based on the batch settings (`--batch-max-seconds`) specified in step 2, a batch will be generated every 5 seconds, turned into a file, and written out to your R2 bucket.
## 4. Verify in R2
Open the [R2 dashboard](https://dash.cloudflare.com/?to=/:account/r2/overview), and navigate to the R2 bucket you created in step 1. You will see a directory, labeled with today's date (such as `event_date=2025-04-05`). Click on the directory, and you'll see a sub-directory with the current hour (such as `hr=04`). You should see a newline delimited JSON file, containing the data you posted in step 3. Download the file, and open it in a text editor of your choice, to verify that the data posted in step 2 is present.
***
## Next steps
* Learn about how to [setup authentication, or CORS settings](/pipelines/build-with-pipelines/sources/http), on your HTTP endpoint.
* Send data to your Pipeline from a Cloudflare Worker using the [Workers API documentation](/pipelines/build-with-pipelines/sources/workers-apis).
If you have any feature requests or notice any bugs, share your feedback directly with the Cloudflare team by joining the [Cloudflare Developers community on Discord](https://discord.cloudflare.com).
---
# Overview
URL: https://developers.cloudflare.com/pipelines/
import { CardGrid, Description, Feature, LinkTitleCard, Plan, RelatedProduct } from "~/components";
Ingest real time data streams and load into R2, using Cloudflare Pipelines.
Cloudflare Pipelines lets you ingest high volumes of real time data, without managing any infrastructure. A single pipeline can ingest up to 100 MB of data per second. Ingested data is automatically batched, written to output files, and delivered to an [R2 bucket](/r2/) in your account. You can use Pipelines to build a data lake of clickstream data, or to store events from a Worker.
## Create your first pipeline
You can setup a pipeline to ingest data via HTTP, and deliver output to R2, with a single command:
```sh
$ npx wrangler@latest pipelines create my-clickstream-pipeline --r2-bucket my-bucket
π Authorizing R2 bucket "my-bucket"
π Creating pipeline named "my-clickstream-pipeline"
β Successfully created pipeline my-clickstream-pipeline
Id: 0e00c5ff09b34d018152af98d06f5a1xvc
Name: my-clickstream-pipeline
Sources:
HTTP:
Endpoint: https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/
Authentication: off
Format: JSON
Worker:
Format: JSON
Destination:
Type: R2
Bucket: my-bucket
Format: newline-delimited JSON
Compression: GZIP
Batch hints:
Max bytes: 100 MB
Max duration: 300 seconds
Max records: 100,000
π You can now send data to your pipeline!
Send data to your pipeline's HTTP endpoint:
curl "https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/" -d '[{ ...JSON_DATA... }]'
To send data to your pipeline from a Worker, add the following configuration to your config file:
{
"pipelines": [
{
"pipeline": "my-clickstream-pipeline",
"binding": "PIPELINE"
}
]
}
```
Refer to the [getting started guide](/pipelines/getting-started) to start building with pipelines.
:::note
While in beta, you will not be billed for Pipelines usage. You will be billed only for [R2 usage](/r2/pricing/).
:::
***
## Features
Each pipeline generates a globally scalable HTTP endpoint, which supports authentication and CORS settings.
Send data to a pipeline directly from a Cloudflare Worker.
Define batch sizes and enable compression to generate output files that are efficient to query.
***
## Related products
Cloudflare R2 Object Storage allows developers to store large amounts of unstructured data without the costly egress bandwidth fees associated with typical cloud storage services.
Cloudflare Workers allows developers to build serverless applications and deploy instantly across the globe for exceptional performance, reliability, and scale.
***
## More resources
Learn about pipelines limits.
Follow @CloudflareDev on Twitter to learn about product announcements, and what is new in Cloudflare Workers.
Connect with the Workers community on Discord to ask questions, show what you are building, and discuss the platform with other developers.
---
# How pipelines work
URL: https://developers.cloudflare.com/pipelines/concepts/how-pipelines-work/
Cloudflare Pipelines let you ingest data from a source and deliver to a sink. It is built for high volume, real time data streams. Each pipeline can ingest up to 100 MB/s of data, via HTTP or a Worker, and load the data as files in an R2 bucket.

## Supported sources, data formats, and sinks
### Sources
Pipelines supports the following sources:
* [HTTP Clients](/pipelines/build-with-pipelines/sources/http), with optional authentication and CORS settings
* [Cloudflare Workers](/workers/), using the [Pipelines Workers API](/pipelines/build-with-pipelines/sources/workers-apis)
Multiple sources can be active on a single pipeline simultaneously. For example, you can create a pipeline which accepts data from Workers and via HTTP. Multiple workers can be configured to send data to the same pipeline. There is no limit to the number of source clients.
### Data format
Pipelines can ingest JSON serializable records.
### Sinks
Pipelines supports delivering data into [R2 Object Storage](/r2/). Ingested data is delivered as newline delimited JSON files (`ndjson`) with optional compression. Multiple pipelines can be configured to deliver data to the same R2 bucket.
## Data durability
Pipelines are designed to be reliable. Any data which is successfully ingested will be delivered, at least once, to the configured R2 bucket, provided that the [R2 API credentials associated with a pipeline](/r2/api/tokens/) remain valid. Ordering of records is best effort.
Each pipeline maintains a storage buffer. Requests to send data to a pipeline receive a successful response only after the data is committed to this storage buffer.
Ingested data accumulates, until a sufficiently [large batch of data](/pipelines/build-with-pipelines/output-settings/#customize-batch-behavior) has been filled. Once the batch reaches its target size, the entire batch of data is converted to a file and delivered to R2.
Transient failures, such as network connectivity issues, are automatically retried.
However, if the [R2 API credentials associated with a pipeline](/r2/api/tokens/) expire or are revoked, data delivery will fail. In this scenario, some data might continue to accumulate in the buffers, but the pipeline will eventually start rejecting requests once the buffers are full.
## Updating a pipeline
Pipelines update without dropping records. Updating an existing pipeline creates a new instance of the pipeline. Requests are gracefully re-routed to the new instance. The old instance continues to write data into the configured sink. Once the old instance is fully drained, it is spun down.
This means that updates might take a few minutes to go into effect. For example, if you update a pipeline's sink, previously ingested data might continue to be delivered into the old sink.
## Backpressure behavior
If you send too much data, the pipeline will communicate backpressure by returning a 429 response to HTTP requests, or throwing an error if using the Workers API. Refer to the [limits](/pipelines/platform/limits) to learn how much volume a single pipeline can support. You might see 429 responses if you are sending too many requests or sending too much data.
If you are consistently seeing backpressure from your pipeline, consider the following strategies:
* Increase the [shard count](/pipelines/build-with-pipelines/shards) to increase the maximum throughput of your pipeline.
* Send data to a second pipeline if you receive an error. You can set up multiple pipelines to write to the same R2 bucket.
---
# Observability
URL: https://developers.cloudflare.com/pipelines/observability/
import { DirectoryListing } from "~/components"
---
# Metrics and analytics
URL: https://developers.cloudflare.com/pipelines/observability/metrics/
Pipelines expose metrics which allow you to measure data ingested, requests made, and data delivered.
The metrics displayed in the [Cloudflare dashboard](https://dash.cloudflare.com/) are queried from Cloudflareβs [GraphQL Analytics API](/analytics/graphql-api/). You can access the metrics [programmatically](#query-via-the-graphql-api) via GraphQL or HTTP client.
## Metrics
### Ingestion
Pipelines export the below metrics within the `pipelinesIngestionAdaptiveGroups` dataset.
| Metric | GraphQL Field Name | Description |
| ---------------------- | ------------------------- | ---------------------------------------------------------------|
| Ingestion Events | `count` | Number of ingestion events, or requests made, to a pipeline. |
| Ingested Bytes | `ingestedBytes` | Total number of bytes ingested |
| Ingested Records | `ingestedRecords` | Total number of records ingested |
The `pipelinesIngestionAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:
* `pipelineId` - ID of the pipeline
* `datetime` - Timestamp of the ingestion event
* `date` - Timestamp of the ingestion event, truncated to the start of a day
* `datetimeHour` - Timestamp of the ingestion event, truncated to the start of an hour
* `datetimeMinute` - Timestamp of the ingestion event, truncated to the start of a minute
### Delivery
Pipelines export the below metrics within the `pipelinesDeliveryAdaptiveGroups` dataset.
| Metric | GraphQL Field Name | Description |
| ---------------------- | ------------------------- | ---------------------------------------------------------------|
| Ingestion Events | `count` | Number of delivery events to an R2 bucket |
| Delivered Bytes | `deliveredBytes` | Total number of bytes ingested |
The `pipelinesDeliverynAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:
* `pipelineId` - ID of the pipeline
* `datetime` - Timestamp of the delivery event
* `date` - Timestamp of the delivery event, truncated to the start of a day
* `datetimeHour` - Timestamp of the delivery event, truncated to the start of an hour
* `datetimeMinute` - Timestamp of the delivery event, truncated to the start of a minute
## Query via the GraphQL API
You can programmatically query analytics for your pipelines via the [GraphQL Analytics API](/analytics/graphql-api/). This API queries the same datasets as the Cloudflare dashboard and supports GraphQL [introspection](/analytics/graphql-api/features/discovery/introspection/).
Pipelines GraphQL datasets require an `accountTag` filter with your Cloudflare account ID.
### Measure total bytes & records ingested over time period
```graphql
query PipelineIngestion($accountTag: string!, $pipelineId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
pipelinesIngestionAdaptiveGroups(
limit: 10000
filter: {
pipelineId: $pipelineId
datetime_geq: $datetimeStart
datetime_leq: $datetimeEnd
}
)
{
sum {
ingestedBytes,
ingestedRecords,
}
}
}
}
}
```
### Measure volume of data delivered
```graphql
query PipelineDelivery($accountTag: string!, $pipelineId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
pipelinesDeliveryAdaptiveGroups(
limit: 10000
filter: {
pipelineId: $pipelineId
datetime_geq: $datetimeStart
datetime_leq: $datetimeEnd
}
) {
sum {
deliveredBytes,
}
}
}
}
}
```
---
# Tutorials
URL: https://developers.cloudflare.com/pipelines/tutorials/
import { GlossaryTooltip, ListTutorials } from "~/components";
View tutorials to help you get started with Pipelines.
---
# Configure output settings
URL: https://developers.cloudflare.com/pipelines/build-with-pipelines/output-settings/
import { Render, PackageManagers } from "~/components";
Pipelines convert a stream of records into output files and deliver the files to an R2 bucket in your account. This guide details how you can change the output destination and customize batch settings to generate query ready files.
## Configure an R2 bucket as a destination
To create or update a pipeline using Wrangler, run the following command in a terminal:
```sh
npx wrangler pipelines create [PIPELINE-NAME] --r2-bucket [R2-BUCKET-NAME]
```
After running this command, you will be prompted to authorize Cloudflare Workers Pipelines to create an R2 API token on your behalf. Your pipeline uses the R2 API token to load data into your bucket. You can approve the request through the browser link which will open automatically.
If you prefer not to authenticate this way, you can pass your [R2 API Token](/r2/api/tokens/) to Wrangler:
```sh
npx wrangler pipelines create [PIPELINE-NAME] --r2 [R2-BUCKET-NAME] --r2-access-key-id [ACCESS-KEY-ID] --r2-secret-access-key [SECRET-ACCESS-KEY]
```
## File format and compression
Output files are generated as Newline Delimited JSON files (`ndjson`). Each line in an output file maps to a single record.
By default, output files are compressed in the `gzip` format. Compression can be turned off using the `--compression` flag:
```sh
npx wrangler pipelines update [PIPELINE-NAME] --compression none
```
Output files are named using a [UILD](https://github.com/ulid/spec) slug, followed by an extension.
## Customize batch behavior
When configuring your pipeline, you can define how records are batched before they are delivered to R2. Batches of records are written out to a single output file.
Batching can:
- Reduce the number of output files written to R2 and thus reduce the [cost of writing data to R2](/r2/pricing/#class-a-operations).
- Increase the size of output files making them more efficient to query.
There are three ways to define how ingested data is batched:
1. `batch-max-mb`: The maximum amount of data that will be batched in megabytes. Default, and maximum, is `100 MB`.
2. `batch-max-rows`: The maximum number of rows or events in a batch before data is written. Default, and maximum, is `10,000,000` rows.
3. `batch-max-seconds`: The maximum duration of a batch before data is written in seconds. Default, and maximum, is `300 seconds`.
Batch definitions are hints. A pipeline will follow these hints closely, but batches might not be exact.
All three batch definitions work together and whichever limit is reached first triggers the delivery of a batch.
For example, a `batch-max-mb` = 100 MB and a `batch-max-seconds` = 100 means that if 100 MB of events are posted to the pipeline, the batch will be delivered. However, if it takes longer than 100 seconds for 100 MB of events to be posted, a batch of all the messages that were posted during those 100 seconds will be created.
### Defining batch settings using Wrangler
You can use the following batch settings flags while creating or updating a pipeline:
* `--batch-max-mb`
* `--batch-max-rows`
* `--batch-max-seconds`
For example:
```sh
npx wrangler pipelines update [PIPELINE-NAME] --batch-max-mb 100 --batch-max-rows 10000 --batch-max-seconds 300
```
### Batch size limits
| Setting | Default | Minimum | Maximum |
| ----------------------------------------- | ----------------| --------- | ----------- |
| Maximum Batch Size `batch-max-mb` | 100 MB | 1 MB | 100 MB |
| Maximum Batch Timeout `batch-max-seconds` | 300 seconds | 1 second | 300 seconds |
| Maximum Batch Rows `batch-max-rows` | 10,000,000 rows | 1 row | 10,000,000 rows |
## Deliver partitioned data
Partitioning organizes data into directories based on specific fields to improve query performance. Partitions reduce the amount of data scanned for queries, enabling faster reads.
:::note
By default, Pipelines partition data by event date and time. This will be customizable in the future.
:::
Output files are prefixed with event date and hour. For example, the output from a Pipeline in your R2 bucket might look like this:
```sh
- event_date=2025-04-01/hr=15/01JQWBZCZBAQZ7RJNZHN38JQ7V.json.gz
- event_date=2025-04-01/hr=15/01JQWC16FXGP845EFHMG1C0XNW.json.gz
```
## Deliver data to a prefix
You can specify an optional prefix for all the output files stored in your specified R2 bucket, using the flag `--r2-prefix`.
For example:
```sh
npx wrangler pipelines update [PIPELINE-NAME] --r2-prefix test
```
After running the above command, the output files generated by your pipeline will be stored under the prefix `test`. Files will remain partitioned. Your output will look like this:
```sh
- test/event_date=2025-04-01/hr=15/01JQWBZCZBAQZ7RJNZHN38JQ7V.json.gz
- test/event_date=2025-04-01/hr=15/01JQWC16FXGP845EFHMG1C0XNW.json.gz
```
---
# Increase pipeline throughput
URL: https://developers.cloudflare.com/pipelines/build-with-pipelines/shards/
import { Render, PackageManagers } from "~/components";
A pipeline's maximum throughput can be increased by increasing the shard count. A single shard can handle approximately 7,000 requests per second, or can ingest 7 MB/s of data.
By default, each pipeline is configured with two shards. To set the shard count, use the `--shard-count` flag while creating or updating a pipeline:
```sh
$ npx wrangler pipelines update [PIPELINE-NAME] --shard-count 10
```
:::note
The default shard count will be set to `auto` in the future, with support for automatic horizontal scaling.
:::
## How shards work

Each pipeline is composed of stateless, independent shards. These shards are spun up when a pipeline is created. Each shard is composed of layers of [Durable Objects](/durable-objects). The Durable Objects buffer data, replicate for durability, handle compression, and delivery to R2.
When a record is sent to a pipeline:
1. The Pipelines [Worker](/workers) receives the record.
2. The record is routed to to one of the shards.
3. The record is handled by a set of Durable Objects, which commit the record to storage and replicate for durability.
4. Records accumulate until the [batch definitions](/pipelines/build-with-pipelines/output-settings/#customize-batch-behavior) are met.
5. The batch is written to an output file and optionally compressed.
6. The output file is delivered to the configured R2 bucket.
Increasing the number of shards will increase the maximum throughput of a pipeline, as well as the number of output files created.
### Example
Your workload might require making 5,000 requests per second to a pipeline. If you create a pipeline with a single shard, all 5,000 requests will be routed to the same shard. If your pipeline has been configured with a maximum batch duration of 1 second, every second, all 5,000 requests will be batched, and a single file will be delivered.
Increasing the shard count to 2 will double the number of output files. The 5,000 requests will be split into 2,500 requests to each shard. Every second, each shard will create a batch of data, and deliver to R2.
## Considerations while increasing the shard count
Increasing the shard count also increases the number of output files that your pipeline generates. This in turn increases the [cost of writing data to R2](/r2/pricing/#class-a-operations), as each file written to R2 counts as a single class A operation. Additionally, smaller files are slower, and more expensive, to query. Rather than setting the maximum, choose a shard count based on your workload needs.
## Determine the right number of shards
Choose a shard count based on these factors:
* The number of requests per second you will make to your pipeline
* The amount of data per second you will send to your pipeline
Each shard is capable of handling approximately 7,000 requests per second, or ingesting 7 MB/s of data. Either factor might act as the bottleneck, so choose the shard count based on the higher number.
For example, if you estimate that you will ingest 70 MB/s, making 70,000 requests per second, setup a pipeline with 10 shards. However, if you estimate that you will ingest 70 MB/s while making 100,000 requests per second, setup a pipeline with 15 shards.
## Limits
| Setting | Default | Minimum | Maximum |
| ----------------------------------------- | ----------- | --------- | ----------- |
| Shards per pipeline `shard-count` | 2 | 1 | 15 |
---
# Platform
URL: https://developers.cloudflare.com/pipelines/platform/
import { DirectoryListing } from "~/components"
---
# Limits
URL: https://developers.cloudflare.com/pipelines/platform/limits/
import { Render } from "~/components"
| Feature | Limit |
| --------------------------------------------- | ------------------------------------------------------------- |
| Maximum requests per second, per pipeline | 14,000 default (configurable up to 100,000) |
| Maximum payload per request | 1 MB |
| Maximum data throughput per pipeline | 14 MB/s default (configurable up to 100 MB/s) |
| Shards per pipeline | 2 default (configurable up to 15) |
| Maximum batch size | 100 MB |
| Maximum batch records | 10,000,000 |
| Maximum batch duration | 300s |
## Exceeding requests per second or throughput limits
If you consistently exceed the requests per second or throughput limits, your pipeline might not be able to keep up with the load. The pipeline will communicate backpressure by returning a 429 response to HTTP requests or throwing an error if using the Workers API.
If you are consistently seeing backpressure from your pipeline, consider the following strategies:
* Increase the [shard count](/pipelines/build-with-pipelines/shards) to increase the maximum throughput of your pipeline.
* Send data to a second pipeline if you receive an error. You can setup multiple pipelines to write to the same R2 bucket.
---
# Pricing
URL: https://developers.cloudflare.com/pipelines/platform/pricing/
:::note
Pipelines requires a [Workers paid](/workers/platform/pricing/#workers) plan to use.
:::
During the first phase of the Pipelines open beta, you will not be billed for Pipelines usage. You will be billed only for [R2 usage](/r2/pricing).
We plan to price based on the volume of data ingested into and delivered from Pipelines. We expect to begin charging by September 15, 2025, and will provide at least 30 days' notice beforehand.
| | Workers Paid Users
| ---------------------------------- | ------------------------
| Ingestion | 50 GB / month included + $0.02 / additional GB
| Delivery to R2 | 50 GB / month included + $0.02 / additional GB
---
# Wrangler commands
URL: https://developers.cloudflare.com/pipelines/platform/wrangler-commands/
import { Render, Type, MetaInfo } from "~/components"
## Global commands
---
# Configure HTTP endpoint
URL: https://developers.cloudflare.com/pipelines/build-with-pipelines/sources/http/
import { Render, PackageManagers } from "~/components";
Pipelines support data ingestion over HTTP. When you create a new pipeline using the default settings you will receive a globally scalable ingestion endpoint. To ingest data, make HTTP POST requests to the endpoint.
```sh
$ npx wrangler@latest pipelines create my-clickstream-pipeline --r2-bucket my-bucket
π Authorizing R2 bucket "my-bucket"
π Creating pipeline named "my-clickstream-pipeline"
β Successfully created pipeline my-clickstream-pipeline
Id: 0e00c5ff09b34d018152af98d06f5a1xvc
Name: my-clickstream-pipeline
Sources:
HTTP:
Endpoint: https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/
Authentication: off
Format: JSON
Worker:
Format: JSON
Destination:
Type: R2
Bucket: my-bucket
Format: newline-delimited JSON
Compression: GZIP
Batch hints:
Max bytes: 100 MB
Max duration: 300 seconds
Max records: 100,000
π You can now send data to your pipeline!
Send data to your pipeline's HTTP endpoint:
curl "https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/" -d '[{ ...JSON_DATA... }]'
```
## Authentication
You can secure your HTTP ingestion endpoint using Cloudflare API tokens. By default, authentication is turned off. To configure authentication, use the `--require-http-auth` flag while creating or updating a pipeline.
```sh
$ npx wrangler pipelines create [PIPELINE-NAME] --r2-bucket [R2-BUCKET-NAME] --require-http-auth true
```
Once authentication is turned on, you will need to include a Cloudflare API token in your request headers.
### Get API token
1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com) and select your account.
2. Navigate to your [API Keys](https://dash.cloudflare.com/profile/api-tokens).
3. Select **Create Token**.
4. Choose the template for Workers Pipelines. Select **Continue to summary** > **Create token**. Make sure to copy the API token and save it securely.
### Making authenticated requests
Include the API token you created in the previous step in the headers for your request:
```sh
curl https://.pipelines.cloudflare.com
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${API_TOKEN}" \
-d '[{"foo":"bar"}, {"foo":"bar"}, {"foo":"bar"}]'
```
## Specifying CORS Settings
If you want to use your pipeline to ingest client side data, such as website clicks, you will need to configure your [Cross-Origin Resource Sharing (CORS) settings](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS).
Without setting your CORS settings, browsers will restrict requests made to your pipeline endpoint. For example, if your website domain is `https://my-website.com`, and you want to post client side data to your pipeline at `https://.pipelines.cloudflare.com`, without CORS settings, the request will fail.
To fix this, you need to configure your pipeline to accept requests from `https://my-website.com`. You can do so while creating or updating a pipeline, using the flag `--cors-origins`. You can specify multiple domains separated by a space.
```sh
$ npx wrangler pipelines update [PIPELINE-NAME] --cors-origins https://mydomain.com http://localhost:8787
```
You can specify that all cross origin requests are accepted. We recommend only using this option in development, and not for production use cases.
```sh
$ npx wrangler pipelines update [PIPELINE-NAME] --cors-origins "*"
```
After the `--cors-origins` have been set on your pipeline, your pipeline will respond to preflight requests and `POST` requests with the appropriate `Access-Control-Allow-Origin` headers set.
---
# Sources
URL: https://developers.cloudflare.com/pipelines/build-with-pipelines/sources/
Pipelines let you ingest data from the following sources:
* [HTTP Clients](/pipelines/build-with-pipelines/sources/http), with optional authentication and CORS settings
* [Cloudflare Workers](/workers/), using the [Pipelines Workers API](/pipelines/build-with-pipelines/sources/workers-apis)
Multiple sources can be active on a single pipeline simultaneously. For example, you can create a pipeline which accepts data from Workers and via HTTP. There is no limit to the number of source clients. Multiple Workers can be configured to send data to the same pipeline.
Each pipeline can ingest up to 100β―MB/s of data or accept up to 100,000 requests per second, aggregated across all sources.
## Configuring allowed sources
By default, ingestion via HTTP and from Workers is turned on. You can configure the allowed sources by using the `--source` flag while creating or updating a pipeline.
For example, to create a pipeline which only accepts data via a Worker, you can run this command:
```sh
$ npx wrangler pipelines create [PIPELINE-NAME] --r2-bucket [R2-BUCKET-NAME] --source worker
```
## Accepted data formats
Pipelines accept arrays of valid JSON objects. You can send multiple objects in a single request, provided the total data volume is within the [documented limits](/pipelines/platform/limits). Sending data in a different format will result in an error.
---
# Workers API
URL: https://developers.cloudflare.com/pipelines/build-with-pipelines/sources/workers-apis/
import { Render, PackageManagers, WranglerConfig } from "~/components";
Pipelines exposes an API directly to your [Workers](/workers) scripts via the [bindings](/workers/runtime-apis/bindings/#what-is-a-binding) concept. Bindings allow you to securely send data to a pipeline without having to manage API keys or clients. Sending data via a Worker is enabled by default.
## Send data from a Worker
### Setup a binding
Bind to a pipeline by defining a `pipelines` binding within your Wrangler configuration. For example:
```toml title="wrangler.toml"
#:schema node_modules/wrangler/config-schema.json
name = "pipeline-starter"
main = "src/index.ts"
compatibility_date = "2025-04-01"
[[pipelines]]
pipeline = "" # The name of your Pipeline
binding = "PIPELINE" # The binding name, accessed using env.MY_PIPELINE
```
You can bind multiple pipelines to a Worker.
### Send data
The Pipelines binding exposes a `send()` method. For example, to log inbound HTTP requests to your Worker:
```ts
export default {
async fetch(request, env, ctx): Promise {
let log = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers),
};
await env.PIPELINE.send([log]);
return new Response('Hello World!');
},
} satisfies ExportedHandler;
```
## Workers API
### `Pipeline`
A binding which allows a Worker to send messages to a pipeline.
```ts
interface Pipeline {
send(records: PipelineRecord[]): Promise;
}
```
* `send(records)`: `Promise`
* Sends records to the pipeline. The body must be an array of objects which are JSON serializable.
* When the promise resolves, the records are confirmed to be ingested.
:::note
When running your Worker locally, pipelines are partially simulated. Worker code which sends data to a pipeline will execute successfully. However, the full pipeline, including batching & writing to R2, will not be executed locally.
:::
---
# Ingest data from a Worker, and analyze using MotherDuck
URL: https://developers.cloudflare.com/pipelines/tutorials/query-data-with-motherduck/
import { Render, PackageManagers, Details, WranglerConfig } from "~/components";
In this tutorial, you will learn how to ingest clickstream data to a [R2 bucket](/r2) using Pipelines. You will use the Pipeline binding to send the clickstream data to the R2 bucket from your Worker. You will also learn how to connect the bucket to MotherDuck. You will then query the data using MotherDuck.
For this tutorial, you will build a landing page of an e-commerce website. A user can click on the view button to view the product details or click on the add to cart button to add the product to their cart.
## Prerequisites
1. A [MotherDuck](https://motherduck.com/) account.
2. Install [`Node.js`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm).
Use a Node version manager like [Volta](https://volta.sh/) or
[nvm](https://github.com/nvm-sh/nvm) to avoid permission issues and change
Node.js versions. [Wrangler](/workers/wrangler/install-and-update/), discussed
later in this guide, requires a Node version of `16.17.0` or later.
## 1. Create a new project
You will create a new Worker project that will use [Static Assets](/workers/static-assets/) to serve the HTML file.
Create a new Worker project by running the following commands:
Navigate to the `e-commerce-pipelines` directory:
```sh frame="none"
cd e-commerce-pipelines
```
## 2. Update the frontend
Using Static Assets, you can serve the frontend of your application from your Worker. The above step creates a new Worker project with a default `public/index.html` file. Update the `public/index.html` file with the following HTML code:
Select to view the HTML code
```html
E-commerce Store
Our Products
```
The above code does the following:
- Uses Tailwind CSS to style the page.
- Renders a list of products.
- Adds a button to view the details of a product.
- Adds a button to add a product to the cart.
- Contains a `handleClick` function to handle the click events. This function logs the action and the product ID. In the next steps, you will add the logic to send the click events to your pipeline.
## 3. Generate clickstream data
You need to send clickstream data like the `timestamp`, `user_id`, `session_id`, and `device_info` to your pipeline. You can generate this data on the client side. Add the following function in the `