Skip to main content Skip to sidebar

Kafka SASL Authentication: Usage & Best Practices

Securing Apache Kafka clusters is critical for production deployments. SASL (Simple Authentication and Security Layer) provides a robust authentication framework that supports multiple mechanisms. This article explores Kafka SASL authentication, implementation patterns in Go, and production best practices.

Understanding SASL in Kafka

SASL is a framework for adding authentication support to connection-based protocols. Kafka supports multiple SASL mechanisms, each designed for different security requirements and operational contexts.

Supported SASL Mechanisms

Kafka supports four primary SASL mechanisms:

  1. SASL/PLAIN: Simple username/password authentication
  2. SASL/SCRAM: Salted Challenge Response Authentication Mechanism
  3. SASL/GSSAPI: Kerberos authentication
  4. SASL/OAUTHBEARER: OAuth 2.0 bearer token authentication
SASL authentication mechanisms overview

SASL/PLAIN Authentication

Overview

SASL/PLAIN is the simplest mechanism, using username and password credentials transmitted over the connection.

Broker Configuration

# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS configuration
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_producer="producer-secret" \
    user_consumer="consumer-secret";

Client Implementation in Go

package main

import (
    "log"

    "github.com/IBM/sarama"
)

func createSASLPlainConfig(username, password string) *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V3_5_0_0

    // SASL configuration
    config.Net.SASL.Enable = true
    config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
    config.Net.SASL.User = username
    config.Net.SASL.Password = password

    return config
}

func main() {
    brokers := []string{"kafka1.example.com:9092"}

    config := createSASLPlainConfig("producer", "producer-secret")
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "test-topic",
        Value: sarama.StringEncoder("Hello, Kafka with SASL/PLAIN!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatalf("Failed to send message: %v", err)
    }

    log.Printf("Message sent to partition %d at offset %d", partition, offset)
}

SASL/PLAIN Authentication Flow

SASL/PLAIN authentication sequence

SASL/SCRAM Authentication

Overview

SCRAM (Salted Challenge Response Authentication Mechanism) is more secure than PLAIN, using cryptographic hashing to avoid transmitting passwords. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.

Advantages Over SASL/PLAIN

  1. No Password Transmission: Only hashed values are sent
  2. Salt Protection: Prevents rainbow table attacks
  3. Mutual Authentication: Both client and server prove identity
  4. Replay Attack Protection: Uses nonces and challenge-response

Creating SCRAM Credentials

# Create SCRAM-SHA-256 credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
  --alter \
  --add-config 'SCRAM-SHA-256=[password=producer-secret]' \
  --entity-type users \
  --entity-name producer

# Create SCRAM-SHA-512 credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
  --alter \
  --add-config 'SCRAM-SHA-512=[password=consumer-secret]' \
  --entity-type users \
  --entity-name consumer

# List SCRAM credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
  --describe \
  --entity-type users \
  --entity-name producer

Broker Configuration

# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512

# JAAS configuration for inter-broker
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin-secret";

Client Implementation with SCRAM

package main

import (
    "log"

    "github.com/IBM/sarama"
)

func createSASLSCRAMConfig(username, password string, mechanism sarama.SASLMechanism) *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V3_5_0_0

    // SASL configuration
    config.Net.SASL.Enable = true
    config.Net.SASL.Mechanism = mechanism
    config.Net.SASL.User = username
    config.Net.SASL.Password = password

    // SCRAM client generator
    switch mechanism {
    case sarama.SASLTypeSCRAMSHA256:
        config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
            return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
        }
    case sarama.SASLTypeSCRAMSHA512:
        config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
            return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
        }
    }

    return config
}

// SCRAM client implementation
import (
    "crypto/sha256"
    "crypto/sha512"
    "hash"
)

type HashGeneratorFcn func() hash.Hash

type XDGSCRAMClient struct {
    *scram.Client
    *scram.ClientConversation
    scram.HashGeneratorFcn
}

var SHA256 scram.HashGeneratorFcn = sha256.New
var SHA512 scram.HashGeneratorFcn = sha512.New

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
    x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
    if err != nil {
        return err
    }
    x.ClientConversation = x.Client.NewConversation()
    return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
    response, err = x.ClientConversation.Step(challenge)
    return
}

func (x *XDGSCRAMClient) Done() bool {
    return x.ClientConversation.Done()
}

func main() {
    brokers := []string{"kafka1.example.com:9092"}

    // Use SCRAM-SHA-512
    config := createSASLSCRAMConfig("producer", "producer-secret", sarama.SASLTypeSCRAMSHA512)
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "test-topic",
        Value: sarama.StringEncoder("Hello, Kafka with SASL/SCRAM!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatalf("Failed to send message: %v", err)
    }

    log.Printf("Message sent to partition %d at offset %d", partition, offset)
}

