Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

LeafCuttr Documentation

Written for v0.5.3

Introduction

LeafCuttr introduction [WIP]

Getting Started

Demo

See Demo Repository

Using Docker

Here's a sample Docker Compose file:

version: '3.8'

volumes:
  lc-data:

services:
  kafka:
    image: ghcr.io/leafcuttr/kafkalite:lc-0.5.2
    volumes:
      - lc-data:/tmp/kafka-logs/
    network_mode: "host"
    environment:
      # Set these to true as required
      # KAFKA_LC_MQTT_BROKER_ENABLE: "false"
      # KAFKA_LC_SCHEMA_REGISTRY_ENABLE: "false"
      # KAFKA_LC_HTTP_PROXY_ENABLE: "false"
      # KAFKA_LC_LOG_SYNC_ALWAYS: "true"
      # KAFKA_LC_ISOLATED: "true"
      # KAFKA_LC_TOPIC_FORWARD_CONFIG: filePath  # make sure filePath is mounted into the container

      # Standard configuration for Kafka
      KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Quick Reference

See the individual modules for details.

Embedded Server mode

WIP

MQTT

The embedded MQTT Broker provides a standard MQTT broker interface that can be accessed by clients written in any language.

Interactive Producer and Consumer

Easiest way to interact with the MQTT Broker is via the MQTTX app.

Sample Producer in Python 3

Before running the code make sure you install the libraries:

pip install paho-mqtt

The below script will attempt to connect to your broker, and if successful, it will publish the message and then immediately disconnect:

import paho.mqtt.client as mqtt
import time
import sys

# --- Configuration ---
BROKER_ADDRESS = "localhost"
BROKER_PORT = 1883
TOPIC = "test/topic"
MESSAGE = "Hello, MQTT from Python!"
# ---------------------

def on_connect(client, userdata, flags, rc):
    """
    The callback for when the client receives a CONNACK response from the server.
    rc (return code) 0 means success.
    """
    if rc == 0:
        print("Connected successfully to MQTT broker.")
        # Once connected, publish the message
        publish_message(client)
    else:
        print(f"Connection failed with code {rc}. Check if the broker is running.")
        sys.exit(1)

def on_publish(client, userdata, mid):
    """
    The callback for when a message has been published successfully.
    mid is the message ID of the published message.
    """
    print(f"Message published (mid: {mid}).")
    # Disconnect after publishing the message
    client.disconnect()
    # A small sleep to let the disconnect complete before the script ends
    time.sleep(0.5)

def publish_message(client):
    """Publishes the configured message to the configured topic."""
    print(f"Attempting to publish '{MESSAGE}' to topic '{TOPIC}'...")
    # The publish call returns a result object. We use QoS 1.
    # QoS 1 means the message is delivered at least once.
    client.publish(TOPIC, MESSAGE, qos=1)

def main():
    """Main function to set up and run the MQTT client."""
    print(f"Starting MQTT client. Target: {BROKER_ADDRESS}:{BROKER_PORT}")

    # Create a new MQTT client instance (using API version 1 for broader compatibility)
    # Specifiying a clientId is mandatory for the LeafCuttr MQTT Proxy
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "demoClient")

    # Setup Auth
    client.username_pw_set("demo", "demo")

    # Assign callback functions
    client.on_connect = on_connect
    client.on_publish = on_publish

    try:
        # Connect to the broker. The last argument is the keepalive time in seconds.
        client.connect(BROKER_ADDRESS, BROKER_PORT, 60)

        # Start the network loop in a non-blocking thread.
        # This is necessary for callbacks (like on_connect) to fire.
        client.loop_start()

        # Keep the main thread alive briefly to allow the connection, publication,
        # and disconnection process to complete via the loop_start thread.
        print("Waiting for connection and publish to complete...")
        while client.is_connected():
            # Check if the loop is still running, exit after a timeout or successful disconnect
            time.sleep(0.1)

    except ConnectionRefusedError:
        print("FATAL ERROR: Connection refused. Is your MQTT broker running on localhost:1873?")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        # Ensure the loop stops if it was started
        client.loop_stop()
        print("Client loop stopped. Script exit.")

