summaryrefslogtreecommitdiff
path: root/recv.ha
diff options
context:
space:
mode:
Diffstat (limited to 'recv.ha')
-rw-r--r--recv.ha105
1 files changed, 105 insertions, 0 deletions
diff --git a/recv.ha b/recv.ha
new file mode 100644
index 0000000..d1efe59
--- /dev/null
+++ b/recv.ha
@@ -0,0 +1,105 @@
+use errors;
+use io;
+use mcproto;
+use net;
+
+type Receiver = struct {
+ buf: []u8,
+ start: size,
+ avail: size,
+ packet_length: (size | void),
+};
+
+fn newrecv(bufsize: size) Receiver = {
+ assert(bufsize & (bufsize - 1) == 0, "bufsize must be a power of 2");
+
+ return Receiver {
+ buf = alloc([0...], bufsize),
+ start = 0,
+ avail = 0,
+ packet_length = void,
+ };
+};
+
+fn recv_finish(recv: *Receiver) void = {
+ free(recv.buf);
+};
+
+fn recv_poll(recv: *Receiver, s: io::handle, out: []u8) (size | void) = {
+ for (true) {
+ for (true) match (recv.packet_length) {
+ case void =>
+ match (recv_read_varint(recv)) {
+ case void =>
+ break;
+ case let length: i32 =>
+ if (length: u32 > len(recv.buf)) {
+ die("packet too long: {}", length);
+ };
+ recv.packet_length = length: u32;
+ };
+ case let length: size =>
+ if (recv.avail < length) {
+ break;
+ };
+ recv.packet_length = void;
+
+ assert(len(out) >= length,
+ "not enough space in output buffer");
+
+ let payload = out[..0];
+ const pl_end = recv.start + length & len(recv.buf) - 1;
+ if (recv.start < pl_end || length == 0) {
+ static append(payload,
+ recv.buf[recv.start..pl_end]...);
+ } else {
+ static append(payload,
+ recv.buf[recv.start..]...);
+ static append(payload,
+ recv.buf[..pl_end]...);
+ };
+ assert(len(payload) == length);
+ recv_advance(recv, length);
+
+ return length;
+ };
+
+ assert(recv.avail < len(recv.buf));
+ const end = recv.start + recv.avail & len(recv.buf) - 1;
+ const limit = if (recv.start <= end) len(recv.buf)
+ else recv.start;
+ match (io::read(s, recv.buf[end..limit])) {
+ case let nread: size =>
+ recv.avail += nread;
+ case io::EOF =>
+ die("disconnected");
+ case errors::again =>
+ return;
+ case let err: io::error =>
+ die("read: {}", io::strerror(err));
+ };
+ };
+};
+
+fn recv_read_varint(recv: *Receiver) (i32 | void) = {
+ let res = 0u32;
+
+ for (let i = 0u32; i < 3; i += 1) {
+ if (i >= recv.avail) {
+ return;
+ };
+ const b = recv.buf[recv.start + i & len(recv.buf) - 1];
+ res |= (b & 0x7f): u32 << (7 * i);
+ if (b & 0x80 == 0) {
+ recv_advance(recv, i + 1);
+ return res: i32;
+ };
+ };
+
+ die("varint should have ended by now");
+};
+
+fn recv_advance(recv: *Receiver, n: size) void = {
+ recv.start = recv.start + n & len(recv.buf) - 1;
+ recv.avail -= n;
+};