This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Kafka on Kubernetes - The Hard Way

Inspired by Kelsey Hightower’s kubernetes-the-hard-way, this comprehensive tutorial walks you through setting up a complete Kafka environment on Kubernetes using the Koperator from scratch.

What You’ll Learn

This tutorial will teach you how to:

  • Set up a multi-node Kubernetes cluster using kind
  • Install and configure all required dependencies manually
  • Deploy a production-ready Kafka cluster with monitoring
  • Test and validate your Kafka deployment
  • Handle disaster recovery scenarios
  • Troubleshoot common issues

Why “The Hard Way”?

This tutorial is called “the hard way” because it walks through each step manually rather than using automated scripts or simplified configurations. This approach helps you understand:

  • How each component works and interacts with others
  • The dependencies and relationships between services
  • How to troubleshoot when things go wrong
  • The complete architecture of a Kafka deployment on Kubernetes

Prerequisites

Before starting this tutorial, you should have:

  • Basic knowledge of Kubernetes concepts (pods, services, deployments)
  • Familiarity with Apache Kafka fundamentals
  • A local development machine with Docker installed
  • At least 8GB of RAM and 4 CPU cores available for the kind cluster

Tutorial Structure

This tutorial is organized into the following sections:

  1. Prerequisites and Setup - Install required tools and prepare your environment
  2. Kubernetes Cluster Setup - Create a multi-node kind cluster with proper labeling
  3. Dependencies Installation - Install cert-manager, ZooKeeper operator, and Prometheus operator
  4. Koperator Installation - Install the Kafka operator and its CRDs
  5. Kafka Cluster Deployment - Deploy and configure a Kafka cluster with monitoring
  6. Testing and Validation - Create topics, run producers/consumers, and performance tests
  7. Disaster Recovery Scenarios - Test failure scenarios and recovery procedures
  8. Troubleshooting - Common issues and debugging techniques

Architecture Overview

By the end of this tutorial, you’ll have deployed the following architecture:

┌─────────────────────────────────────────────────────────────────┐
│                    Kubernetes Cluster (kind)                    │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   Control Plane │  │    Worker AZ1   │  │    Worker AZ2   │  │
│  │                 │  │                 │  │                 │  │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘  │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   Worker AZ3    │  │   Worker AZ1    │  │   Worker AZ2    │  │
│  │                 │  │                 │  │                 │  │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘  │
├─────────────────────────────────────────────────────────────────┤
│                        Applications                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   Kafka Cluster │  │   ZooKeeper     │  │   Monitoring    │  │
│  │   (3 brokers)   │  │   (3 nodes)     │  │   Stack         │  │
│  │                 │  │                 │  │                 │  │
│  │  ┌─────────────┐│  │  ┌─────────────┐│  │  ┌─────────────┐│  │
│  │  │ Broker 101  ││  │  │    ZK-0     ││  │  │ Prometheus  ││  │
│  │  │ Broker 102  ││  │  │    ZK-1     ││  │  │ Grafana     ││  │
│  │  │ Broker 201  ││  │  │    ZK-2     ││  │  │ AlertMgr    ││  │
│  │  │ Broker 202  ││  │  └─────────────┘│  │  └─────────────┘│  │
│  │  │ Broker 301  ││  └─────────────────┘  └─────────────────┘  │
│  │  │ Broker 302  ││                                            │
│  │  └─────────────┘│                                            │
│  └─────────────────┘                                            │
├─────────────────────────────────────────────────────────────────┤
│                      Infrastructure                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │   cert-manager  │  │   Koperator     │  │   Cruise        │  │
│  │                 │  │                 │  │   Control       │  │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Key Features Demonstrated

This tutorial demonstrates:

  • Multi-AZ deployment with rack awareness
  • SSL/TLS encryption for secure communication
  • Monitoring and alerting with Prometheus and Grafana
  • Automatic scaling with Cruise Control
  • Persistent storage with proper volume management
  • External access configuration
  • Disaster recovery and failure handling

Time Commitment

Plan to spend approximately 2-3 hours completing this tutorial, depending on your familiarity with the tools and concepts involved.

Getting Started

Ready to begin? Start with the Prerequisites and Setup section.


Note: This tutorial is designed for learning and development purposes. For production deployments, consider using automated deployment tools and following your organization’s security and operational guidelines.

1 - Prerequisites and Setup

Prerequisites and Setup

Before starting this tutorial, you need to install several tools and prepare your development environment. This section will guide you through setting up everything required for the Kafka on Kubernetes deployment.

System Requirements

Hardware Requirements

  • CPU: Minimum 4 cores, recommended 8+ cores
  • Memory: Minimum 8GB RAM, recommended 16GB+ RAM
  • Storage: At least 20GB free disk space
  • Network: Stable internet connection for downloading container images

Operating System Support

This tutorial has been tested on:

  • macOS: 10.15+ (Catalina and newer)
  • Linux: Ubuntu 18.04+, CentOS 7+, RHEL 7+
  • Windows: Windows 10+ with WSL2

Required Tools Installation

1. Docker

Docker is required to run the kind Kubernetes cluster.

macOS (using Homebrew)

brew install --cask docker

Linux (Ubuntu/Debian)

# Update package index
sudo apt-get update

# Install required packages
sudo apt-get install -y \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg \
    lsb-release

# Add Docker's official GPG key
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

# Set up the stable repository
echo \
  "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

# Install Docker Engine
sudo apt-get update
sudo apt-get install -y docker-ce docker-ce-cli containerd.io

# Add your user to the docker group
sudo usermod -aG docker $USER

Windows

Download and install Docker Desktop from https://www.docker.com/products/docker-desktop

Verify Docker installation:

docker --version
docker run hello-world

2. kubectl

kubectl is the Kubernetes command-line tool.

macOS (using Homebrew)

brew install kubectl

Linux

# Download the latest release
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"

# Make it executable
chmod +x kubectl

# Move to PATH
sudo mv kubectl /usr/local/bin/

Windows (using Chocolatey)

choco install kubernetes-cli

Verify kubectl installation:

kubectl version --client

3. kind (Kubernetes in Docker)

kind is a tool for running local Kubernetes clusters using Docker containers.

All Platforms

# For Linux
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.30.0/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind

# For macOS
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.30.0/kind-darwin-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind

# For Windows (in PowerShell)
curl.exe -Lo kind-windows-amd64.exe https://kind.sigs.k8s.io/dl/v0.30.0/kind-windows-amd64
Move-Item .\kind-windows-amd64.exe c:\some-dir-in-your-PATH\kind.exe

macOS (using Homebrew)

brew install kind

Verify kind installation:

kind version

4. Helm

Helm is the package manager for Kubernetes.

macOS (using Homebrew)

brew install helm

Linux

# Download and install
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash

Windows (using Chocolatey)

choco install kubernetes-helm

Verify Helm installation:

helm version

5. Git

Git is required to clone configuration files and examples.

macOS (using Homebrew)

brew install git

Linux (Ubuntu/Debian)

sudo apt-get install -y git

Windows

Download and install from https://git-scm.com/download/win

Verify Git installation:

git --version

Environment Setup

1. Create Working Directory

Create a dedicated directory for this tutorial:

mkdir -p ~/kafka-k8s-tutorial
cd ~/kafka-k8s-tutorial

2. Set Environment Variables

Set up some useful environment variables:

# Export variables for the session
export TUTORIAL_DIR=~/kafka-k8s-tutorial
export KAFKA_NAMESPACE=kafka
export ZOOKEEPER_NAMESPACE=zookeeper
export MONITORING_NAMESPACE=default

# Make them persistent (add to your shell profile)
echo "export TUTORIAL_DIR=~/kafka-k8s-tutorial" >> ~/.bashrc
echo "export KAFKA_NAMESPACE=kafka" >> ~/.bashrc
echo "export ZOOKEEPER_NAMESPACE=zookeeper" >> ~/.bashrc
echo "export MONITORING_NAMESPACE=default" >> ~/.bashrc

# Reload your shell or source the file
source ~/.bashrc

3. Verify Docker Resources

Ensure Docker has sufficient resources allocated:

# Check Docker system info
docker system info

# Check available resources
docker system df

Recommended Docker Desktop settings:

  • Memory: 8GB minimum, 12GB+ recommended
  • CPUs: 4 minimum, 6+ recommended
  • Disk: 20GB minimum

4. Download Tutorial Resources

Clone the reference repository for configuration files:

cd $TUTORIAL_DIR
git clone https://github.com/amuraru/k8s-kafka-the-hard-way.git
cd k8s-kafka-the-hard-way

Verification Checklist

Before proceeding to the next section, verify that all tools are properly installed:

# Check Docker
echo "Docker version:"
docker --version
echo ""

# Check kubectl
echo "kubectl version:"
kubectl version --client
echo ""

# Check kind
echo "kind version:"
kind version
echo ""

# Check Helm
echo "Helm version:"
helm version
echo ""

# Check Git
echo "Git version:"
git --version
echo ""

# Check working directory
echo "Working directory:"
ls -la $TUTORIAL_DIR

Troubleshooting Common Issues

Docker Permission Issues (Linux)

If you get permission denied errors with Docker:

# Add your user to the docker group
sudo usermod -aG docker $USER

# Log out and log back in, or run:
newgrp docker

kubectl Not Found

If kubectl is not found in your PATH:

# Check if kubectl is in your PATH
which kubectl

# If not found, ensure /usr/local/bin is in your PATH
echo $PATH

# Add to PATH if needed
export PATH=$PATH:/usr/local/bin

kind Cluster Creation Issues

If you encounter issues creating kind clusters:

# Check Docker is running
docker ps

# Check available disk space
df -h

# Check Docker resources in Docker Desktop settings

Next Steps

Once you have all prerequisites installed and verified, proceed to the Kubernetes Cluster Setup section to create your kind cluster.


Tip: Keep this terminal session open throughout the tutorial, as the environment variables and working directory will be used in subsequent steps.

2 - Kubernetes Cluster Setup

Kubernetes Cluster Setup

In this section, you’ll create a multi-node Kubernetes cluster using kind (Kubernetes in Docker) that simulates a production-like environment with multiple availability zones.

Cluster Architecture

We’ll create a 7-node cluster with the following configuration:

  • 1 Control Plane node: Manages the Kubernetes API and cluster state
  • 6 Worker nodes: Distributed across 3 simulated availability zones (2 nodes per AZ)

This setup allows us to demonstrate:

  • Multi-AZ deployment patterns
  • Rack awareness for Kafka brokers
  • High availability configurations
  • Realistic failure scenarios

Create Cluster Configuration

First, create the kind cluster configuration file:

cd $TUTORIAL_DIR

# Create kind configuration directory
mkdir -p ~/.kind