if __name__ == "__main__":
    main()

Sample Listener in Python 3

The following script connects, subscribes, and waits indefinitely for messages.

import paho.mqtt.client as mqtt
import sys
import time

# --- Configuration ---
BROKER_ADDRESS = "localhost"
BROKER_PORT = 1873
TOPIC = "test/topic"
# ---------------------

def on_connect(client, userdata, flags, rc):
    """
    The callback for when the client receives a CONNACK response from the server.
    rc (return code) 0 means success.
    """
    if rc == 0:
        print("Connected successfully to MQTT broker.")
        # Once connected, subscribe to the topic
        client.subscribe(TOPIC)
        print(f"Subscribed to topic: '{TOPIC}'. Waiting for messages...")
    else:
        print(f"Connection failed with code {rc}. Check if the broker is running.")
        sys.exit(1)

def on_message(client, userdata, msg):
    """
    The callback for when a PUBLISH message is received from the server.
    This function processes the incoming message payload.
    """
    # Decode the payload from bytes to a string
    payload_str = msg.payload.decode()
    print("-" * 30)
    print(f"[{time.strftime('%H:%M:%S')}] Message Received!")
    print(f"Topic: {msg.topic}")
    print(f"Payload: {payload_str}")
    print(f"QoS: {msg.qos}")
    print("-" * 30)


