view src/twjit.c @ 3:d10ea5dc61b3

twjit: initial import from previous work repository The initial development of twjit was done in a private branch of osmo-bts git repository; the intent was to prototype twjit in osmo-bts. However, as I am getting a better feel for the full scope of the problem (producing a replacement for libortp), I changed course to the present themwi-rtp-lib (libtwrtp) approach.
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 18:50:48 +0000
parents
children 1bb26347e253
line wrap: on
line source

/*
 * Themyscira Wireless jitter buffer implementation: code body.
 */

#include <stdint.h>
#include <stdbool.h>
#include <string.h>
#include <arpa/inet.h>	/* for network byte order functions */

#include <osmocom/core/linuxlist.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/timer.h>
#include <osmocom/core/utils.h>

/* FIXME: this libosmo-netif dependency needs to be removed */
#include <osmocom/netif/rtp.h>

#include <themwi/rtp/twjit.h>

void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config)
{
	memset(config, 0, sizeof(struct twrtp_jibuf_config));
	config->bd_start = 2;	/* smallest allowed */
	config->bd_hiwat = 3;	/* Nstart+1 is practically-useful minimum */
	config->thinning_int = 17;	/* prime number, usually 340 ms */
	config->max_future_sec = 10;	/* 10 s is a long time for voice */
}

/* create and destroy functions */

struct twrtp_jibuf_inst *
twrtp_jibuf_create(void *ctx, uint16_t quantum_ms, uint32_t quantum_ts_inc,
		   struct twrtp_jibuf_config *config)
{
	struct twrtp_jibuf_inst *twjit;

	twjit = talloc_zero(ctx, struct twrtp_jibuf_inst);
	if (!twjit)
		return NULL;

	twjit->ext_config = config;
	twjit->ts_quantum = quantum_ts_inc;
	twjit->quanta_per_sec = 1000 / quantum_ms;
	twjit->state = TWJIT_STATE_EMPTY;
	INIT_LLIST_HEAD(&twjit->sb[0].queue);
	INIT_LLIST_HEAD(&twjit->sb[1].queue);

	return twjit;
}

void twrtp_jibuf_destroy(struct twrtp_jibuf_inst *twjit)
{
	msgb_queue_free(&twjit->sb[0].queue);
	msgb_queue_free(&twjit->sb[1].queue);
	talloc_free(twjit);
}

/* RTP input to twjit */

static void get_current_time(struct timeval *tp)
{
	struct timespec now;

	osmo_clock_gettime(CLOCK_MONOTONIC, &now);
	tp->tv_sec = now.tv_sec;
	tp->tv_usec = now.tv_nsec / 1000;
}

static void
init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
			 uint32_t rx_ssrc, uint32_t rx_ts,
			 const struct timeval *new_time)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];

	OSMO_ASSERT(llist_empty(&sb->queue));
	OSMO_ASSERT(sb->depth == 0);
	/* all good, proceed */
	sb->ssrc = rx_ssrc;
	sb->head_ts = rx_ts;
	msgb_enqueue(&sb->queue, msg);
	sb->depth = 1;
	memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
	memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
	sb->drop_int_count = 0;
}

enum input_decision {
	INPUT_CONTINUE,
	INPUT_TOO_OLD,
	INPUT_RESET,
};

static enum input_decision
check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
			uint32_t rx_ssrc, uint32_t rx_ts,
			const struct timeval *new_time)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
	struct timeval time_delta;
	int32_t ts_delta;

	if (rx_ssrc != sb->ssrc)
		return INPUT_RESET;
	if (starting) {
		timersub(new_time, &sb->last_arrival, &time_delta);
		sb->delta_ms = time_delta.tv_sec * 1000 +
				time_delta.tv_usec / 1000;
		memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
	}
	ts_delta = (int32_t)(rx_ts - sb->head_ts);
	if (ts_delta < 0)
		return INPUT_TOO_OLD;
	if (ts_delta % twjit->ts_quantum)
		return INPUT_RESET;
	if (starting) {
		if (sb->conf.start_max_delta &&
		    sb->delta_ms > sb->conf.start_max_delta)
			return INPUT_RESET;
	} else {
		uint32_t fwd = ts_delta / twjit->ts_quantum;

		if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec)
			return INPUT_RESET;
	}
	return INPUT_CONTINUE;
}