# Create the cluster configuration
cat > ~/.kind/kind-config.yaml <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
    kubeadmConfigPatches:
    - |
      kind: InitConfiguration
      nodeRegistration:
        kubeletExtraArgs:
          node-labels: "ingress-ready=true"
  - role: worker
  - role: worker
  - role: worker
  - role: worker
  - role: worker
  - role: worker
containerdConfigPatches:
- |-
  [plugins."io.containerd.grpc.v1.cri".containerd]
    snapshotter = "overlayfs"
  [plugins."io.containerd.grpc.v1.cri".registry.mirrors."localhost:5000"]
    endpoint = ["http://localhost:5000"]
EOF

Create the Kubernetes Cluster

Now create the kind cluster:

# Create the cluster (this may take 5-10 minutes)
kind create cluster \
  --name kafka \
  --config ~/.kind/kind-config.yaml \
  --image kindest/node:v1.33.4

# Wait for cluster to be ready
echo "Waiting for cluster to be ready..."
kubectl wait --for=condition=Ready nodes --all --timeout=300s

Expected output:

Creating cluster "kafka" ...
 ✓ Ensuring node image (kindest/node:v1.33.4) 🖼
 ✓ Preparing nodes 📦 📦 📦 📦 📦 📦 📦
 ✓ Writing configuration 📜
 ✓ Starting control-plane 🕹️
 ✓ Installing CNI 🔌
 ✓ Installing StorageClass 💾
 ✓ Joining worker nodes 🚜
Set kubectl context to "kind-kafka"
You can now use your cluster with:

kubectl cluster-info --context kind-kafka

Verify Cluster Creation

Verify that all nodes are running and ready:

# Check cluster info
kubectl cluster-info --context kind-kafka

# List all nodes
kubectl get nodes -o wide

# Check node status
kubectl get nodes --show-labels

Expected output:

NAME                  STATUS   ROLES           AGE   VERSION
kafka-control-plane   Ready    control-plane   2m    v1.33.4
kafka-worker          Ready    <none>          2m    v1.33.4
kafka-worker2         Ready    <none>          2m    v1.33.4
kafka-worker3         Ready    <none>          2m    v1.33.4
kafka-worker4         Ready    <none>          2m    v1.33.4
kafka-worker5         Ready    <none>          2m    v1.33.4
kafka-worker6         Ready    <none>          2m    v1.33.4

Configure Multi-AZ Simulation

To simulate a multi-availability zone environment, we’ll label the nodes with region and zone information:

1. Label Nodes with Region

First, label all worker nodes with the same region:

# Label all worker nodes with region
kubectl label nodes \
  kafka-worker \
  kafka-worker2 \
  kafka-worker3 \
  kafka-worker4 \
  kafka-worker5 \
  kafka-worker6 \
  topology.kubernetes.io/region=region1

2. Label Nodes with Availability Zones

Now distribute the worker nodes across three availability zones:

# AZ1: kafka-worker and kafka-worker2
kubectl label nodes kafka-worker kafka-worker2 \
  topology.kubernetes.io/zone=az1

# AZ2: kafka-worker3 and kafka-worker4
kubectl label nodes kafka-worker3 kafka-worker4 \
  topology.kubernetes.io/zone=az2

# AZ3: kafka-worker5 and kafka-worker6
kubectl label nodes kafka-worker5 kafka-worker6 \
  topology.kubernetes.io/zone=az3

3. Verify Zone Configuration

Check that the zone labels are correctly applied:

# Display nodes with region and zone labels
kubectl get nodes \
  --label-columns=topology.kubernetes.io/region,topology.kubernetes.io/zone

# Show detailed node information
kubectl describe nodes | grep -E "Name:|topology.kubernetes.io"

Expected output:

NAME                  STATUS   ROLES           AGE   VERSION   REGION    ZONE
kafka-control-plane   Ready    control-plane   5m    v1.33.4   <none>    <none>
kafka-worker          Ready    <none>          5m    v1.33.4   region1   az1
kafka-worker2         Ready    <none>          5m    v1.33.4   region1   az1
kafka-worker3         Ready    <none>          5m    v1.33.4   region1   az2
kafka-worker4         Ready    <none>          5m    v1.33.4   region1   az2
kafka-worker5         Ready    <none>          5m    v1.33.4   region1   az2
kafka-worker6         Ready    <none>          5m    v1.33.4   region1   az3

Configure kubectl Context

Ensure you’re using the correct kubectl context:

# Set the current context to the kind cluster
kubectl config use-context kind-kafka

# Verify current context
kubectl config current-context

# Test cluster access
kubectl get namespaces

Cluster Resource Verification

Check the cluster’s available resources:

# Check node resources
kubectl top nodes 2>/dev/null || echo "Metrics server not yet available"

# Check cluster capacity
kubectl describe nodes | grep -A 5 "Capacity:"

# Check storage classes
kubectl get storageclass

# Check default namespace
kubectl get all

Understanding the Cluster Layout

Your cluster now has the following topology:

┌─────────────────────────────────────────────────────────────────┐
│                         kind-kafka cluster                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐                                            │
│  │ Control Plane   │                                            │
│  │ kafka-control-  │                                            │
│  │ plane           │                                            │
│  └─────────────────┘                                            │
│                                                                 │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │
│  │      AZ1        │  │      AZ2        │  │      AZ3        │  │
│  │                 │  │                 │  │                 │  │
│  │ kafka-worker    │  │ kafka-worker3   │  │ kafka-worker5   │  │
│  │ kafka-worker2   │  │ kafka-worker4   │  │ kafka-worker6   │  │
│  │                 │  │                 │  │                 │  │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Troubleshooting

Cluster Creation Issues

If cluster creation fails:

# Delete the failed cluster
kind delete cluster --name kafka

# Check Docker resources
docker system df
docker system prune -f

# Retry cluster creation
kind create cluster --name kafka --config ~/.kind/kind-config.yaml --image kindest/node:v1.33.4

Node Not Ready

If nodes are not ready:

# Check node status
kubectl describe nodes

# Check system pods
kubectl get pods -n kube-system

# Check kubelet logs (from Docker)
docker logs kafka-worker

Context Issues

If kubectl context is not set correctly:

# List available contexts
kubectl config get-contexts

# Set the correct context
kubectl config use-context kind-kafka

# Verify
kubectl config current-context

Cluster Cleanup (Optional)

If you need to start over:

# Delete the cluster
kind delete cluster --name kafka

# Verify deletion
kind get clusters

# Remove configuration
rm ~/.kind/kind-config.yaml

Next Steps

With your Kubernetes cluster ready and properly configured with multi-AZ simulation, you can now proceed to install the required dependencies. Continue to the Dependencies Installation section.


Note: The cluster will persist until you explicitly delete it with kind delete cluster --name kafka. You can stop and start Docker without losing your cluster state.

3 - Dependencies Installation

Dependencies Installation

Before installing the Koperator, we need to set up several dependencies that are required for a complete Kafka deployment. This section covers the installation of cert-manager, ZooKeeper operator, and Prometheus operator.

Overview

The dependencies we’ll install are:

  1. cert-manager: Manages TLS certificates for secure communication
  2. ZooKeeper Operator: Manages ZooKeeper clusters (required for traditional Kafka deployments)
  3. Prometheus Operator: Provides monitoring and alerting capabilities

1. Install cert-manager

cert-manager is essential for TLS certificate management in Kafka deployments.

Install cert-manager CRDs

First, install the Custom Resource Definitions:

# Install cert-manager CRDs
kubectl create --validate=false -f https://github.com/cert-manager/cert-manager/releases/download/v1.18.2/cert-manager.crds.yaml

Expected output:

customresourcedefinition.apiextensions.k8s.io/certificaterequests.cert-manager.io created
customresourcedefinition.apiextensions.k8s.io/certificates.cert-manager.io created
customresourcedefinition.apiextensions.k8s.io/challenges.acme.cert-manager.io created
customresourcedefinition.apiextensions.k8s.io/clusterissuers.cert-manager.io created
customresourcedefinition.apiextensions.k8s.io/issuers.cert-manager.io created
customresourcedefinition.apiextensions.k8s.io/orders.acme.cert-manager.io created

Create cert-manager Namespace

# Create namespace for cert-manager
kubectl create namespace cert-manager

Install cert-manager using Helm

# Add cert-manager Helm repository
helm repo add cert-manager https://charts.jetstack.io
helm repo update

# Install cert-manager
helm install cert-manager cert-manager/cert-manager \
  --namespace cert-manager \
  --version v1.18.2

# Wait for cert-manager to be ready
kubectl wait --for=condition=Available deployment --all -n cert-manager --timeout=300s

Verify cert-manager Installation

# Check cert-manager pods
kubectl get pods -n cert-manager

# Check cert-manager services
kubectl get svc -n cert-manager

# Verify cert-manager is working
kubectl get certificates -A

Expected output:

NAME                                       READY   STATUS    RESTARTS   AGE
cert-manager-cainjector-7d55bf8f78-xyz123   1/1     Running   0          2m
cert-manager-webhook-97f8b47bc-abc456       1/1     Running   0          2m
cert-manager-7dd5854bb4-def789              1/1     Running   0          2m

2. Install ZooKeeper Operator

The ZooKeeper operator manages ZooKeeper clusters required by Kafka.

Create ZooKeeper Namespace

# Create namespace for ZooKeeper
kubectl create namespace zookeeper

Install ZooKeeper Operator CRDs

# Install ZooKeeper CRDs
kubectl create -f https://raw.githubusercontent.com/adobe/zookeeper-operator/master/config/crd/bases/zookeeper.pravega.io_zookeeperclusters.yaml

Clone ZooKeeper Operator Repository

# Clone the ZooKeeper operator repository
cd $TUTORIAL_DIR
rm -rf /tmp/zookeeper-operator
git clone --single-branch --branch master https://github.com/adobe/zookeeper-operator /tmp/zookeeper-operator
cd /tmp/zookeeper-operator

Install ZooKeeper Operator using Helm

# Install ZooKeeper operator
helm template zookeeper-operator \
  --namespace=zookeeper \
  --set crd.create=false \
  --set image.repository='adobe/zookeeper-operator' \
  --set image.tag='0.2.15-adobe-20250914' \
  ./charts/zookeeper-operator | kubectl create -n zookeeper -f -

# Wait for operator to be ready
kubectl wait --for=condition=Available deployment --all -n zookeeper --timeout=300s

Deploy ZooKeeper Cluster

Create a 3-node ZooKeeper cluster:

# Create ZooKeeper cluster
kubectl create --namespace zookeeper -f - <<EOF
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
  name: zk
  namespace: zookeeper
