Kafka SASL Authentication: Usage & Best Practices
Securing Apache Kafka clusters is critical for production deployments. SASL (Simple Authentication and Security Layer) provides a robust authentication framework that supports multiple mechanisms. This article explores Kafka SASL authentication, implementation patterns in Go, and production best practices.
Understanding SASL in Kafka
SASL is a framework for adding authentication support to connection-based protocols. Kafka supports multiple SASL mechanisms, each designed for different security requirements and operational contexts.
Supported SASL Mechanisms
Kafka supports four primary SASL mechanisms:
- SASL/PLAIN: Simple username/password authentication
- SASL/SCRAM: Salted Challenge Response Authentication Mechanism
- SASL/GSSAPI: Kerberos authentication
- SASL/OAUTHBEARER: OAuth 2.0 bearer token authentication

SASL/PLAIN Authentication
Overview
SASL/PLAIN is the simplest mechanism, using username and password credentials transmitted over the connection.
Broker Configuration
# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS configuration
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_producer="producer-secret" \
user_consumer="consumer-secret";
Client Implementation in Go
package main
import (
"log"
"github.com/IBM/sarama"
)
func createSASLPlainConfig(username, password string) *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V3_5_0_0
// SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = username
config.Net.SASL.Password = password
return config
}
func main() {
brokers := []string{"kafka1.example.com:9092"}
config := createSASLPlainConfig("producer", "producer-secret")
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka with SASL/PLAIN!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
log.Printf("Message sent to partition %d at offset %d", partition, offset)
}
SASL/PLAIN Authentication Flow

SASL/SCRAM Authentication
Overview
SCRAM (Salted Challenge Response Authentication Mechanism) is more secure than PLAIN, using cryptographic hashing to avoid transmitting passwords. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.
Advantages Over SASL/PLAIN
- No Password Transmission: Only hashed values are sent
- Salt Protection: Prevents rainbow table attacks
- Mutual Authentication: Both client and server prove identity
- Replay Attack Protection: Uses nonces and challenge-response
Creating SCRAM Credentials
# Create SCRAM-SHA-256 credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
--alter \
--add-config 'SCRAM-SHA-256=[password=producer-secret]' \
--entity-type users \
--entity-name producer
# Create SCRAM-SHA-512 credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
--alter \
--add-config 'SCRAM-SHA-512=[password=consumer-secret]' \
--entity-type users \
--entity-name consumer
# List SCRAM credentials
kafka-configs --bootstrap-server kafka1.example.com:9092 \
--describe \
--entity-type users \
--entity-name producer
Broker Configuration
# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
# JAAS configuration for inter-broker
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
Client Implementation with SCRAM
package main
import (
"log"
"github.com/IBM/sarama"
)
func createSASLSCRAMConfig(username, password string, mechanism sarama.SASLMechanism) *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V3_5_0_0
// SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = mechanism
config.Net.SASL.User = username
config.Net.SASL.Password = password
// SCRAM client generator
switch mechanism {
case sarama.SASLTypeSCRAMSHA256:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
return config
}
// SCRAM client implementation
import (
"crypto/sha256"
"crypto/sha512"
"hash"
)
type HashGeneratorFcn func() hash.Hash
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
var SHA256 scram.HashGeneratorFcn = sha256.New
var SHA512 scram.HashGeneratorFcn = sha512.New
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
func main() {
brokers := []string{"kafka1.example.com:9092"}
// Use SCRAM-SHA-512
config := createSASLSCRAMConfig("producer", "producer-secret", sarama.SASLTypeSCRAMSHA512)
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka with SASL/SCRAM!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
log.Printf("Message sent to partition %d at offset %d", partition, offset)
}
SASL/SCRAM Authentication Flow

