Skip to content

Commit b0761e6

Browse files
committed
WIP
1 parent ad3df96 commit b0761e6

6 files changed

+68
-23
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ please review the `iodine -h` command line options for more details on these and
4747

4848
## Iodine - a fast & powerful HTTP + WebSockets server/client with native Pub/Sub
4949

50-
Iodine includes a light and fast HTTP and Websocket server written in C that was written to support both the [NeoRack](https://github.com/boazsegev/neorack) and [Rack interface specifications](http://www.rubydoc.info/github/rack/rack/master/file/SPEC) and the [WebSocket](https://github.com/boazsegev/neorack/blob/master/extensions/websockets.md) and [SSE](https://github.com/boazsegev/neorack/blob/master/extensions/sse.md) extension draft.
50+
Iodine includes a light and fast HTTP and WebSocket server written in C that was written to support both the [NeoRack Specifications](https://github.com/boazsegev/neorack) (with [WebSocket](https://github.com/boazsegev/neorack/blob/master/extensions/websockets.md) / [SSE](https://github.com/boazsegev/neorack/blob/master/extensions/sse.md)) and [Rack specifications](https://github.com/rack/rack/blob/main/SPEC.rdoc) (with the experimental [WebSocket / SSE Rack draft](./Old-WebSocket-Draft.md)).
5151

5252
Iodine also supports native process cluster Pub/Sub and a native RedisEngine to easily scale iodine's Pub/Sub horizontally.
5353

ext/iodine/iodine.h

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static rb_encoding *IodineUTF8Encoding;
5050
static rb_encoding *IodineBinaryEncoding;
5151

5252
static VALUE IODINE_CONNECTION_ENV_TEMPLATE = Qnil;
53+
static VALUE IODINE_RACK_PROTOCOL_STR;
5354
static VALUE IODINE_RACK_HIJACK_SYM;
5455
static VALUE IODINE_RACK_HIJACK_STR;
5556
static VALUE IODINE_RACK_UPGRADE_STR;

ext/iodine/iodine_connection.h

+57-18
Original file line numberDiff line numberDiff line change
@@ -903,10 +903,12 @@ static VALUE iodine_handler_deafult_on_http__each_body(VALUE s, VALUE info_) {
903903
static VALUE iodine_connection_env_get(VALUE self);
904904
static VALUE iodine_connection_handler_set(VALUE client, VALUE handler);
905905
static VALUE iodine_connection_rack_hijack(VALUE self);
906+
static int iodine_connection_rack_hijack_partial(VALUE self, VALUE proc);
906907
static VALUE iodine_connection_peer_addr(VALUE self);
907908
static VALUE iodine_handler_deafult_on_http(VALUE handler, VALUE client) {
908909
/* RACK specification: https://github.com/rack/rack/blob/main/SPEC.rdoc */
909910
VALUE returned_value = Qnil;
911+
VALUE partial_hijack = Qnil;
910912
iodine_connection_s *c = iodine_connection_ptr(client);
911913
if (!c->http)
912914
return returned_value;
@@ -923,7 +925,7 @@ static VALUE iodine_handler_deafult_on_http(VALUE handler, VALUE client) {
923925
return returned_value;
924926
if (!RB_TYPE_P(r, RUBY_T_ARRAY))
925927
goto rack_error;
926-
if (RARRAY_LEN(r) != 3)
928+
if (RARRAY_LEN(r) < 3)
927929
goto rack_error;
928930
{ /* handle status */
929931
VALUE s = RARRAY_PTR(r)[0];
@@ -941,18 +943,27 @@ static VALUE iodine_handler_deafult_on_http(VALUE handler, VALUE client) {
941943
{ /* handle headers */
942944
VALUE hdr = RARRAY_PTR(r)[1];
943945
if (RB_TYPE_P(hdr, RUBY_T_HASH)) {
946+
partial_hijack = rb_hash_delete(hdr, IODINE_RACK_HIJACK_STR);
944947
rb_hash_foreach(hdr,
945948
iodine_handler_deafult_on_http__header,
946949
(VALUE)c->http);
947950
} else if (RB_TYPE_P(hdr, RUBY_T_ARRAY)) {
948951
for (size_t i = 0; i < (size_t)RARRAY_LEN(hdr); ++i) {
949952
VALUE t = RARRAY_PTR(hdr)[i];
950-
if (!RB_TYPE_P(t, RUBY_T_ARRAY) || RARRAY_LEN(t) != 2)
953+
if (!RB_TYPE_P(t, RUBY_T_ARRAY) || RARRAY_LEN(t) != 2 ||
954+
!RB_TYPE_P(RARRAY_PTR(t)[0], RUBY_T_STRING))
951955
goto rack_error;
952-
iodine_connection___add_header(
953-
c->http,
954-
(fio_str_info_s)IODINE_RSTR_INFO(RARRAY_PTR(t)[0]),
955-
(fio_str_info_s)IODINE_RSTR_INFO(RARRAY_PTR(t)[1]));
956+
fio_str_info_s hn = (fio_str_info_s)IODINE_RSTR_INFO(RARRAY_PTR(t)[0]);
957+
if (!FIO_STR_INFO_IS_EQ(
958+
(fio_buf_info_s)IODINE_RSTR_INFO(RARRAY_PTR(t)[0]),
959+
(fio_buf_info_s)IODINE_RSTR_INFO(IODINE_RACK_HIJACK_STR))) {
960+
iodine_connection___add_header(
961+
c->http,
962+
hn,
963+
(fio_str_info_s)IODINE_RSTR_INFO(RARRAY_PTR(t)[1]));
964+
continue;
965+
}
966+
partial_hijack = RARRAY_PTR(t)[1];
956967
}
957968
} else {
958969
FIO_LOG_ERROR("Rack application response headers type error");
@@ -986,23 +997,21 @@ static VALUE iodine_handler_deafult_on_http(VALUE handler, VALUE client) {
986997
.dealloc = (void (*)(void *))fio_bstr_free,
987998
.finish = 1);
988999
}
989-
} else if (RB_TYPE_P(bd, RUBY_T_STRING)) { /* streaming body... */
1000+
} else if (RB_TYPE_P(bd, RUBY_T_STRING)) { /* a simple String */
9901001
fio_http_write(c->http,
9911002
.buf = RSTRING_PTR(bd),
9921003
.len = (size_t)RSTRING_LEN(bd),
9931004
.copy = 1,
9941005
.finish = 1);
995-
} else if (bd == Qnil) {
996-
/* do nothing? no body. */
1006+
} else if (bd == Qnil) { /* do nothing? no body. */
9971007
fio_http_write(c->http, .finish = 1);
9981008
} else { /* streaming body – answers to `call` */
999-
VALUE nio = iodine_connection_rack_hijack(client);
1000-
if (rb_check_funcall(bd, IODINE_CALL_ID, 1, &nio) == RUBY_Qundef) {
1001-
/* failed, close connection (now owned by Ruby) */
1002-
rb_check_funcall(nio, IODINE_CLOSE_ID, 0, NULL);
1003-
}
1009+
partial_hijack = bd;
10041010
}
10051011
}
1012+
if (partial_hijack && partial_hijack != Qnil &&
1013+
iodine_connection_rack_hijack_partial(client, partial_hijack))
1014+
goto rack_error;
10061015

10071016
rb_check_funcall(RARRAY_PTR(r)[2], IODINE_CLOSE_ID, 0, NULL);
10081017

@@ -1652,13 +1661,13 @@ static void *iodine_io_http_on_eventsource_internal(void *info_) {
16521661
.channel = i->event,
16531662
.message = i->data,
16541663
};
1655-
VALUE args[] = {connection, iodine_pubsub_msg_create(&msg)};
1664+
VALUE args[] = {connection, iodine_pubsub_msg_new(&msg)};
16561665
iodine_pubsub_msg_id_set(args[1], rb_str_new(i->id.buf, i->id.len));
16571666
iodine_ruby_call_inside(c->store[IODINE_CONNECTION_STORE_handler],
16581667
IODINE_ON_EVENTSOURCE_ID,
16591668
2,
16601669
args);
1661-
STORE.release(args[1]); /* Store.hold(m) called iodine_pubsub_msg_create */
1670+
STORE.release(args[1]); /* Store.hold(m) called iodine_pubsub_msg_new */
16621671
return NULL;
16631672
}
16641673
/** Called when an EventSource event is received. */
@@ -1737,8 +1746,8 @@ Subscription Helpers
17371746

17381747
static void *iodine_connection_on_pubsub_in_gvl(void *m_) {
17391748
fio_msg_s *m = (fio_msg_s *)m_;
1740-
VALUE msg = iodine_pubsub_msg_create(m);
1741-
/* TODO! move callback to async queue. */
1749+
VALUE msg = iodine_pubsub_msg_new(m);
1750+
/* TODO! move callback to async queue? Is this possible? */
17421751
iodine_ruby_call_inside((VALUE)m->udata, IODINE_CALL_ID, 1, &msg);
17431752
STORE.release(msg);
17441753
return m_;
@@ -2410,6 +2419,36 @@ static VALUE iodine_connection_rack_hijack(VALUE self) {
24102419
return nio;
24112420
}
24122421

2422+
/* TODO: Partial Hijack */
2423+
FIO_SFUNC int iodine_connection_rack_hijack_partial(VALUE self, VALUE proc) {
2424+
iodine_connection_s *c = iodine_connection_ptr(self);
2425+
if (!c->http && !c->io)
2426+
return -1;
2427+
if (!c->io)
2428+
c->io = fio_http_io(c->http);
2429+
if (!rb_respond_to(proc, IODINE_CALL_ID))
2430+
goto error;
2431+
int new_fd = fio_sock_dup(fio_io_fd(c->io));
2432+
if (new_fd == -1)
2433+
goto error;
2434+
VALUE nio = rb_io_fdopen(new_fd, O_RDWR, NULL);
2435+
if (!nio || nio == Qnil)
2436+
goto error_after_open;
2437+
fio_http_write(c->http, .finish = 1);
2438+
if (c->store[IODINE_CONNECTION_STORE_env] &&
2439+
RB_TYPE_P(c->store[IODINE_CONNECTION_STORE_env], RUBY_T_HASH)) {
2440+
rb_hash_aset(c->store[IODINE_CONNECTION_STORE_env],
2441+
STORE.frozen_str(FIO_STR_INFO1((char *)"rack.hijack_io")),
2442+
nio);
2443+
}
2444+
rb_funcallv(proc, IODINE_CALL_ID, 1, &nio);
2445+
return 0;
2446+
error_after_open:
2447+
fio_sock_close(new_fd);
2448+
error:
2449+
return -1;
2450+
}
2451+
24132452
/* *****************************************************************************
24142453
Ruby Public API - Pub/Sub
24152454
***************************************************************************** */

ext/iodine/iodine_pubsub_eng.h

+8-3
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ static void iodine_pubsub_eng___punsubscribe(const fio_pubsub_engine_s *eng,
114114

115115
static void *iodine_pubsub_eng___publish__in_GC(void *a_) {
116116
iodine_pubsub_eng___args_s *args = (iodine_pubsub_eng___args_s *)a_;
117-
VALUE msg = iodine_pubsub_msg_create(args->msg);
117+
VALUE msg = iodine_pubsub_msg_new(args->msg);
118118
iodine_ruby_call_inside(args->eng->handler, rb_intern("publish"), 1, &msg);
119119
STORE.release(msg);
120120
return NULL;
@@ -230,10 +230,15 @@ static VALUE iodine_pubsub_eng_initialize(VALUE self) {
230230

231231
static VALUE iodine_pubsub_eng_default_set(VALUE klass, VALUE eng) {
232232
fio_pubsub_engine_s *e = FIO_PUBSUB_CLUSTER;
233-
if (!IODINE_STORE_IS_SKIP(eng))
233+
ID name = rb_intern(IODINE_PUBSUB_DEFAULT_NM);
234+
if (!IODINE_STORE_IS_SKIP(eng)) {
234235
e = iodine_pubsub_eng_get(eng)->ptr;
236+
}
235237
FIO_PUBSUB_DEFAULT = e;
236-
ID name = rb_intern(IODINE_PUBSUB_DEFAULT_NM);
238+
VALUE old = rb_const_get(iodine_rb_IODINE_BASE, name);
239+
if ((uintptr_t)old > 15)
240+
STORE.release(old);
241+
STORE.hold(eng);
237242
rb_const_remove(iodine_rb_IODINE_BASE, name);
238243
rb_const_set(iodine_rb_IODINE_BASE, name, eng);
239244
return eng;

ext/iodine/iodine_pubsub_msg.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ static iodine_pubsub_msg_s *iodine_pubsub_msg_get(VALUE self) {
7676
return m;
7777
}
7878

79-
static VALUE iodine_pubsub_msg_create(fio_msg_s *msg) {
79+
static VALUE iodine_pubsub_msg_new(fio_msg_s *msg) {
8080
VALUE m = rb_obj_alloc(iodine_rb_IODINE_PUBSUB_MSG);
8181
STORE.hold(m);
8282
iodine_pubsub_msg_s *c = iodine_pubsub_msg_get(m);

logo.png

-134 KB
Binary file not shown.

0 commit comments

Comments
 (0)