static void toss_write_queue(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];

	msgb_queue_free(&sb->queue);
	sb->depth = 0;
}

static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit,
				struct msgb *new_msg, uint32_t rx_ts)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
	uint32_t ts_delta = rx_ts - sb->head_ts;
	uint32_t ins_depth = ts_delta / twjit->ts_quantum;
	struct msgb *old_msg;
	uint32_t old_ts_delta;

	/* are we increasing total depth, and can we do simple tail append? */
	if (ins_depth >= sb->depth) {
		msgb_enqueue(&sb->queue, new_msg);
		sb->depth = ins_depth + 1;
		return;
	}
	/* nope - do it the hard way */
	llist_for_each_entry(old_msg, &sb->queue, list) {
		old_ts_delta = old_msg->cb[0] - sb->head_ts;
		if (old_ts_delta == ts_delta) {
			/* two packets with the same timestamp! */
			twjit->stats.duplicate_ts++;
			msgb_free(new_msg);
			return;
		}
		if (old_ts_delta > ts_delta)
			break;
	}
	llist_add_tail(&new_msg->list, &old_msg->list);
}

static void trim_starting_sb(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
	struct msgb *msg;
	uint32_t msg_ts, ts_adv, quantum_adv;

	while (sb->depth > sb->conf.bd_start) {
		msg = msgb_dequeue(&sb->queue);
		OSMO_ASSERT(msg);
		msgb_free(msg);
		OSMO_ASSERT(!llist_empty(&sb->queue));
		msg = llist_entry(sb->queue.next, struct msgb, list);
		msg_ts = msg->cb[0];
		ts_adv = msg_ts - sb->head_ts;
		quantum_adv = ts_adv / twjit->ts_quantum;
		OSMO_ASSERT(sb->depth > quantum_adv);
		sb->head_ts = msg_ts;
		sb->depth -= quantum_adv;
	}
}

static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
			     uint16_t rx_seq, uint32_t rx_ts)
{
	int16_t seq_delta;
	int32_t ts_delta;

	if (twjit->last_ssrc != rx_ssrc) {
		twjit->stats.ssrc_changes++;
		return;
	}
	seq_delta = (int16_t)(rx_seq - twjit->last_seq);
	if (seq_delta < 0)
		twjit->stats.seq_backwards++;
	else if (seq_delta == 0)
		twjit->stats.seq_repeats++;
	else if (seq_delta == 1) {
		ts_delta = (int32_t)(rx_ts - twjit->last_ts);
		if (ts_delta != twjit->ts_quantum) {
			if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
				twjit->stats.intentional_gaps++;
			else
				twjit->stats.ts_resets++;
		}
	} else
		twjit->stats.seq_skips++;
}

