Real-Time Crypto Price Streaming with Delta Live Tables in Databricks from Kafka

Real-time data streaming is no longer exclusive to financial giants or large tech companies. With open APIs and modern data platforms like Databricks, anyone can build a streaming pipeline in under an hour. Thanks to Vibe coding, I set mine up even faster.

Vibe coding, a cutting-edge approach to software development, leverages large language models (LLMs) to generate code from natural language descriptions, streamlining the process and boosting efficiency.

I demonstrate how I built a real-time cryptocurrency price tracker using:

  • CoinGecko REST API for price feed
  • Kafka on AWS EC2 (free-tier) for data transport
  • Delta Live Tables (DLT) in Databricks for stream ingestion and transformation

Whether you’re a data engineer exploring real-time pipelines or a manager evaluating ELT capabilities, this is a practical example of turning live data into insights.

Step 1: Collecting Prices from CoinGecko Every Minute

The CoinGecko REST API offers simple access to real-time prices for popular cryptocurrencies. I wrote a lightweight Python script to: Fetch BTC and ETH prices in USD every 30 seconds, Format messages as JSON and Send data to Kafka.

# coingecko_kafka_producer.py
import requests
import json
import time
from kafka import KafkaProducer

KAFKA_BROKER = 'localhost:9092'
TOPIC = 'crypto-prices'

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def fetch_prices():
    url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        payload = {
            "btc_price": data["bitcoin"]["usd"],
            "eth_price": data["ethereum"]["usd"],
            "timestamp": int(time.time())
        }
        print("Sending:", payload)
        producer.send(TOPIC, payload)
    else:
        print("Failed to fetch from CoinGecko:", response.status_code)

if __name__ == "__main__":
    while True:
        fetch_prices()
        time.sleep(30)  # 1/2-minute interval
Step 2: Hosting Kafka on AWS EC2

used a free-tier EC2 instance (Ubuntu) to host Kafka and ZooKeeper.

  • Download Kafka 3.7
  • Open ports 9092 (Kafka) and 2181 (ZooKeeper)
  • Configure advertised listeners to use your EC2 public IP

Start services with:

# ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
# Kafka
bin/kafka-server-start.sh config/server.properties &

I automated this with a script: ./start_kafka_stack.sh and ./stop_kafka_stack.sh Zookeeper, Kafka and the python script.

Step 3: Ingesting Kafka Stream Using Delta Live Tables

Delta Live Tables makes streaming ingestion incredibly simple. Here’s a DLT workbook that reads from Kafka and write to a bronze table and further cleanses the data into a silver table:

import dlt
from pyspark.sql.types import StructType, DoubleType, LongType
from pyspark.sql.functions import from_json, col

# Define schema for incoming Kafka JSON
schema = StructType() \
    .add("btc_price", DoubleType()) \
    .add("eth_price", DoubleType()) \
    .add("timestamp", LongType())

# Load from Kafka - utility function
def load_from_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "<your Server IP>")
        .option("subscribe", "crypto-prices")
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING) as json_string")
        .withColumn("data", from_json("json_string", schema))
        .select("data.*")
    )

# Bronze table from Kafka
@dlt.table(
    name="crypto_prices_bronze",
    comment="Raw streaming crypto prices from Kafka",
    table_properties={"quality": "bronze"}
)
def bronze():
    return load_from_kafka()

# Silver table - cleansed data
@dlt.table(
    name="crypto_prices_silver",
    comment="Cleaned crypto prices with casted types",
    table_properties={"quality": "silver"}
)
@dlt.expect_all({"btc_positive": "btc_price > 0", "eth_positive": "eth_price > 0"})
def clean_data():
    df = dlt.read("crypto_prices_bronze")
    return df

This gives you a fully managed streaming pipeline with monitoring and lineage tracking. Hit play to see the quick demo