SASL/SCRAM Authentication Flow

SASL/SCRAM authentication sequence

Consumer Group with SASL

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/IBM/sarama"
)

type ConsumerGroupHandler struct{}

func (h ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (h ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message received: topic=%s partition=%d offset=%d value=%s",
            message.Topic, message.Partition, message.Offset, string(message.Value))
        session.MarkMessage(message, "")
    }
    return nil
}

func main() {
    brokers := []string{"kafka1.example.com:9092"}

    config := sarama.NewConfig()
    config.Version = sarama.V3_5_0_0

    // SASL/SCRAM-SHA-512 configuration
    config.Net.SASL.Enable = true
    config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
    config.Net.SASL.User = "consumer"
    config.Net.SASL.Password = "consumer-secret"
    config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
        return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
    }

    // Consumer configuration
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
    config.Consumer.Offsets.Initial = sarama.OffsetNewest

    group, err := sarama.NewConsumerGroup(brokers, "my-consumer-group", config)
    if err != nil {
        log.Fatalf("Failed to create consumer group: %v", err)
    }
    defer group.Close()

    ctx, cancel := context.WithCancel(context.Background())

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            topics := []string{"test-topic"}
            handler := ConsumerGroupHandler{}

            err := group.Consume(ctx, topics, handler)
            if err != nil {
                log.Printf("Error from consumer: %v", err)
            }

            if ctx.Err() != nil {
                return
            }
        }
    }()

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    <-sigterm

    cancel()
    wg.Wait()

    log.Println("Consumer group closed")
}

SASL with KRaft Mode

KRaft (Kafka Raft) eliminates ZooKeeper dependency. As of Kafka 4.0, ZooKeeper mode is deprecated. SASL authentication in KRaft mode requires a dedicated CONTROLLER listener for inter-broker communication and stores SCRAM credentials in the metadata log.

Key Configuration Differences

# server.properties for KRaft mode
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1.internal:9094,2@kafka2.internal:9094,3@kafka3.internal:9094

# Two listeners: client (SASL_PLAINTEXT) and controller (CONTROLLER)
listeners=SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT

# Client authentication: SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin-secret";

# Controller authentication: PLAIN (internal only)
listener.name.controller.sasl.enabled.mechanisms=PLAIN
listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="controller-user-1" \
    password="controller-secret" \
    user_controller-user-1="controller-secret" \
    user_controller-user-2="controller-secret" \
    user_controller-user-3="controller-secret";

Bootstrap Process

Create SCRAM credentials after cluster startup:

# Format and start cluster
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties

# Create SCRAM credentials
bin/kafka-configs.sh --bootstrap-server kafka1.example.com:9092 \
  --command-config admin.properties \
  --alter \
  --add-config 'SCRAM-SHA-512=[password=producer-secret]' \
  --entity-type users \
  --entity-name producer

Key Points:

  • SCRAM credentials stored in metadata log, not ZooKeeper
  • Controllers use SASL/PLAIN on internal listener
  • Clients use SASL/SCRAM on public listener
  • Client configuration unchanged from ZooKeeper mode

Security Comparison

FeatureSASL/PLAINSASL/SCRAMSASL/GSSAPISASL/OAUTHBEARER
ComplexityLowMediumHighHigh
Password StoragePlaintext (broker)Hashed (broker)KDCToken service
Password TransmissionPlaintextHashedEncryptedToken
Setup DifficultyEasyEasyComplexMedium
Production ReadyYesYesYesYes
Best ForDevelopmentProductionEnterpriseMicroservices
Credential RotationManualManualAutomaticAutomatic
Mutual AuthNoYesYesYes

Production Best Practices

1. Implement Credential Management

package credentials

import (
    "fmt"
    "os"
)

type KafkaCredentials struct {
    Username string
    Password string
}

func LoadFromEnvironment() (*KafkaCredentials, error) {
    username := os.Getenv("KAFKA_SASL_USERNAME")
    password := os.Getenv("KAFKA_SASL_PASSWORD")

    if username == "" || password == "" {
        return nil, fmt.Errorf("credentials not found in environment")
    }

    return &KafkaCredentials{
        Username: username,
        Password: password,
    }, nil
}

// Integration with secret management systems
func LoadFromVault(vaultAddr, secretPath string) (*KafkaCredentials, error) {
    // Implementation for HashiCorp Vault
    // or other secret management systems
    return nil, nil
}

2. Connection Error Handling

func createProducerWithRetry(brokers []string, config *sarama.Config, maxRetries int) (sarama.SyncProducer, error) {
    var producer sarama.SyncProducer
    var err error

    for i := 0; i < maxRetries; i++ {
        producer, err = sarama.NewSyncProducer(brokers, config)
        if err == nil {
            return producer, nil
        }

        log.Printf("Failed to create producer (attempt %d/%d): %v", i+1, maxRetries, err)

        if isAuthenticationError(err) {
            return nil, fmt.Errorf("authentication failed: %w", err)
        }

        time.Sleep(time.Second * time.Duration(i+1))
    }

    return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}

