-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathconsume_pause_resume.php
108 lines (93 loc) · 2.46 KB
/
consume_pause_resume.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<?php
require("../qbus.php");
ini_set('error_reporting', E_ALL);
if (count($argv) < 3)
{
print "Invaild parameter!\n";
print "usage: php consumer.php <cluster> <topic> <group>\n";
exit(1);
}
$cluster = $argv[1];
$topic = $argv[2];
$group = $argv[3];
# msg_batch_size为一个批次的消息数量,每次消费该数量的消息,就会暂停消费该topic
# 等到pause_seconds秒后恢复消费,即模拟对该批次的消息进行消费的时间
$msg_batch_size = 2;
$pause_seconds = 3;
$consumer = new QbusConsumer;
if (!$consumer->init($cluster, "./consumer.log", "./consumer.config"))
{
print "Consumer.init() failed\n";
exit(1);
}
$topics = new StringVector;
$topics->push($topic);
if (!$consumer->subscribe($group, $topics))
{
print "Consumer.subscribe() failed\n";
exit(1);
}
function now()
{
list($msec, $sec) = explode(' ', microtime());
return sprintf("%s.%03d", date("Y-m-d H:i:s", $sec), (int)($msec * 1000));
}
$run = true;
function sig_handler($signo)
{
switch ($signo) {
case SIGINT:
global $run;
$run = false;
break;
case SIGALRM:
global $consumer;
global $topics;
global $pause_seconds;
printf("%s | Resume consuming %s for %s seconds\n", now(), $topics->get(0), $pause_seconds);
if (!$consumer->resume($topics))
{
print "Consumer.resume() failed\n";
exit(1);
}
break;
default:
printf("Unknown signal: %d\n", $signo);
exit(1);
}
}
pcntl_signal(SIGINT, "sig_handler");
pcntl_signal(SIGALRM, "sig_handler");
if (!$consumer->start())
{
print "Consumer.start() failed\n";
exit(1);
}
print "Start to consume...\n";
$msg_info = new QbusMsgContentInfo;
$msg_cnt = 0;
while ($run)
{
declare(ticks = 1);
if ($consumer->consume($msg_info))
{
$msg_cnt++;
# 实际场景下会保存该消息到用户自定义的数据结构中,以实现分批消费
printf("[%d] %s | %s\n", $msg_cnt, $topics->get(0), $msg_info->msg);
if ($msg_cnt % $msg_batch_size == 0)
{
printf("%s | Pause consuming %s for %d seconds\n", now(), $topics->get(0), $pause_seconds);
if (!$consumer->pause($topics))
{
print "Consumer.pause() failed\n";
exit(1);
}
pcntl_alarm($pause_seconds);
}
}
}
$msg_info = NULL;
$consumer->stop();
$consumer = NULL;
print "\nStopped\n";
?>