Schema Registry
Overview
The schema-service module provides a Schema Registry implementation compatible with the Confluent Schema Registry API. It runs as an embedded HTTP server (default port 8081) and provides REST endpoints for managing schemas, subjects, versions, configurations, and compatibility checking.
The service uses Kafka as its storage backend (via a _schemas topic) and supports schema validation and compatibility checking, primarily for Apache Avro schemas.
Configuration
lc.schema.registry.enable
- Type: Boolean
- Default:
false - Description: Controls whether the embedded Schema Registry is enabled
lc.schema.registry.port
- Type: Int
- Default: 8081
- Description: Port that the Schema Registry's HTTP server listens to
Implicit Configurations
Currently, the following are hard-coded:
- Default Kafka Bootstrap: Advertised listeners in Kafka config
- Storage Topic:
_schemas - Content-Type:
application/vnd.schemaregistry.v1+jsonorapplication/json
API Endpoints
Root Endpoint
GET /
Returns an empty JSON object to verify the service is running.
Response: 200 OK
{}
Schema Endpoints
GET /schemas/ids/
Retrieves a schema by its global ID.
Path Parameters:
id(integer): The schema ID
Response: 200 OK
{
"schema": "<schema_string>"
}
Error Responses:
404 Not Found(40403): Schema not found
GET /schemas/ids/{id}/schema
Retrieves the raw schema string by its global ID.
Path Parameters:
id(integer): The schema ID
Response: 200 OK
- Returns the raw schema string (not wrapped in JSON)
Error Responses:
404 Not Found(40403): Schema not found
GET /schemas/types
Lists all supported schema types.
Response: 200 OK
["AVRO", "JSON", "PROTOBUF"]
Subject Endpoints
GET /subjects
Lists all registered subjects.
Query Parameters:
deleted(boolean, optional): Include soft-deleted subjects (default: false)
Response: 200 OK
["subject1", "subject2", "subject3"]
GET /subjects/{subject}/versions
Lists all version numbers for a subject.
Path Parameters:
subject(string): The subject name (URL-encoded)
Response: 200 OK
[1, 2, 3]
Error Responses:
404 Not Found(40401): Subject not found
GET /subjects/{subject}/versions/
Retrieves a specific schema version for a subject.
Path Parameters:
subject(string): The subject name (URL-encoded)version(string): Version number, "latest", or "-1" for latest
Response: 200 OK
{
"subject": "example-subject",
"id": 123,
"version": 1,
"schema": "<schema_string>",
"schemaType": "AVRO"
}
Note: schemaType field is only included for non-AVRO schemas.
Error Responses:
404 Not Found(40401): Subject not found404 Not Found(40402): Version not found422 Unprocessable Entity(42202): Invalid version
GET /subjects/{subject}/versions/{version}/schema
Retrieves the raw schema string for a specific version.
Path Parameters:
subject(string): The subject name (URL-encoded)version(string): Version number, "latest", or "-1" for latest
Response: 200 OK
- Returns the raw schema string (not wrapped in JSON)
Error Responses:
404 Not Found(40401): Subject not found404 Not Found(40402): Version not found
POST /subjects/{subject}/versions
Registers a new schema version for a subject.
Path Parameters:
subject(string): The subject name (URL-encoded)
Request Body:
{
"schema": "<schema_string>",
"schemaType": "AVRO",
"references": []
}
Fields:
schema(string, required): The schema definitionschemaType(string, optional): Schema type (default: "AVRO")references(array, optional): Schema references
Response: 200 OK
{
"id": 123
}
Returns the existing ID if the schema is identical to the latest version (idempotent operation).
Error Responses:
409 Conflict: Schema is incompatible with the latest schema422 Unprocessable Entity(42201): Invalid or empty schema
POST /subjects/
Checks if a schema exists under the subject and returns its version information.
Path Parameters:
subject(string): The subject name (URL-encoded)
Request Body:
{
"schema": "<schema_string>",
"schemaType": "AVRO"
}
Response: 200 OK
{
"subject": "example-subject",
"id": 123,
"version": 1,
"schema": "<schema_string>",
"schemaType": null
}
Error Responses:
404 Not Found(40401): Subject not found404 Not Found(40403): Schema not found422 Unprocessable Entity(42201): Invalid schema
DELETE /subjects/
Deletes a subject and all its versions.
Path Parameters:
subject(string): The subject name (URL-encoded)
Query Parameters:
permanent(boolean, optional): Perform hard delete with tombstones (default: false for soft delete)
Response: 200 OK
[1, 2, 3]
Returns the list of deleted version numbers.
Error Responses:
404 Not Found(40401): Subject not found
Notes:
- Soft delete marks versions as deleted but they remain accessible by ID
- Hard delete writes tombstones to permanently remove the subject
DELETE /subjects/{subject}/versions/
Deletes a specific version of a subject.
Path Parameters:
subject(string): The subject name (URL-encoded)version(string): Version number, "latest", or "-1" for latest
Query Parameters:
permanent(boolean, optional): Perform hard delete with tombstone (default: false for soft delete)
Response: 200 OK
1
Returns the deleted version number.
Error Responses:
404 Not Found(40401): Subject not found404 Not Found(40402): Version not found422 Unprocessable Entity(42202): Invalid version
Configuration Endpoints
GET /config
Retrieves the global compatibility level configuration.
Response: 200 OK
{
"compatibilityLevel": "BACKWARD"
}
PUT /config
Updates the global compatibility level configuration.
Request Body:
{
"compatibility": "BACKWARD"
}
Valid compatibility levels:
BACKWARDBACKWARD_TRANSITIVEFORWARDFORWARD_TRANSITIVEFULLFULL_TRANSITIVENONE
Response: 200 OK
{
"compatibility": "BACKWARD"
}
Error Responses:
422 Unprocessable Entity(42203): Invalid compatibility level
GET /config/
Retrieves the compatibility level for a specific subject.
Path Parameters:
subject(string): The subject name (URL-encoded)
Query Parameters:
defaultToGlobal(boolean, optional): Return global config if subject has no specific config (default: false)
Response: 200 OK
{
"compatibilityLevel": "BACKWARD"
}
Error Responses:
404 Not Found(40401): Subject not found or no specific config exists (when defaultToGlobal=false)
PUT /config/
Updates the compatibility level for a specific subject.
Path Parameters:
subject(string): The subject name (URL-encoded)
Request Body:
{
"compatibility": "FORWARD"
}
Response: 200 OK
{
"compatibility": "FORWARD"
}
Error Responses:
422 Unprocessable Entity(42203): Invalid compatibility level
DELETE /config/
Deletes the subject-specific compatibility configuration, reverting to global default.
Path Parameters:
subject(string): The subject name (URL-encoded)
Response: 200 OK
{
"compatibilityLevel": "BACKWARD"
}
Returns the compatibility level that was deleted (before falling back to global).
Error Responses:
404 Not Found(40401): Subject config not found
Compatibility Check Endpoints
POST /compatibility/subjects/{subject}/versions/
Tests if a schema is compatible with a specific version.
Path Parameters:
subject(string): The subject name (URL-encoded)version(string): Version number, "latest", or "-1" for latest
Request Body:
{
"schema": "<schema_string>",
"schemaType": "AVRO"
}
Response: 200 OK
{
"is_compatible": true
}
Error Responses:
404 Not Found(40401): Subject not found404 Not Found(40402): Version not found422 Unprocessable Entity(42201): Invalid schema
POST /compatibility/subjects/{subject}/versions
Tests if a schema is compatible with the subject based on its compatibility level.
Path Parameters:
subject(string): The subject name (URL-encoded)
Request Body:
{
"schema": "<schema_string>",
"schemaType": "AVRO"
}
Response: 200 OK
{
"is_compatible": true
}
Returns true if no existing schemas exist (compatible by default).
Error Responses:
422 Unprocessable Entity(42201): Invalid schema
Notes:
- This endpoint checks compatibility against relevant versions based on the configured compatibility level
- For BACKWARD, checks against the latest version
- For TRANSITIVE modes, checks against all relevant historical versions
Internal/Unstable Endpoints
POST /restart
Restarts the schema registry service (clears state and reinitializes).
Response: 200 OK
{
"success": true
}
Error Responses:
404 Not Found(40401): Endpoint disabled (requiresunstable.api.versions.enablein config)500 Internal Server Error(50001): Error during restart
Notes:
- This is an internal API endpoint
- Must be explicitly enabled via configuration (
allowRestartsflag) - Shuts down the service, clears all in-memory state, and reinitializes
Error Response Format
All error responses follow this format:
{
"error_code": 40401,
"message": "Subject not found"
}
Common Error Codes
400: Bad Request - Invalid JSON or request format404: Not Found - Resource not found40401: Subject not found40402: Version not found40403: Schema not found409: Conflict - Schema incompatible415: Unsupported Media Type - Invalid Content-Type422: Unprocessable Entity - Invalid schema or parameters42201: Invalid schema42202: Invalid version42203: Invalid compatibility level500: Internal Server Error50001: Internal server error with details
Data Models
SchemaRequest
{
"schema": "string (required)",
"schemaType": "string (optional, default: AVRO)",
"references": "array (optional)"
}
SchemaResponse
{
"schema": "string"
}
SchemaVersionResponse
{
"subject": "string",
"id": "integer",
"version": "integer",
"schema": "string",
"schemaType": "string (optional, omitted for AVRO)"
}
IdResponse
{
"id": "integer"
}
CompatibilityResponse
{
"is_compatible": "boolean"
}
ErrorResponse
{
"error_code": "integer",
"message": "string"
}
Implementation Details
Storage Backend
- Uses Kafka topic
_schemasfor persistent storage - All state changes are written to Kafka and replayed on startup
- In-memory cache is maintained for fast reads
- Consumer group processes updates asynchronously
Schema Validation
- Primary support for Apache Avro schemas
- Extensible architecture for JSON and Protobuf (declared but may not be fully implemented)
- Uses
AvroSchemaValidatorfor parsing and validation
Compatibility Checking
- Uses
AvroCompatibilityCheckerfor compatibility validation - Supports all standard compatibility modes (BACKWARD, FORWARD, FULL, NONE, and TRANSITIVE variants)
- Compatibility checks are performed before schema registration
URL Encoding
- Subject names in URLs must be URL-encoded
- The service automatically decodes URL-encoded path parameters