changeset 5:1bb26347e253

twjit: split into separate base/in/out modules
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 21:24:56 +0000
parents be04d84f5468
children 668b84c52094
files src/Makefile src/twjit.c src/twjit_in.c src/twjit_out.c
diffstat 4 files changed, 366 insertions(+), 343 deletions(-) [+]
line wrap: on
line diff
--- a/src/Makefile	Fri Jul 05 18:52:34 2024 +0000
+++ b/src/Makefile	Fri Jul 05 21:24:56 2024 +0000
@@ -1,4 +1,4 @@
-OBJS=	twjit.o
+OBJS=	twjit.o twjit_in.o twjit_out.o
 LIB=	libtwrtp.a
 
 include ../config.defs
--- a/src/twjit.c	Fri Jul 05 18:52:34 2024 +0000
+++ b/src/twjit.c	Fri Jul 05 21:24:56 2024 +0000
@@ -1,21 +1,16 @@
 /*
- * Themyscira Wireless jitter buffer implementation: code body.
+ * Themyscira Wireless jitter buffer implementation: main body.
  */
 
 #include <stdint.h>
 #include <stdbool.h>
 #include <string.h>
-#include <arpa/inet.h>	/* for network byte order functions */
 
 #include <osmocom/core/linuxlist.h>
 #include <osmocom/core/msgb.h>
 #include <osmocom/core/talloc.h>
-#include <osmocom/core/timer.h>
 #include <osmocom/core/utils.h>
 
-/* FIXME: this libosmo-netif dependency needs to be removed */
-#include <osmocom/netif/rtp.h>
-
 #include <themwi/rtp/twjit.h>
 
 void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config)
@@ -55,339 +50,3 @@
 	msgb_queue_free(&twjit->sb[1].queue);
 	talloc_free(twjit);
 }
