Skip to main content

Your submission was sent successfully! Close

Thank you for signing up for our newsletter!
In these regular emails you will find the latest updates from Canonical and upcoming events where you can meet our team.Close

Thank you for contacting us. A member of our team will be in touch shortly. Close

This is part of the Charmed Kafka K8s Tutorial. Please refer to the overview page for more information and the overview of the content.

Partition rebalancing and reassignment

By default, when scaling-out a Kafka cluster, partitions allocated to the original brokers are not reallocated to make use of any new storage volumes and brokers, which can result in over-provisioning of resources. The inverse is also true when scaling-in, which can result in under-replicated partitions at best, and permanent data loss at worst.

To address this, we can make use of LinkedIn’s Cruise Control, which is bundled as part of the Charmed Kafka Snap and rock.

At a high level, Cruise Control is made up of the following five components:

  • Workload Monitor - responsible for the metrics collection from Kafka
  • Analyzer - generates allocation proposals based on configured Goals
  • Anomaly Detector - detects failures in brokers, disks, metrics or goals and (optionally) self-heals
  • Webserver - a REST API for user operations
  • Executor - issues re-allocation commands to Kafka

Deploying partition balancer

The Charmed Kafka charms have a configuration option roles, which takes a list of possible values. Different roles can be configured to run on the same machine, or as separate Juju applications.

The two possible roles are:

  • broker - running Apache Kafka
  • balancer - running Cruise Control

Note: It is recommended to deploy a separate Juju application for running Cruise Control in production environments.

For the purposes of this tutorial, we will be deploying a single Charmed Kafka K8s unit to serve as the balancer:

juju deploy kafka-k8s --config roles=balancer -n 1 cruise-control

Earlier in the tutorial, we covered enabling TLS encryption, so we will repeat that step here for the new cruise-control application:

juju relate cruise-control:certificates self-signed-certificates

Now, to make the new cruise-control application aware of the existing Kafka cluster, we will relate the two applications using the peer_cluster relation interface, ensuring that the broker cluster is using the peer-cluster relation-endpoint, and the balancer cluster is using the peer-cluster-orchestrator relation-endpoint:

juju relate kafka-k8s:peer-cluster cruise-control:peer-cluster-orchestrator

Adding new brokers

Let’s assume that we have three kafka-k8s units already, and have an active client application writing messages to an existing topic (as per the Relate applications tutorial page earlier). Let’s scale the kafka-k8s application out to four units:

juju scale-application kafka-k8s 4

We can see that, by default, no partitions are allocated by checking the log directory assignment for the new unit 3:

juju ssh --container kafka kafka-k8s/leader \
    '/opt/kafka/bin/kafka-log-dirs.sh' \
    '--describe' \
    '--bootstrap-server localhost:9093' \
    '--command-config /etc/kafka/client.properties' \
    | tail -n +3 | jq -c '.brokers[] | select(.broker == 3)' | jq

This should produce output similar to the result seen below, with no partitions allocated by default:

{
  "broker": 3,
  "logDirs": [
    {
      "error": null,
      "logDir": "/var/lib/kafka/data",
      "partitions": []
    }
  ]
}

Now, to rebalance some existing partitions from brokers 0, 1 and 2, let’s allocate them to broker 3:

juju run cruise-control/0 rebalance mode=add brokerid=3 --wait=2m

NOTE - If this action fails with a message similar to Cruise Control balancer service has not yet collected enough data to provide a partition reallocation proposal, wait 20 minutes or so and try again. Cruise Control takes a while to collect sufficient metrics from the Kafka cluster during a cold deployment

By default, the rebalance action runs as a ‘dryrun’, where the returned result is what would happen were the partition rebalance actually executed. In the action result, one can find some rich information on the proposed allocation.

For example, the summary section might look similar to this:

summary:
  datatomovemb: "0"
  excludedbrokersforleadership: '[]'
  excludedbrokersforreplicamove: '[]'
  excludedtopics: '[]'
  intrabrokerdatatomovemb: "0"
  monitoredpartitionspercentage: "100.0"
  numintrabrokerreplicamovements: "0"
  numleadermovements: "0"
  numreplicamovements: "76"
  ondemandbalancednessscoreafter: "78.8683072916115"
  ondemandbalancednessscorebefore: "68.01755475998335"
  provisionrecommendation: ""
  provisionstatus: RIGHT_SIZED
  recentwindows: "1"

If we are happy with this proposal, we can re-run the action, but this time instructing the charm to actually execute the proposal:

juju run cruise-control/0 rebalance mode=add dryrun=false brokerid=3 --wait=10m

