By Nicole Ghalwash, Zach Peirce and Anish Singh Walia

Imagine your online store is processing orders smoothly. Then your order processing system starts rejecting customer data. Someone on your team changed a field name in the order data, and now your producer and consumer apps no longer “speak the same language.” What was once a working pipeline is now throwing error messages and blocking customer purchases.
This scenario is surprisingly common in distributed systems. Applications that rely on shared data pipelines are fragile when it comes to format changes. Even a small mismatch, like renaming customer_email to email, can cause services to fail.
That’s where Kafka Schema Registry comes in. Schema Registry acts as a central authority for data formats, ensuring that producers (apps sending data) and consumers (apps reading data) always agree on message structure. It makes evolving your data models safer, so you can add new fields without breaking existing services.
In this tutorial, you’ll learn how to set up Kafka Schema Registry on DigitalOcean, step by step. We’ll walk you through building a simple order processing system where:
By the end, you’ll know how to produce and consume Avro-encoded messages with Schema Registry, evolve your schemas safely, and monitor your setup on a managed Kafka cluster in DigitalOcean.
Before diving into the implementation, here’s what you’ll learn:
Apache Kafka moves data between applications efficiently. It lets applications publish and subscribe to streams of records, making it a backbone for real-time systems. But while Kafka handles message transport, it doesn’t enforce rules about message structure. Producers can send anything and must ensure that consumers can decode it.
That’s where Schema Registry fits in as the central authority for your Kafka topics. It stores schemas, formal descriptions of your data structure, like the order data in our example, in a centralized service. These schemas are typically defined in formats like Avro, Protobuf, or JSON Schema.
Here’s why this matters:
order_id vs id from causing deserialization errors.shipping_address field or support discount codes. Schema Registry allows you to evolve schemas safely, ensuring new versions remain compatible with old ones.When a producer sends a message, it registers or references a schema stored in Schema Registry. Consumers automatically fetch that schema (if they don’t already have it cached) and use it to decode messages.
DigitalOcean’s Managed Kafka service (available on dedicated CPU plans) includes Schema Registry out of the box. That means you can focus on building your applications instead of running and maintaining extra infrastructure. For more information on setting up Kafka clusters, see the DigitalOcean Managed Kafka documentation.
To see Schema Registry in action, we’ll build a simple order processing system.
Here’s the scenario:
Later, we’ll evolve our schema to add optional fields like shipping_address and discount_code. Without Schema Registry, these changes could cause errors. With Schema Registry, they’ll reliably work.
We’ll start by creating a Kafka cluster on DigitalOcean with Schema Registry enabled. You can do this with Terraform (recommended) or via the DigitalOcean Control Panel.
First, create a file called kafka-cluster.tf:
# Configure the DigitalOcean provider
terraform {
required_providers {
digitalocean = {
source = "digitalocean/digitalocean"
version = "~> 2.0"
}
}
}
provider "digitalocean" {
token = var.do_token
}
# Variables
variable "do_token" {
type = string
description = "Your DigitalOcean API token"
sensitive = true
}
# Create Kafka cluster with Schema Registry
resource "digitalocean_database_cluster" "kafka_cluster" {
engine = "kafka"
version = "3.8"
name = "orders-kafka-cluster"
node_count = 3
region = "nyc3"
size = "gd-2vcpu-8gb"
}
# Create topics for our example
resource "digitalocean_database_kafka_topic" "orders" {
cluster_id = digitalocean_database_cluster.kafka_cluster.id
name = "orders"
partition_count = 3
replication_factor = 3
}
# Create our order schema
resource "digitalocean_database_kafka_schema_registry" "order_schema" {
cluster_id = digitalocean_database_cluster.kafka_cluster.id
subject_name = "orders-value"
schema_type = "avro"
schema = jsonencode({
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
})
}
# Output connection details
output "kafka_host" {
value = digitalocean_database_cluster.kafka_cluster.host
}
output "kafka_port" {
value = digitalocean_database_cluster.kafka_cluster.port
}
output "schema_registry_url" {
value = "https://${digitalocean_database_cluster.kafka_cluster.host}:25065"
}
output "kafka_user" {
value = digitalocean_database_cluster.kafka_cluster.user
}
output "kafka_password" {
value = digitalocean_database_cluster.kafka_cluster.password
sensitive = true
}
Note: You’ll need to create a Personal Access Token in your DigitalOcean account to use as the do_token value. Follow the instructions in the DigitalOcean API documentation to generate your token. For more on using Terraform with DigitalOcean, see our Terraform with DigitalOcean tutorial.
Deploy the cluster:
# Set your DigitalOcean token
export TF_VAR_do_token="your_digitalocean_token_here"
# Initialize and apply
terraform init
terraform apply
After a few minutes, Terraform will output the connection details for your Kafka cluster and Schema Registry.
If you prefer a click-based setup:
Either way, you now have a Kafka cluster with Schema Registry enabled that is ready to store schemas and enforce data rules.
Once your cluster is deployed, let’s confirm that Schema Registry is working.
Run:
# Get your cluster details
terraform output
Test your Schema Registry connection by running the following, replace “your-kafka-host” and “username:password” with your actual host configuration from terraform output (you can view your password string with terraform output -raw kafka_password):
curl -u "username:password" "https://your-kafka-host:25065/subjects"
You should see something like:
["orders-value"]
This tells you that:
orders-value) registered, which corresponds to the schema we defined for order messages.A subject is essentially a namespace under which schemas are versioned. If you update your schema later, you’ll see multiple versions under the same subject.
Before running your producer and consumer applications, you’ll need to download the SSL certificates from your DigitalOcean Kafka cluster:
ca-certificate.crt (Certificate Authority)Note: For detailed instructions on downloading and configuring SSL certificates for your Kafka cluster, see the DigitalOcean Managed Kafka documentation.
Make sure these certificate files are in the same directory as your Python scripts, or update the file paths in your configuration accordingly.
Now that the infrastructure is ready, let’s create a Python producer that sends orders into Kafka. This example uses the confluent-kafka library, which provides robust support for Schema Registry integration. For more Python examples with DigitalOcean services, see our Python tutorials.
Create a file called order_producer.py using the example below with the placeholder values for your-username, your-password, and your-kafka-host:your-kafka-port updated to match your Terraform output:
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
from datetime import datetime
# Configuration (replace with your actual values)
kafka_config = {
'bootstrap.servers': 'your-kafka-host:your-kafka-port',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'ssl.ca.location': 'ca-certificate.crt',
}
schema_registry_config = {
'url': 'https://your-kafka-host:25065',
'basic.auth.user.info': 'your-username:your-password'
}
# Our order schema (matches what we created in Terraform)
order_schema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
}
"""
def create_producer():
"""Create producer with Avro serialization"""
# Create Schema Registry client
schema_registry_client = SchemaRegistryClient(schema_registry_config)
# Create Avro serializer
avro_serializer = AvroSerializer(
schema_registry_client,
order_schema
)
# Create producer
producer = Producer(kafka_config)
return producer, avro_serializer
def send_order(producer, avro_serializer, order_data):
"""Send order to Kafka topic"""
try:
serialized_value = avro_serializer(
order_data,
SerializationContext('orders', MessageField.VALUE)
)
producer.produce(
topic='orders',
value=serialized_value
)
producer.flush()
print(f"✅ Sent order: {order_data['order_id']}")
except Exception as e:
print(f"❌ Error sending order: {e}")
# Example usage
if __name__ == "__main__":
producer, avro_serializer = create_producer()
# Send sample orders
orders = [
{
"order_id": "ORDER-001",
"customer_email": "john@example.com",
"total_amount": 99.99,
"order_date": "2025-01-15"
},
{
"order_id": "ORDER-002",
"customer_email": "jane@example.com",
"total_amount": 149.50,
"order_date": "2025-01-15"
}
]
for order in orders:
send_order(producer, avro_serializer, order)
print("All orders sent!")
Now, install the required packages:
pip install 'confluent-kafka[avro]'
Run the producer:
python3 order_producer.py
You should see output like:
✅ Sent order: ORDER-001
✅ Sent order: ORDER-002
All orders sent!
So, what’s happening behind the scenes?
order_id), it won’t be sent.Next, let’s build the fulfillment service, a consumer that reads orders from Kafka.
Create a file called order_consumer.py:
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import json
# Configuration (same as producer)
kafka_config = {
'bootstrap.servers': 'your-kafka-host:your-kafka-port',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'ssl.ca.location': 'ca-certificate.crt',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
}
schema_registry_config = {
'url': 'https://your-kafka-host:25065',
'basic.auth.user.info': 'your-username:your-password'
}
# Our order schema (matches what we created in Terraform)
order_schema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
}
"""
def create_consumer():
"""Create consumer with Avro deserialization"""
# Create Schema Registry client
schema_registry_client = SchemaRegistryClient(schema_registry_config)
# Create Avro deserializer
avro_deserializer = AvroDeserializer(
schema_registry_client,
order_schema
)
# Create consumer
consumer = Consumer(kafka_config)
return consumer, avro_deserializer
def process_order(order_data):
"""Process received order (your business logic here)"""
print(f"📦 Processing order {order_data['order_id']}")
print(f" Customer: {order_data['customer_email']}")
print(f" Amount: ${order_data['total_amount']}")
print(f" Date: {order_data['order_date']}")
print(" Order processed successfully! ✅\n")
# Example usage
if __name__ == "__main__":
consumer, avro_deserializer = create_consumer()
consumer.subscribe(['orders'])
print("🚀 Order consumer started. Waiting for orders...")
print("Press Ctrl+C to stop\n")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"❌ Consumer error: {msg.error()}")
continue
# Deserialize the message
try:
order_data = avro_deserializer(
msg.value(),
SerializationContext('orders', MessageField.VALUE)
)
process_order(order_data)
except Exception as e:
print(f"❌ Error deserializing message: {e}")
except KeyboardInterrupt:
print("👋 Shutting down consumer...")
finally:
consumer.close()
Now, let’s run the consumer:
python order_consumer.py
Expected output:
🚀 Order consumer started. Waiting for orders...
📦 Processing order ORDER-001
Customer: john@example.com
Amount: $99.99
Date: 2025-01-15
Order processed successfully! ✅
The consumer automatically retrieves the schema from Schema Registry, deserializes the message, and makes sure the data structure is exactly what it expects.
Now for the magic: evolving schemas safely.
Let’s add two new optional fields: shipping_address and discount_code. Update your Terraform schema:
resource "digitalocean_database_kafka_schema_registry" "order_schema" {
cluster_id = digitalocean_database_cluster.kafka_cluster.id
subject_name = "orders-value"
schema_type = "avro"
schema = jsonencode({
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"},
{"name": "shipping_address", "type": ["null", "string"], "default": null},
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
})
}
Apply the update:
terraform apply
Now try sending a new order by updating order_producer.py:
orders = [
{
"order_id": "ORDER-003",
"customer_email": "bob@example.com",
"total_amount": 199.99,
"order_date": "2025-01-15",
"shipping_address": "123 Main St, City, State",
"discount_code": "SAVE10"
}
]
Run:
python3 order_producer.py
What happens in the background?
No downtime. No broken services. Just smooth schema evolution.
You can monitor Schema Registry directly via its API:
# List all schemas
curl -u "username:password" \
"https://your-kafka-host:25065/subjects"
# Get latest schema version
curl -u "username:password" \
"https://your-kafka-host:25065/subjects/orders-value/versions/latest"
You’ll see the schema JSON returned, including version numbers.
In addition, DigitalOcean’s control panel gives you:
Together, these tools help you keep your pipeline reliable as you scale. For more on monitoring and observability, see the DigitalOcean Monitoring documentation.
Even with managed infrastructure, you may run into common issues. Here’s a quick reference table for resolving them:
| Issue | Cause / Explanation | Solution |
|---|---|---|
| Schema Registry not found | Likely you’re on a Basic Kafka plan. Schema Registry requires a General Purpose (dedicated CPU) plan. | In the Control Panel, go to your cluster’s Settings and confirm Schema Registry is enabled. |
| SSL connection failed | Kafka certificates (CA, client cert, client key) may not be downloaded or are missing. | Ensure the file paths in your Python config point to the correct certificates downloaded from DigitalOcean. |
| Schema compatibility error | Happens when you add new fields without defaults. Schema Registry enforces compatibility rules. | Make new fields optional with ["null", "string"] or provide a default value. |
Pro tip: When evolving schemas, always test with a consumer running before deploying to production.
Kafka Schema Registry is a centralized service that stores and manages data schemas for Kafka topics. It ensures that producers and consumers agree on message structure, preventing errors from schema mismatches. Without Schema Registry, a simple field rename like changing customer_email to email can break your entire pipeline. Schema Registry validates messages before they enter topics and handles schema versioning automatically.
No. Schema Registry requires a General Purpose (dedicated CPU) plan. Basic plans don’t support Schema Registry. When creating your Kafka cluster in the DigitalOcean Control Panel, select a General Purpose plan to enable Schema Registry. You can find more details in the DigitalOcean Managed Kafka documentation.
Schema Registry enforces compatibility rules when you update schemas. By default, it uses backward compatibility, meaning new schema versions must be readable by consumers using older versions. To add new fields safely, make them optional with ["null", "string"] or provide default values. Breaking changes (like removing required fields) are rejected unless you change the compatibility mode. This prevents production outages from schema updates.
Yes. DigitalOcean’s Schema Registry supports Avro, Protobuf, and JSON Schema formats. This tutorial uses Avro because it’s widely adopted and provides efficient binary serialization. Protobuf offers similar benefits with a different schema definition language, while JSON Schema is more human-readable but less efficient. Choose based on your team’s preferences and existing tooling.
Schema Registry validates messages before they’re accepted. If a message doesn’t match the registered schema, the producer will receive an error and the message won’t be written to the topic. This keeps your data stream clean and prevents downstream consumers from encountering deserialization errors. Always validate your data structure matches the schema before sending.
You can monitor Schema Registry through its REST API using curl commands to list subjects and view schema versions. Additionally, DigitalOcean’s control panel provides cluster health metrics, message throughput per topic, and consumer lag monitoring. For production deployments, consider setting up alerts for schema compatibility errors and consumer lag thresholds.
Congratulations! You now have a working Kafka pipeline with Schema Registry on DigitalOcean.
Now that your foundation is in place, here are some ideas to take it further:
payment_status or delivery_date.From here, you can expand your pipeline into a complete event-driven architecture. With Schema Registry on DigitalOcean, you can build confidently, knowing your data integrity is protected.
Ready to go deeper? Check out these resources:
Continue learning and evolving your architecture with DigitalOcean Managed Kafka and the tutorials above.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
I help Businesses scale with AI x SEO x (authentic) Content that revives traffic and keeps leads flowing | 3,000,000+ Average monthly readers on Medium | Sr Technical Writer @ DigitalOcean | Ex-Cloud Consultant @ AMEX | Ex-Site Reliability Engineer(DevOps)@Nutanix
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.
Full documentation for every DigitalOcean product.
The Wave has everything you need to know about building a business, from raising funding to marketing your product.
Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.
New accounts only. By submitting your email you agree to our Privacy Policy
Scale up as you grow — whether you're running one virtual machine or ten thousand.
Sign up and get $200 in credit for your first 60 days with DigitalOcean.*
*This promotional offer applies to new accounts only.