def main():
    """Main function to set up and run the MQTT subscriber client."""
    print(f"Starting MQTT subscriber client. Target: {BROKER_ADDRESS}:{BROKER_PORT}")

    # Create a new MQTT client instance (using API version 1)
    # Specifiying a clientId is mandatory for the LeafCuttr MQTT Proxy
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "demoClient")

    # Setup Auth
    client.username_pw_set("demo", "demo")

    # Assign callback functions
    client.on_connect = on_connect
    client.on_message = on_message

    try:
        # Connect to the broker
        client.connect(BROKER_ADDRESS, BROKER_PORT, 60)

        # Blocking call that processes network traffic, dispatches callbacks,
        # and handles reconnecting. It blocks the main thread.
        print("Client running. Press Ctrl+C to stop.")
        client.loop_forever()

    except ConnectionRefusedError:
        print("FATAL ERROR: Connection refused. Is your MQTT broker running on localhost:1873?")
    except KeyboardInterrupt:
        print("\nSubscriber stopped by user.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        client.disconnect()
        print("Client disconnected. Script exit.")

if __name__ == "__main__":
    main()

HTTP

API Endpoints

POST /topics/{topicName}

Produces records to the specified topic

Request Format: The request body must contain a records array, where each record has key and value fields (both as JSON).

Response Format: Returns partition offsets for each produced record, along with optional error information.

Example Request and Response

The following will post two records to the jsontest topic.

curl -X POST \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -d '{"records":[{"key":"someKeyText", "value":{"name": "testUser"}}, {"key":"someKeyText", "value":{"name": "testUser2"}}]}' \
  http://localhost:8080/topics/jsontest | jq

Sample response:

{
  "offsets": [
    {
      "partition": 0,
      "offset": 23,
      "error_code": null,
      "error": null
    },
    {
      "partition": 0,
      "offset": 24,
      "error_code": null,
      "error": null
    }
  ],
  "value_schema_id": null,
  "key_schema_id": null
}

Kafka

example producer

docker run --net=host --rm -it ghcr.io/leafcuttr/kafkalite:lc-0.5.2 \
 /opt/kafka/bin/kafka-producer-perf-test.sh \
   --topic perfTest --num-records 10000000 \
   --throughput -1 --record-size 1000 \
   --producer-props bootstrap.servers=localhost:9092

example consumer

docker run --net=host --rm -it ghcr.io/leafcuttr/kafkalite:lc-0.5.2 \
 /opt/kafka/bin/kafka-consumer-perf-test.sh \
  --topic perfTest --messages 10000000 --timeout 100000 \
  --bootstrap-server localhost:9092 --show-detailed-stats

example producer with schema evolution

Automatically evolves the schema at specified rate.

docker run --net=host --rm -it ghcr.io/leafcuttr/kafkalite:lc-0.5.2 \
 /opt/kafka/bin/kafka-schema-producer-perf-test.sh \
   --schema-evolution-interval 500000 \
   --topic perfTest --num-records 10000000 \
   --throughput -1 --record-size 1000 \
   --producer-props bootstrap.servers=localhost:9092

Schema Registry

Overview

The schema-service module provides a Schema Registry implementation compatible with the Confluent Schema Registry API. It runs as an embedded HTTP server (default port 8081) and provides REST endpoints for managing schemas, subjects, versions, configurations, and compatibility checking.

The service uses Kafka as its storage backend (via a _schemas topic) and supports schema validation and compatibility checking, primarily for Apache Avro schemas.

Configuration

lc.schema.registry.enable

  • Type: Boolean
  • Default: false
  • Description: Controls whether the embedded Schema Registry is enabled

lc.schema.registry.port

  • Type: Int
  • Default: 8081
  • Description: Port that the Schema Registry's HTTP server listens to

Implicit Configurations

Currently, the following are hard-coded:

  • Default Kafka Bootstrap: Advertised listeners in Kafka config
  • Storage Topic: _schemas
  • Content-Type: application/vnd.schemaregistry.v1+json or application/json

API Endpoints

Root Endpoint

GET /

Returns an empty JSON object to verify the service is running.

Response: 200 OK

{}

Schema Endpoints

GET /schemas/ids/

Retrieves a schema by its global ID.

Path Parameters:

  • id (integer): The schema ID

Response: 200 OK

{
  "schema": "<schema_string>"
}

Error Responses:

  • 404 Not Found (40403): Schema not found

GET /schemas/ids/{id}/schema

Retrieves the raw schema string by its global ID.

Path Parameters:

  • id (integer): The schema ID

Response: 200 OK

  • Returns the raw schema string (not wrapped in JSON)

Error Responses:

  • 404 Not Found (40403): Schema not found

GET /schemas/types

Lists all supported schema types.

Response: 200 OK

["AVRO", "JSON", "PROTOBUF"]

Subject Endpoints

GET /subjects

Lists all registered subjects.

Query Parameters:

  • deleted (boolean, optional): Include soft-deleted subjects (default: false)

Response: 200 OK

["subject1", "subject2", "subject3"]

GET /subjects/{subject}/versions

Lists all version numbers for a subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Response: 200 OK

[1, 2, 3]

Error Responses:

  • 404 Not Found (40401): Subject not found

GET /subjects/{subject}/versions/

Retrieves a specific schema version for a subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)
  • version (string): Version number, "latest", or "-1" for latest

Response: 200 OK

{
  "subject": "example-subject",
  "id": 123,
  "version": 1,
  "schema": "<schema_string>",
  "schemaType": "AVRO"
}

Note: schemaType field is only included for non-AVRO schemas.

Error Responses:

  • 404 Not Found (40401): Subject not found
  • 404 Not Found (40402): Version not found
  • 422 Unprocessable Entity (42202): Invalid version

GET /subjects/{subject}/versions/{version}/schema

Retrieves the raw schema string for a specific version.

Path Parameters:

  • subject (string): The subject name (URL-encoded)
  • version (string): Version number, "latest", or "-1" for latest

Response: 200 OK

  • Returns the raw schema string (not wrapped in JSON)

Error Responses:

  • 404 Not Found (40401): Subject not found
  • 404 Not Found (40402): Version not found

POST /subjects/{subject}/versions

Registers a new schema version for a subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Request Body:

{
  "schema": "<schema_string>",
  "schemaType": "AVRO",
  "references": []
}

Fields:

  • schema (string, required): The schema definition
  • schemaType (string, optional): Schema type (default: "AVRO")
  • references (array, optional): Schema references

Response: 200 OK

{
  "id": 123
}

Returns the existing ID if the schema is identical to the latest version (idempotent operation).

