Skip to content

Commit 0d558b4

Browse files
committed
Add test
1 parent 319967c commit 0d558b4

File tree

2 files changed

+189
-0
lines changed

2 files changed

+189
-0
lines changed

tests/produce_consume.phpt

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
--TEST--
2+
Produce, consume
3+
--SKIPIF--
4+
<?php
5+
file_exists(__DIR__."/test_env.php") || die("skip");
6+
--FILE--
7+
<?php
8+
9+
require __DIR__."/test_env.php";
10+
11+
$delivered = 0;
12+
13+
$conf = new RdKafka\Conf();
14+
$conf->set('broker.version.fallback', TEST_KAFKA_BROKER_VERSION);
15+
$conf->setErrorCb(function ($producer, $err, $errstr) {
16+
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
17+
exit;
18+
});
19+
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
20+
if ($msg->err) {
21+
throw new Exception("Message delivery failed: " . $msg->errstr());
22+
}
23+
$delivered++;
24+
});
25+
26+
$producer = new RdKafka\Producer($conf);
27+
28+
if ($producer->addBrokers(TEST_KAFKA_BROKERS) < 1) {
29+
echo "Failed adding brokers\n";
30+
exit;
31+
}
32+
33+
$topicName = sprintf("test_rdkafka_%s", uniqid());
34+
35+
$topic = $producer->newTopic($topicName);
36+
37+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
38+
echo "Failed to get metadata, is broker down?\n";
39+
}
40+
41+
for ($i = 0; $i < 10; $i++) {
42+
$topic->produce(0, 0, "message $i");
43+
$producer->poll(0);
44+
}
45+
46+
while ($producer->getOutQLen()) {
47+
$producer->poll(50);
48+
}
49+
50+
printf("%d messages delivered\n", $delivered);
51+
52+
$consumer = new RdKafka\Consumer($conf);
53+
$consumer->addBrokers(TEST_KAFKA_BROKERS);
54+
55+
$topic = $consumer->newTopic($topicName);
56+
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
57+
58+
$messages = [];
59+
60+
while (true) {
61+
$msg = $topic->consume(0, 60*1000);
62+
if (!$msg) {
63+
continue;
64+
}
65+
switch ($msg->err) {
66+
case RD_KAFKA_RESP_ERR_NO_ERROR:
67+
printf("Got message: %s\n", $msg->payload);
68+
break;
69+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
70+
echo "EOF\n";
71+
break 2;
72+
default:
73+
throw new Exception($message->errstr());
74+
}
75+
}
76+
--EXPECT--
77+
10 messages delivered
78+
Got message: message 0
79+
Got message: message 1
80+
Got message: message 2
81+
Got message: message 3
82+
Got message: message 4
83+
Got message: message 5
84+
Got message: message 6
85+
Got message: message 7
86+
Got message: message 8
87+
Got message: message 9
88+
EOF

tests/produce_consume_queue.phpt

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
--TEST--
2+
Produce, consume queue
3+
--SKIPIF--
4+
<?php
5+
file_exists(__DIR__."/test_env.php") || die("skip");
6+
--FILE--
7+
<?php
8+
9+
require __DIR__."/test_env.php";
10+
11+
$delivered = 0;
12+
13+
$conf = new RdKafka\Conf();
14+
$conf->set('broker.version.fallback', TEST_KAFKA_BROKER_VERSION);
15+
$conf->setErrorCb(function ($producer, $err, $errstr) {
16+
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
17+
exit;
18+
});
19+
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
20+
if ($msg->err) {
21+
throw new Exception("Message delivery failed: " . $msg->errstr());
22+
}
23+
$delivered++;
24+
});
25+
26+
$producer = new RdKafka\Producer($conf);
27+
28+
if ($producer->addBrokers(TEST_KAFKA_BROKERS) < 1) {
29+
echo "Failed adding brokers\n";
30+
exit;
31+
}
32+
33+
$topicNames = [
34+
sprintf("test_rdkafka_0_%s", uniqid()),
35+
sprintf("test_rdkafka_1_%s", uniqid()),
36+
];
37+
38+
$topics = array_map(function ($topicName) use ($producer) {
39+
return $producer->newTopic($topicName);
40+
}, $topicNames);
41+
42+
if (!$producer->getMetadata(false, reset($topics), 2*1000)) {
43+
echo "Failed to get metadata, is broker down?\n";
44+
}
45+
46+
for ($i = 0; $i < 10; $i++) {
47+
$topics[$i%2]->produce(0, 0, "message $i");
48+
$producer->poll(0);
49+
}
50+
51+
while ($producer->getOutQLen()) {
52+
$producer->poll(50);
53+
}
54+
55+
printf("%d messages delivered\n", $delivered);
56+
57+
$consumer = new RdKafka\Consumer($conf);
58+
$consumer->addBrokers(TEST_KAFKA_BROKERS);
59+
60+
$queue = $consumer->newQueue();
61+
62+
array_walk($topicNames, function ($topicName) use ($consumer, $queue) {
63+
$topic = $consumer->newTopic($topicName);
64+
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
65+
});
66+
67+
$messages = [];
68+
$eof = 0;
69+
70+
while ($eof < 2) {
71+
$msg = $queue->consume(60*1000);
72+
if (!$msg) {
73+
continue;
74+
}
75+
switch ($msg->err) {
76+
case RD_KAFKA_RESP_ERR_NO_ERROR:
77+
$messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
78+
break;
79+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
80+
$eof++;
81+
break;
82+
default:
83+
throw new Exception($message->errstr());
84+
}
85+
}
86+
87+
sort($messages);
88+
echo implode("\n", $messages), "\n";
89+
90+
--EXPECTF--
91+
10 messages delivered
92+
Got message: message 0 from test_rdkafka_0_%s
93+
Got message: message 1 from test_rdkafka_1_%s
94+
Got message: message 2 from test_rdkafka_0_%s
95+
Got message: message 3 from test_rdkafka_1_%s
96+
Got message: message 4 from test_rdkafka_0_%s
97+
Got message: message 5 from test_rdkafka_1_%s
98+
Got message: message 6 from test_rdkafka_0_%s
99+
Got message: message 7 from test_rdkafka_1_%s
100+
Got message: message 8 from test_rdkafka_0_%s
101+
Got message: message 9 from test_rdkafka_1_%s

0 commit comments

Comments
 (0)