Skip to content

EricLondon/kafka-log-aggregator

Folders and files

NameName
Last commit message
Last commit date
Jun 3, 2024
Jul 26, 2018
Jun 3, 2024
Jul 26, 2018
Jul 26, 2018
Mar 20, 2025
Jul 26, 2018
Jul 26, 2018
Jul 26, 2018
Jul 26, 2018
Jul 26, 2018

Repository files navigation

Kafka streams Java application to aggregate messages using a session window

See Blog post here: http://ericlondon.com/2018/07/26/kafka-streams-java-application-to-aggregate-messages-using-a-session-window.html

# start zookeeper
zookeeper-server-start $KAFKA_CONF/zookeeper.properties

# start kafka
$KAFKA_HOME/bin/kafka-server-start $KAFKA_CONF/server.properties

# create topics
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-input-stream
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-output-stream

# start console consumers
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-input-stream
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-output-stream

# build jar
./gradlew clean shadowJar

# run app
java -cp "./build/libs/*" LogAggregatorApp

# execute producer
cd ruby && ./producer.rb

# agg output
[{"code":401,"message":"Unauthorized","count":8},{"code":500,"message":"Internal Server Error","count":8},{"code":301,"message":"Moved Permanently","count":6},{"code":403,"message":"Forbidden","count":6},{"code":503,"message":"Service Unavailable","count":4},{"code":400,"message":"Bad Request","count":4},{"code":200,"message":"OK","count":4},{"code":418,"message":"I\u0027m a teapot","count":3},{"code":422,"message":"Unprocessable Entity","count":3},{"code":304,"message":"Not Modified","count":2}]
[{"code":503,"message":"Service Unavailable","count":7},{"code":302,"message":"Found","count":7},{"code":301,"message":"Moved Permanently","count":6},{"code":304,"message":"Not Modified","count":5},{"code":418,"message":"I\u0027m a teapot","count":5},{"code":400,"message":"Bad Request","count":5},{"code":422,"message":"Unprocessable Entity","count":4},{"code":200,"message":"OK","count":4},{"code":403,"message":"Forbidden","count":3},{"code":401,"message":"Unauthorized","count":2}]