Error Responses:

  • 409 Conflict: Schema is incompatible with the latest schema
  • 422 Unprocessable Entity (42201): Invalid or empty schema

POST /subjects/

Checks if a schema exists under the subject and returns its version information.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Request Body:

{
  "schema": "<schema_string>",
  "schemaType": "AVRO"
}

Response: 200 OK

{
  "subject": "example-subject",
  "id": 123,
  "version": 1,
  "schema": "<schema_string>",
  "schemaType": null
}

Error Responses:

  • 404 Not Found (40401): Subject not found
  • 404 Not Found (40403): Schema not found
  • 422 Unprocessable Entity (42201): Invalid schema

DELETE /subjects/

Deletes a subject and all its versions.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Query Parameters:

  • permanent (boolean, optional): Perform hard delete with tombstones (default: false for soft delete)

Response: 200 OK

[1, 2, 3]

Returns the list of deleted version numbers.

Error Responses:

  • 404 Not Found (40401): Subject not found

Notes:

  • Soft delete marks versions as deleted but they remain accessible by ID
  • Hard delete writes tombstones to permanently remove the subject

DELETE /subjects/{subject}/versions/

Deletes a specific version of a subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)
  • version (string): Version number, "latest", or "-1" for latest

Query Parameters:

  • permanent (boolean, optional): Perform hard delete with tombstone (default: false for soft delete)

Response: 200 OK

1

Returns the deleted version number.

Error Responses:

  • 404 Not Found (40401): Subject not found
  • 404 Not Found (40402): Version not found
  • 422 Unprocessable Entity (42202): Invalid version

Configuration Endpoints

GET /config

Retrieves the global compatibility level configuration.

Response: 200 OK

{
  "compatibilityLevel": "BACKWARD"
}

PUT /config

Updates the global compatibility level configuration.

Request Body:

{
  "compatibility": "BACKWARD"
}

Valid compatibility levels:

  • BACKWARD
  • BACKWARD_TRANSITIVE
  • FORWARD
  • FORWARD_TRANSITIVE
  • FULL
  • FULL_TRANSITIVE
  • NONE

Response: 200 OK

{
  "compatibility": "BACKWARD"
}

Error Responses:

  • 422 Unprocessable Entity (42203): Invalid compatibility level

GET /config/

Retrieves the compatibility level for a specific subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Query Parameters:

  • defaultToGlobal (boolean, optional): Return global config if subject has no specific config (default: false)

Response: 200 OK

{
  "compatibilityLevel": "BACKWARD"
}

Error Responses:

  • 404 Not Found (40401): Subject not found or no specific config exists (when defaultToGlobal=false)

PUT /config/

Updates the compatibility level for a specific subject.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Request Body:

{
  "compatibility": "FORWARD"
}

Response: 200 OK

{
  "compatibility": "FORWARD"
}

Error Responses:

  • 422 Unprocessable Entity (42203): Invalid compatibility level

DELETE /config/

Deletes the subject-specific compatibility configuration, reverting to global default.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Response: 200 OK

{
  "compatibilityLevel": "BACKWARD"
}

Returns the compatibility level that was deleted (before falling back to global).

Error Responses:

  • 404 Not Found (40401): Subject config not found

Compatibility Check Endpoints

POST /compatibility/subjects/{subject}/versions/

Tests if a schema is compatible with a specific version.

Path Parameters:

  • subject (string): The subject name (URL-encoded)
  • version (string): Version number, "latest", or "-1" for latest

Request Body:

{
  "schema": "<schema_string>",
  "schemaType": "AVRO"
}

Response: 200 OK

{
  "is_compatible": true
}

Error Responses:

  • 404 Not Found (40401): Subject not found
  • 404 Not Found (40402): Version not found
  • 422 Unprocessable Entity (42201): Invalid schema

POST /compatibility/subjects/{subject}/versions

Tests if a schema is compatible with the subject based on its compatibility level.

Path Parameters:

  • subject (string): The subject name (URL-encoded)

Request Body:

{
  "schema": "<schema_string>",
  "schemaType": "AVRO"
}

Response: 200 OK

