summaryrefslogtreecommitdiff
path: root/recv.ha
blob: d1efe59241262607a4ba12682b4ea7d60893e005 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use errors;
use io;
use mcproto;
use net;

type Receiver = struct {
	buf: []u8,
	start: size,
	avail: size,
	packet_length: (size | void),
};

fn newrecv(bufsize: size) Receiver = {
	assert(bufsize & (bufsize - 1) == 0, "bufsize must be a power of 2");

	return Receiver {
		buf = alloc([0...], bufsize),
		start = 0,
		avail = 0,
		packet_length = void,
	};
};

fn recv_finish(recv: *Receiver) void = {
	free(recv.buf);
};

fn recv_poll(recv: *Receiver, s: io::handle, out: []u8) (size | void) = {
	for (true) {
		for (true) match (recv.packet_length) {
		case void =>
			match (recv_read_varint(recv)) {
			case void =>
				break;
			case let length: i32 =>
				if (length: u32 > len(recv.buf)) {
					die("packet too long: {}", length);
				};
				recv.packet_length = length: u32;
			};
		case let length: size =>
			if (recv.avail < length) {
				break;
			};
			recv.packet_length = void;

			assert(len(out) >= length,
				"not enough space in output buffer");

			let payload = out[..0];
			const pl_end = recv.start + length & len(recv.buf) - 1;
			if (recv.start < pl_end || length == 0) {
				static append(payload,
					recv.buf[recv.start..pl_end]...);
			} else {
				static append(payload,
					recv.buf[recv.start..]...);
				static append(payload,
					recv.buf[..pl_end]...);
			};
			assert(len(payload) == length);
			recv_advance(recv, length);

			return length;
		};

		assert(recv.avail < len(recv.buf));
		const end = recv.start + recv.avail & len(recv.buf) - 1;
		const limit = if (recv.start <= end) len(recv.buf)
			else recv.start;
		match (io::read(s, recv.buf[end..limit])) {
		case let nread: size =>
			recv.avail += nread;
		case io::EOF =>
			die("disconnected");
		case errors::again =>
			return;
		case let err: io::error =>
			die("read: {}", io::strerror(err));
		};
	};
};

fn recv_read_varint(recv: *Receiver) (i32 | void) = {
	let res = 0u32;

	for (let i = 0u32; i < 3; i += 1) {
		if (i >= recv.avail) {
			return;
		};
		const b = recv.buf[recv.start + i & len(recv.buf) - 1];
		res |= (b & 0x7f): u32 << (7 * i);
		if (b & 0x80 == 0) {
			recv_advance(recv, i + 1);
			return res: i32;
		};
	};

	die("varint should have ended by now");
};

fn recv_advance(recv: *Receiver, n: size) void = {
	recv.start = recv.start + n & len(recv.buf) - 1;
	recv.avail -= n;
};