Partition rebalances can take quite some time. To monitor the progress, in a separate terminal session, you can check the Juju debug logs to see it in progress:

unit-cruise-control-0: 22:18:41 INFO unit.cruise-control/0.juju-log Waiting for task execution to finish for user_task_id='d3e426a3-6c2e-412e-804c-8a677f2678af'...
unit-cruise-control-0: 22:18:51 INFO unit.cruise-control/0.juju-log Waiting for task execution to finish for user_task_id='d3e426a3-6c2e-412e-804c-8a677f2678af'...
unit-cruise-control-0: 22:19:02 INFO unit.cruise-control/0.juju-log Waiting for task execution to finish for user_task_id='d3e426a3-6c2e-412e-804c-8a677f2678af'...
unit-cruise-control-0: 22:19:12 INFO unit.cruise-control/0.juju-log Waiting for task execution to finish for user_task_id='d3e426a3-6c2e-412e-804c-8a677f2678af'...
...

Once it has been completed, and the action has returned a result, you can now verify the partition movement using the same commands as before:

juju ssh --container kafka kafka-k8s/leader \
    '/opt/kafka/bin/kafka-log-dirs.sh' \
    '--describe' \
    '--bootstrap-server localhost:9093' \
    '--command-config /etc/kafka/client.properties' \
    | tail -n +3 | jq -c '.brokers[] | select(.broker == 3)' | jq

This should produce an output similar to the result seen below, with broker 3 now having assigned partitions present, completing the adding of a new broker to the cluster:

{
  "broker": 3,
  "logDirs": [
    {
      "partitions": [
        {
          "partition": "__KafkaCruise ControlModelTrainingSamples-10",
          "size": 0,
          "offsetLag": 0,
          "isFuture": false
        },
        ...

Removing old brokers

Following on from the previous example of Adding new brokers, let’s proceed to safely remove the new broker unit. Before scaling down the Juju application, we must make sure to carefully move any existing data from units about to be removed, to another unit.

In practice, this means running a rebalance Juju action as seen above, BEFORE scaling down the application. This ensures that data is moved, prior to the unit becoming unreachable and permanently losing the data on it.

NOTE: As partition data is replicated across a finite number of units based on the value of the Kafka cluster’s replication.factor property (default value is 3), it is imperative to remove only one broker at a time, so as to avoid losing all available replicas for a given partition.

To remove the most recent broker unit 3 from the previous example, re-run the action with a new parameter value of mode=remove:

juju run cruise-control/0 rebalance mode=remove dryrun=false brokerid=3 --wait=10m

This does not remove the unit, but moves the partitions from the broker on unit number 3 to other brokers within the cluster.

Once the action has been completed, we can verify that broker 3 no longer has any assigned partitions by doing the following:

juju ssh --container kafka kafka-k8s/leader \
    '/opt/kafka/bin/kafka-log-dirs.sh' \
    '--describe' \
    '--bootstrap-server localhost:9093' \
    '--command-config /etc/kafka/client.properties' \
    | tail -n +3 | jq -c '.brokers[] | select(.broker == 3)' | jq

This should result something similar to the result seen below, with broker 3 now having no partitions assigned, completing the safe reassignment of a broker’s data to other brokers in the cluster

{
  "broker": 3,
  "logDirs": [
    {
      "partitions": [],
      "error": null,
      "logDir": "/var/lib/kafka/data"
    }
  ]
}

Now, it is safe to scale-in the cluster, removing broker number 3:

juju scale-application kafka-k8s 3

Full cluster rebalancing

If a cluster has been running in production for some time, partition allocations may be imbalanced. This can happen naturally as topic load varies, topics are reconfigured to have more/less partitions, or new topics are added/removed etc. As such, it is expected that periodically during the lifetime of a deployment, an administrator will need to redistribute the partition allocation across the existing broker units as part of cluster maintenance.

Unlike Adding new brokers or Removing old brokers, this includes a full re-shuffle of partition allocation across all currently live broker units.

To achieve this, re-run the action with a new parameter value of mode=full. Let’s try it with a dryrun (by default) for now:

juju run cruise-control/0 rebalance mode=full --wait=10m

Looking at the bottom of the output, we can see the value of the balancedness score before and after the proposed ‘full’ rebalance:

summary:
  ...
  ondemandbalancednessscoreafter: "90.06926434109423"
  ondemandbalancednessscorebefore: "85.15942156660185"
  ...

To implement the proposed changes, run:

juju run cruise-control/0 rebalance mode=full dryrun=false --wait=10m

Last updated 2 months ago. Help improve this document in the forum.