spec:
  replicas: 3
  image:
    repository: adobe/zookeeper
    tag: 3.8.4-0.2.15-adobe-20250914
    pullPolicy: IfNotPresent
  config:
    initLimit: 10
    tickTime: 2000
    syncLimit: 5
  probes:
    livenessProbe:
      initialDelaySeconds: 41
  persistence:
    reclaimPolicy: Delete
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 20Gi
EOF

Verify ZooKeeper Installation

# Check ZooKeeper cluster status
kubectl get zookeepercluster -n zookeeper -o wide

# Watch ZooKeeper cluster creation
kubectl get pods -n zookeeper -w
# Press Ctrl+C to stop watching when all pods are running

# Check ZooKeeper services
kubectl get svc -n zookeeper

# Verify ZooKeeper cluster is ready
kubectl wait --for=condition=Ready pod --all -n zookeeper --timeout=600s

Expected output:

NAME   REPLICAS   READY REPLICAS   VERSION                           DESIRED VERSION                   INTERNAL ENDPOINT    EXTERNAL ENDPOINT   AGE
zk     3          3               3.8.4-0.2.15-adobe-20250914      3.8.4-0.2.15-adobe-20250914      zk-client:2181                           5m

3. Install Prometheus Operator

The Prometheus operator provides comprehensive monitoring for Kafka and ZooKeeper.

Install Prometheus Operator CRDs

# Install Prometheus Operator CRDs
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_alertmanagers.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_alertmanagerconfigs.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_prometheuses.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_prometheusrules.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_servicemonitors.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_podmonitors.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_thanosrulers.yaml
kubectl create -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/example/prometheus-operator-crd/monitoring.coreos.com_probes.yaml

Install Prometheus Stack using Helm

# Add Prometheus community Helm repository
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update

# Install kube-prometheus-stack (includes Prometheus, Grafana, and AlertManager)
helm install monitoring \
  --namespace=default \
  prometheus-community/kube-prometheus-stack \
  --set prometheusOperator.createCustomResource=false

# Wait for monitoring stack to be ready
kubectl wait --for=condition=Available deployment --all -n default --timeout=600s

Verify Prometheus Installation

# Check monitoring pods
kubectl get pods -l release=monitoring

# Check monitoring services
kubectl get svc -l release=monitoring

# Check Prometheus targets (optional)
kubectl get prometheus -o wide

Expected output:

NAME                                                   READY   STATUS    RESTARTS   AGE
monitoring-kube-prometheus-prometheus-node-exporter-*  1/1     Running   0          3m
monitoring-kube-state-metrics-*                       1/1     Running   0          3m
monitoring-prometheus-operator-*                      1/1     Running   0          3m
monitoring-grafana-*                                  3/3     Running   0          3m

Access Monitoring Dashboards

Get Grafana Admin Password

# Get Grafana admin password
kubectl get secret --namespace default monitoring-grafana \
  -o jsonpath="{.data.admin-password}" | base64 --decode
echo ""

Set Up Port Forwarding (Optional)

You can access the monitoring dashboards using port forwarding:

# Prometheus (in a separate terminal)
kubectl --namespace default port-forward svc/monitoring-kube-prometheus-prometheus 9090 &

# Grafana (in a separate terminal)
kubectl --namespace default port-forward svc/monitoring-grafana 3000:80 &

# AlertManager (in a separate terminal)
kubectl --namespace default port-forward svc/monitoring-kube-prometheus-alertmanager 9093 &

Access the dashboards at:

  • Prometheus: http://localhost:9090
  • Grafana: http://localhost:3000 (admin/[password from above])
  • AlertManager: http://localhost:9093

Verification Summary

Verify all dependencies are properly installed:

echo "=== cert-manager ==="
kubectl get pods -n cert-manager

echo -e "\n=== ZooKeeper ==="
kubectl get pods -n zookeeper
kubectl get zookeepercluster -n zookeeper

echo -e "\n=== Monitoring ==="
kubectl get pods -l release=monitoring

echo -e "\n=== All Namespaces ==="
kubectl get namespaces

Troubleshooting

cert-manager Issues

# Check cert-manager logs
kubectl logs -n cert-manager deployment/cert-manager

# Check webhook connectivity
kubectl get validatingwebhookconfigurations

ZooKeeper Issues

# Check ZooKeeper operator logs
kubectl logs -n zookeeper deployment/zookeeper-operator

# Check ZooKeeper cluster events
kubectl describe zookeepercluster zk -n zookeeper

Prometheus Issues

# Check Prometheus operator logs
kubectl logs -l app.kubernetes.io/name=prometheus-operator

# Check Prometheus configuration
kubectl get prometheus -o yaml

Next Steps

With all dependencies successfully installed, you can now proceed to install the Koperator itself. Continue to the Koperator Installation section.


Note: The monitoring stack will start collecting metrics immediately. You can explore the Grafana dashboards to see cluster metrics even before deploying Kafka.

4 - Koperator Installation

Koperator Installation

In this section, you’ll install the Koperator (formerly BanzaiCloud Kafka Operator), which will manage your Kafka clusters on Kubernetes. The installation process involves installing Custom Resource Definitions (CRDs) and the operator itself.

Overview

The Koperator installation consists of:

  1. Creating the Kafka namespace
  2. Installing Koperator CRDs (Custom Resource Definitions)
  3. Installing the Koperator using Helm
  4. Verifying the installation

1. Create Kafka Namespace

First, create a dedicated namespace for Kafka resources:

# Create namespace for Kafka
kubectl create namespace kafka

# Verify namespace creation
kubectl get namespaces | grep kafka

Expected output:

kafka         Active   10s

2. Install Koperator CRDs

The Koperator requires several Custom Resource Definitions to manage Kafka clusters, topics, and users.

Install Required CRDs

# Install KafkaCluster CRD
kubectl apply --server-side -f https://raw.githubusercontent.com/adobe/koperator/refs/heads/master/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml

# Install KafkaTopic CRD
kubectl apply --server-side -f https://raw.githubusercontent.com/adobe/koperator/refs/heads/master/config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml

# Install KafkaUser CRD
kubectl apply --server-side -f https://raw.githubusercontent.com/adobe/koperator/refs/heads/master/config/base/crds/kafka.banzaicloud.io_kafkausers.yaml

# Install CruiseControlOperation CRD
kubectl apply --server-side -f https://raw.githubusercontent.com/adobe/koperator/refs/heads/master/config/base/crds/kafka.banzaicloud.io_cruisecontroloperations.yaml

Verify CRD Installation

# Check that all CRDs are installed
kubectl get crd | grep kafka.banzaicloud.io

# Get detailed information about the CRDs
kubectl get crd kafkaclusters.kafka.banzaicloud.io -o yaml | head -20

Expected output:

cruisecontroloperations.kafka.banzaicloud.io   2024-01-15T10:30:00Z
kafkaclusters.kafka.banzaicloud.io             2024-01-15T10:30:00Z
kafkatopics.kafka.banzaicloud.io               2024-01-15T10:30:00Z
kafkausers.kafka.banzaicloud.io                2024-01-15T10:30:00Z

3. Install Koperator using Helm

Now install the Koperator using the OCI Helm chart:

# Install Koperator using OCI Helm chart
helm install kafka-operator oci://ghcr.io/adobe/helm-charts/kafka-operator \
  --namespace=kafka \
  --set webhook.enabled=false \
  --version 0.28.0-adobe-20250923

# Wait for the operator to be ready
kubectl wait --for=condition=Available deployment --all -n kafka --timeout=300s

Expected output:

Pulled: ghcr.io/adobe/helm-charts/kafka-operator:0.28.0-adobe-20250923
Digest: sha256:...
NAME: kafka-operator
LAST DEPLOYED: Mon Jan 15 10:35:00 2024
NAMESPACE: kafka
STATUS: deployed
REVISION: 1

4. Verify Koperator Installation

Check Operator Pods

# Check Koperator pods
kubectl get pods -n kafka

# Check pod details
kubectl describe pods -n kafka

# Check operator logs
kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager -n kafka

Expected output:

NAME                                    READY   STATUS    RESTARTS   AGE
kafka-operator-operator-xyz123-abc456   2/2     Running   0          2m

Check Operator Services

# Check services in kafka namespace
kubectl get svc -n kafka

# Check operator deployment
kubectl get deployment -n kafka

Verify Operator Functionality

# Check if the operator is watching for KafkaCluster resources
kubectl get kafkaclusters -n kafka

# Check operator configuration
kubectl get deployment kafka-operator-operator -n kafka -o yaml | grep -A 10 -B 10 image:

5. Understanding Koperator Components

The Koperator installation includes several components:

Manager Container

  • Purpose: Main operator logic
  • Responsibilities: Watches Kafka CRDs and manages Kafka clusters
  • Resource Management: Creates and manages Kafka broker pods, services, and configurations

Webhook (Disabled)

  • Purpose: Admission control and validation
  • Status: Disabled in this tutorial for simplicity
  • Production Note: Should be enabled in production environments

RBAC Resources

The operator creates several RBAC resources:

  • ServiceAccount
  • ClusterRole and ClusterRoleBinding
  • Role and RoleBinding
# Check RBAC resources
kubectl get serviceaccount -n kafka
kubectl get clusterrole | grep kafka-operator
kubectl get rolebinding -n kafka

6. Operator Configuration

View Operator Configuration

# Check operator deployment configuration
kubectl get deployment kafka-operator-operator -n kafka -o yaml

# Check operator environment variables
kubectl get deployment kafka-operator-operator -n kafka -o jsonpath='{.spec.template.spec.containers[0].env}' | jq .

Key Configuration Options

The operator is configured with the following key settings:

  • Webhook disabled: Simplifies the tutorial setup
  • Namespace: Operates in the kafka namespace
  • Image: Uses the official Adobe Koperator image
  • Version: 0.28.0-adobe-20250923

7. Operator Capabilities

The Koperator provides the following capabilities:

Kafka Cluster Management

  • Automated broker deployment and scaling
  • Rolling updates and configuration changes
  • Persistent volume management
  • Network policy configuration

Security Features

  • TLS/SSL certificate management
  • SASL authentication support
  • Network encryption
  • User and ACL management

Monitoring Integration

  • JMX metrics exposure
  • Prometheus integration
  • Grafana dashboard support
  • Custom alerting rules

Advanced Features

  • Cruise Control integration for rebalancing
  • External access configuration
  • Multi-AZ deployment support
  • Rack awareness

8. Troubleshooting

Operator Not Starting

If the operator pod is not starting:

# Check pod events
kubectl describe pod -l app.kubernetes.io/instance=kafka-operator -n kafka

# Check operator logs
kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager -n kafka --previous

# Check resource constraints
kubectl top pod -n kafka

CRD Issues

If CRDs are not properly installed:

# Reinstall CRDs
kubectl delete crd kafkaclusters.kafka.banzaicloud.io
kubectl apply --server-side -f https://raw.githubusercontent.com/adobe/koperator/refs/heads/master/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml

