Skip to content

Commit a3d7e77

Browse files
committed
Simple setup with a producer and a consumer
0 parents  commit a3d7e77

File tree

7 files changed

+287
-0
lines changed

7 files changed

+287
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
target
3+
*.iml

README.md

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
## Start Kafka and Zookeeper
2+
3+
Start the Kafka and Zookeeper containers
4+
```shell script
5+
docker-compose up -d
6+
```
7+
8+
## SSH into Kafka container
9+
10+
SSH into the Kafka container
11+
```shell script
12+
docker exec -it kafka /bin/sh
13+
```
14+
15+
## Kafka CLI examples
16+
#### Simple producer and consumer
17+
18+
Create a `orders` topic
19+
```shell script
20+
$KAFKA_HOME/bin/kafka-topics.sh --create --topic orders --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
21+
```
22+
Write messages to the topic
23+
```shell script
24+
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=orders --broker-list localhost:9092
25+
```
26+
Consume messages sent by the producer from beginning
27+
```shell script
28+
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic=orders --from-beginning --bootstrap-server localhost:9092
29+
```
30+
31+
#### Producer and consumer within a consumer group
32+
33+
Create a `locations` topic
34+
```shell script
35+
$KAFKA_HOME/bin/kafka-topics.sh --create --topic locations --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
36+
```
37+
Write messages to the topic
38+
```shell script
39+
$KAFKA_HOME/bin/kafka-console-producer.sh --topic locations --broker-list localhost:9092
40+
```
41+
Consume messages within a consumer group
42+
```shell script
43+
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic locations --group group-ABC --bootstrap-server localhost:9092
44+
```
45+
46+
#### Retrieve and describe topics
47+
48+
Retrieve the list of topics
49+
```shell script
50+
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
51+
```
52+
Describe the `orders` topic
53+
```shell script
54+
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
55+
```
56+
57+
#### Retrieve and describe consumer groups
58+
59+
Retrieve the list of consumer groups
60+
```shell script
61+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
62+
```
63+
```shell script
64+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
65+
```
66+
67+
#### Manage offset
68+
69+
Reset offsets
70+
```shell script
71+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --reset-offsets --to-offset 0 --bootstrap-server localhost:9092 --execute --group group-ABC --topic locations
72+
```
73+
Reset offset of a specific topic:partition
74+
```shell script
75+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --reset-offsets --to-offset 1 --bootstrap-server localhost:9092 --execute --group group-ABC --topic locations:2
76+
```
77+
Shift offset by 'n', where 'n' can be positive or negative
78+
```shell script
79+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --reset-offsets --shift-by -2 --bootstrap-server localhost:9092 --execute --group group-ABC --topic locations
80+
```
81+
82+
#### Find Kafka cluster version
83+
```shell script
84+
$KAFKA_HOME/bin/kafka-consumer-groups.sh --version
85+
```
86+
87+
## Stop Kafka and Zookeeper
88+
89+
Stop the containers
90+
```shell script
91+
docker-compose down
92+
```
93+
94+
---
95+
96+
## Setup Kafka Tool UI
97+
98+
#### Download
99+
https://www.kafkatool.com/download.html
100+
101+
#### Configure connection to Kafka cluster
102+
Configure the Properties tab with
103+
- Cluster name
104+
- Kafka cluster version ([see here](#Find-Kafka-cluster-version))
105+
- Zookeeper Host (localhost)
106+
- Zookeeper Port (2181)

docker-compose.yml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
version: '3'
2+
3+
services:
4+
zookeeper:
5+
image: wurstmeister/zookeeper
6+
container_name: zookeeper
7+
ports:
8+
- "2181:2181"
9+
kafka:
10+
image: wurstmeister/kafka
11+
container_name: kafka
12+
ports:
13+
- "9092:9092"
14+
environment:
15+
KAFKA_ADVERTISED_HOST_NAME: localhost
16+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
17+
KAFKA_CREATE_TOPICS: "orders:3:1"

pom.xml

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>marc.destefanis</groupId>
8+
<artifactId>kafka-examples</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>13</maven.compiler.source>
13+
<maven.compiler.target>13</maven.compiler.target>
14+
<maven.compiler.release>13</maven.compiler.release>
15+
<kafka.clients.version>2.5.0</kafka.clients.version>
16+
<logback.version>1.2.3</logback.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-clients</artifactId>
23+
<version>${kafka.clients.version}</version>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>ch.qos.logback</groupId>
28+
<artifactId>logback-classic</artifactId>
29+
<version>${logback.version}</version>
30+
</dependency>
31+
</dependencies>
32+
33+
<build>
34+
<plugins>
35+
<plugin>
36+
<groupId>org.apache.maven.plugins</groupId>
37+
<artifactId>maven-compiler-plugin</artifactId>
38+
<version>3.8.0</version>
39+
<configuration>
40+
<release>${maven.compiler.release}</release>
41+
<source>${maven.compiler.source}</source>
42+
<target>${maven.compiler.target}</target>
43+
</configuration>
44+
</plugin>
45+
</plugins>
46+
</build>
47+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kafka.examples;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.common.serialization.StringDeserializer;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.time.Duration;
12+
import java.util.Collections;
13+
import java.util.Properties;
14+
15+
public class ConsumerDemo {
16+
private static final Logger LOG = LoggerFactory.getLogger(ConsumerDemo.class);
17+
18+
public static void main(String[] args) {
19+
Properties kafkaProps = new Properties();
20+
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
21+
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
22+
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
23+
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-demo-group");
24+
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
25+
26+
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
27+
28+
consumer.subscribe(Collections.singletonList("orders"));
29+
30+
while(true) {
31+
consumer.poll(Duration.ofMillis(1000))
32+
.forEach(ConsumerDemo::printRecord);
33+
}
34+
}
35+
36+
private static void printRecord(ConsumerRecord<String, String> record) {
37+
LOG.info(record.toString());
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package kafka.examples;
2+
3+
import org.apache.kafka.clients.producer.Callback;
4+
import org.apache.kafka.clients.producer.KafkaProducer;
5+
import org.apache.kafka.clients.producer.ProducerConfig;
6+
import org.apache.kafka.clients.producer.ProducerRecord;
7+
import org.apache.kafka.clients.producer.RecordMetadata;
8+
import org.apache.kafka.common.serialization.StringSerializer;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.util.Properties;
13+
14+
public class ProducerDemo {
15+
private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
16+
private static final String TOPIC_NAME = "orders";
17+
18+
public static void main(String[] args) {
19+
Properties kafkaProps = new Properties();
20+
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
21+
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
22+
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
23+
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
24+
25+
fireAndForgetMessage(producer, "Fire and forget message...");
26+
synchronousSend(producer, "Synchronous message...");
27+
asynchronousSend(producer, "Asynchronous message....");
28+
}
29+
30+
public static void fireAndForgetMessage(KafkaProducer<String, String> producer, String message) {
31+
LOG.info("Fire and forget: {}", message);
32+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
33+
producer.send(record);
34+
producer.flush();
35+
}
36+
37+
public static void synchronousSend(KafkaProducer<String, String> producer, String message) {
38+
LOG.info("synchronous send: {}", message);
39+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
40+
try {
41+
LOG.info(producer.send(record).get().toString());
42+
} catch (Exception e) {
43+
e.printStackTrace();
44+
}
45+
}
46+
47+
public static void asynchronousSend(KafkaProducer<String, String> producer, String message) {
48+
LOG.info("asynchronous send: {}", message);
49+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
50+
producer.send(record, new OrdersProducerCallback());
51+
producer.flush();
52+
}
53+
54+
private static class OrdersProducerCallback implements Callback {
55+
56+
@Override
57+
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
58+
if (e != null) {
59+
LOG.error(e.toString());
60+
} // else the message has been sent successfully
61+
}
62+
}
63+
}

src/main/resources/logback.xml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
6+
</encoder>
7+
</appender>
8+
9+
<root level="info">
10+
<appender-ref ref="STDOUT" />
11+
</root>
12+
</configuration>

0 commit comments

Comments
 (0)