func isAuthenticationError(err error) bool {
    if err == nil {
        return false
    }

    errStr := err.Error()
    return strings.Contains(errStr, "authentication") ||
           strings.Contains(errStr, "SASL") ||
           strings.Contains(errStr, "credentials")
}

3. Credential Rotation Strategy

type CredentialRotator struct {
    currentCreds  *KafkaCredentials
    producer      sarama.SyncProducer
    config        *sarama.Config
    brokers       []string
    mu            sync.RWMutex
}

func (r *CredentialRotator) RotateCredentials(newCreds *KafkaCredentials) error {
    r.mu.Lock()
    defer r.mu.Unlock()

    // Create new configuration with new credentials
    newConfig := r.config
    newConfig.Net.SASL.User = newCreds.Username
    newConfig.Net.SASL.Password = newCreds.Password

    // Create new producer
    newProducer, err := sarama.NewSyncProducer(r.brokers, newConfig)
    if err != nil {
        return fmt.Errorf("failed to create producer with new credentials: %w", err)
    }

    // Close old producer
    oldProducer := r.producer
    r.producer = newProducer
    r.currentCreds = newCreds

    if oldProducer != nil {
        oldProducer.Close()
    }

    log.Println("Credentials rotated successfully")
    return nil
}

Troubleshooting SASL Issues

Common Errors and Solutions

1. Authentication Failed

Error: kafka: client has run out of available brokers
Cause: Invalid credentials or SASL mechanism mismatch

Solution:

# Verify credentials exist
kafka-configs --bootstrap-server kafka1.example.com:9092 \
  --describe \
  --entity-type users \
  --entity-name producer

# Check broker logs
tail -f /var/log/kafka/server.log | grep -i sasl

2. SCRAM Client Not Configured

Error: kafka: unable to authenticate: invalid response
Cause: Missing SCRAM client generator function

Solution:

config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
    return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}

Debugging Checklist

SASL troubleshooting decision tree

Testing SASL Configuration

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/IBM/sarama"
)

func testSASLConnection(brokers []string, config *sarama.Config) error {
    start := time.Now()

    // Test client connection
    client, err := sarama.NewClient(brokers, config)
    if err != nil {
        return fmt.Errorf("failed to create client: %w", err)
    }
    defer client.Close()

    authDuration := time.Since(start)
    log.Printf("Authentication successful in %v", authDuration)

    // Test broker connectivity
    brokerList := client.Brokers()
    log.Printf("Discovered %d brokers", len(brokerList))

    for _, broker := range brokerList {
        err := broker.Open(config)
        if err != nil {
            return fmt.Errorf("failed to connect to broker %d: %w", broker.ID(), err)
        }
        log.Printf("Connected to broker %d at %s", broker.ID(), broker.Addr())
        broker.Close()
    }

    // Test metadata request
    topics, err := client.Topics()
    if err != nil {
        return fmt.Errorf("failed to fetch topics: %w", err)
    }
    log.Printf("Discovered %d topics", len(topics))

    return nil
}

func main() {
    brokers := []string{"kafka1.example.com:9092"}

    config := createSASLSCRAMConfig("test-user", "test-password", sarama.SASLTypeSCRAMSHA512)

    if err := testSASLConnection(brokers, config); err != nil {
        log.Fatalf("Connection test failed: %v", err)
    }

    log.Println("All SASL connection tests passed")
}

Conclusion

SASL authentication is essential for securing Kafka clusters in production environments. Key takeaways:

Choosing a Mechanism:

  • Development: SASL/PLAIN for simplicity
  • Production: SASL/SCRAM-SHA-512 for security without external dependencies
  • Enterprise: SASL/GSSAPI for Kerberos integration
  • Microservices: SASL/OAUTHBEARER for OAuth 2.0 integration

Security Requirements:

  • Use SCRAM over PLAIN for production deployments
  • Implement proper credential management and rotation
  • Secure network communication between clients and brokers

Best Practices:

  • Store credentials securely (environment variables, secret management systems)
  • Implement retry logic with proper error handling
  • Test SASL configuration thoroughly before production deployment
  • Maintain separate credentials for different services and roles
  • Regularly rotate credentials following security policies

Common Pitfalls to Avoid:

  • Hardcoding credentials in application code
  • Missing SCRAM client configuration in Go applications
  • Not implementing proper error handling for auth failures
  • Sharing credentials across multiple services
  • Using weak passwords for SASL/PLAIN

With proper implementation and adherence to best practices, SASL authentication provides robust security for Kafka deployments while maintaining operational flexibility and performance.