# Check CRD status
kubectl get crd kafkaclusters.kafka.banzaicloud.io -o yaml

Helm Installation Issues

If Helm installation fails:

# Check Helm release status
helm list -n kafka

# Uninstall and reinstall
helm uninstall kafka-operator -n kafka
helm install kafka-operator oci://ghcr.io/adobe/helm-charts/kafka-operator \
  --namespace=kafka \
  --set webhook.enabled=false \
  --version 0.28.0-adobe-20250923

9. Verification Checklist

Before proceeding to the next section, ensure:

echo "=== Namespace ==="
kubectl get namespace kafka

echo -e "\n=== CRDs ==="
kubectl get crd | grep kafka.banzaicloud.io

echo -e "\n=== Operator Pod ==="
kubectl get pods -n kafka

echo -e "\n=== Operator Logs (last 10 lines) ==="
kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager -n kafka --tail=10

echo -e "\n=== Ready for Kafka Cluster Deployment ==="
kubectl get kafkaclusters -n kafka

Expected final output:

=== Namespace ===
NAME    STATUS   AGE
kafka   Active   10m

=== CRDs ===
cruisecontroloperations.kafka.banzaicloud.io   2024-01-15T10:30:00Z
kafkaclusters.kafka.banzaicloud.io             2024-01-15T10:30:00Z
kafkatopics.kafka.banzaicloud.io               2024-01-15T10:30:00Z
kafkausers.kafka.banzaicloud.io                2024-01-15T10:30:00Z

=== Operator Pod ===
NAME                                    READY   STATUS    RESTARTS   AGE
kafka-operator-operator-xyz123-abc456   2/2     Running   0          5m

=== Ready for Kafka Cluster Deployment ===
No resources found in kafka namespace.

Next Steps

With the Koperator successfully installed and running, you’re now ready to deploy a Kafka cluster. Continue to the Kafka Cluster Deployment section to create your first Kafka cluster.


Note: The operator will continuously monitor the kafka namespace for KafkaCluster resources. Once you create a KafkaCluster resource, the operator will automatically provision the necessary Kafka infrastructure.

5 - Kafka Cluster Deployment

Kafka Cluster Deployment

In this section, you’ll deploy a production-ready Kafka cluster with monitoring, alerting, and dashboard integration. The deployment will demonstrate multi-AZ distribution, persistent storage, and comprehensive observability.

Overview

We’ll deploy:

  1. A 6-broker Kafka cluster distributed across 3 availability zones
  2. Prometheus monitoring with ServiceMonitor resources
  3. AlertManager rules for auto-scaling and alerting
  4. Grafana dashboard for Kafka metrics visualization
  5. Cruise Control for cluster management and rebalancing

1. Deploy Kafka Cluster

Create Kafka Cluster Configuration

First, let’s create a comprehensive Kafka cluster configuration:

cd $TUTORIAL_DIR

# Create the KafkaCluster resource
kubectl create -n kafka -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
metadata:
  name: kafka
  namespace: kafka