Consumer Group with SASL
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/IBM/sarama"
)
type ConsumerGroupHandler struct{}
func (h ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (h ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message received: topic=%s partition=%d offset=%d value=%s",
message.Topic, message.Partition, message.Offset, string(message.Value))
session.MarkMessage(message, "")
}
return nil
}
func main() {
brokers := []string{"kafka1.example.com:9092"}
config := sarama.NewConfig()
config.Version = sarama.V3_5_0_0
// SASL/SCRAM-SHA-512 configuration
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.User = "consumer"
config.Net.SASL.Password = "consumer-secret"
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
// Consumer configuration
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
group, err := sarama.NewConsumerGroup(brokers, "my-consumer-group", config)
if err != nil {
log.Fatalf("Failed to create consumer group: %v", err)
}
defer group.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
topics := []string{"test-topic"}
handler := ConsumerGroupHandler{}
err := group.Consume(ctx, topics, handler)
if err != nil {
log.Printf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
wg.Wait()
log.Println("Consumer group closed")
}
SASL with KRaft Mode
KRaft (Kafka Raft) eliminates ZooKeeper dependency. As of Kafka 4.0, ZooKeeper mode is deprecated. SASL authentication in KRaft mode requires a dedicated CONTROLLER listener for inter-broker communication and stores SCRAM credentials in the metadata log.
Key Configuration Differences
# server.properties for KRaft mode
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1.internal:9094,2@kafka2.internal:9094,3@kafka3.internal:9094
# Two listeners: client (SASL_PLAINTEXT) and controller (CONTROLLER)
listeners=SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094
advertised.listeners=SASL_PLAINTEXT://kafka1.example.com:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
# Client authentication: SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
# Controller authentication: PLAIN (internal only)
listener.name.controller.sasl.enabled.mechanisms=PLAIN
listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="controller-user-1" \
password="controller-secret" \
user_controller-user-1="controller-secret" \
user_controller-user-2="controller-secret" \
user_controller-user-3="controller-secret";
Bootstrap Process
Create SCRAM credentials after cluster startup:
# Format and start cluster
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties
# Create SCRAM credentials
bin/kafka-configs.sh --bootstrap-server kafka1.example.com:9092 \
--command-config admin.properties \
--alter \
--add-config 'SCRAM-SHA-512=[password=producer-secret]' \
--entity-type users \
--entity-name producer
Key Points:
- SCRAM credentials stored in metadata log, not ZooKeeper
- Controllers use SASL/PLAIN on internal listener
- Clients use SASL/SCRAM on public listener
- Client configuration unchanged from ZooKeeper mode
Security Comparison
| Feature | SASL/PLAIN | SASL/SCRAM | SASL/GSSAPI | SASL/OAUTHBEARER |
|---|---|---|---|---|
| Complexity | Low | Medium | High | High |
| Password Storage | Plaintext (broker) | Hashed (broker) | KDC | Token service |
| Password Transmission | Plaintext | Hashed | Encrypted | Token |
| Setup Difficulty | Easy | Easy | Complex | Medium |
| Production Ready | Yes | Yes | Yes | Yes |
| Best For | Development | Production | Enterprise | Microservices |
| Credential Rotation | Manual | Manual | Automatic | Automatic |
| Mutual Auth | No | Yes | Yes | Yes |
Production Best Practices
1. Implement Credential Management
package credentials
import (
"fmt"
"os"
)
type KafkaCredentials struct {
Username string
Password string
}
func LoadFromEnvironment() (*KafkaCredentials, error) {
username := os.Getenv("KAFKA_SASL_USERNAME")
password := os.Getenv("KAFKA_SASL_PASSWORD")
if username == "" || password == "" {
return nil, fmt.Errorf("credentials not found in environment")
}
return &KafkaCredentials{
Username: username,
Password: password,
}, nil
}
// Integration with secret management systems
func LoadFromVault(vaultAddr, secretPath string) (*KafkaCredentials, error) {
// Implementation for HashiCorp Vault
// or other secret management systems
return nil, nil
}
2. Connection Error Handling
func createProducerWithRetry(brokers []string, config *sarama.Config, maxRetries int) (sarama.SyncProducer, error) {
var producer sarama.SyncProducer
var err error
for i := 0; i < maxRetries; i++ {
producer, err = sarama.NewSyncProducer(brokers, config)
if err == nil {
return producer, nil
}
log.Printf("Failed to create producer (attempt %d/%d): %v", i+1, maxRetries, err)
if isAuthenticationError(err) {
return nil, fmt.Errorf("authentication failed: %w", err)
}
time.Sleep(time.Second * time.Duration(i+1))
}
return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
func isAuthenticationError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "authentication") ||
strings.Contains(errStr, "SASL") ||
strings.Contains(errStr, "credentials")
}
3. Credential Rotation Strategy
type CredentialRotator struct {
currentCreds *KafkaCredentials
producer sarama.SyncProducer
config *sarama.Config
brokers []string
mu sync.RWMutex
}
func (r *CredentialRotator) RotateCredentials(newCreds *KafkaCredentials) error {
r.mu.Lock()
defer r.mu.Unlock()
// Create new configuration with new credentials
newConfig := r.config
newConfig.Net.SASL.User = newCreds.Username
newConfig.Net.SASL.Password = newCreds.Password
// Create new producer
newProducer, err := sarama.NewSyncProducer(r.brokers, newConfig)
if err != nil {
return fmt.Errorf("failed to create producer with new credentials: %w", err)
}
// Close old producer
oldProducer := r.producer
r.producer = newProducer
r.currentCreds = newCreds
if oldProducer != nil {
oldProducer.Close()
}
log.Println("Credentials rotated successfully")
return nil
}
Troubleshooting SASL Issues
Common Errors and Solutions
1. Authentication Failed
Error: kafka: client has run out of available brokers
Cause: Invalid credentials or SASL mechanism mismatch
Solution:
# Verify credentials exist
kafka-configs --bootstrap-server kafka1.example.com:9092 \
--describe \
--entity-type users \
--entity-name producer
# Check broker logs
tail -f /var/log/kafka/server.log | grep -i sasl
2. SCRAM Client Not Configured
Error: kafka: unable to authenticate: invalid response
Cause: Missing SCRAM client generator function
Solution:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
Debugging Checklist