void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
{
	struct rtp_hdr *rtph;
	uint32_t rx_ssrc, rx_ts;
	uint16_t rx_seq;
	struct timeval new_time;
	enum input_decision id;

	rtph = osmo_rtp_get_hdr(msg);
	if (!rtph) {
		/* invalid packet, couldn't even get header from it */
		twjit->stats.bad_packets++;
		msgb_free(msg);
		return;
	}
	rx_ssrc = ntohl(rtph->ssrc);
	rx_ts = ntohl(rtph->timestamp);
	rx_seq = ntohs(rtph->sequence);
	get_current_time(&new_time);
	if (twjit->got_first_packet)
		rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
	twjit->last_ssrc = rx_ssrc;
	twjit->last_seq = rx_seq;
	twjit->last_ts = rx_ts;
	twjit->got_first_packet = true;
	msg->cb[0] = rx_ts;

	switch (twjit->state) {
	case TWJIT_STATE_EMPTY:
		/* first packet into totally empty buffer */
		twjit->state = TWJIT_STATE_HUNT;
		twjit->write_sb = 0;
		init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
		return;
	case TWJIT_STATE_HUNT:
	case TWJIT_STATE_HANDOVER:
		id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
					    &new_time);
		if (id == INPUT_TOO_OLD) {
			msgb_free(msg);
			return;
		}
		if (id == INPUT_RESET) {
			toss_write_queue(twjit);
			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
						 &new_time);
			return;
		}
		insert_pkt_write_sb(twjit, msg, rx_ts);
		trim_starting_sb(twjit);
		return;
	case TWJIT_STATE_FLOWING:
		id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
					    &new_time);
		if (id == INPUT_TOO_OLD) {
			twjit->stats.too_old++;
			msgb_free(msg);
			return;
		}
		if (id == INPUT_RESET) {
			twjit->state = TWJIT_STATE_HANDOVER;
			twjit->write_sb = !twjit->write_sb;
			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
						 &new_time);
			twjit->stats.handovers++;
			return;
		}
		insert_pkt_write_sb(twjit, msg, rx_ts);
		return;
	default:
		OSMO_ASSERT(0);
	}
}

/* Output from twjit to the fixed timing system */

static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];

	if (sb->depth < sb->conf.bd_start)
		return false;
	if (sb->delta_ms < sb->conf.start_min_delta)
		return false;
	return true;
}

static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];

	return sb->depth == 0;
}

static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
	struct msgb *msg;

	OSMO_ASSERT(!llist_empty(&sb->queue));
	OSMO_ASSERT(sb->depth > 0);
	msg = llist_entry(sb->queue.next, struct msgb, list);
	if (msg->cb[0] == sb->head_ts) {
		llist_del(&msg->list);
		twjit->stats.delivered_pkt++;
		twjit->stats.delivered_bytes += msg->len;
	} else {
		msg = NULL;
		twjit->stats.output_gaps++;
	}
	sb->head_ts += twjit->ts_quantum;
	sb->depth--;
	return msg;
}

static void read_sb_thinning(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];
	struct msgb *msg;

	if (sb->drop_int_count) {
		sb->drop_int_count--;
		return;
	}
	if (sb->depth <= sb->conf.bd_hiwat)
		return;
	twjit->stats.thinning_drops++;
	msg = pull_from_read_sb(twjit);
	if (msg)
		msgb_free(msg);
	sb->drop_int_count = sb->conf.thinning_int - 2;
}

static void toss_read_queue(struct twrtp_jibuf_inst *twjit)
{
	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb];

	msgb_queue_free(&sb->queue);
	sb->depth = 0;
}

struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit)
{
	switch (twjit->state) {
	case TWJIT_STATE_EMPTY:
		return NULL;
	case TWJIT_STATE_HUNT:
		if (!starting_sb_is_ready(twjit))
			return NULL;
		twjit->state = TWJIT_STATE_FLOWING;
		twjit->read_sb = twjit->write_sb;
		return pull_from_read_sb(twjit);
	case TWJIT_STATE_FLOWING:
		if (read_sb_is_empty(twjit)) {
			twjit->state = TWJIT_STATE_EMPTY;
			twjit->stats.underruns++;
			return NULL;
		}
		read_sb_thinning(twjit);
		return pull_from_read_sb(twjit);
	case TWJIT_STATE_HANDOVER:
		if (starting_sb_is_ready(twjit)) {
			toss_read_queue(twjit);
			twjit->state = TWJIT_STATE_FLOWING;
			twjit->read_sb = twjit->write_sb;
			return pull_from_read_sb(twjit);
		}
		if (read_sb_is_empty(twjit)) {
			twjit->state = TWJIT_STATE_HUNT;
			twjit->stats.underruns++;
			return NULL;
		}
		read_sb_thinning(twjit);
		return pull_from_read_sb(twjit);
	default:
		OSMO_ASSERT(0);
	}
}