Consumer Manager¶
This kafka tool provides the ability to view and manipulate consumer offsets for a specific consumer group. For a given cluster, this tool provides us with the following functionalities:
- Manipulating consumer-groups: Listing consumer-groups subscribed to the cluster. Copying, deleting and renaming of the group.
- Manipulating offsets: For a given consumer-group, fetching current offsets, low and high watermarks for topics and partitions subscribed to the group. Setting, advancing, rewinding, saving and restoring of current-offsets.
- Manipulating topics: For a given consumer-group and cluster, listing and unsubscribing topics.
- Offset storage choice: Supports Kafka 0.8.2 and 0.9.0, using offsets stored in either Zookeeper or Kafka. Version 0 and 2 of the Kafka Protocol are supported for committing offsets.
Subcommands¶
- copy_group
- delete_group
- list_groups
- list_topics
- offset_advance
- offset_get
- offset_restore
- offset_rewind
- offset_save
- offset_set
- rename_group
- unsubscribe_topics
Listing consumer groups¶
The list_groups
command shows all of the consumer groups that exist in
the cluster.
$ kafka-consumer-manager --cluster-type=test list_groups
Consumer Groups:
group1
group2
group3
If list_groups
is called with the --storage
option, then the groups will
only be fetched from Zookeeper or Kafka.
Listing topics¶
For information about the topics subscribed by a consumer group, the list_topics subcommand can be used.
$ kafka-consumer-manager --cluster-type=test list_topics group3
Consumer Group ID: group3
Topic: topic_foo
Partitions: [0, 1, 2, 3, 4, 5]
Topic: topic_bar
Partitions: [0, 1, 2]
Getting consumer offsets¶
The offset_get
subcommand gets information about a specific consumer group.
The most basic usage is to call offset_get
with a consumer group id.
$ kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_get my_group
Cluster name: my_cluster, consumer group: my_group
Topic Name: topic1
Partition ID: 0
High Watermark: 787656
Low Watermark: 787089
Current Offset: 787645
The offsets for all topics in the consumer group will be shown by default.
A single topic can be specified using the --topic
option. If a topic is
specified, then a list of partitions can also be specified using the
--partitions
option.
By default, the offsets will be fetched from both Zookeeper and Kafka’s
internal offset storage. A specific offset storage location can be speficied
using the --storage
option.
Manipulating consumer offsets¶
The offsets for a consumer group can also be saved into a json file.
$ kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_save my_group my_offsets.json
Cluster name: my_cluster, consumer group: my_group
Consumer offset data saved in json-file my_offsets.json
The save offsets file can then be used to restore the consumer group.
$ kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_restore my_offsets.json
Restored to new offsets {u'topic1': {0: 425447}}
The offsets can also be set directly using the offset_set
command. This
command takes a group id, and a set of topics, partitions, and offsets.
$ kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_set my_group topic1.0.38531
There is also an offset_advance
command, which will advance the current offset
to the same value as the high watermark of a topic, and an offset_rewind
command, which will rewind to the low watermark.
If the offset needs to be modified for a consumer group does not already
exist, then the --force
option can be used. This option can be used with
offset_set
, offset_rewind
, and offset_advance
.
Copying or renaming consumer group¶
Consumer groups can have metadata copied into a new group using the
copy_group
subcommand.
$ kafka-consumer-manager --cluster-type=test copy_group my_group1 my_group2
They can be renamed using rename_group
.
$ kafka-consumer-manager --cluster-type=test rename_group my_group1 my_group2
When the group is copied, if a topic is specified using the --topic
option,
then only the offsets for that topic will be copied. If a topic is specified,
then a set of partitions of that topic can also be specified using the
--partitions
option.
Deleting or unsubscribing consumer groups¶
A consumer group can be deleted using the delete_group
subcommand.
$ kafka-consumer-manager --cluster-type=test delete_group my_group
A consumer group be unsubscribed from topics using the unsubscribe_topics
subcommand. If a single topic is specified using the --topic
option, then
the group will be unsubscribed from only that topic.