{
  "is_compatible": true
}

Returns true if no existing schemas exist (compatible by default).

Error Responses:

  • 422 Unprocessable Entity (42201): Invalid schema

Notes:

  • This endpoint checks compatibility against relevant versions based on the configured compatibility level
  • For BACKWARD, checks against the latest version
  • For TRANSITIVE modes, checks against all relevant historical versions

Internal/Unstable Endpoints

POST /restart

Restarts the schema registry service (clears state and reinitializes).

Response: 200 OK

{
  "success": true
}

Error Responses:

  • 404 Not Found (40401): Endpoint disabled (requires unstable.api.versions.enable in config)
  • 500 Internal Server Error (50001): Error during restart

Notes:

  • This is an internal API endpoint
  • Must be explicitly enabled via configuration (allowRestarts flag)
  • Shuts down the service, clears all in-memory state, and reinitializes

Error Response Format

All error responses follow this format:

{
  "error_code": 40401,
  "message": "Subject not found"
}

Common Error Codes

  • 400: Bad Request - Invalid JSON or request format
  • 404: Not Found - Resource not found
  • 40401: Subject not found
  • 40402: Version not found
  • 40403: Schema not found
  • 409: Conflict - Schema incompatible
  • 415: Unsupported Media Type - Invalid Content-Type
  • 422: Unprocessable Entity - Invalid schema or parameters
  • 42201: Invalid schema
  • 42202: Invalid version
  • 42203: Invalid compatibility level
  • 500: Internal Server Error
  • 50001: Internal server error with details

Data Models

SchemaRequest

{
  "schema": "string (required)",
  "schemaType": "string (optional, default: AVRO)",
  "references": "array (optional)"
}

SchemaResponse

{
  "schema": "string"
}

SchemaVersionResponse

{
  "subject": "string",
  "id": "integer",
  "version": "integer",
  "schema": "string",
  "schemaType": "string (optional, omitted for AVRO)"
}

IdResponse

{
  "id": "integer"
}

CompatibilityResponse

{
  "is_compatible": "boolean"
}

ErrorResponse

{
  "error_code": "integer",
  "message": "string"
}

Implementation Details

Storage Backend

  • Uses Kafka topic _schemas for persistent storage
  • All state changes are written to Kafka and replayed on startup
  • In-memory cache is maintained for fast reads
  • Consumer group processes updates asynchronously

Schema Validation

  • Primary support for Apache Avro schemas
  • Extensible architecture for JSON and Protobuf (declared but may not be fully implemented)
  • Uses AvroSchemaValidator for parsing and validation

Compatibility Checking

  • Uses AvroCompatibilityChecker for compatibility validation
  • Supports all standard compatibility modes (BACKWARD, FORWARD, FULL, NONE, and TRANSITIVE variants)
  • Compatibility checks are performed before schema registration

URL Encoding

  • Subject names in URLs must be URL-encoded
  • The service automatically decodes URL-encoded path parameters

HTTP Proxy

The HTTP proxy is an embedded HTTP server that provides a REST API for producing records to Kafka topics.

Configuration

The HTTP proxy is controlled by the lc.http.proxy.enable configuration option, which is disabled by default. When enabled, it starts automatically as part of the Kafka server startup process.

Core Functionality

The proxy runs on port 8080 by default and accepts HTTP POST requests to produce records to Kafka topics.

See HTTP inteface for the provided end points.

Notes

The proxy creates a Kafka producer with acks=all and 3 retries for reliability.

The proxy can automatically create topics if they don't exist (when configured).

MQTT Broker and Proxy

The MQTT Broker and Proxy is an embedded lightweight MQTT Broker that can also store messages from specified MQTT topics to Kafka topics.

  • All configuration properties follow the standard Kafka configuration system and can be specified in the server properties file.
  • Authentication is optional; if username and password are not set, clients can connect without credentials.
  • The MQTT Broker is lightweight broker compliant with MQTT 5 and MQTT 3

Configuration

Enable/Disable Configuration

