# Real-Time SQL on Kafka with PySpark

PySpark is a Python interface for Apache Spark. Spark crunches big datasets in a distributed way, and PySpark lets you control it from a Python shell.

Combining PySpark with Kafka creates a system that processes real-time data in seconds. You can run ad-hoc SQL analysis locally without deploying anything.

This guide covers:

- Setting up PySpark with Kafka
- Filtering and transforming JSON using SQL
- Using Conduktor as a faster alternative for simple filters
- Generating test data with ChatGPT

## Installing PySpark

On macOS, install via [Homebrew](https://brew.sh/). See [Apache Spark docs](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) for other systems.

```bash
$ brew install apache-spark
```

This installs the full Spark stack. Spark runs on the JVM with Scala at its core. We only need the `pyspark` CLI.

Open a new terminal and run:

```bash
$ pyspark
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 00:13:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/

Using Python version 3.11.5 (main, Aug 24 2023 15:09:45)
Spark context Web UI available at http://stephane.lan:4040
Spark context available as 'sc' (master = local[*], app id = local-1694643195574).
SparkSession available as 'spark'.
>>>
```

A local Spark instance is now running with a web UI at [http://localhost:4040](http://localhost:4040/).

![Screenshot of the local PySpark web UI running on localhost:4040 showing the Spark master and worker](https://www.conduktor.io/assets/images/blog/pyspark-local-web-ui-running.png)

Test the API with a basic dataset:

```python
>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> df = spark.createDataFrame(data)
df.show()
>>> df.show()
+------+------+
| _1| _2|
+------+------+
| Java| 20000|
|Python|100000|
| Scala| 3000|
+------+------+
>>> df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
```

> A DataFrame (`df`) is a table with rows and columns. Each column has a name, each row is a record. Think spreadsheet, but designed for millions or billions of rows, including data that won't fit in memory.

DataFrames support sorting, filtering, and calculations like spreadsheets. The difference: they scale to massive datasets across distributed clusters.

Kafka produces massive streams of data. Spark processes massive datasets. They fit together naturally.

## Connecting PySpark to Kafka

First, you need a Kafka cluster. [Upstash](https://upstash.com/) offers a free tier. We'll use Conduktor to create topics and produce data, and PySpark for transformations.

> Follow [Getting Started with Conduktor and Upstash](https://www.conduktor.io/alternatives-for-conduktor-playground) for setup.

If you don't have Conduktor installed:

```bash
$ curl -L https://releases.conduktor.io/console -o docker-compose.yml && docker compose up
```

![Terminal screenshot of the PySpark + Kafka + Conduktor docker-compose setup completing](https://www.conduktor.io/assets/images/blog/pyspark-kafka-conduktor-cli-setup.png)

Create a topic called "hello" and produce some data:

![Conduktor Console showing the 'hello' Kafka topic with a few produced records](https://www.conduktor.io/assets/images/blog/conduktor-topic-hello-produce.png)

Start PySpark with the Kafka dependency:

```bash
# Without spark-sql-kafka, you'll get:
# pyspark.errors.exceptions.captured.AnalysisException: Failed to find data source: kafka.

$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1
```

Replace `XXX` and `YYY` with your Upstash credentials in the code below:

> The `"startingOffsets": "earliest"` option tells Spark to read from the beginning of the topic, not the end (the default).

```python
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import time

# Consumer runs for 10 seconds
stop_time = datetime.now() + timedelta(seconds=10)

##
## TODO: UPDATE kafka.sasl.jaas.config
##
kafka_options = {
 "kafka.bootstrap.servers": "romantic-drake-10214-eu1-kafka.upstash.io:9092",
 "kafka.sasl.mechanism": "SCRAM-SHA-256",
 "kafka.security.protocol": "SASL_SSL",
 "kafka.sasl.jaas.config": """org.apache.kafka.common.security.scram.ScramLoginModule required username="XXX" password="YYY";""",
 "startingOffsets": "earliest",
 "subscribe": "hello"
}

# Subscribe to Kafka topic "hello"
df = spark.readStream.format("kafka").options(** kafka_options).load()

# Deserialize the value from Kafka as a String
deserialized_df = df.selectExpr("CAST(value AS STRING)")

# Query Kafka for 10 seconds
query = deserialized_df.writeStream.outputMode("append").format("console").start()
time.sleep(10)
query.stop()
```

Output while producing data via Conduktor:

```text
+---------------+
| value|
+---------------+
| my |
| first |
| message |
...
+---------------+
only showing top 20 rows
>>>
```

The connection works. Now for JSON transformations.

## Monitoring Active Queries in the Spark UI

Shell work gets messy. You start streaming queries and forget to `.stop()` them. The Spark UI shows active queries and their status:

[http://localhost:4040/StreamingQuery/](http://localhost:4040/StreamingQuery/):

![Spark UI Streaming Query tab listing the active Kafka structured streaming query](https://www.conduktor.io/assets/images/blog/pyspark-streaming-query-ui.png)

## Running SQL on JSON from Kafka

We'll work with Netflix-style "view events":

```json
{
  "user_id": 18,
  "content": "Stranger Things",
  "watched_at": "2023-08-10 10:00:01",
  "rating": 5
}
```

Produce this JSON in the `hello` topic, then run:

```python
from pyspark.sql.functions import *
from pyspark.sql.types import *

json_schema = StructType([
 StructField("user_id", StringType()),
 StructField("content", StringType()),
 StructField("watched_at", TimestampType()),
 StructField("rating", IntegerType()),
])

# df is the DataFrame reading from Kafka above
json_df = df.select(from_json(col("value").cast("string"), json_schema).alias("value"))
json_df.printSchema()
# root
# |-- value: struct (nullable = true)
# | |-- user_id: string (nullable = true)
# | |-- content: string (nullable = true)
# | |-- watched_at: timestamp (nullable = true)
# | |-- rating: integer (nullable = true)

query = json_df.writeStream.outputMode("append").format("console").start()
time.sleep(10)
query.stop()
```

Output:

```text
+--------------------+
| value|
+--------------------+
|{18, Stranger Thi...|
+--------------------+
```

Now for SQL. Create a temporary view (required because SQL context doesn't know Python variable names):

```bash
# value.* avoids prefixing every field with "value."
json_df.select("value.*").createOrReplaceTempView("netflix_view")

averageRatings = spark.sql("SELECT content, AVG(rating) FROM netflix_view GROUP BY content")
query = averageRatings.writeStream.outputMode("complete").format("console").start()
# "complete" mode is required for GROUP BY aggregations
# "append" mode fails with: Append output mode not supported when there are streaming aggregations

time.sleep(10)
query.stop()
```

Produce more data:

```json
{
  "user_id": 112,
  "content": "The Crown",
  "watched_at": "2023-08-11 10:00:01",
  "rating": 4
}
```

Spark displays the update:

```text
+---------------+-----------+
| content|avg(rating)|
+---------------+-----------+
|Stranger Things| 3.75|
| The Crown| 4.0|
+---------------+-----------+
```

Real-time SQL on Kafka data, running in your terminal.

## Filtering Without Spark Using Conduktor Console

For simple field filtering (no GROUP BY), you don't need PySpark. Conduktor Console filters Kafka data directly.

Below: filtering messages where `rating > 4`. Add as many filters as needed.

![Conduktor Console filter dialog: keeping only messages where rating greater than 4](https://www.conduktor.io/assets/images/blog/conduktor-message-filter-rating.png)

## Creating Virtual SQL Topics with Conduktor Gateway

[Conduktor Gateway](https://marketplace.conduktor.io/interceptors/virtual-sql-topic/), a Kafka proxy, supports Virtual SQL Topics: persistent SQL views over Kafka topics. No code, no stream processing framework. Everything runs in memory at runtime, costing nothing.

With Gateway deployed, add the SQL Topic interceptor:

```json
{
  "name": "my-virtual-sql-topic-interceptor",
  "pluginClass": "io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "good_ratings",
    "statement": "SELECT * FROM my_netflix_topic WHERE rating >= 4"
  }
}
```

Users now access a topic named `good_ratings` built from your SQL query. It's not materialized (no storage, no partitions) but behaves like a normal topic. Works with kafka-console-consumer, Spring, any Kafka client.

> Common use case: hide the original `my_netflix_topic` and expose only filtered data via `good_ratings`. Hide sensitive data, expose only what's needed, and modify the SQL query without consumers knowing.

## Generating Test Data with ChatGPT

This section is optional. It requires an [OpenAI account](https://platform.openai.com/signup?launch) and a [Google Cloud account](https://console.cloud.google.com/).

Setup:

- [Create an API key](https://platform.openai.com/account/api-keys) on OpenAI
- [Create a Search Engine](https://programmablesearchengine.google.com/u/1/controlpanel/create) on Google Cloud for a Search Engine ID
- [Enable the Custom Search API](https://console.cloud.google.com/projectselector2/apis/dashboard?supportedpurview=project) on your Google Cloud project

Export these variables (quit PySpark with Ctrl+C, add exports, restart):

```bash
export OPENAI_API_KEY=sk-your-openai-api-key-here
export GOOGLE_API_KEY=your-google-api-key-here
export GOOGLE_CSE_ID=your-google-cse-id-here
```

We'll use [pyspark-ai](https://github.com/databrickslabs/pyspark-ai) for OpenAI integration.

With GPT-4 access:

```python
from pyspark_ai import SparkAI
spark_ai = SparkAI()
spark_ai.activate()
```

Without GPT-4 (use GPT-3.5):

```python
from pyspark_ai import SparkAI
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model_name='gpt-3.5-turbo')
spark_ai = SparkAI(llm=llm, verbose=True)
spark_ai.activate()
```

Available models: [https://platform.openai.com/account/rate-limits](https://platform.openai.com/account/rate-limits)

After activation, create a DataFrame from Wikipedia:

```python
df = spark_ai.create_df("https://en.wikipedia.org/wiki/List_of_most-subscribed_YouTube_channels", [ "name", "subscribers", "language", "category", "country" ])
df.show()
```

Output:

```json
[..]
+--------------------+-----------+--------+-------------+--------------------+
| name|subscribers|language| category| country|
+--------------------+-----------+--------+-------------+--------------------+
| T-Series| 249.0| Hindi| Music| India|
| MrBeast| 183.0| English|Entertainment| United States|
| Cocomelon| 165.0| English| Education| United States|
|Sony Entertainmen...| 162.0| Hindi|Entertainment| India|
| Kids Diana Show| 113.0| English|Entertainment|Ukraine- United S...|
| PewDiePie| 111.0| English|Entertainment| Sweden|
| Like Nastya| 107.0| English|Entertainment|Russia- United St...|
| Vlad and Niki| 101.0| English|Entertainment|Russia- United St...|
| Zee Music Company| 99.5| Hindi| Music| India|
| WWE| 97.1| English| Sports| United States|
| Blackpink| 91.2| Korean| Music| South Korea|
| Goldmines| 89.5| Hindi| Film| India|
| Sony SAB| 85.2| Hindi|Entertainment| India|
| 5-Minute Crafts| 80.2| English| How-to| Cyprus|
| BangtanTV| 76.4| Korean| Music| South Korea|
| Hybe Labels| 72.6| Korean| Music| South Korea|
| Zee TV| 72.4| Hindi|Entertainment| India|
| Justin Bieber| 71.9| English| Music| Canada|
| Pinkfong| 69.5| English| Education| South Korea|
|ChuChu TV Nursery...| 67.5| Hindi| Education| India|
+--------------------+-----------+--------+-------------+--------------------+
[..]
```

Natural language queries work:

```python
>>> df.ai.verify("expect France not to be in the countries")
Result: True
```

```python
>>> df.ai.transform("per country").show()
+--------------------+-----------------+
| country|total_subscribers|
+--------------------+-----------------+
| India| 1312.2|
| United States| 678.5|
|Ukraine- United S...| 113.0|
| Sweden| 111.0|
|Russia- United St...| 208.0|
| South Korea| 309.7|
| Cyprus| 80.2|
| Canada| 71.9|
| Brazil| 66.6|
| Argentina| 59.7|
+--------------------+-----------------+
```

It builds PySpark programs (typically SQL) behind the scenes.

> GPT-3.5 may fail on large datasets: `openai.error.InvalidRequestError: This model's maximum context length is 4097 tokens. However, your messages resulted in 7337 tokens.`

Plotting works too:

```python
>>> df.ai.plot("by country")
```

![Terminal output: final PySpark structured streaming results aggregated from the Kafka topic](https://www.conduktor.io/assets/images/blog/pyspark-kafka-final-streaming-output.png)

## Producing AI-Generated JSON to Kafka

Use `ChatOpenAI` directly to generate test payloads:

```python
from langchain.chat_models import ChatOpenAI
from langchain.schema import *
llm = ChatOpenAI(model_name='gpt-4') # or gpt-3.5
random_json = llm([HumanMessage(content="""generate random minified JSON payloads based on this: {
 "user_id": 112,
 "content": "The Crown",
 "watched_at": "2023-08-11 10:00:01",
 "rating": 4
}""")]).content

random_json
```

ChatGPT generates:

```json
{"user_id":345,"content":"Breaking Bad","watched_at":"2022-12-14 18:30:12","rating":5}
{"user_id":876,"content":"Friends","watched_at":"2023-01-03 21:00:00","rating":4}
{"user_id":290,"content":"Stranger Things","watched_at":"2023-04-16 22:45:01","rating":5}
{"user_id":789,"content":"The Witcher","watched_at":"2023-08-14 20:55:00","rating":4}
{"user_id":654,"content":"Mandalorian","watched_at":"2023-07-20 19:10:10","rating":5}
{"user_id":321,"content":"Peaky Blinders","watched_at":"2023-09-10 22:00:45","rating":4}
{"user_id":154,"content":"Game of Thrones","watched_at":"2023-05-15 21:30:30","rating":3}
{"user_id":903,"content":"Money Heist","watched_at":"2023-10-01 23:00:01","rating":4}
{"user_id":567,"content":"Westworld","watched_at":"2023-06-12 20:10:10","rating":4}
{"user_id":238,"content":"Better Call Saul","watched_at":"2023-07-11 19:00:00","rating":5}
```

Send it to Kafka:

```python
chatgpt_records = random_json.strip().split("\n")
chatgpt_df = spark.createDataFrame([Row(value=x) for x in chatgpt_records])
chatgpt_df.selectExpr("CAST(value AS STRING)").write.format("kafka").options(** kafka_options).option("topic", "hello").save()
```

Each generation updates the running `averageRatings` query:

```bash
# averageRatings = spark.sql("SELECT content, AVG(rating) FROM netflix_view GROUP BY content")

+---------------+-----------+
| content|avg(rating)|
+---------------+-----------+
| Westworld| 4.0|
| Money Heist| 4.0|
|Stranger Things| 4.0|
| The Crown| 4.0|
| Narcos| 5.0|
| The Office| 4.0|
| Peaky Blinders| 5.0|
| Breaking Bad| 5.0|
|The Mandalorian| 5.0|
| Friends| 3.0|
|Game of Thrones| 4.0|
+---------------+-----------+
```

The loop closes: AI generates JSON, writes to Kafka, PySpark runs SQL on it in real-time.

## Summary

This guide covered:

- PySpark installation and Kafka integration
- SQL filtering and transformations on streaming JSON
- Conduktor Console for simple filtering without code
- Conduktor Gateway for virtual SQL topics
- ChatGPT for generating test data

PySpark handles ad-hoc analysis locally. Conduktor's UI and Gateway add SQL capabilities without deploying a stream processing framework. Together, they make real-time data exploration fast and cheap.