spec:
  headlessServiceEnabled: true
  zkAddresses:
    - "zk-client.zookeeper:2181"
  rackAwareness:
    labels:
      - "topology.kubernetes.io/zone"
  brokerConfigGroups:
    default:
      brokerAnnotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9020"
      storageConfigs:
        - mountPath: "/kafka-logs"
          pvcSpec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 10Gi
      serviceAccountName: "default"
      resourceRequirements:
        limits:
          cpu: "2"
          memory: "4Gi"
        requests:
          cpu: "1"
          memory: "2Gi"
      jvmPerformanceOpts: "-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dsun.awt.fontpath=/usr/share/fonts/TTF"
      config:
        "auto.create.topics.enable": "true"
        "cruise.control.metrics.topic.auto.create": "true"
        "cruise.control.metrics.topic.num.partitions": "1"
        "cruise.control.metrics.topic.replication.factor": "2"
        "default.replication.factor": "2"
        "min.insync.replicas": "1"
        "num.partitions": "3"
        "offsets.topic.replication.factor": "2"
        "replica.lag.time.max.ms": "30000"
        "transaction.state.log.replication.factor": "2"
        "transaction.state.log.min.isr": "1"
  brokers:
    - id: 101
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
    - id: 102
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
    - id: 201
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
    - id: 202
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
    - id: 301
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
    - id: 302
      brokerConfigGroup: "default"
      nodePortExternalIP:
        externalIP: "127.0.0.1"
  rollingUpgradeConfig:
    failureThreshold: 1
  cruiseControlConfig:
    cruiseControlEndpoint: "kafka-cruisecontrol-svc.kafka:8090"
    config: |
      # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
      # Sample Cruise Control configuration file.
      
      # Configuration for the metadata client.
      # =======================================
      
      # The maximum interval in milliseconds between two metadata refreshes.
      metadata.max.age.ms=300000
      
      # Client id for the Cruise Control. It is used for the metadata client.
      client.id=kafka-cruise-control
      
      # The size of TCP send buffer for Kafka network client.
      send.buffer.bytes=131072
      
      # The size of TCP receive buffer for Kafka network client.
      receive.buffer.bytes=131072
      
      # The time to wait for response from a server.
      request.timeout.ms=30000
      
      # Configurations for the load monitor
      # ===================================
      
      # The number of metric fetcher thread to fetch metrics for the Kafka cluster
      num.metric.fetchers=1
      
      # The metric sampler class
      metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler
      
      # Configurations for CruiseControlMetricsReporter
      cruise.control.metrics.reporter.interval.ms=10000
      cruise.control.metrics.reporter.kubernetes.mode=true
      
      # The sample store class name
      sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore
      
      # The config for the Kafka sample store to save the partition metric samples
      partition.metric.sample.store.topic=__CruiseControlMetrics
      
      # The config for the Kafka sample store to save the model training samples
      broker.metric.sample.store.topic=__CruiseControlModelTrainingSamples
      
      # The replication factor of Kafka metric sample store topic
      sample.store.topic.replication.factor=2
      
      # The config for the number of Kafka sample store consumer threads
      num.sample.loading.threads=8
      
      # The partition assignor class for the metric samplers
      metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
      
      # The metric sampling interval in milliseconds
      metric.sampling.interval.ms=120000
      
      # The partition metrics window size in milliseconds
      partition.metrics.window.ms=300000
      
      # The number of partition metric windows to keep in memory
      num.partition.metrics.windows=1
      
      # The minimum partition metric samples required for a partition in each window
      min.samples.per.partition.metrics.window=1
      
      # The broker metrics window size in milliseconds
      broker.metrics.window.ms=300000
      
      # The number of broker metric windows to keep in memory
      num.broker.metrics.windows=20
      
      # The minimum broker metric samples required for a broker in each window
      min.samples.per.broker.metrics.window=1
      
      # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities)
      capacity.config.file=config/capacity.json
      
      # Configurations for the analyzer
      # ===============================
      
      # The list of goals to optimize the Kafka cluster for with pre-computed proposals
      default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
      
      # The list of supported goals
      goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
      
      # The list of supported hard goals
      hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      
      # The minimum percentage of well monitored partitions out of all the partitions
      min.monitored.partition.percentage=0.95
      
      # The balance threshold for CPU
      cpu.balance.threshold=1.1
      
      # The balance threshold for disk
      disk.balance.threshold=1.1
      
      # The balance threshold for network inbound utilization
      network.inbound.balance.threshold=1.1
      
      # The balance threshold for network outbound utilization
      network.outbound.balance.threshold=1.1
      
      # The balance threshold for the replica count
      replica.count.balance.threshold=1.1
      
      # The capacity threshold for CPU in percentage
      cpu.capacity.threshold=0.8
      
      # The capacity threshold for disk in percentage
      disk.capacity.threshold=0.8
      
      # The capacity threshold for network inbound utilization in percentage
      network.inbound.capacity.threshold=0.8
      
      # The capacity threshold for network outbound utilization in percentage
      network.outbound.capacity.threshold=0.8
      
      # The threshold for the number of replicas per broker
      replica.capacity.threshold=1000
      
      # The weight adjustment in the optimization algorithm
      cpu.low.utilization.threshold=0.0
      disk.low.utilization.threshold=0.0
      network.inbound.low.utilization.threshold=0.0
      network.outbound.low.utilization.threshold=0.0
      
      # The metric anomaly percentile upper threshold
      metric.anomaly.percentile.upper.threshold=90.0
      
      # The metric anomaly percentile lower threshold
      metric.anomaly.percentile.lower.threshold=10.0
      
      # How often should the cached proposal be expired and recalculated if necessary
      proposal.expiration.ms=60000
      
      # The maximum number of replicas that can reside on a broker at any given time.
      max.replicas.per.broker=10000
      
      # The number of threads to use for proposal candidate precomputing.
      num.proposal.precompute.threads=1
      
      # the topics that should be excluded from the partition movement.
      #topics.excluded.from.partition.movement
      
      # Configurations for the executor
      # ===============================
      
      # The max number of partitions to move in/out on a given broker at a given time.
      num.concurrent.partition.movements.per.broker=10
      
      # The interval between two execution progress checks.
      execution.progress.check.interval.ms=10000
      
      # Configurations for anomaly detector
      # ===================================
      
      # The goal violation notifier class
      anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier
      
      # The metric anomaly finder class
      metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder
      
      # The anomaly detection interval
      anomaly.detection.interval.ms=10000
      
      # The goal violation to detect.
      anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      
      # The interested metrics for metric anomaly analyzer.
      metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN
      
      # The zk path to store the anomaly detector state. This is to avoid duplicate anomaly detection due to controller failure.
      anomaly.detection.state.path=/CruiseControlAnomalyDetector/AnomalyDetectorState
      
      # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled
      self.healing.enabled=true
      
      # Enable self healing for broker failure detector
      self.healing.broker.failure.enabled=true
      
      # Enable self healing for goal violation detector
      self.healing.goal.violation.enabled=true
      
      # Enable self healing for metric anomaly detector
      self.healing.metric.anomaly.enabled=true
      
      # configurations for the webserver
      # ================================
      
      # HTTP listen port for the Cruise Control
      webserver.http.port=8090
      
      # HTTP listen address for the Cruise Control
      webserver.http.address=0.0.0.0
      
      # Whether CORS support is enabled for API or not
      webserver.http.cors.enabled=false
      
      # Value for Access-Control-Allow-Origin
      webserver.http.cors.origin=http://localhost:8090/
      
      # Value for Access-Control-Request-Method
      webserver.http.cors.allowmethods=OPTIONS,GET,POST
      
      # Headers that should be exposed to the Browser (Webapp)
      # This is a special header that is used by the
      # User Tasks subsystem and should be explicitly
      # Enabled when CORS mode is used as part of the
      # Admin Interface
      webserver.http.cors.exposeheaders=User-Task-ID
      
      # REST API default prefix (dont forget the ending /*)
      webserver.api.urlprefix=/kafkacruisecontrol/*
      
      # Location where the Cruise Control frontend is deployed
      webserver.ui.diskpath=./cruise-control-ui/dist/
      
      # URL path prefix for UI
      webserver.ui.urlprefix=/*
      
      # Time After which request is converted to Async
      webserver.request.maxBlockTimeMs=10000
      
      # Default Session Expiry Period
      webserver.session.maxExpiryTimeMs=60000
      
      # Session cookie path
      webserver.session.path=/
      
      # Server Access Logs
      webserver.accesslog.enabled=true
      
      # Location of HTTP Request Logs
      webserver.accesslog.path=access.log
      
      # HTTP Request Log retention days
      webserver.accesslog.retention.days=14
EOF

Monitor Cluster Deployment

Watch the cluster deployment progress:

# Watch KafkaCluster status
kubectl get kafkacluster kafka -n kafka -w -o wide
# Press Ctrl+C to stop watching when cluster is ready

# Check broker pods
kubectl get pods -n kafka -l kafka_cr=kafka

# Check all resources in kafka namespace
kubectl get all -n kafka

Expected output (after 5-10 minutes):

NAME    AGE   WARNINGS
kafka   10m   

NAME                                    READY   STATUS    RESTARTS   AGE
pod/kafka-101-xyz123                    1/1     Running   0          8m
pod/kafka-102-abc456                    1/1     Running   0          8m
pod/kafka-201-def789                    1/1     Running   0          8m
pod/kafka-202-ghi012                    1/1     Running   0          8m
pod/kafka-301-jkl345                    1/1     Running   0          8m
pod/kafka-302-mno678                    1/1     Running   0          8m
pod/kafka-cruisecontrol-xyz789          1/1     Running   0          6m

2. Configure Monitoring

Create Prometheus ServiceMonitor

Create monitoring configuration for Prometheus to scrape Kafka metrics:

# Create ServiceMonitor for Kafka metrics
kubectl apply -n kafka -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka-servicemonitor
  namespace: kafka
  labels:
    app: kafka
    release: monitoring
spec:
  selector:
    matchLabels:
      app: kafka
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka-jmx-servicemonitor
  namespace: kafka
  labels:
    app: kafka-jmx
    release: monitoring
spec:
  selector:
    matchLabels:
      app: kafka
  endpoints:
  - port: jmx-metrics
    interval: 30s
    path: /metrics
EOF

Create AlertManager Rules

Set up alerting rules for Kafka monitoring and auto-scaling:

# Create PrometheusRule for Kafka alerting
kubectl apply -n kafka -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: kafka-alerts
  namespace: kafka
  labels:
    app: kafka
    release: monitoring
spec:
  groups:
  - name: kafka.rules
    rules:
    - alert: KafkaOfflinePartitions
      expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "Kafka has offline partitions"
        description: "Kafka cluster {{ \$labels.instance }} has {{ \$value }} offline partitions"
    
    - alert: KafkaUnderReplicatedPartitions
      expr: kafka_server_replicamanager_underreplicatedpartitions > 0
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Kafka has under-replicated partitions"
        description: "Kafka cluster {{ \$labels.instance }} has {{ \$value }} under-replicated partitions"
    
    - alert: KafkaHighProducerRequestRate
      expr: rate(kafka_network_requestmetrics_requests_total{request="Produce"}[5m]) > 1000
      for: 10m
      labels:
        severity: warning
        command: "upScale"
      annotations:
        summary: "High Kafka producer request rate"
        description: "Kafka producer request rate is {{ \$value }} requests/sec"
    
    - alert: KafkaLowProducerRequestRate
      expr: rate(kafka_network_requestmetrics_requests_total{request="Produce"}[5m]) < 100
      for: 30m
      labels:
        severity: info
        command: "downScale"
      annotations:
        summary: "Low Kafka producer request rate"
        description: "Kafka producer request rate is {{ \$value }} requests/sec"
EOF

3. Load Grafana Dashboard

Apply Complete Kafka Dashboard

The complete Kafka Looking Glass dashboard provides comprehensive monitoring with dozens of panels covering all aspects of Kafka performance:

# Apply the complete Kafka Looking Glass dashboard directly
kubectl apply -n default \
  -f https://raw.githubusercontent.com/amuraru/k8s-kafka-operator/master/grafana-dashboard.yaml

Dashboard Features

The complete Kafka Looking Glass dashboard includes:

Overview Section:

  • Brokers online count
  • Cluster version information
  • Active controllers
  • Topic count
  • Offline partitions
  • Under-replicated partitions

Performance Metrics:

  • Message throughput (in/out per second)
  • Bytes throughput (in/out per second)
  • Request latency breakdown
  • Network request metrics
  • Replication rates

Broker Health:

  • JVM memory usage
  • Garbage collection metrics
  • Thread states
  • Log flush times
  • Disk usage

Topic Analysis:

  • Per-topic throughput
  • Partition distribution
  • Leader distribution
  • Consumer lag metrics

ZooKeeper Integration:

  • ZooKeeper quorum size
  • Leader count
  • Request latency
  • Digest mismatches

Error Monitoring:

  • Offline broker disks
  • Orphan replicas
  • Under-replicated partitions
  • Network issues

4. Verify Deployment

Check Cluster Status

# Describe the KafkaCluster
kubectl describe kafkacluster kafka -n kafka

# Check broker distribution across zones
kubectl get pods -n kafka -l kafka_cr=kafka -o wide

# Check persistent volumes
kubectl get pv,pvc -n kafka

Access Cruise Control

# Port forward to Cruise Control (in a separate terminal)
kubectl port-forward -n kafka svc/kafka-cruisecontrol-svc 8090:8090 &

# Check Cruise Control status (optional)
curl -s http://localhost:8090/kafkacruisecontrol/v1/state | jq .

Verify Monitoring Integration

# Check if Prometheus is scraping Kafka metrics
kubectl port-forward -n default svc/monitoring-kube-prometheus-prometheus 9090 &

# Visit http://localhost:9090 and search for kafka_ metrics

Access the Kafka Looking Glass Dashboard

# Get Grafana admin password
kubectl get secret --namespace default monitoring-grafana \
  -o jsonpath="{.data.admin-password}" | base64 --decode
echo ""

# Port forward to Grafana
kubectl port-forward -n default svc/monitoring-grafana 3000:80 &

Visit http://localhost:3000 and:

  1. Login with admin/[password from above]
  2. Navigate to Dashboards → Browse
  3. Look for “Kafka Looking Glass” dashboard
  4. The dashboard should show real-time metrics from your Kafka cluster

The dashboard will automatically detect your cluster using the template variables:

  • Namespace: Should auto-select “kafka”
  • Cluster Name: Should auto-select “kafka”
  • Broker: Shows all brokers (101, 102, 201, 202, 301, 302)
  • Topic: Shows all topics in your cluster

Next Steps

With your Kafka cluster successfully deployed and monitoring configured, you can now proceed to test the deployment. Continue to the Testing and Validation section to create topics and run producer/consumer tests.


Note: The cluster deployment may take 10-15 minutes to complete. The brokers will be distributed across the three availability zones you configured earlier, providing high availability and fault tolerance.

6 - Testing and Validation

Testing and Validation

In this section, you’ll thoroughly test your Kafka cluster deployment by creating topics, running producers and consumers, and performing performance tests. This validates that your cluster is working correctly and can handle production workloads.

Overview

We’ll perform the following tests:

  1. Basic connectivity tests - Verify cluster accessibility
  2. Topic management - Create, list, and configure topics
  3. Producer/Consumer tests - Send and receive messages
  4. Performance testing - Load testing with high throughput
  5. Monitoring validation - Verify metrics collection
  6. Multi-AZ validation - Confirm rack awareness

1. Basic Connectivity Tests

List Existing Topics

First, verify that you can connect to the Kafka cluster:

# List topics using kubectl run
kubectl run kafka-topics --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --list

Expected output:

__CruiseControlMetrics
__CruiseControlModelTrainingSamples
__consumer_offsets

Check Cluster Information

# Get cluster metadata
kubectl run kafka-metadata --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-broker-api-versions.sh \
  --bootstrap-server kafka-headless:29092

2. Topic Management

Create a Test Topic

Create a topic with multiple partitions and replicas:

# Create a test topic with 12 partitions distributed across brokers
kubectl run kafka-topics --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic perf_topic \
  --replica-assignment 101:201:301,102:202:302,101:201:301,102:202:302,101:201:301,102:202:302,101:201:301,102:202:302,101:201:301,102:202:302,101:201:301,102:202:302 \
  --create

Describe the Topic

# Describe the topic to verify configuration
kubectl run kafka-topics --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic perf_topic \
  --describe

Expected output:

Topic: perf_topic	TopicId: xyz-123-abc	PartitionCount: 12	ReplicationFactor: 3
	Topic: perf_topic	Partition: 0	Leader: 101	Replicas: 101,201,301	Isr: 101,201,301
	Topic: perf_topic	Partition: 1	Leader: 102	Replicas: 102,202,302	Isr: 102,202,302
	...

Configure Topic Retention

# Set custom retention period (12 minutes for testing)
kubectl run kafka-configs --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-configs.sh \
  --zookeeper zk-client.zookeeper:2181/kafka \
  --alter --entity-name perf_topic \
  --entity-type topics \
  --add-config retention.ms=720000

3. Producer/Consumer Tests

Simple Message Test

Start a Producer

# Start a simple producer (run in one terminal)
kubectl run kafka-producer \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic perf_topic

Type some test messages:

Hello Kafka!
This is a test message
Testing multi-AZ deployment

Start a Consumer

# Start a consumer (run in another terminal)
kubectl run kafka-consumer \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic perf_topic \
  --from-beginning

You should see the messages you sent from the producer.

Clean Up Test Pods

# Clean up the test pods
kubectl delete pod kafka-producer --ignore-not-found=true
kubectl delete pod kafka-consumer --ignore-not-found=true

4. Performance Testing

Producer Performance Test

Run a high-throughput producer test:

# Start producer performance test
kubectl run kafka-producer-perf \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-producer-perf-test.sh \
  --producer-props bootstrap.servers=kafka-headless:29092 acks=all \
  --topic perf_topic \
  --record-size 1000 \
  --throughput 29000 \
  --num-records 2110000

Expected output:

100000 records sent, 28500.0 records/sec (27.18 MB/sec), 2.1 ms avg latency, 45 ms max latency.
200000 records sent, 29000.0 records/sec (27.66 MB/sec), 1.8 ms avg latency, 38 ms max latency.
...

Consumer Performance Test

In another terminal, run a consumer performance test:

# Start consumer performance test
kubectl run kafka-consumer-perf \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-consumer-perf-test.sh \
  --broker-list kafka-headless:29092 \
  --group perf-consume \
  --messages 10000000 \
  --topic perf_topic \
  --show-detailed-stats \
  --from-latest \
  --timeout 100000

Expected output:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-01-15 10:30:00:000, 2024-01-15 10:30:10:000, 95.37, 9.54, 100000, 10000.0, 1500, 8500, 11.22, 11764.7

5. Monitoring Validation

Check Kafka Metrics in Prometheus

# Port forward to Prometheus (if not already done)
kubectl port-forward -n default svc/monitoring-kube-prometheus-prometheus 9090 &

# Check if Kafka metrics are being collected
curl -s "http://localhost:9090/api/v1/query?query=kafka_server_brokertopicmetrics_messagesin_total" | jq .

Access Grafana Dashboard

# Port forward to Grafana (if not already done)
kubectl port-forward -n default svc/monitoring-grafana 3000:80 &

# Get Grafana admin password
kubectl get secret --namespace default monitoring-grafana \
  -o jsonpath="{.data.admin-password}" | base64 --decode
echo ""

Visit http://localhost:3000 and:

  1. Login with admin/[password]
  2. Navigate to Dashboards
  3. Look for “Kafka Looking Glass” dashboard
  4. Verify metrics are being displayed

Check AlertManager

# Port forward to AlertManager
kubectl port-forward -n default svc/monitoring-kube-prometheus-alertmanager 9093 &

Visit http://localhost:9093 to see any active alerts.

6. Multi-AZ Validation

Verify Broker Distribution

Check that brokers are distributed across availability zones:

# Check broker pod distribution
kubectl get pods -n kafka -l kafka_cr=kafka -o wide \
  --sort-by='.spec.nodeName'

# Check node labels
kubectl get nodes \
  --label-columns=topology.kubernetes.io/zone \
  -l topology.kubernetes.io/zone

Verify Rack Awareness

# Check if rack awareness is working by examining topic partition distribution
kubectl run kafka-topics --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic perf_topic \
  --describe

Verify that replicas are distributed across different broker IDs (which correspond to different AZs).

7. Advanced Testing

Test Topic Creation via CRD

Create a topic using Kubernetes CRD:

# Create topic using KafkaTopic CRD
kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: test-topic-crd
  namespace: kafka
spec:
  clusterRef:
    name: kafka
  name: test-topic-crd
  partitions: 6
  replicationFactor: 2
  config:
    "retention.ms": "604800000"
    "cleanup.policy": "delete"
EOF

Verify CRD Topic Creation

# Check KafkaTopic resource
kubectl get kafkatopic -n kafka

# Verify topic exists in Kafka
kubectl run kafka-topics --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --list | grep test-topic-crd

Test Consumer Groups

# Create multiple consumers in the same group
for i in {1..3}; do
  kubectl run kafka-consumer-group-$i \
    --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
    --restart=Never \
    -- /opt/kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka-headless:29092 \
    --topic perf_topic \
    --group test-group &
done

# Check consumer group status
kubectl run kafka-consumer-groups --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server kafka-headless:29092 \
  --group test-group \
  --describe

8. Performance Metrics Summary

After running the performance tests, you should see metrics similar to:

Producer Performance

  • Throughput: 25,000-30,000 records/sec
  • Latency: 1-3 ms average
  • Record Size: 1KB

Consumer Performance

  • Throughput: 10,000+ records/sec
  • Lag: Minimal (< 100 records)

Resource Utilization

  • CPU: 20-40% per broker
  • Memory: 2-3GB per broker
  • Disk I/O: Moderate

9. Cleanup Test Resources

# Clean up performance test pods
kubectl delete pod kafka-producer-perf --ignore-not-found=true
kubectl delete pod kafka-consumer-perf --ignore-not-found=true

# Clean up consumer group pods
for i in {1..3}; do
  kubectl delete pod kafka-consumer-group-$i --ignore-not-found=true
done

# Optionally delete test topics
kubectl delete kafkatopic test-topic-crd -n kafka

Troubleshooting

Producer/Consumer Connection Issues

# Check broker connectivity
kubectl run kafka-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /bin/bash

# Inside the pod, test connectivity
telnet kafka-headless 29092

Performance Issues

# Check broker resource usage
kubectl top pods -n kafka

# Check broker logs
kubectl logs -n kafka kafka-101-xyz123

# Check JVM metrics
kubectl exec -n kafka kafka-101-xyz123 -- jps -v

Monitoring Issues

# Check ServiceMonitor
kubectl get servicemonitor -n kafka

# Check Prometheus targets
curl -s "http://localhost:9090/api/v1/targets" | jq '.data.activeTargets[] | select(.labels.job | contains("kafka"))'

Next Steps

With your Kafka cluster thoroughly tested and validated, you can now explore disaster recovery scenarios. Continue to the Disaster Recovery Scenarios section to test failure handling and recovery procedures.


Note: Keep the performance test results for comparison after implementing any configuration changes or during disaster recovery testing.

7 - Disaster Recovery Scenarios

Disaster Recovery Scenarios

In this section, you’ll test various failure scenarios to understand how the Koperator handles disasters and recovers from failures. This is crucial for understanding the resilience of your Kafka deployment and validating that data persistence works correctly.

Overview

We’ll test the following disaster scenarios:

  1. Broker JVM crash - Process failure within a pod
  2. Broker pod deletion - Kubernetes pod failure
  3. Node failure simulation - Worker node unavailability
  4. Persistent volume validation - Data persistence across failures
  5. Network partition simulation - Connectivity issues
  6. ZooKeeper failure - Dependency service failure

Prerequisites

Before starting disaster recovery tests, ensure you have:

# Verify cluster is healthy
kubectl get kafkacluster kafka -n kafka
kubectl get pods -n kafka -l kafka_cr=kafka

# Create a test topic with data
kubectl run kafka-producer-dr --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic disaster-recovery-test <<EOF
message-1-before-disaster
message-2-before-disaster
message-3-before-disaster
EOF

1. Initial State Documentation

Record Current State

Document the initial state before testing disasters:

echo "=== Initial Kafka Cluster State ==="

# Get broker pods
echo "Kafka Broker Pods:"
kubectl get pods -l kafka_cr=kafka -n kafka -o wide

# Get persistent volumes
echo -e "\nPersistent Volumes:"
kubectl get pv | grep kafka

# Get persistent volume claims
echo -e "\nPersistent Volume Claims:"
kubectl get pvc -n kafka | grep kafka

# Get broker services
echo -e "\nKafka Services:"
kubectl get svc -n kafka | grep kafka

# Save state to file for comparison
kubectl get pods -l kafka_cr=kafka -n kafka -o yaml > /tmp/kafka-pods-initial.yaml
kubectl get pvc -n kafka -o yaml > /tmp/kafka-pvc-initial.yaml

Expected output:

Kafka Broker Pods:
NAME         READY   STATUS    RESTARTS   AGE   IP           NODE
kafka-101    1/1     Running   0          30m   10.244.1.5   kafka-worker
kafka-102    1/1     Running   0          30m   10.244.2.5   kafka-worker2
kafka-201    1/1     Running   0          30m   10.244.3.5   kafka-worker3
kafka-202    1/1     Running   0          30m   10.244.4.5   kafka-worker4
kafka-301    1/1     Running   0          30m   10.244.5.5   kafka-worker5
kafka-302    1/1     Running   0          30m   10.244.6.5   kafka-worker6

2. Broker JVM Crash Test

Simulate JVM Crash

Kill the Java process inside a broker pod:

# Get a broker pod name
BROKER_POD=$(kubectl get pods -n kafka -l kafka_cr=kafka -o jsonpath='{.items[0].metadata.name}')
echo "Testing JVM crash on pod: $BROKER_POD"

# Kill the Java process (PID 1 in the container)
kubectl exec -n kafka $BROKER_POD -- kill 1

# Monitor pod restart
kubectl get pods -n kafka -l kafka_cr=kafka -w
# Press Ctrl+C after observing the restart

Verify Recovery

# Check if pod restarted
kubectl get pods -n kafka -l kafka_cr=kafka

# Verify the same PVC is reused
kubectl describe pod -n kafka $BROKER_POD | grep -A 5 "Volumes:"

# Test data persistence
kubectl run kafka-consumer-dr --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic disaster-recovery-test \
  --from-beginning \
  --timeout-ms 10000

Expected Result: ✅ PASSED - Pod restarts, same PVC is reused, data is preserved.

3. Broker Pod Deletion Test

Delete a Broker Pod

# Get another broker pod
BROKER_POD_2=$(kubectl get pods -n kafka -l kafka_cr=kafka -o jsonpath='{.items[1].metadata.name}')
echo "Testing pod deletion on: $BROKER_POD_2"

# Record the PVC before deletion
kubectl get pod -n kafka $BROKER_POD_2 -o yaml | grep -A 10 "volumes:" > /tmp/pvc-before-deletion.yaml

# Delete the pod
kubectl delete pod -n kafka $BROKER_POD_2

# Monitor recreation
kubectl get pods -n kafka -l kafka_cr=kafka -w
# Press Ctrl+C after new pod is running

Verify Pod Recreation

# Check new pod is running
kubectl get pods -n kafka -l kafka_cr=kafka

# Verify PVC reattachment
NEW_BROKER_POD=$(kubectl get pods -n kafka -l kafka_cr=kafka | grep $BROKER_POD_2 | awk '{print $1}')
kubectl get pod -n kafka $NEW_BROKER_POD -o yaml | grep -A 10 "volumes:" > /tmp/pvc-after-deletion.yaml

# Compare PVC usage
echo "PVC comparison:"
diff /tmp/pvc-before-deletion.yaml /tmp/pvc-after-deletion.yaml || echo "PVCs are identical - Good!"

# Test cluster functionality
kubectl run kafka-test-after-deletion --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --list

Expected Result: ✅ PASSED - New pod created, same PVC reattached, cluster functional.

4. Node Failure Simulation

Cordon and Drain a Node

# Get a worker node with Kafka pods
NODE_WITH_KAFKA=$(kubectl get pods -n kafka -l kafka_cr=kafka -o wide | grep kafka | head -1 | awk '{print $7}')
echo "Simulating failure on node: $NODE_WITH_KAFKA"

# Get pods on this node before cordoning
echo "Pods on node before cordoning:"
kubectl get pods -n kafka -l kafka_cr=kafka -o wide | grep $NODE_WITH_KAFKA

# Cordon the node (prevent new pods)
kubectl cordon $NODE_WITH_KAFKA

# Drain the node (evict existing pods)
kubectl drain $NODE_WITH_KAFKA --ignore-daemonsets --delete-emptydir-data --force

Monitor Pod Rescheduling

# Watch pods being rescheduled
kubectl get pods -n kafka -l kafka_cr=kafka -o wide -w
# Press Ctrl+C after pods are rescheduled

# Verify pods moved to other nodes
echo "Pods after node drain:"
kubectl get pods -n kafka -l kafka_cr=kafka -o wide | grep -v $NODE_WITH_KAFKA

Restore Node

# Uncordon the node
kubectl uncordon $NODE_WITH_KAFKA

# Verify node is ready
kubectl get nodes | grep $NODE_WITH_KAFKA

Expected Result: ✅ PASSED - Pods rescheduled to healthy nodes, PVCs reattached, cluster remains functional.

5. Persistent Volume Validation

Detailed PVC Analysis

echo "=== Persistent Volume Analysis ==="

# List all Kafka PVCs
kubectl get pvc -n kafka | grep kafka

# Check PV reclaim policy
kubectl get pv | grep kafka | head -3

# Verify PVC-PV binding
for pvc in $(kubectl get pvc -n kafka -o jsonpath='{.items[*].metadata.name}' | grep kafka); do
  echo "PVC: $pvc"
  kubectl get pvc -n kafka $pvc -o jsonpath='{.spec.volumeName}'
  echo ""
done

Test Data Persistence Across Multiple Failures

# Create test data
kubectl run kafka-persistence-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic persistence-test <<EOF
persistence-message-1
persistence-message-2
persistence-message-3
EOF

# Delete multiple broker pods simultaneously
kubectl delete pods -n kafka -l kafka_cr=kafka --grace-period=0 --force

# Wait for recreation
kubectl wait --for=condition=Ready pod -l kafka_cr=kafka -n kafka --timeout=300s

# Verify data survived
kubectl run kafka-persistence-verify --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic persistence-test \
  --from-beginning \
  --timeout-ms 10000

Expected Result: ✅ PASSED - All messages preserved across multiple pod deletions.

6. Network Partition Simulation

Create Network Policy to Isolate Broker

# Create a network policy that isolates one broker
kubectl apply -n kafka -f - <<EOF
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: isolate-broker
  namespace: kafka
spec:
  podSelector:
    matchLabels:
      brokerId: "101"
  policyTypes:
  - Ingress
  - Egress
  ingress: []
  egress: []
EOF

Monitor Cluster Behavior

# Check cluster state during network partition
kubectl run kafka-network-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic persistence-test \
  --describe

# Check under-replicated partitions
kubectl logs -n kafka kafka-101 | grep -i "under.replicated" | tail -5

Remove Network Partition

# Remove the network policy
kubectl delete networkpolicy isolate-broker -n kafka

# Verify cluster recovery
sleep 30
kubectl run kafka-recovery-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic persistence-test \
  --describe

Expected Result: ✅ PASSED - Cluster detects partition, maintains availability, recovers when partition is resolved.

7. ZooKeeper Failure Test

Scale Down ZooKeeper

# Check current ZooKeeper state
kubectl get pods -n zookeeper

# Scale down ZooKeeper to 1 replica (simulating failure)
kubectl patch zookeepercluster zk -n zookeeper --type='merge' -p='{"spec":{"replicas":1}}'

# Monitor Kafka behavior
kubectl logs -n kafka kafka-101 | grep -i zookeeper | tail -10

Test Kafka Functionality During ZK Degradation

# Try to create a topic (should fail or be delayed)
timeout 30 kubectl run kafka-zk-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic zk-failure-test \
  --create --partitions 3 --replication-factor 2 || echo "Topic creation failed as expected"

# Test existing topic access (should still work)
kubectl run kafka-existing-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic persistence-test <<EOF
message-during-zk-failure
EOF

Restore ZooKeeper

# Scale ZooKeeper back to 3 replicas
kubectl patch zookeepercluster zk -n zookeeper --type='merge' -p='{"spec":{"replicas":3}}'

# Wait for ZooKeeper recovery
kubectl wait --for=condition=Ready pod -l app=zookeeper -n zookeeper --timeout=300s

# Verify Kafka functionality restored
kubectl run kafka-zk-recovery --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --topic zk-recovery-test \
  --create --partitions 3 --replication-factor 2

Expected Result: ✅ PASSED - Kafka maintains existing functionality during ZK degradation, full functionality restored after ZK recovery.

8. Disaster Recovery Summary

Generate Recovery Report

echo "=== Disaster Recovery Test Summary ==="
echo "Test Date: $(date)"
echo ""

echo "1. Broker JVM Crash Test: PASSED"
echo "   - Pod restarted automatically"
echo "   - PVC reused successfully"
echo "   - Data preserved"
echo ""

echo "2. Broker Pod Deletion Test: PASSED"
echo "   - New pod created automatically"
echo "   - PVC reattached successfully"
echo "   - Cluster remained functional"
echo ""

echo "3. Node Failure Simulation: PASSED"
echo "   - Pods rescheduled to healthy nodes"
echo "   - PVCs reattached successfully"
echo "   - No data loss"
echo ""

echo "4. Persistent Volume Validation: PASSED"
echo "   - Data survived multiple pod deletions"
echo "   - PVC-PV bindings maintained"
echo "   - Storage reclaim policy working"
echo ""

echo "5. Network Partition Test: PASSED"
echo "   - Cluster detected partition"
echo "   - Maintained availability"
echo "   - Recovered after partition resolution"
echo ""

echo "6. ZooKeeper Failure Test: PASSED"
echo "   - Existing functionality maintained during ZK degradation"
echo "   - Full functionality restored after ZK recovery"
echo ""

# Final cluster health check
echo "=== Final Cluster Health Check ==="
kubectl get kafkacluster kafka -n kafka
kubectl get pods -n kafka -l kafka_cr=kafka
kubectl get pvc -n kafka | grep kafka

9. Recovery Time Objectives (RTO) Analysis

Based on the tests, typical recovery times are:

  • JVM Crash Recovery: 30-60 seconds
  • Pod Deletion Recovery: 60-120 seconds
  • Node Failure Recovery: 2-5 minutes
  • Network Partition Recovery: 30-60 seconds
  • ZooKeeper Recovery: 1-3 minutes

10. Cleanup

# Clean up test topics
kubectl run kafka-cleanup --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --delete --topic disaster-recovery-test

kubectl run kafka-cleanup-2 --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --delete --topic persistence-test

# Remove temporary files
rm -f /tmp/kafka-pods-initial.yaml /tmp/kafka-pvc-initial.yaml
rm -f /tmp/pvc-before-deletion.yaml /tmp/pvc-after-deletion.yaml

Next Steps

With disaster recovery scenarios tested and validated, you now have confidence in your Kafka cluster’s resilience. Continue to the Troubleshooting section to learn about common issues and debugging techniques.


Key Takeaway: The Koperator provides excellent resilience with automatic recovery, persistent data storage, and minimal downtime during various failure scenarios.

8 - Troubleshooting Guide

Troubleshooting Guide

This section provides comprehensive troubleshooting guidance for common issues you might encounter during the Kafka deployment and operation. It includes diagnostic commands, common error patterns, and resolution strategies.

Overview

Common categories of issues:

  1. Cluster Setup Issues - Problems during initial deployment
  2. Connectivity Issues - Network and service discovery problems
  3. Performance Issues - Throughput and latency problems
  4. Storage Issues - Persistent volume and disk problems
  5. Monitoring Issues - Metrics collection and dashboard problems
  6. Operator Issues - Koperator-specific problems

1. Diagnostic Commands

Essential Debugging Commands

# Set namespace context for convenience
kubectl config set-context --current --namespace=kafka

# Quick cluster health check
echo "=== Cluster Health Overview ==="
kubectl get kafkacluster kafka -o wide
kubectl get pods -l kafka_cr=kafka
kubectl get svc | grep kafka
kubectl get pvc | grep kafka

Detailed Diagnostics

# Comprehensive cluster diagnostics
function kafka_diagnostics() {
    echo "=== Kafka Cluster Diagnostics ==="
    echo "Timestamp: $(date)"
    echo ""
    
    echo "1. KafkaCluster Resource:"
    kubectl describe kafkacluster kafka
    echo ""
    
    echo "2. Broker Pods:"
    kubectl get pods -l kafka_cr=kafka -o wide
    echo ""
    
    echo "3. Pod Events:"
    kubectl get events --sort-by=.metadata.creationTimestamp | grep kafka | tail -10
    echo ""
    
    echo "4. Persistent Volumes:"
    kubectl get pv | grep kafka
    echo ""
    
    echo "5. Services:"
    kubectl get svc | grep kafka
    echo ""
    
    echo "6. Operator Status:"
    kubectl get pods -l app.kubernetes.io/instance=kafka-operator
    echo ""
}

# Run diagnostics
kafka_diagnostics

2. Cluster Setup Issues

Issue: Koperator Pod Not Starting

Symptoms:

  • Operator pod in CrashLoopBackOff or ImagePullBackOff
  • KafkaCluster resource not being processed

Diagnosis:

# Check operator pod status
kubectl get pods -l app.kubernetes.io/instance=kafka-operator

# Check operator logs
kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager --tail=50

# Check operator events
kubectl describe pod -l app.kubernetes.io/instance=kafka-operator

Common Solutions:

# 1. Restart operator deployment
kubectl rollout restart deployment kafka-operator-operator

# 2. Check RBAC permissions
kubectl auth can-i create kafkaclusters --as=system:serviceaccount:kafka:kafka-operator-operator

# 3. Reinstall operator
helm uninstall kafka-operator -n kafka
helm install kafka-operator oci://ghcr.io/adobe/helm-charts/kafka-operator \
  --namespace=kafka \
  --set webhook.enabled=false \
  --version 0.28.0-adobe-20250923

Issue: Kafka Brokers Not Starting

Symptoms:

  • Broker pods stuck in Pending or Init state
  • Brokers failing health checks

Diagnosis:

# Check broker pod status
kubectl get pods -l kafka_cr=kafka -o wide

# Check specific broker logs
BROKER_POD=$(kubectl get pods -l kafka_cr=kafka -o jsonpath='{.items[0].metadata.name}')
kubectl logs $BROKER_POD --tail=100

# Check pod events
kubectl describe pod $BROKER_POD

Common Solutions:

# 1. Check resource constraints
kubectl describe nodes | grep -A 5 "Allocated resources"

# 2. Check storage class
kubectl get storageclass

# 3. Check ZooKeeper connectivity
kubectl run zk-test --rm -i --tty=true \
  --image=busybox \
  --restart=Never \
  -- telnet zk-client.zookeeper 2181

# 4. Force broker recreation
kubectl delete pod $BROKER_POD

3. Connectivity Issues

Issue: Cannot Connect to Kafka Cluster

Symptoms:

  • Timeout errors when connecting to Kafka
  • DNS resolution failures

Diagnosis:

# Test DNS resolution
kubectl run dns-test --rm -i --tty=true \
  --image=busybox \
  --restart=Never \
  -- nslookup kafka-headless.kafka.svc.cluster.local

# Test port connectivity
kubectl run port-test --rm -i --tty=true \
  --image=busybox \
  --restart=Never \
  -- telnet kafka-headless 29092

# Check service endpoints
kubectl get endpoints kafka-headless

Solutions:

# 1. Verify service configuration
kubectl get svc kafka-headless -o yaml

# 2. Check network policies
kubectl get networkpolicy -A

# 3. Restart CoreDNS (if DNS issues)
kubectl rollout restart deployment coredns -n kube-system

Issue: External Access Not Working

Diagnosis:

# Check external services
kubectl get svc | grep LoadBalancer

# Check ingress configuration
kubectl get ingress -A

# Test external connectivity
kubectl run external-test --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-broker-api-versions.sh \
  --bootstrap-server <EXTERNAL_IP>:<EXTERNAL_PORT>

4. Performance Issues

Issue: High Latency or Low Throughput

Diagnosis:

# Check broker resource usage
kubectl top pods -n kafka

# Check JVM metrics
kubectl exec -n kafka $BROKER_POD -- jstat -gc 1

# Check disk I/O
kubectl exec -n kafka $BROKER_POD -- iostat -x 1 5

# Check network metrics
kubectl exec -n kafka $BROKER_POD -- ss -tuln

Performance Tuning:

# 1. Increase broker resources
kubectl patch kafkacluster kafka --type='merge' -p='
{
  "spec": {
    "brokerConfigGroups": {
      "default": {
        "resourceRequirements": {
          "requests": {
            "cpu": "2",
            "memory": "4Gi"
          },
          "limits": {
            "cpu": "4",
            "memory": "8Gi"
          }
        }
      }
    }
  }
}'

# 2. Optimize JVM settings
kubectl patch kafkacluster kafka --type='merge' -p='
{
  "spec": {
    "brokerConfigGroups": {
      "default": {
        "jvmPerformanceOpts": "-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Xms4g -Xmx4g"
      }
    }
  }
}'

Issue: Disk Space Problems

Diagnosis:

# Check disk usage in broker pods
for pod in $(kubectl get pods -l kafka_cr=kafka -o jsonpath='{.items[*].metadata.name}'); do
  echo "=== $pod ==="
  kubectl exec $pod -- df -h /kafka-logs
done

# Check PVC usage
kubectl get pvc | grep kafka

Solutions:

# 1. Increase PVC size (if storage class supports expansion)
kubectl patch pvc kafka-101-storage-0 -p='{"spec":{"resources":{"requests":{"storage":"20Gi"}}}}'

# 2. Configure log retention
kubectl patch kafkacluster kafka --type='merge' -p='
{
  "spec": {
    "brokerConfigGroups": {
      "default": {
        "config": {
          "log.retention.hours": "168",
          "log.segment.bytes": "1073741824",
          "log.retention.check.interval.ms": "300000"
        }
      }
    }
  }
}'

5. Monitoring Issues

Issue: Metrics Not Appearing in Prometheus

Diagnosis:

# Check ServiceMonitor
kubectl get servicemonitor -n kafka

# Check Prometheus targets
kubectl port-forward -n default svc/monitoring-kube-prometheus-prometheus 9090 &
curl -s "http://localhost:9090/api/v1/targets" | jq '.data.activeTargets[] | select(.labels.job | contains("kafka"))'

# Check metrics endpoints
kubectl exec -n kafka $BROKER_POD -- curl -s localhost:9020/metrics | head -10

Solutions:

# 1. Recreate ServiceMonitor
kubectl delete servicemonitor kafka-servicemonitor -n kafka
kubectl apply -n kafka -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka-servicemonitor
  namespace: kafka
  labels:
    app: kafka
    release: monitoring
spec:
  selector:
    matchLabels:
      app: kafka
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
EOF

# 2. Check Prometheus configuration
kubectl get prometheus -o yaml | grep -A 10 serviceMonitorSelector

Issue: Grafana Dashboard Not Loading

Diagnosis:

# Check Grafana pod
kubectl get pods -l app.kubernetes.io/name=grafana

# Check dashboard ConfigMap
kubectl get configmap -l grafana_dashboard=1

# Check Grafana logs
kubectl logs -l app.kubernetes.io/name=grafana

Solutions:

# 1. Restart Grafana
kubectl rollout restart deployment monitoring-grafana

# 2. Recreate dashboard ConfigMap
kubectl delete configmap kafka-looking-glass-dashboard
# Then recreate using the configuration from the deployment section

6. Operator Issues

Issue: KafkaCluster Resource Not Reconciling

Diagnosis:

# Check operator logs for errors
kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager --tail=100

# Check KafkaCluster status
kubectl describe kafkacluster kafka

# Check operator events
kubectl get events --field-selector involvedObject.kind=KafkaCluster

Solutions:

# 1. Restart operator
kubectl rollout restart deployment kafka-operator-operator

# 2. Check CRD versions
kubectl get crd kafkaclusters.kafka.banzaicloud.io -o yaml | grep version

# 3. Force reconciliation
kubectl annotate kafkacluster kafka kubectl.kubernetes.io/restartedAt="$(date +%Y-%m-%dT%H:%M:%S%z)"

7. Common Error Patterns

Error: “No space left on device”

Solution:

# Check disk usage
kubectl exec -n kafka $BROKER_POD -- df -h

# Clean up old log segments
kubectl exec -n kafka $BROKER_POD -- find /kafka-logs -name "*.log" -mtime +7 -delete

# Increase PVC size or configure retention

Error: “Connection refused”

Solution:

# Check if broker is listening
kubectl exec -n kafka $BROKER_POD -- netstat -tuln | grep 9092

# Check broker configuration
kubectl exec -n kafka $BROKER_POD -- cat /opt/kafka/config/server.properties | grep listeners

# Restart broker if needed
kubectl delete pod $BROKER_POD

Error: “ZooKeeper connection timeout”

Solution:

# Check ZooKeeper status
kubectl get pods -n zookeeper

# Test ZooKeeper connectivity
kubectl run zk-test --rm -i --tty=true \
  --image=busybox \
  --restart=Never \
  -- telnet zk-client.zookeeper 2181

# Check ZooKeeper logs
kubectl logs -n zookeeper zk-0

8. Monitoring Access

Quick Access to Monitoring Tools

# Function to start all monitoring port-forwards
function start_monitoring() {
    echo "Starting monitoring port-forwards..."
    
    # Prometheus
    kubectl port-forward -n default svc/monitoring-kube-prometheus-prometheus 9090 &
    echo "Prometheus: http://localhost:9090"
    
    # Grafana
    kubectl port-forward -n default svc/monitoring-grafana 3000:80 &
    echo "Grafana: http://localhost:3000"
    echo "Grafana password: $(kubectl get secret --namespace default monitoring-grafana -o jsonpath="{.data.admin-password}" | base64 --decode)"
    
    # AlertManager
    kubectl port-forward -n default svc/monitoring-kube-prometheus-alertmanager 9093 &
    echo "AlertManager: http://localhost:9093"
    
    # Cruise Control
    kubectl port-forward -n kafka svc/kafka-cruisecontrol-svc 8090:8090 &
    echo "Cruise Control: http://localhost:8090"
    
    echo "All monitoring tools are now accessible!"
}

# Run the function
start_monitoring

Stop All Port-Forwards

# Function to stop all port-forwards
function stop_monitoring() {
    echo "Stopping all port-forwards..."
    pkill -f "kubectl port-forward"
    echo "All port-forwards stopped."
}

9. Emergency Procedures

Complete Cluster Reset

# WARNING: This will delete all data!
function emergency_reset() {
    echo "WARNING: This will delete all Kafka data!"
    read -p "Are you sure? (yes/no): " confirm
    
    if [ "$confirm" = "yes" ]; then
        # Delete KafkaCluster
        kubectl delete kafkacluster kafka -n kafka
        
        # Delete all Kafka pods
        kubectl delete pods -l kafka_cr=kafka -n kafka --force --grace-period=0
        
        # Delete PVCs (this deletes data!)
        kubectl delete pvc -l app=kafka -n kafka
        
        # Recreate cluster
        echo "Recreate your KafkaCluster resource to start fresh"
    else
        echo "Reset cancelled"
    fi
}

Backup Critical Data

# Backup ZooKeeper data
kubectl exec -n zookeeper zk-0 -- tar czf /tmp/zk-backup.tar.gz /data

# Copy backup locally
kubectl cp zookeeper/zk-0:/tmp/zk-backup.tar.gz ./zk-backup-$(date +%Y%m%d).tar.gz

# Backup Kafka topic metadata
kubectl run kafka-backup --rm -i --tty=true \
  --image=ghcr.io/adobe/koperator/kafka:2.13-3.9.1 \
  --restart=Never \
  -- /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-headless:29092 \
  --list > kafka-topics-backup-$(date +%Y%m%d).txt

10. Getting Help

Collect Support Information

# Generate support bundle
function collect_support_info() {
    local output_dir="kafka-support-$(date +%Y%m%d-%H%M%S)"
    mkdir -p $output_dir
    
    # Cluster information
    kubectl cluster-info > $output_dir/cluster-info.txt
    kubectl get nodes -o wide > $output_dir/nodes.txt
    
    # Kafka resources
    kubectl get kafkacluster kafka -n kafka -o yaml > $output_dir/kafkacluster.yaml
    kubectl get pods -n kafka -o wide > $output_dir/kafka-pods.txt
    kubectl get svc -n kafka > $output_dir/kafka-services.txt
    kubectl get pvc -n kafka > $output_dir/kafka-pvcs.txt
    
    # Logs
    kubectl logs -l app.kubernetes.io/instance=kafka-operator -c manager --tail=1000 > $output_dir/operator-logs.txt
    
    # Events
    kubectl get events -n kafka --sort-by=.metadata.creationTimestamp > $output_dir/kafka-events.txt
    
    # Create archive
    tar czf $output_dir.tar.gz $output_dir
    rm -rf $output_dir
    
    echo "Support bundle created: $output_dir.tar.gz"
}

# Run support collection
collect_support_info

Next Steps

You’ve now completed the comprehensive Kafka on Kubernetes tutorial! For production deployments, consider:

  1. Security hardening - Enable SSL/TLS, SASL authentication
  2. Backup strategies - Implement regular data backups
  3. Monitoring alerts - Configure production alerting rules
  4. Capacity planning - Size resources for production workloads
  5. Disaster recovery - Plan for multi-region deployments

Remember: Always test changes in a development environment before applying them to production clusters.