Home Kafka notes
Post
Cancel

Kafka notes

Kafka

Scripts directory

1
/opt/bitnami/kafka/bin

Logging into the Kafka Container

1
docker exec -it kafka-broker /bin/bash
1
cd /opt/bitnami/kafka/bin

Creating new Topics

1
2
3
4
5
6
7
8
9
10
11
12
13
./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --topic kafka.learning.tweets \
    --partitions 1 \
    --replication-factor 1

./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --topic kafka.learning.alerts \
    --partitions 1 \
    --replication-factor 1

Listing Topics

1
2
3
./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --list

Getting details about a Topic

1
2
3
./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --describe

Publishing Messages to Topics

1
2
3
./kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic kafka.learning.tweets

Consuming Messages from Topics

1
2
3
4
./kafka-console-consumer.sh \
    --bootstrap-server localhost:29092 \
    --topic kafka.learning.tweets \
    --from-beginning

Deleting Topics

1
2
3
4
./kafka-topics.sh \
    --bootstrap-server localhost:29092 \
    --delete \
    --topic kafka.learning.alerts

Create a Topic with multiple partitions

1
2
3
4
5
6
./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --topic kafka.learning.orders \
    --partitions 3 \
    --replication-factor 1

Check topic partitioning

1
2
3
4
./kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --topic kafka.learning.orders \
    --describe

Publishing Messages to Topics with keys

1
2
3
4
5
./kafka-console-producer.sh \
    --bootstrap-server localhost:29092 \
    --property "parse.key=true" \
    --property "key.separator=:" \
    --topic kafka.learning.orders

Consume messages using a consumer group

1
2
3
4
5
6
7
./kafka-console-consumer.sh \
    --bootstrap-server localhost:29092 \
    --topic kafka.learning.orders \
    --group test-consumer-group \
    --property print.key=true \
    --property key.separator=" = " \
    --from-beginning

Check current status of offsets

1
2
3
4
./kafka-consumer-groups.sh \
    --bootstrap-server localhost:29092 \
    --describe \
    --all-groups

Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
version: '2'
services:

#Zookeeper Service.
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    restart: "no"
    ports:
      - '2181:2181'
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    container_name: zookeeper

#Kafka Service
  kafka:
    image: 'bitnami/kafka:latest'
    restart: "no"
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=INTERNAL://:29092,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://docker-pve.localdomain:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - ALLOW_PLAINTEXT_LISTENER=yes
      
    container_name: kafka-broker
    
    depends_on:
      - "zookeeper"
    
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Errors

Connection problems

  • thrd:localhost:9092/1001]: localhost:9092/1001: Connect to ipv4#127.0.0.1:9092 failed: Unknown error (after 2047ms in state CONNECT)

  • The advertised broker address is set to localhost, needs docker host DNS address
  • Update this line to contain docker address instead, example, KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://docker-pve.localdomain:9092

C# Sample code

1
Install-Package 'Confluent.Kafka'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public static async Task PublishAsync()
{
    var config = new ProducerConfig
    {
        BootstrapServers = "docker-pve.localdomain:9092",
        ClientId = Dns.GetHostName()
    };


    using (var producer = new ProducerBuilder<Null, string>(config).Build())
    {
        for (var i = 0; i < 2000; i++)
        {
            await producer.ProduceAsync("weblogs", new Message<Null, string> { Value = $"hello world - {i}" }).ContinueWith(task =>
            {
                if (task.IsFaulted)
                {

                }
                else
                {

                    Console.WriteLine($"Wrote to offset: {task.Result.Offset}");
                }
            });
        }
    }
}

public static void ConsumeAsync()
{
    var config = new ConsumerConfig
    {
        BootstrapServers = "docker-pve.localdomain:9092",
        GroupId = "foo",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    bool cancelled = false;
    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        consumer.Subscribe("weblogs");

        while (!cancelled)
        {
            var result = consumer.Consume(100);

            if (result != null)
            {
                var msg = result.Message.Value;
                Console.WriteLine(msg);
            }

        }

        consumer.Close();
    }
}
This post is licensed under CC BY 4.0 by the author.