# HG changeset patch # User Mychaela Falconia # Date 1720205448 0 # Node ID d10ea5dc61b38a80a921776a1ed79e0c6f87a139 # Parent e3ab549d6a0f8746176dfb6027ca7ee5c6318a79 twjit: initial import from previous work repository The initial development of twjit was done in a private branch of osmo-bts git repository; the intent was to prototype twjit in osmo-bts. However, as I am getting a better feel for the full scope of the problem (producing a replacement for libortp), I changed course to the present themwi-rtp-lib (libtwrtp) approach. diff -r e3ab549d6a0f -r d10ea5dc61b3 include/twjit.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/twjit.h Fri Jul 05 18:50:48 2024 +0000 @@ -0,0 +1,135 @@ +/* + * Themyscira Wireless jitter buffer implementation: definition of + * structures and API functions. + */ + +#pragma once + +#include +#include + +#include +#include + +/* + * twjit configuration tunings, usually set via vty. + */ +struct twrtp_jibuf_config { + /* buffer depth: starting minimum and high watermark */ + uint16_t bd_start; + uint16_t bd_hiwat; + /* interval for thinning of too-deep standing queue */ + uint16_t thinning_int; + /* guard against time traveler RTP packets */ + uint16_t max_future_sec; + /* min and max time delta in starting state, 0 means not set */ + uint16_t start_min_delta; + uint16_t start_max_delta; +}; + +/* + * Stats collected during the lifetime of a twjit instance. + */ +struct twrtp_jibuf_stats { + /* normal operation */ + uint32_t delivered_pkt; + uint32_t delivered_bytes; + uint32_t handovers; + /* undesirable, but not totally unexpected */ + uint32_t too_old; + uint32_t underruns; + uint32_t output_gaps; + uint32_t thinning_drops; + /* unusual error events */ + uint32_t bad_packets; + uint32_t duplicate_ts; + /* independent analysis of Rx packet stream */ + uint32_t ssrc_changes; + uint32_t seq_skips; + uint32_t seq_backwards; + uint32_t seq_repeats; + uint32_t intentional_gaps; + uint32_t ts_resets; +}; + +/* + * Each twjit instance has two sub-buffers; each subbuf is a queue of + * received RTP packets that have the same SSRC and whose timestamps + * increment in the expected cadence, with each ts delta being an + * integral multiple of the samples-per-quantum constant. + */ +struct twrtp_jibuf_sub { + uint32_t ssrc; + uint32_t head_ts; + struct llist_head queue; + uint32_t depth; + /* last packet arrival time, used only in starting state */ + struct timeval last_arrival; + uint32_t delta_ms; + /* thinning mechanism */ + uint16_t drop_int_count; + /* running config for this subbuf */ + struct twrtp_jibuf_config conf; +}; + +/* + * Each twjit instance is in one of 4 fundamental states at any moment, + * as enumerated here. + */ +enum twrtp_jibuf_state { + TWJIT_STATE_EMPTY, + TWJIT_STATE_HUNT, + TWJIT_STATE_FLOWING, + TWJIT_STATE_HANDOVER, +}; + +/* Main structure for one instance of twjit */ +struct twrtp_jibuf_inst { + /* pointer to config structure given to twrtp_jibuf_create(), + * memory must remain valid, but content can change at any time. */ + struct twrtp_jibuf_config *ext_config; + /* count of RTP timestamp units per quantum */ + uint32_t ts_quantum; + /* quanta per second, used to scale max_future_sec */ + uint32_t quanta_per_sec; + /* operational state */ + enum twrtp_jibuf_state state; + struct twrtp_jibuf_sub sb[2]; + uint8_t read_sb; /* 0 or 1 */ + uint8_t write_sb; /* ditto */ + /* Rx packet stream analysis */ + uint32_t last_ssrc; + uint32_t last_ts; + uint16_t last_seq; + bool got_first_packet; + /* stats over lifetime of this instance */ + struct twrtp_jibuf_stats stats; +}; + +/* twjit module API functions */ + +struct twrtp_jibuf_inst *twrtp_jibuf_create(void *ctx, uint16_t quantum_ms, + uint32_t quantum_ts_inc, + struct twrtp_jibuf_config *config); + +void twrtp_jibuf_destroy(struct twrtp_jibuf_inst *twjit); + +struct msgb; + +/* RTP input, takes ownership of msgb */ +void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg); + +/* output function, to be called by TDM/GSM/etc fixed-timing side */ +struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit); + +/* vty configuration functions */ + +void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config); + +void twrtp_jibuf_vty_init(int twjit_node); + +struct vty; + +int twrtp_jibuf_config_write(struct vty *vty, + const struct twrtp_jibuf_config *conf, + const char *name, const char *prefix); diff -r e3ab549d6a0f -r d10ea5dc61b3 src/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Makefile Fri Jul 05 18:50:48 2024 +0000 @@ -0,0 +1,15 @@ +OBJS= twjit.o +LIB= libtwrtp.a + +include ../config.defs + +CPPFLAGS=${OSMO_INCLUDE} -I../build-inc + +all: ${LIB} + +${LIB}: ${OBJS} + ar rcu $@ ${OBJS} + ranlib $@ + +clean: + rm -f *.[oa] errs diff -r e3ab549d6a0f -r d10ea5dc61b3 src/twjit.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/twjit.c Fri Jul 05 18:50:48 2024 +0000 @@ -0,0 +1,393 @@ +/* + * Themyscira Wireless jitter buffer implementation: code body. + */ + +#include +#include +#include +#include /* for network byte order functions */ + +#include +#include +#include +#include +#include + +/* FIXME: this libosmo-netif dependency needs to be removed */ +#include + +#include + +void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) +{ + memset(config, 0, sizeof(struct twrtp_jibuf_config)); + config->bd_start = 2; /* smallest allowed */ + config->bd_hiwat = 3; /* Nstart+1 is practically-useful minimum */ + config->thinning_int = 17; /* prime number, usually 340 ms */ + config->max_future_sec = 10; /* 10 s is a long time for voice */ +} + +/* create and destroy functions */ + +struct twrtp_jibuf_inst * +twrtp_jibuf_create(void *ctx, uint16_t quantum_ms, uint32_t quantum_ts_inc, + struct twrtp_jibuf_config *config) +{ + struct twrtp_jibuf_inst *twjit; + + twjit = talloc_zero(ctx, struct twrtp_jibuf_inst); + if (!twjit) + return NULL; + + twjit->ext_config = config; + twjit->ts_quantum = quantum_ts_inc; + twjit->quanta_per_sec = 1000 / quantum_ms; + twjit->state = TWJIT_STATE_EMPTY; + INIT_LLIST_HEAD(&twjit->sb[0].queue); + INIT_LLIST_HEAD(&twjit->sb[1].queue); + + return twjit; +} + +void twrtp_jibuf_destroy(struct twrtp_jibuf_inst *twjit) +{ + msgb_queue_free(&twjit->sb[0].queue); + msgb_queue_free(&twjit->sb[1].queue); + talloc_free(twjit); +} + +/* RTP input to twjit */ + +static void get_current_time(struct timeval *tp) +{ + struct timespec now; + + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + tp->tv_sec = now.tv_sec; + tp->tv_usec = now.tv_nsec / 1000; +} + +static void +init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + OSMO_ASSERT(llist_empty(&sb->queue)); + OSMO_ASSERT(sb->depth == 0); + /* all good, proceed */ + sb->ssrc = rx_ssrc; + sb->head_ts = rx_ts; + msgb_enqueue(&sb->queue, msg); + sb->depth = 1; + memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); + memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); + sb->drop_int_count = 0; +} + +enum input_decision { + INPUT_CONTINUE, + INPUT_TOO_OLD, + INPUT_RESET, +}; + +static enum input_decision +check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct timeval time_delta; + int32_t ts_delta; + + if (rx_ssrc != sb->ssrc) + return INPUT_RESET; + if (starting) { + timersub(new_time, &sb->last_arrival, &time_delta); + sb->delta_ms = time_delta.tv_sec * 1000 + + time_delta.tv_usec / 1000; + memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); + } + ts_delta = (int32_t)(rx_ts - sb->head_ts); + if (ts_delta < 0) + return INPUT_TOO_OLD; + if (ts_delta % twjit->ts_quantum) + return INPUT_RESET; + if (starting) { + if (sb->conf.start_max_delta && + sb->delta_ms > sb->conf.start_max_delta) + return INPUT_RESET; + } else { + uint32_t fwd = ts_delta / twjit->ts_quantum; + + if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) + return INPUT_RESET; + } + return INPUT_CONTINUE; +} + +static void toss_write_queue(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + msgb_queue_free(&sb->queue); + sb->depth = 0; +} + +static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, + struct msgb *new_msg, uint32_t rx_ts) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + uint32_t ts_delta = rx_ts - sb->head_ts; + uint32_t ins_depth = ts_delta / twjit->ts_quantum; + struct msgb *old_msg; + uint32_t old_ts_delta; + + /* are we increasing total depth, and can we do simple tail append? */ + if (ins_depth >= sb->depth) { + msgb_enqueue(&sb->queue, new_msg); + sb->depth = ins_depth + 1; + return; + } + /* nope - do it the hard way */ + llist_for_each_entry(old_msg, &sb->queue, list) { + old_ts_delta = old_msg->cb[0] - sb->head_ts; + if (old_ts_delta == ts_delta) { + /* two packets with the same timestamp! */ + twjit->stats.duplicate_ts++; + msgb_free(new_msg); + return; + } + if (old_ts_delta > ts_delta) + break; + } + llist_add_tail(&new_msg->list, &old_msg->list); +} + +static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct msgb *msg; + uint32_t msg_ts, ts_adv, quantum_adv; + + while (sb->depth > sb->conf.bd_start) { + msg = msgb_dequeue(&sb->queue); + OSMO_ASSERT(msg); + msgb_free(msg); + OSMO_ASSERT(!llist_empty(&sb->queue)); + msg = llist_entry(sb->queue.next, struct msgb, list); + msg_ts = msg->cb[0]; + ts_adv = msg_ts - sb->head_ts; + quantum_adv = ts_adv / twjit->ts_quantum; + OSMO_ASSERT(sb->depth > quantum_adv); + sb->head_ts = msg_ts; + sb->depth -= quantum_adv; + } +} + +static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, + uint16_t rx_seq, uint32_t rx_ts) +{ + int16_t seq_delta; + int32_t ts_delta; + + if (twjit->last_ssrc != rx_ssrc) { + twjit->stats.ssrc_changes++; + return; + } + seq_delta = (int16_t)(rx_seq - twjit->last_seq); + if (seq_delta < 0) + twjit->stats.seq_backwards++; + else if (seq_delta == 0) + twjit->stats.seq_repeats++; + else if (seq_delta == 1) { + ts_delta = (int32_t)(rx_ts - twjit->last_ts); + if (ts_delta != twjit->ts_quantum) { + if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) + twjit->stats.intentional_gaps++; + else + twjit->stats.ts_resets++; + } + } else + twjit->stats.seq_skips++; +} + +void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) +{ + struct rtp_hdr *rtph; + uint32_t rx_ssrc, rx_ts; + uint16_t rx_seq; + struct timeval new_time; + enum input_decision id; + + rtph = osmo_rtp_get_hdr(msg); + if (!rtph) { + /* invalid packet, couldn't even get header from it */ + twjit->stats.bad_packets++; + msgb_free(msg); + return; + } + rx_ssrc = ntohl(rtph->ssrc); + rx_ts = ntohl(rtph->timestamp); + rx_seq = ntohs(rtph->sequence); + get_current_time(&new_time); + if (twjit->got_first_packet) + rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); + twjit->last_ssrc = rx_ssrc; + twjit->last_seq = rx_seq; + twjit->last_ts = rx_ts; + twjit->got_first_packet = true; + msg->cb[0] = rx_ts; + + switch (twjit->state) { + case TWJIT_STATE_EMPTY: + /* first packet into totally empty buffer */ + twjit->state = TWJIT_STATE_HUNT; + twjit->write_sb = 0; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); + return; + case TWJIT_STATE_HUNT: + case TWJIT_STATE_HANDOVER: + id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + toss_write_queue(twjit); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + trim_starting_sb(twjit); + return; + case TWJIT_STATE_FLOWING: + id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + twjit->stats.too_old++; + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + twjit->state = TWJIT_STATE_HANDOVER; + twjit->write_sb = !twjit->write_sb; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + twjit->stats.handovers++; + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + return; + default: + OSMO_ASSERT(0); + } +} + +/* Output from twjit to the fixed timing system */ + +static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + if (sb->depth < sb->conf.bd_start) + return false; + if (sb->delta_ms < sb->conf.start_min_delta) + return false; + return true; +} + +static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + + return sb->depth == 0; +} + +static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + struct msgb *msg; + + OSMO_ASSERT(!llist_empty(&sb->queue)); + OSMO_ASSERT(sb->depth > 0); + msg = llist_entry(sb->queue.next, struct msgb, list); + if (msg->cb[0] == sb->head_ts) { + llist_del(&msg->list); + twjit->stats.delivered_pkt++; + twjit->stats.delivered_bytes += msg->len; + } else { + msg = NULL; + twjit->stats.output_gaps++; + } + sb->head_ts += twjit->ts_quantum; + sb->depth--; + return msg; +} + +static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + struct msgb *msg; + + if (sb->drop_int_count) { + sb->drop_int_count--; + return; + } + if (sb->depth <= sb->conf.bd_hiwat) + return; + twjit->stats.thinning_drops++; + msg = pull_from_read_sb(twjit); + if (msg) + msgb_free(msg); + sb->drop_int_count = sb->conf.thinning_int - 2; +} + +static void toss_read_queue(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + + msgb_queue_free(&sb->queue); + sb->depth = 0; +} + +struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) +{ + switch (twjit->state) { + case TWJIT_STATE_EMPTY: + return NULL; + case TWJIT_STATE_HUNT: + if (!starting_sb_is_ready(twjit)) + return NULL; + twjit->state = TWJIT_STATE_FLOWING; + twjit->read_sb = twjit->write_sb; + return pull_from_read_sb(twjit); + case TWJIT_STATE_FLOWING: + if (read_sb_is_empty(twjit)) { + twjit->state = TWJIT_STATE_EMPTY; + twjit->stats.underruns++; + return NULL; + } + read_sb_thinning(twjit); + return pull_from_read_sb(twjit); + case TWJIT_STATE_HANDOVER: + if (starting_sb_is_ready(twjit)) { + toss_read_queue(twjit); + twjit->state = TWJIT_STATE_FLOWING; + twjit->read_sb = twjit->write_sb; + return pull_from_read_sb(twjit); + } + if (read_sb_is_empty(twjit)) { + twjit->state = TWJIT_STATE_HUNT; + twjit->stats.underruns++; + return NULL; + } + read_sb_thinning(twjit); + return pull_from_read_sb(twjit); + default: + OSMO_ASSERT(0); + } +}