# Stream Real-Time WebSocket Data to Apache Kafka with Our New Source Connector

Kafka Connect has connectors for databases, files, cloud services, message queues. But if you wanted to pipe a WebSocket stream into Kafka, you had two options: write custom code or pay for a commercial connector. Most teams wrote the custom code. Then they maintained the reconnection logic, the auth handling, the 3 AM debugging when messages stopped flowing.

We got tired of watching teams solve the same problem. So we built an open-source WebSocket source connector for Kafka Connect and are releasing it today.

![WebSocket connector architecture: A WebSocket API (like Coinbase) streams ticker, trades, and order book data over wss:// to the WebSocket Source Connector in Kafka Connect, which writes messages to a Kafka topic. The connector auto-reconnects on disconnect.](https://www.conduktor.io/assets/images/blog/websocket-connector-architecture.png)

## What it does

The connector is Apache-licensed. Point it at any WebSocket endpoint, give it a Kafka topic, and it streams messages in. `wss://ws-feed.exchange.coinbase.com` for Bitcoin trades, your IoT platform's WebSocket API for sensor data, Binance or Discord feeds. Standard Kafka Connect configuration, nothing custom.

What you get:

- Automatic reconnection with configurable intervals
- Bearer token and custom header authentication
- Subscription messages for exchanges like Binance and Coinbase
- Configurable in-memory buffering for bursty traffic
- JMX metrics for connection health and throughput

## Quick start: running in 5 minutes

You need Docker and Docker Compose. We'll spin up Kafka, Kafka Connect, and Conduktor Console (a web UI for viewing topics), then deploy the connector to stream live Coinbase trades.

**Step 1: Clone and build**

```bash
git clone https://github.com/conduktor/kafka-connect-websocket.git
cd kafka-connect-websocket
mvn clean package -DskipTests
```

No local Java? Use Docker instead:

```bash
docker run --rm -v "$(pwd)":/app -w /app maven:3.9-eclipse-temurin-17 mvn clean package -DskipTests
```

**Step 2: Start the stack**

```bash
cd examples
docker compose up -d
```

This starts:
- Kafka (KRaft mode) on port 9092
- Kafka Connect on port 8083
- Conduktor Console on port 8080 for viewing topics

Wait about 30 seconds for everything to initialize. Check that Kafka Connect is up:

```bash
curl http://localhost:8083/
```

You should get back something like:
```json
{"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"5L6g3nShT-eMCtK--X86sw"}
```

**Step 3: Deploy the WebSocket connector**

Deploy a connector that subscribes to Coinbase BTC-USD ticker updates:

```bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "coinbase-btcusd",
    "config": {
      "connector.class": "io.conduktor.connect.websocket.WebSocketSourceConnector",
      "tasks.max": "1",
      "websocket.url": "wss://ws-feed.exchange.coinbase.com",
      "kafka.topic": "coinbase-btcusd-trades",
      "websocket.subscription.message": "{\"type\":\"subscribe\",\"channels\":[{\"name\":\"ticker\",\"product_ids\":[\"BTC-USD\"]}]}"
    }
  }'
```

The connector opens a WebSocket to Coinbase, sends the subscription message, and starts forwarding ticker updates.

**Step 4: See the data**

Open Conduktor Console at http://localhost:8080 and navigate to the `coinbase-btcusd-trades` topic. You should see BTC-USD ticker updates arriving in real time.

Or consume via the command line:

```bash
docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic coinbase-btcusd-trades \
  --from-beginning \
  --max-messages 3
```

The first message is a subscription confirmation from Coinbase. After that, each message is a JSON ticker event:

```json
{"type":"ticker","sequence":124104291706,"product_id":"BTC-USD","price":"70672.78","open_24h":"72422.56","volume_24h":"6951.60","low_24h":"70236.02","high_24h":"72869.07","volume_30d":"302213.79","best_bid":"70672.77","best_bid_size":"0.00002426","best_ask":"70672.78","best_ask_size":"0.79504402","side":"buy","time":"2026-03-14T14:49:14.639873Z","trade_id":980607675,"last_size":"0.00034376"}
```

Live market data, in Kafka, with zero custom code.

> To connect your own WebSocket API, copy the connector config, change the URL and topic. For authenticated endpoints, see the configuration section below.

## Configuration

The minimum configuration is two fields:

| Parameter | Description | Example |
|-----------|-------------|---------|
| `websocket.url` | WebSocket endpoint (ws:// or wss://) | `wss://stream.example.com/feed` |
| `kafka.topic` | Destination Kafka topic | `my-websocket-data` |

That's enough for a public WebSocket. Most production scenarios need more.

**Authentication**

For protected APIs, use bearer token authentication:

```json
{
  "name": "authenticated-feed",
  "config": {
    "connector.class": "io.conduktor.connect.websocket.WebSocketSourceConnector",
    "tasks.max": "1",
    "websocket.url": "wss://api.example.com/stream",
    "kafka.topic": "example-stream",
    "websocket.auth.token": "your-bearer-token-here"
  }
}
```

The connector sends `Authorization: Bearer <token>` with the connection request.

For custom headers, use `websocket.headers`:

```json
{
  "websocket.headers": "X-API-Key:abc123,X-Client-ID:myapp"
}
```

Format is `key1:value1,key2:value2` for multiple headers. You can combine both:

```json
{
  "websocket.auth.token": "bearer-token",
  "websocket.headers": "X-Client-ID:myapp,X-Version:2"
}
```

**Subscription messages**

Many WebSocket APIs, especially crypto exchanges, require a subscription message after connecting:

```json
{
  "websocket.subscription.message": "{\"type\":\"subscribe\",\"channels\":[\"ticker\"]}"
}
```

This JSON is sent immediately after the WebSocket connection opens. Binance, Coinbase, Kraken all use this pattern to specify which streams you want.

**Message format**

The connector preserves WebSocket messages as-is in Kafka record values:

- Text messages become strings (UTF-8)
- Binary messages become byte arrays (preserved exactly)
- Record key is always null (WebSocket has no key concept)
- Headers are empty (WebSocket messages don't carry headers)

You can apply Kafka Connect SMTs to parse JSON, extract fields, or route messages based on content.

**Reconnection**

| Parameter | Default | Description |
|-----------|---------|-------------|
| `websocket.reconnect.enabled` | `true` | Automatically reconnect on disconnect |
| `websocket.reconnect.interval.ms` | `5000` | Wait 5 seconds between reconnection attempts |

When the WebSocket disconnects, whether from a server restart, network issue, or idle timeout, the connector waits the configured interval and reconnects. It resends the subscription message if one was configured.

**Performance tuning**

| Parameter | Default | Description |
|-----------|---------|-------------|
| `websocket.message.queue.size` | `10000` | In-memory buffer for messages |
| `websocket.connection.timeout.ms` | `30000` | Connection timeout (30 seconds) |

The queue buffers messages between the WebSocket and Kafka Connect's polling loop. Increase it for bursty traffic.

When the queue fills, the connector drops incoming messages until space opens. Monitor the `MessagesDropped` JMX metric. If you see drops, increase `websocket.message.queue.size` or tune your Kafka producer settings for higher throughput.

**Task configuration**

Set `tasks.max` to `1`. WebSocket connections are single-threaded by protocol design: one connection, one persistent socket. You cannot parallelize a single WebSocket stream across multiple tasks.

For multiple WebSocket streams, deploy multiple connector instances, one per stream.

**Delivery guarantees**

The connector provides at-most-once delivery. WebSocket is a fire-and-forget protocol with no replay capability. Messages are lost in two cases: when the internal queue fills (monitor `MessagesDropped`), and during reconnection windows when the server keeps sending while the connector is disconnected. For use cases requiring completeness, pair with periodic REST API snapshots to detect and fill gaps.

Full configuration reference: [configuration documentation](https://conduktor.github.io/kafka-connect-websocket/configuration/).

## Use cases

The most obvious fit is crypto and financial markets. Stream price feeds, order books, and trade executions from Coinbase, Binance, Kraken, or Gemini. Coinbase's WebSocket feed covers every trading pair with ticker, order book, and match data. One connector instance per stream, each writing to its own Kafka topic.

IoT platforms with WebSocket feeds work the same way: temperature readings, GPS coordinates, machine status, all into Kafka for stream processing and alerting.

Social and communication platforms push updates over WebSockets. Discord's Gateway API, Bluesky's firehose, Reddit's real-time feeds. Pipe them into Kafka for sentiment analysis, content moderation, or engagement analytics. Game backends, observability platforms, anything that speaks WebSocket works the same way.

## Getting started

Three paths depending on what you need.

**Docker Compose quickstart** (try it out). Run the Docker Compose example from earlier in this article. Full Kafka stack and the connector running in minutes. See the [examples directory](https://github.com/conduktor/kafka-connect-websocket/tree/main/examples).

**Install from release** (production). Download the pre-built JAR from GitHub releases:

```bash
wget https://github.com/conduktor/kafka-connect-websocket/releases/download/v1.0.0/kafka-connect-websocket-1.0.0-jar-with-dependencies.jar
```

Copy to your Kafka Connect plugins directory:

```bash
mkdir -p $KAFKA_HOME/plugins/kafka-connect-websocket
cp kafka-connect-websocket-1.0.0-jar-with-dependencies.jar $KAFKA_HOME/plugins/kafka-connect-websocket/
```

Restart Kafka Connect and the plugin becomes available. Deploy connectors via the REST API as shown in the quickstart.

> If the connector doesn't appear in the plugin list (`curl http://localhost:8083/connector-plugins`), verify the JAR is in the plugins directory and that Kafka Connect has read permissions.

**Build from source** (contributors). Clone and build:

```bash
git clone https://github.com/conduktor/kafka-connect-websocket.git
cd kafka-connect-websocket
mvn clean package
```

The output JAR with dependencies will be in `target/`. Requires Java 11+ and Maven 3.6+.

**Links**

- Documentation: [conduktor.github.io/kafka-connect-websocket](https://conduktor.github.io/kafka-connect-websocket)
- Configuration reference, operations runbook, and FAQ are all on the docs site
- GitHub: [github.com/conduktor/kafka-connect-websocket](https://github.com/conduktor/kafka-connect-websocket)

The project is Apache License 2.0. Bug reports, feature requests, and PRs are welcome. See [CONTRIBUTING.md](https://github.com/conduktor/kafka-connect-websocket/blob/main/CONTRIBUTING.md) for guidelines.

---

[github.com/conduktor/kafka-connect-websocket](https://github.com/conduktor/kafka-connect-websocket)