-
-/* RTP input to twjit */
-
-static void get_current_time(struct timeval *tp)
-{
-	struct timespec now;
-
-	osmo_clock_gettime(CLOCK_MONOTONIC, &now);
-	tp->tv_sec = now.tv_sec;
-	tp->tv_usec = now.tv_nsec / 1000;
-}
-
-static void
-init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
-			 uint32_t rx_ssrc, uint32_t rx_ts,
-			 const struct timeval *new_time)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-
-	OSMO_ASSERT(llist_empty(&sb->queue));
-	OSMO_ASSERT(sb->depth == 0);
-	/* all good, proceed */
-	sb->ssrc = rx_ssrc;
-	sb->head_ts = rx_ts;
-	msgb_enqueue(&sb->queue, msg);
-	sb->depth = 1;
-	memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
-	memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
-	sb->drop_int_count = 0;
-}
-
-enum input_decision {
-	INPUT_CONTINUE,
-	INPUT_TOO_OLD,
-	INPUT_RESET,
-};
-
-static enum input_decision
-check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
-			uint32_t rx_ssrc, uint32_t rx_ts,
-			const struct timeval *new_time)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-	struct timeval time_delta;
-	int32_t ts_delta;
-
-	if (rx_ssrc != sb->ssrc)
-		return INPUT_RESET;
-	if (starting) {
-		timersub(new_time, &sb->last_arrival, &time_delta);
-		sb->delta_ms = time_delta.tv_sec * 1000 +
-				time_delta.tv_usec / 1000;
-		memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
-	}
-	ts_delta = (int32_t)(rx_ts - sb->head_ts);
-	if (ts_delta < 0)
-		return INPUT_TOO_OLD;
-	if (ts_delta % twjit->ts_quantum)
-		return INPUT_RESET;
-	if (starting) {
-		if (sb->conf.start_max_delta &&
-		    sb->delta_ms > sb->conf.start_max_delta)
-			return INPUT_RESET;
-	} else {
-		uint32_t fwd = ts_delta / twjit->ts_quantum;
-
-		if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec)
-			return INPUT_RESET;
-	}
-	return INPUT_CONTINUE;
-}
-
-static void toss_write_queue(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-
-	msgb_queue_free(&sb->queue);
-	sb->depth = 0;
-}
-
-static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit,
-				struct msgb *new_msg, uint32_t rx_ts)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-	uint32_t ts_delta = rx_ts - sb->head_ts;
-	uint32_t ins_depth = ts_delta / twjit->ts_quantum;
-	struct msgb *old_msg;
-	uint32_t old_ts_delta;
-
-	/* are we increasing total depth, and can we do simple tail append? */
-	if (ins_depth >= sb->depth) {
-		msgb_enqueue(&sb->queue, new_msg);
-		sb->depth = ins_depth + 1;
-		return;
-	}
-	/* nope - do it the hard way */
-	llist_for_each_entry(old_msg, &sb->queue, list) {
-		old_ts_delta = old_msg->cb[0] - sb->head_ts;
-		if (old_ts_delta == ts_delta) {
-			/* two packets with the same timestamp! */
-			twjit->stats.duplicate_ts++;
-			msgb_free(new_msg);
-			return;
-		}
-		if (old_ts_delta > ts_delta)
-			break;
-	}
-	llist_add_tail(&new_msg->list, &old_msg->list);
-}
-
-static void trim_starting_sb(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-	struct msgb *msg;
-	uint32_t msg_ts, ts_adv, quantum_adv;
-
-	while (sb->depth > sb->conf.bd_start) {
-		msg = msgb_dequeue(&sb->queue);
-		OSMO_ASSERT(msg);
-		msgb_free(msg);
-		OSMO_ASSERT(!llist_empty(&sb->queue));
-		msg = llist_entry(sb->queue.next, struct msgb, list);
-		msg_ts = msg->cb[0];
-		ts_adv = msg_ts - sb->head_ts;
-		quantum_adv = ts_adv / twjit->ts_quantum;
-		OSMO_ASSERT(sb->depth > quantum_adv);
-		sb->head_ts = msg_ts;
-		sb->depth -= quantum_adv;
-	}
-}
-
-static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
-			     uint16_t rx_seq, uint32_t rx_ts)
-{
-	int16_t seq_delta;
-	int32_t ts_delta;
-
-	if (twjit->last_ssrc != rx_ssrc) {
-		twjit->stats.ssrc_changes++;
-		return;
-	}
-	seq_delta = (int16_t)(rx_seq - twjit->last_seq);
-	if (seq_delta < 0)
-		twjit->stats.seq_backwards++;
-	else if (seq_delta == 0)
-		twjit->stats.seq_repeats++;
-	else if (seq_delta == 1) {
-		ts_delta = (int32_t)(rx_ts - twjit->last_ts);
-		if (ts_delta != twjit->ts_quantum) {
-			if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
-				twjit->stats.intentional_gaps++;
-			else
-				twjit->stats.ts_resets++;
-		}
-	} else
-		twjit->stats.seq_skips++;
-}
-
-void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
-{
-	struct rtp_hdr *rtph;
-	uint32_t rx_ssrc, rx_ts;
-	uint16_t rx_seq;
-	struct timeval new_time;
-	enum input_decision id;
-
-	rtph = osmo_rtp_get_hdr(msg);
-	if (!rtph) {
-		/* invalid packet, couldn't even get header from it */
-		twjit->stats.bad_packets++;
-		msgb_free(msg);
-		return;
-	}
-	rx_ssrc = ntohl(rtph->ssrc);
-	rx_ts = ntohl(rtph->timestamp);
-	rx_seq = ntohs(rtph->sequence);
-	get_current_time(&new_time);
-	if (twjit->got_first_packet)
-		rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
-	twjit->last_ssrc = rx_ssrc;
-	twjit->last_seq = rx_seq;
-	twjit->last_ts = rx_ts;
-	twjit->got_first_packet = true;
-	msg->cb[0] = rx_ts;
-
-	switch (twjit->state) {
-	case TWJIT_STATE_EMPTY:
-		/* first packet into totally empty buffer */
-		twjit->state = TWJIT_STATE_HUNT;
-		twjit->write_sb = 0;
-		init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
-		return;
-	case TWJIT_STATE_HUNT:
-	case TWJIT_STATE_HANDOVER:
-		id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
-					    &new_time);
-		if (id == INPUT_TOO_OLD) {
-			msgb_free(msg);
-			return;
-		}
-		if (id == INPUT_RESET) {
-			toss_write_queue(twjit);
-			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
-						 &new_time);
-			return;
-		}
-		insert_pkt_write_sb(twjit, msg, rx_ts);
-		trim_starting_sb(twjit);
-		return;
-	case TWJIT_STATE_FLOWING:
-		id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
-					    &new_time);
-		if (id == INPUT_TOO_OLD) {
-			twjit->stats.too_old++;
-			msgb_free(msg);
-			return;
-		}
-		if (id == INPUT_RESET) {
-			twjit->state = TWJIT_STATE_HANDOVER;
-			twjit->write_sb = !twjit->write_sb;
-			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
-						 &new_time);
-			twjit->stats.handovers++;
-			return;
-		}
-		insert_pkt_write_sb(twjit, msg, rx_ts);
-		return;
-	default:
-		OSMO_ASSERT(0);
-	}
-}
-
-/* Output from twjit to the fixed timing system */
-
-static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-
-	if (sb->depth < sb->conf.bd_start)
-		return false;
-	if (sb->delta_ms < sb->conf.start_min_delta)
-		return false;
-	return true;
-}
-
-static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
-
-	return sb->depth == 0;
-}
-
-static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
-	struct msgb *msg;
-
-	OSMO_ASSERT(!llist_empty(&sb->queue));
-	OSMO_ASSERT(sb->depth > 0);
-	msg = llist_entry(sb->queue.next, struct msgb, list);
-	if (msg->cb[0] == sb->head_ts) {
-		llist_del(&msg->list);
-		twjit->stats.delivered_pkt++;
-		twjit->stats.delivered_bytes += msg->len;
-	} else {
-		msg = NULL;
-		twjit->stats.output_gaps++;
-	}
-	sb->head_ts += twjit->ts_quantum;
-	sb->depth--;
-	return msg;
-}
-
-static void read_sb_thinning(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
-	struct msgb *msg;
-
-	if (sb->drop_int_count) {
-		sb->drop_int_count--;
-		return;
-	}
-	if (sb->depth <= sb->conf.bd_hiwat)
-		return;
-	twjit->stats.thinning_drops++;
-	msg = pull_from_read_sb(twjit);
-	if (msg)
-		msgb_free(msg);
-	sb->drop_int_count = sb->conf.thinning_int - 2;
-}
-
-static void toss_read_queue(struct twrtp_jibuf_inst *twjit)
-{
-	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
-
-	msgb_queue_free(&sb->queue);
-	sb->depth = 0;
-}
-
-struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit)
-{
-	switch (twjit->state) {
-	case TWJIT_STATE_EMPTY:
-		return NULL;
-	case TWJIT_STATE_HUNT:
-		if (!starting_sb_is_ready(twjit))
-			return NULL;
-		twjit->state = TWJIT_STATE_FLOWING;
-		twjit->read_sb = twjit->write_sb;
-		return pull_from_read_sb(twjit);
-	case TWJIT_STATE_FLOWING:
-		if (read_sb_is_empty(twjit)) {
-			twjit->state = TWJIT_STATE_EMPTY;
-			twjit->stats.underruns++;
-			return NULL;
-		}
-		read_sb_thinning(twjit);
-		return pull_from_read_sb(twjit);
-	case TWJIT_STATE_HANDOVER:
-		if (starting_sb_is_ready(twjit)) {
-			toss_read_queue(twjit);
-			twjit->state = TWJIT_STATE_FLOWING;
-			twjit->read_sb = twjit->write_sb;
-			return pull_from_read_sb(twjit);
-		}
-		if (read_sb_is_empty(twjit)) {
-			twjit->state = TWJIT_STATE_HUNT;
-			twjit->stats.underruns++;
-			return NULL;
-		}
-		read_sb_thinning(twjit);
-		return pull_from_read_sb(twjit);
-	default:
-		OSMO_ASSERT(0);
-	}
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/twjit_in.c	Fri Jul 05 21:24:56 2024 +0000
@@ -0,0 +1,247 @@
+/*
+ * Themyscira Wireless jitter buffer implementation: RTP input processing.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+#include <arpa/inet.h>	/* for network byte order functions */
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/utils.h>
+
+/* FIXME: this libosmo-netif dependency needs to be removed */
+#include <osmocom/netif/rtp.h>
+
+#include <themwi/rtp/twjit.h>
+
+static void get_current_time(struct timeval *tp)
+{
+	struct timespec now;
+
+	osmo_clock_gettime(CLOCK_MONOTONIC, &now);
+	tp->tv_sec = now.tv_sec;
+	tp->tv_usec = now.tv_nsec / 1000;
+}
+
+static void
+init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
+			 uint32_t rx_ssrc, uint32_t rx_ts,
+			 const struct timeval *new_time)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+
+	OSMO_ASSERT(llist_empty(&sb->queue));
+	OSMO_ASSERT(sb->depth == 0);
+	/* all good, proceed */
+	sb->ssrc = rx_ssrc;
+	sb->head_ts = rx_ts;
+	msgb_enqueue(&sb->queue, msg);
+	sb->depth = 1;
+	memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
+	memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
+	sb->drop_int_count = 0;
+}
+
+enum input_decision {
+	INPUT_CONTINUE,
+	INPUT_TOO_OLD,
+	INPUT_RESET,
+};
+
+static enum input_decision
+check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
+			uint32_t rx_ssrc, uint32_t rx_ts,
+			const struct timeval *new_time)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+	struct timeval time_delta;
+	int32_t ts_delta;
+
+	if (rx_ssrc != sb->ssrc)
+		return INPUT_RESET;
+	if (starting) {
+		timersub(new_time, &sb->last_arrival, &time_delta);
+		sb->delta_ms = time_delta.tv_sec * 1000 +
+				time_delta.tv_usec / 1000;
+		memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
+	}
+	ts_delta = (int32_t)(rx_ts - sb->head_ts);
+	if (ts_delta < 0)
+		return INPUT_TOO_OLD;
+	if (ts_delta % twjit->ts_quantum)
+		return INPUT_RESET;
+	if (starting) {
+		if (sb->conf.start_max_delta &&
+		    sb->delta_ms > sb->conf.start_max_delta)
+			return INPUT_RESET;
+	} else {
+		uint32_t fwd = ts_delta / twjit->ts_quantum;
+
+		if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec)
+			return INPUT_RESET;
+	}
+	return INPUT_CONTINUE;
+}
+
+static void toss_write_queue(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+
+	msgb_queue_free(&sb->queue);
+	sb->depth = 0;
+}
+
+static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit,
+				struct msgb *new_msg, uint32_t rx_ts)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+	uint32_t ts_delta = rx_ts - sb->head_ts;
+	uint32_t ins_depth = ts_delta / twjit->ts_quantum;
+	struct msgb *old_msg;
+	uint32_t old_ts_delta;
+
+	/* are we increasing total depth, and can we do simple tail append? */
+	if (ins_depth >= sb->depth) {
+		msgb_enqueue(&sb->queue, new_msg);
+		sb->depth = ins_depth + 1;
+		return;
+	}
+	/* nope - do it the hard way */
+	llist_for_each_entry(old_msg, &sb->queue, list) {
+		old_ts_delta = old_msg->cb[0] - sb->head_ts;
+		if (old_ts_delta == ts_delta) {
+			/* two packets with the same timestamp! */
+			twjit->stats.duplicate_ts++;
+			msgb_free(new_msg);
+			return;
+		}
+		if (old_ts_delta > ts_delta)
+			break;
+	}
+	llist_add_tail(&new_msg->list, &old_msg->list);
+}
+
+static void trim_starting_sb(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+	struct msgb *msg;
+	uint32_t msg_ts, ts_adv, quantum_adv;
+
+	while (sb->depth > sb->conf.bd_start) {
+		msg = msgb_dequeue(&sb->queue);
+		OSMO_ASSERT(msg);
+		msgb_free(msg);
+		OSMO_ASSERT(!llist_empty(&sb->queue));
+		msg = llist_entry(sb->queue.next, struct msgb, list);
+		msg_ts = msg->cb[0];
+		ts_adv = msg_ts - sb->head_ts;
+		quantum_adv = ts_adv / twjit->ts_quantum;
+		OSMO_ASSERT(sb->depth > quantum_adv);
+		sb->head_ts = msg_ts;
+		sb->depth -= quantum_adv;
+	}
+}
+
+static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
+			     uint16_t rx_seq, uint32_t rx_ts)
+{
+	int16_t seq_delta;
+	int32_t ts_delta;
+
+	if (twjit->last_ssrc != rx_ssrc) {
+		twjit->stats.ssrc_changes++;
+		return;
+	}
+	seq_delta = (int16_t)(rx_seq - twjit->last_seq);
+	if (seq_delta < 0)
+		twjit->stats.seq_backwards++;
+	else if (seq_delta == 0)
+		twjit->stats.seq_repeats++;
+	else if (seq_delta == 1) {
+		ts_delta = (int32_t)(rx_ts - twjit->last_ts);
+		if (ts_delta != twjit->ts_quantum) {
+			if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
+				twjit->stats.intentional_gaps++;
+			else
+				twjit->stats.ts_resets++;
+		}
+	} else
+		twjit->stats.seq_skips++;
+}
+
+void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
+{
+	struct rtp_hdr *rtph;
+	uint32_t rx_ssrc, rx_ts;
+	uint16_t rx_seq;
+	struct timeval new_time;
+	enum input_decision id;
+
+	rtph = osmo_rtp_get_hdr(msg);
+	if (!rtph) {
+		/* invalid packet, couldn't even get header from it */
+		twjit->stats.bad_packets++;
+		msgb_free(msg);
+		return;
+	}
+	rx_ssrc = ntohl(rtph->ssrc);
+	rx_ts = ntohl(rtph->timestamp);
+	rx_seq = ntohs(rtph->sequence);
+	get_current_time(&new_time);
+	if (twjit->got_first_packet)
+		rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
+	twjit->last_ssrc = rx_ssrc;
+	twjit->last_seq = rx_seq;
+	twjit->last_ts = rx_ts;
+	twjit->got_first_packet = true;
+	msg->cb[0] = rx_ts;
+
+	switch (twjit->state) {
+	case TWJIT_STATE_EMPTY:
+		/* first packet into totally empty buffer */
+		twjit->state = TWJIT_STATE_HUNT;
+		twjit->write_sb = 0;
+		init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
+		return;
+	case TWJIT_STATE_HUNT:
+	case TWJIT_STATE_HANDOVER:
+		id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
+					    &new_time);
+		if (id == INPUT_TOO_OLD) {
+			msgb_free(msg);
+			return;
+		}
+		if (id == INPUT_RESET) {
+			toss_write_queue(twjit);
+			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
+						 &new_time);
+			return;
+		}
+		insert_pkt_write_sb(twjit, msg, rx_ts);
+		trim_starting_sb(twjit);
+		return;
+	case TWJIT_STATE_FLOWING:
+		id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
+					    &new_time);
+		if (id == INPUT_TOO_OLD) {
+			twjit->stats.too_old++;
+			msgb_free(msg);
+			return;
+		}
+		if (id == INPUT_RESET) {
+			twjit->state = TWJIT_STATE_HANDOVER;
+			twjit->write_sb = !twjit->write_sb;
+			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
+						 &new_time);
+			twjit->stats.handovers++;
+			return;
+		}
+		insert_pkt_write_sb(twjit, msg, rx_ts);
+		return;
+	default:
+		OSMO_ASSERT(0);
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/twjit_out.c	Fri Jul 05 21:24:56 2024 +0000
@@ -0,0 +1,117 @@
+/*
+ * Themyscira Wireless jitter buffer implementation:
+ * output to the fixed timing system.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/utils.h>
+
+#include <themwi/rtp/twjit.h>
+
+static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
+
+	if (sb->depth < sb->conf.bd_start)
+		return false;
+	if (sb->delta_ms < sb->conf.start_min_delta)
+		return false;
+	return true;
+}
+
+static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
+
+	return sb->depth == 0;
+}
+
+static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
+	struct msgb *msg;
+
+	OSMO_ASSERT(!llist_empty(&sb->queue));
+	OSMO_ASSERT(sb->depth > 0);
+	msg = llist_entry(sb->queue.next, struct msgb, list);
+	if (msg->cb[0] == sb->head_ts) {
+		llist_del(&msg->list);
+		twjit->stats.delivered_pkt++;
+		twjit->stats.delivered_bytes += msg->len;
+	} else {
+		msg = NULL;
+		twjit->stats.output_gaps++;
+	}
+	sb->head_ts += twjit->ts_quantum;
+	sb->depth--;
+	return msg;
+}
+
+static void read_sb_thinning(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
+	struct msgb *msg;
+
+	if (sb->drop_int_count) {
+		sb->drop_int_count--;
+		return;
+	}
+	if (sb->depth <= sb->conf.bd_hiwat)
+		return;
+	twjit->stats.thinning_drops++;
+	msg = pull_from_read_sb(twjit);
+	if (msg)
+		msgb_free(msg);
+	sb->drop_int_count = sb->conf.thinning_int - 2;
+}
+
+static void toss_read_queue(struct twrtp_jibuf_inst *twjit)
+{
+	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
+
+	msgb_queue_free(&sb->queue);
+	sb->depth = 0;
+}
+
+struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit)
+{
+	switch (twjit->state) {
+	case TWJIT_STATE_EMPTY:
+		return NULL;
+	case TWJIT_STATE_HUNT:
+		if (!starting_sb_is_ready(twjit))
+			return NULL;
+		twjit->state = TWJIT_STATE_FLOWING;
+		twjit->read_sb = twjit->write_sb;
+		return pull_from_read_sb(twjit);
+	case TWJIT_STATE_FLOWING:
+		if (read_sb_is_empty(twjit)) {
+			twjit->state = TWJIT_STATE_EMPTY;
+			twjit->stats.underruns++;
+			return NULL;
+		}
+		read_sb_thinning(twjit);
+		return pull_from_read_sb(twjit);
+	case TWJIT_STATE_HANDOVER:
+		if (starting_sb_is_ready(twjit)) {
+			toss_read_queue(twjit);
+			twjit->state = TWJIT_STATE_FLOWING;
+			twjit->read_sb = twjit->write_sb;
+			return pull_from_read_sb(twjit);
+		}
+		if (read_sb_is_empty(twjit)) {
+			twjit->state = TWJIT_STATE_HUNT;
+			twjit->stats.underruns++;
+			return NULL;
+		}
+		read_sb_thinning(twjit);
+		return pull_from_read_sb(twjit);
+	default:
+		OSMO_ASSERT(0);
+	}
+}