lc.mqtt.broker.enable

  • Type: Boolean
  • Default: false
  • Description: Controls whether the embedded MQTT broker is enabled. When set to false, the MQTT proxy will not start.

MQTT Broker Settings

mqtt.host

  • Type: String
  • Default: 0.0.0.0
  • Description: The host address the embedded MQTT broker will bind to. The default value 0.0.0.0 means the broker will listen on all available network interfaces.

mqtt.port

  • Type: Integer
  • Default: 1883
  • Range: 1 to 65535
  • Description: The port number for the embedded MQTT broker.

Authentication Settings

mqtt.auth.username

  • Type: String
  • Default: null (no authentication required)
  • Description: A shared username that all MQTT clients must use to connect. If not specified, authentication is disabled.

mqtt.auth.password

  • Type: Password
  • Default: null (no authentication required)
  • Description: A shared password that all MQTT clients must use to connect. If not specified, authentication is disabled.

Kafka Connection Settings

mqtt.kafka.bootstrap.servers

  • Type: String
  • Default: Uses the Kafka server's advertised brokers
  • Description: A comma-separated list of host:port pairs to establish the initial connection to the Kafka cluster. If not specified, the proxy will use the Kafka server's advertised brokers.

mqtt.kafka.client.id

  • Type: String
  • Default: mqtt-kafka-proxy
  • Description: An identifier for the Kafka producer client.

Topic Mapping Configuration

mqtt.topic.mapping.<mqtt-topic-filter>=<kafka-topic>

  • Format: Configuration properties with prefix mqtt.topic.mapping.
  • Description: Defines how MQTT topics are mapped to Kafka topics. The portion after the prefix is the MQTT topic filter, and the value is the target Kafka topic name.
  • Wildcard Support: The single-level wildcard + is supported in MQTT topic filters to match any value at a single topic level.
  • Example: mqtt.topic.mapping.devices/+/temperature=kafka-temperature-topic maps any MQTT topic matching the pattern devices/+/temperature (e.g., devices/sensor1/temperature, devices/sensor2/temperature) to the Kafka topic kafka-temperature-topic.

The + wildcard matches exactly one topic level and can be used at any position in the topic filter.

Isolated Mode

When Isolated Mode is enabled, the broker operates in an optimised code path with replication disabled. This can give a performance boost and hence can be used in situations where it is not possible or desirable to have replication.

To enable this mode, set lc.isolated to true.

In isolated mode, you may want to use Topic Sync for critical topics.

Topic sync mode

By default, topic data is written to the operating system's page cache and flushed to the disk asynchronously. That improves throughput, but it also creates a short window where data could be lost if a crash or power failure occurs before the OS forces the pages to stable storage.

The common way to limit that risk is to increase replication and use acks=all, so that a single node failure doesn't cause data loss. Another (more conservative) option is to force a disk flush after every message (for example, flush.interval.messages=1), but that significantly reduces throughput and increases latency.

On resource-constrained or single-node deployments where replication isn't practical, LeafCuttr offers "topic sync mode." In this mode log segment files are opened with the O_DSYNC flag, which ensures data is committed to the physical medium before the producer is acknowledged. This approach provides stronger durability than the default behavior while avoiding the per-message fsync overhead of forcing a separate flush for every message.

Enable this mode globally or per-topic by setting lc.log.sync.always to true.

Benefits

  • Improved durability on single-node or edge deployments where replication isn't available
  • Lower syscall overhead than forcing a flush after every message, so better throughput than per-message fsync
  • Can be enabled globally or selectively for critical topics

Caveats and limitations

  • This affects only the log segment data; index and offset files are still flushed according to your flush.interval.* settings.
  • Topic sync mode improves durability but is not a replacement for replication when high availability and redundancy are required.

Topic Forwarding (WIP)

This feature allow forwarding local topics to external Kafka cluster.

This feature lets LeafCuttr replicate or forward topic data to a remote Kafka cluster. It's useful for backups, migrations, or bridging edge deployments to central clusters.

This currently uses an in-process MirrorMaker2 standalone instance under the hood.

Specify the path to the MM2 config file with the lc.topic.forward.config property.