Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c25cf0a

Browse files
committedMar 24, 2022
WIP
1 parent 5eccd06 commit c25cf0a

File tree

9 files changed

+938
-702
lines changed

9 files changed

+938
-702
lines changed
 

‎include/scoreboard.h

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ struct process_score {
149149
apr_uint32_t keep_alive; /* async connections in keep alive */
150150
apr_uint32_t suspended; /* connections suspended by some module */
151151
apr_uint32_t read_line; /* async connections doing read line */
152+
apr_uint32_t pending; /* connections waiting a worker */
152153
};
153154

154155
/* Scoreboard is now in 'local' memory, since it isn't updated once created,

‎modules/filters/mod_reqtimeout.c

+23-17
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ static void extend_timeout(reqtimeout_con_cfg *ccfg, apr_bucket_brigade *bb)
9797
}
9898
}
9999

100-
static apr_status_t check_time_left(reqtimeout_con_cfg *ccfg,
101-
apr_time_t now)
100+
static apr_status_t check_and_update_time_left(reqtimeout_con_cfg *ccfg,
101+
apr_time_t now)
102102
{
103103
if (!now)
104104
now = apr_time_now();
@@ -209,11 +209,11 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f,
209209
/* set new timeout */
210210
now = apr_time_now();
211211
ccfg->timeout_at = now + apr_time_from_sec(ccfg->cur_stage.timeout);
212-
ccfg->cur_stage.timeout = 0;
213212
if (ccfg->cur_stage.max_timeout > 0) {
214213
ccfg->max_timeout_at = now + apr_time_from_sec(ccfg->cur_stage.max_timeout);
215214
ccfg->cur_stage.max_timeout = 0;
216215
}
216+
ccfg->cur_stage.timeout = 0;
217217
}
218218
else if (ccfg->timeout_at == 0) {
219219
/* no timeout set, or in between requests */
@@ -227,25 +227,27 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f,
227227
rv = apr_socket_timeout_get(ccfg->socket, &saved_sock_timeout);
228228
AP_DEBUG_ASSERT(rv == APR_SUCCESS);
229229

230-
rv = check_time_left(ccfg, now);
230+
rv = check_and_update_time_left(ccfg, now);
231231
if (rv != APR_SUCCESS)
232232
goto cleanup;
233233

234234
if (mode == AP_MODE_GETLINE && block == APR_BLOCK_READ) {
235+
apr_off_t remaining = HUGE_STRING_LEN;
236+
#if APR_MAJOR_VERSION < 2
237+
apr_int32_t nsds;
238+
apr_interval_time_t poll_timeout;
239+
apr_pollfd_t pollset;
240+
pollset.p = NULL;
241+
#endif
242+
235243
/*
236244
* For a blocking AP_MODE_GETLINE read, apr_brigade_split_line()
237245
* would loop until a whole line has been read. As this would make it
238246
* impossible to enforce a total timeout, we only do non-blocking
239247
* reads.
240248
*/
241-
apr_off_t remaining = HUGE_STRING_LEN;
242249
do {
243250
apr_off_t bblen;
244-
#if APR_MAJOR_VERSION < 2
245-
apr_int32_t nsds;
246-
apr_interval_time_t poll_timeout;
247-
apr_pollfd_t pollset;
248-
#endif
249251

250252
rv = ap_get_brigade(f->next, bb, AP_MODE_GETLINE, APR_NONBLOCK_READ, remaining);
251253
if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
@@ -282,10 +284,12 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f,
282284

283285
/* ... and wait for more */
284286
#if APR_MAJOR_VERSION < 2
285-
pollset.p = f->c->pool;
286-
pollset.desc_type = APR_POLL_SOCKET;
287-
pollset.reqevents = APR_POLLIN|APR_POLLHUP;
288-
pollset.desc.s = ccfg->socket;
287+
if (pollset.p == NULL) {
288+
pollset.p = f->c->pool;
289+
pollset.desc_type = APR_POLL_SOCKET;
290+
pollset.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
291+
pollset.desc.s = ccfg->socket;
292+
}
289293
apr_socket_timeout_get(ccfg->socket, &poll_timeout);
290294
rv = apr_poll(&pollset, 1, &nsds, poll_timeout);
291295
#else
@@ -294,7 +298,7 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f,
294298
if (rv != APR_SUCCESS)
295299
break;
296300

297-
rv = check_time_left(ccfg, 0);
301+
rv = check_and_update_time_left(ccfg, 0);
298302
if (rv != APR_SUCCESS)
299303
break;
300304

@@ -306,12 +310,14 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f,
306310
}
307311
else { /* mode != AP_MODE_GETLINE */
308312
rv = ap_get_brigade(f->next, bb, mode, block, readbytes);
313+
309314
/* Don't extend the timeout in speculative mode, wait for
310315
* the real (relevant) bytes to be asked later, within the
311316
* currently allotted time.
312317
*/
313-
if (ccfg->cur_stage.rate_factor && rv == APR_SUCCESS
314-
&& mode != AP_MODE_SPECULATIVE) {
318+
if (rv == APR_SUCCESS
319+
&& mode != AP_MODE_SPECULATIVE
320+
&& ccfg->cur_stage.rate_factor) {
315321
extend_timeout(ccfg, bb);
316322
}
317323
}

‎modules/generators/mod_status.c

+15-6
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,8 @@ static int status_handler(request_rec *r)
557557
ap_rputs("</dl>", r);
558558

559559
if (is_async) {
560-
int read_line = 0, write_completion = 0, lingering_close = 0, keep_alive = 0,
560+
int read_line = 0, write_completion = 0,
561+
pending = 0, keep_alive = 0, lingering_close = 0,
561562
connections = 0, stopping = 0, procs = 0;
562563
/*
563564
* These differ from 'busy' and 'ready' in how gracefully finishing
@@ -574,13 +575,15 @@ static int status_handler(request_rec *r)
574575
"<th colspan=\"3\">Async connections</th></tr>\n"
575576
"<tr><th>total</th><th>accepting</th>"
576577
"<th>busy</th><th>idle</th>"
577-
"<th>reading</th><th>writing</th><th>keep-alive</th><th>closing</th></tr>\n", r);
578+
"<th>reading</th><th>writing</th>"
579+
"<th>pending</th><th>keep-alive</th><th>closing</th></tr>\n", r);
578580
for (i = 0; i < server_limit; ++i) {
579581
ps_record = ap_get_scoreboard_process(i);
580582
if (ps_record->pid) {
581583
connections += ps_record->connections;
582584
read_line += ps_record->read_line;
583585
write_completion += ps_record->write_completion;
586+
pending += ps_record->pending;
584587
keep_alive += ps_record->keep_alive;
585588
lingering_close += ps_record->lingering_close;
586589
busy_workers += thread_busy_buffer[i];
@@ -601,7 +604,8 @@ static int status_handler(request_rec *r)
601604
"<td>%s%s</td>"
602605
"<td>%u</td><td>%s</td>"
603606
"<td>%u</td><td>%u</td>"
604-
"<td>%u</td><td>%u</td><td>%u</td><td>%u</td>"
607+
"<td>%u</td><td>%u</td>"
608+
"<td>%u</td><td>%u</td><td>%u</td>"
605609
"</tr>\n",
606610
i, ps_record->pid,
607611
dying, old,
@@ -611,6 +615,7 @@ static int status_handler(request_rec *r)
611615
thread_idle_buffer[i],
612616
ps_record->read_line,
613617
ps_record->write_completion,
618+
ps_record->pending,
614619
ps_record->keep_alive,
615620
ps_record->lingering_close);
616621
}
@@ -621,12 +626,14 @@ static int status_handler(request_rec *r)
621626
"<td>%d</td><td>%d</td>"
622627
"<td>%d</td><td>&nbsp;</td>"
623628
"<td>%d</td><td>%d</td>"
624-
"<td>%d</td><td>%d</td><td>%d</td><td>%d</td>"
629+
"<td>%d</td><td>%d</td>"
630+
"<td>%d</td><td>%d</td><td>%d</td>"
625631
"</tr>\n</table>\n",
626632
procs, stopping,
627633
connections,
628634
busy_workers, idle_workers,
629-
read_line, write_completion, keep_alive, lingering_close);
635+
read_line, write_completion,
636+
pending, keep_alive, lingering_close);
630637
}
631638
else {
632639
ap_rprintf(r, "Processes: %d\n"
@@ -636,12 +643,14 @@ static int status_handler(request_rec *r)
636643
"ConnsTotal: %d\n"
637644
"ConnsAsyncReading: %d\n"
638645
"ConnsAsyncWriting: %d\n"
646+
"ConnsAsyncPending: %d\n"
639647
"ConnsAsyncKeepAlive: %d\n"
640648
"ConnsAsyncClosing: %d\n",
641649
procs, stopping,
642650
busy_workers, idle_workers,
643651
connections,
644-
read_line, write_completion, keep_alive, lingering_close);
652+
read_line, write_completion,
653+
pending, keep_alive, lingering_close);
645654
}
646655
}
647656

‎modules/lua/lua_request.c

+8
Original file line numberDiff line numberDiff line change
@@ -1264,10 +1264,18 @@ static int lua_ap_scoreboard_process(lua_State *L)
12641264
lua_pushnumber(L, ps_record->suspended);
12651265
lua_settable(L, -3);
12661266

1267+
lua_pushstring(L, "read_line");
1268+
lua_pushnumber(L, ps_record->read_line);
1269+
lua_settable(L, -3);
1270+
12671271
lua_pushstring(L, "write_completion");
12681272
lua_pushnumber(L, ps_record->write_completion);
12691273
lua_settable(L, -3);
12701274

1275+
lua_pushstring(L, "pending");
1276+
lua_pushnumber(L, ps_record->pending);
1277+
lua_settable(L, -3);
1278+
12711279
lua_pushstring(L, "not_accepting");
12721280
lua_pushnumber(L, ps_record->not_accepting);
12731281
lua_settable(L, -3);

‎server/mpm/event/event.c

+579-545
Large diffs are not rendered by default.

‎server/mpm/worker/worker.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy)
690690
accept_mutex_error("unlock", rv, process_slot);
691691
}
692692
if (csd != NULL) {
693-
rv = ap_queue_push_socket(worker_queue, csd, NULL, ptrans);
693+
rv = ap_queue_push_socket(worker_queue, csd, ptrans);
694694
if (rv) {
695695
/* trash the connection; we couldn't queue the connected
696696
* socket to a worker

‎server/mpm_fdqueue.c

+246-110
Large diffs are not rendered by default.

‎server/mpm_fdqueue.h

+64-22
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
#include <apr_thread_cond.h>
4141
#include <apr_network_io.h>
4242

43+
struct fd_queue_t; /* opaque */
4344
struct fd_queue_info_t; /* opaque */
4445
struct fd_queue_elem_t; /* opaque */
46+
typedef struct fd_queue_t fd_queue_t;
4547
typedef struct fd_queue_info_t fd_queue_info_t;
4648
typedef struct fd_queue_elem_t fd_queue_elem_t;
4749

@@ -50,9 +52,11 @@ AP_DECLARE(apr_status_t) ap_queue_info_create(fd_queue_info_t **queue_info,
5052
int max_recycled_pools);
5153
AP_DECLARE(apr_status_t) ap_queue_info_set_idle(fd_queue_info_t *queue_info,
5254
apr_pool_t *pool_to_recycle);
55+
AP_DECLARE(apr_status_t) ap_queue_info_get_idler(fd_queue_info_t *queue_info);
5356
AP_DECLARE(apr_status_t) ap_queue_info_try_get_idler(fd_queue_info_t *queue_info);
5457
AP_DECLARE(apr_status_t) ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
5558
int *had_to_block);
59+
AP_DECLARE(apr_int32_t) ap_queue_info_avail(fd_queue_info_t *queue_info);
5660
AP_DECLARE(apr_uint32_t) ap_queue_info_num_idlers(fd_queue_info_t *queue_info);
5761
AP_DECLARE(apr_status_t) ap_queue_info_term(fd_queue_info_t *queue_info);
5862

@@ -62,6 +66,43 @@ AP_DECLARE(void) ap_queue_info_push_pool(fd_queue_info_t *queue_info,
6266
apr_pool_t *pool_to_recycle);
6367
AP_DECLARE(void) ap_queue_info_free_idle_pools(fd_queue_info_t *queue_info);
6468

69+
enum fd_queue_event_type_e
70+
{
71+
FD_QUEUE_EVENT_SOCKET,
72+
FD_QUEUE_EVENT_TIMER,
73+
};
74+
typedef enum fd_queue_event_type_e fd_queue_event_type_e;
75+
76+
struct sock_event_t;
77+
struct timer_event_t;
78+
struct fd_queue_event_t
79+
{
80+
/* queue container (used internally) */
81+
fd_queue_elem_t *elem;
82+
83+
/* called back when (de)queuing (under the queue lock) */
84+
void (*cb)(void *baton, int add);
85+
void *baton;
86+
87+
/* event data */
88+
fd_queue_event_type_e type;
89+
union {
90+
struct sock_event_t *se;
91+
struct timer_event_t *te;
92+
} data;
93+
};
94+
typedef struct fd_queue_event_t fd_queue_event_t;
95+
96+
struct sock_event_t
97+
{
98+
apr_socket_t *sd;
99+
void *sd_baton;
100+
apr_pool_t *p;
101+
102+
fd_queue_event_t qe; /* in-queue */
103+
};
104+
typedef struct sock_event_t sock_event_t;
105+
65106
struct timer_event_t
66107
{
67108
APR_RING_ENTRY(timer_event_t) link;
@@ -71,36 +112,37 @@ struct timer_event_t
71112
int canceled;
72113
apr_array_header_t *pfds;
73114
apr_interval_time_t timeout;
115+
116+
fd_queue_event_t qe; /* in-queue */
74117
};
75118
typedef struct timer_event_t timer_event_t;
76119

77-
struct fd_queue_t
78-
{
79-
APR_RING_HEAD(timers_t, timer_event_t) timers;
80-
fd_queue_elem_t *data;
81-
unsigned int nelts;
82-
unsigned int bounds;
83-
unsigned int in;
84-
unsigned int out;
85-
apr_thread_mutex_t *one_big_mutex;
86-
apr_thread_cond_t *not_empty;
87-
volatile int terminated;
88-
};
89-
typedef struct fd_queue_t fd_queue_t;
120+
apr_status_t ap_queue_create(fd_queue_t **pqueue,
121+
int capacity, apr_pool_t *p);
122+
123+
/* mpm_event API */
124+
AP_DECLARE(apr_status_t) ap_queue_push_event(fd_queue_t *queue,
125+
fd_queue_event_t *event);
126+
AP_DECLARE(apr_status_t) ap_queue_pop_event(fd_queue_t *queue,
127+
fd_queue_event_t **event);
128+
AP_DECLARE(apr_status_t) ap_queue_lock(fd_queue_t *queue);
129+
AP_DECLARE(apr_status_t) ap_queue_unlock(fd_queue_t *queue);
130+
AP_DECLARE(void) ap_queue_kill_event_locked(fd_queue_t *queue,
131+
fd_queue_event_t *event);
90132

91-
AP_DECLARE(apr_status_t) ap_queue_create(fd_queue_t **pqueue,
92-
int capacity, apr_pool_t *p);
93-
AP_DECLARE(apr_status_t) ap_queue_push_socket(fd_queue_t *queue,
94-
apr_socket_t *sd, void *sd_baton,
95-
apr_pool_t *p);
96-
AP_DECLARE(apr_status_t) ap_queue_push_timer(fd_queue_t *queue,
97-
timer_event_t *te);
133+
/* mpm_worker API */
134+
AP_DECLARE(apr_status_t) ap_queue_push_something(fd_queue_t *queue,
135+
apr_socket_t *sd, void *sd_baton,
136+
apr_pool_t *p, timer_event_t *te);
98137
AP_DECLARE(apr_status_t) ap_queue_pop_something(fd_queue_t *queue,
99138
apr_socket_t **sd, void **sd_baton,
100139
apr_pool_t **p, timer_event_t **te);
101-
#define ap_queue_pop_socket(q_, s_, p_) \
102-
ap_queue_pop_something((q_), (s_), NULL, (p_), NULL)
140+
#define ap_queue_push_socket(q_, s_, p_) \
141+
ap_queue_push_something((q_), (s_), NULL, (p_), NULL)
142+
#define ap_queue_pop_socket(q_, s_, p_) \
143+
ap_queue_pop_something((q_), (s_), NULL, (p_), NULL)
103144

145+
/* common API */
104146
AP_DECLARE(apr_status_t) ap_queue_interrupt_all(fd_queue_t *queue);
105147
AP_DECLARE(apr_status_t) ap_queue_interrupt_one(fd_queue_t *queue);
106148
AP_DECLARE(apr_status_t) ap_queue_term(fd_queue_t *queue);

‎support/ab.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1819,7 +1819,7 @@ static void read_connection(struct connection * c)
18191819
}
18201820

18211821
/* are we done? */
1822-
if (started >= requests && (c->bread >= c->length)) {
1822+
if (done >= requests && (c->bread >= c->length)) {
18231823
close_connection(c);
18241824
}
18251825

0 commit comments

Comments
 (0)
Please sign in to comment.