Testing SASL Configuration
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func testSASLConnection(brokers []string, config *sarama.Config) error {
start := time.Now()
// Test client connection
client, err := sarama.NewClient(brokers, config)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer client.Close()
authDuration := time.Since(start)
log.Printf("Authentication successful in %v", authDuration)
// Test broker connectivity
brokerList := client.Brokers()
log.Printf("Discovered %d brokers", len(brokerList))
for _, broker := range brokerList {
err := broker.Open(config)
if err != nil {
return fmt.Errorf("failed to connect to broker %d: %w", broker.ID(), err)
}
log.Printf("Connected to broker %d at %s", broker.ID(), broker.Addr())
broker.Close()
}
// Test metadata request
topics, err := client.Topics()
if err != nil {
return fmt.Errorf("failed to fetch topics: %w", err)
}
log.Printf("Discovered %d topics", len(topics))
return nil
}
func main() {
brokers := []string{"kafka1.example.com:9092"}
config := createSASLSCRAMConfig("test-user", "test-password", sarama.SASLTypeSCRAMSHA512)
if err := testSASLConnection(brokers, config); err != nil {
log.Fatalf("Connection test failed: %v", err)
}
log.Println("All SASL connection tests passed")
}
Conclusion
SASL authentication is essential for securing Kafka clusters in production environments. Key takeaways:
Choosing a Mechanism:
- Development: SASL/PLAIN for simplicity
- Production: SASL/SCRAM-SHA-512 for security without external dependencies
- Enterprise: SASL/GSSAPI for Kerberos integration
- Microservices: SASL/OAUTHBEARER for OAuth 2.0 integration
Security Requirements:
- Use SCRAM over PLAIN for production deployments
- Implement proper credential management and rotation
- Secure network communication between clients and brokers
Best Practices:
- Store credentials securely (environment variables, secret management systems)
- Implement retry logic with proper error handling
- Test SASL configuration thoroughly before production deployment
- Maintain separate credentials for different services and roles
- Regularly rotate credentials following security policies
Common Pitfalls to Avoid:
- Hardcoding credentials in application code
- Missing SCRAM client configuration in Go applications
- Not implementing proper error handling for auth failures
- Sharing credentials across multiple services
- Using weak passwords for SASL/PLAIN
With proper implementation and adherence to best practices, SASL authentication provides robust security for Kafka deployments while maintaining operational flexibility and performance.