FreeCalypso > hg > themwi-rtp-lib
view src/twjit.c @ 3:d10ea5dc61b3
twjit: initial import from previous work repository
The initial development of twjit was done in a private branch of
osmo-bts git repository; the intent was to prototype twjit in osmo-bts.
However, as I am getting a better feel for the full scope of the problem
(producing a replacement for libortp), I changed course to the present
themwi-rtp-lib (libtwrtp) approach.
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 18:50:48 +0000 |
parents | |
children | 1bb26347e253 |
line wrap: on
line source
/* * Themyscira Wireless jitter buffer implementation: code body. */ #include <stdint.h> #include <stdbool.h> #include <string.h> #include <arpa/inet.h> /* for network byte order functions */ #include <osmocom/core/linuxlist.h> #include <osmocom/core/msgb.h> #include <osmocom/core/talloc.h> #include <osmocom/core/timer.h> #include <osmocom/core/utils.h> /* FIXME: this libosmo-netif dependency needs to be removed */ #include <osmocom/netif/rtp.h> #include <themwi/rtp/twjit.h> void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) { memset(config, 0, sizeof(struct twrtp_jibuf_config)); config->bd_start = 2; /* smallest allowed */ config->bd_hiwat = 3; /* Nstart+1 is practically-useful minimum */ config->thinning_int = 17; /* prime number, usually 340 ms */ config->max_future_sec = 10; /* 10 s is a long time for voice */ } /* create and destroy functions */ struct twrtp_jibuf_inst * twrtp_jibuf_create(void *ctx, uint16_t quantum_ms, uint32_t quantum_ts_inc, struct twrtp_jibuf_config *config) { struct twrtp_jibuf_inst *twjit; twjit = talloc_zero(ctx, struct twrtp_jibuf_inst); if (!twjit) return NULL; twjit->ext_config = config; twjit->ts_quantum = quantum_ts_inc; twjit->quanta_per_sec = 1000 / quantum_ms; twjit->state = TWJIT_STATE_EMPTY; INIT_LLIST_HEAD(&twjit->sb[0].queue); INIT_LLIST_HEAD(&twjit->sb[1].queue); return twjit; } void twrtp_jibuf_destroy(struct twrtp_jibuf_inst *twjit) { msgb_queue_free(&twjit->sb[0].queue); msgb_queue_free(&twjit->sb[1].queue); talloc_free(twjit); } /* RTP input to twjit */ static void get_current_time(struct timeval *tp) { struct timespec now; osmo_clock_gettime(CLOCK_MONOTONIC, &now); tp->tv_sec = now.tv_sec; tp->tv_usec = now.tv_nsec / 1000; } static void init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, uint32_t rx_ssrc, uint32_t rx_ts, const struct timeval *new_time) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; OSMO_ASSERT(llist_empty(&sb->queue)); OSMO_ASSERT(sb->depth == 0); /* all good, proceed */ sb->ssrc = rx_ssrc; sb->head_ts = rx_ts; msgb_enqueue(&sb->queue, msg); sb->depth = 1; memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); sb->drop_int_count = 0; } enum input_decision { INPUT_CONTINUE, INPUT_TOO_OLD, INPUT_RESET, }; static enum input_decision check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, uint32_t rx_ssrc, uint32_t rx_ts, const struct timeval *new_time) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; struct timeval time_delta; int32_t ts_delta; if (rx_ssrc != sb->ssrc) return INPUT_RESET; if (starting) { timersub(new_time, &sb->last_arrival, &time_delta); sb->delta_ms = time_delta.tv_sec * 1000 + time_delta.tv_usec / 1000; memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); } ts_delta = (int32_t)(rx_ts - sb->head_ts); if (ts_delta < 0) return INPUT_TOO_OLD; if (ts_delta % twjit->ts_quantum) return INPUT_RESET; if (starting) { if (sb->conf.start_max_delta && sb->delta_ms > sb->conf.start_max_delta) return INPUT_RESET; } else { uint32_t fwd = ts_delta / twjit->ts_quantum; if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) return INPUT_RESET; } return INPUT_CONTINUE; } static void toss_write_queue(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; msgb_queue_free(&sb->queue); sb->depth = 0; } static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, struct msgb *new_msg, uint32_t rx_ts) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; uint32_t ts_delta = rx_ts - sb->head_ts; uint32_t ins_depth = ts_delta / twjit->ts_quantum; struct msgb *old_msg; uint32_t old_ts_delta; /* are we increasing total depth, and can we do simple tail append? */ if (ins_depth >= sb->depth) { msgb_enqueue(&sb->queue, new_msg); sb->depth = ins_depth + 1; return; } /* nope - do it the hard way */ llist_for_each_entry(old_msg, &sb->queue, list) { old_ts_delta = old_msg->cb[0] - sb->head_ts; if (old_ts_delta == ts_delta) { /* two packets with the same timestamp! */ twjit->stats.duplicate_ts++; msgb_free(new_msg); return; } if (old_ts_delta > ts_delta) break; } llist_add_tail(&new_msg->list, &old_msg->list); } static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; struct msgb *msg; uint32_t msg_ts, ts_adv, quantum_adv; while (sb->depth > sb->conf.bd_start) { msg = msgb_dequeue(&sb->queue); OSMO_ASSERT(msg); msgb_free(msg); OSMO_ASSERT(!llist_empty(&sb->queue)); msg = llist_entry(sb->queue.next, struct msgb, list); msg_ts = msg->cb[0]; ts_adv = msg_ts - sb->head_ts; quantum_adv = ts_adv / twjit->ts_quantum; OSMO_ASSERT(sb->depth > quantum_adv); sb->head_ts = msg_ts; sb->depth -= quantum_adv; } } static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, uint16_t rx_seq, uint32_t rx_ts) { int16_t seq_delta; int32_t ts_delta; if (twjit->last_ssrc != rx_ssrc) { twjit->stats.ssrc_changes++; return; } seq_delta = (int16_t)(rx_seq - twjit->last_seq); if (seq_delta < 0) twjit->stats.seq_backwards++; else if (seq_delta == 0) twjit->stats.seq_repeats++; else if (seq_delta == 1) { ts_delta = (int32_t)(rx_ts - twjit->last_ts); if (ts_delta != twjit->ts_quantum) { if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) twjit->stats.intentional_gaps++; else twjit->stats.ts_resets++; } } else twjit->stats.seq_skips++; } void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) { struct rtp_hdr *rtph; uint32_t rx_ssrc, rx_ts; uint16_t rx_seq; struct timeval new_time; enum input_decision id; rtph = osmo_rtp_get_hdr(msg); if (!rtph) { /* invalid packet, couldn't even get header from it */ twjit->stats.bad_packets++; msgb_free(msg); return; } rx_ssrc = ntohl(rtph->ssrc); rx_ts = ntohl(rtph->timestamp); rx_seq = ntohs(rtph->sequence); get_current_time(&new_time); if (twjit->got_first_packet) rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); twjit->last_ssrc = rx_ssrc; twjit->last_seq = rx_seq; twjit->last_ts = rx_ts; twjit->got_first_packet = true; msg->cb[0] = rx_ts; switch (twjit->state) { case TWJIT_STATE_EMPTY: /* first packet into totally empty buffer */ twjit->state = TWJIT_STATE_HUNT; twjit->write_sb = 0; init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); return; case TWJIT_STATE_HUNT: case TWJIT_STATE_HANDOVER: id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, &new_time); if (id == INPUT_TOO_OLD) { msgb_free(msg); return; } if (id == INPUT_RESET) { toss_write_queue(twjit); init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); return; } insert_pkt_write_sb(twjit, msg, rx_ts); trim_starting_sb(twjit); return; case TWJIT_STATE_FLOWING: id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, &new_time); if (id == INPUT_TOO_OLD) { twjit->stats.too_old++; msgb_free(msg); return; } if (id == INPUT_RESET) { twjit->state = TWJIT_STATE_HANDOVER; twjit->write_sb = !twjit->write_sb; init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); twjit->stats.handovers++; return; } insert_pkt_write_sb(twjit, msg, rx_ts); return; default: OSMO_ASSERT(0); } } /* Output from twjit to the fixed timing system */ static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; if (sb->depth < sb->conf.bd_start) return false; if (sb->delta_ms < sb->conf.start_min_delta) return false; return true; } static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; return sb->depth == 0; } static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; struct msgb *msg; OSMO_ASSERT(!llist_empty(&sb->queue)); OSMO_ASSERT(sb->depth > 0); msg = llist_entry(sb->queue.next, struct msgb, list); if (msg->cb[0] == sb->head_ts) { llist_del(&msg->list); twjit->stats.delivered_pkt++; twjit->stats.delivered_bytes += msg->len; } else { msg = NULL; twjit->stats.output_gaps++; } sb->head_ts += twjit->ts_quantum; sb->depth--; return msg; } static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; struct msgb *msg; if (sb->drop_int_count) { sb->drop_int_count--; return; } if (sb->depth <= sb->conf.bd_hiwat) return; twjit->stats.thinning_drops++; msg = pull_from_read_sb(twjit); if (msg) msgb_free(msg); sb->drop_int_count = sb->conf.thinning_int - 2; } static void toss_read_queue(struct twrtp_jibuf_inst *twjit) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; msgb_queue_free(&sb->queue); sb->depth = 0; } struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) { switch (twjit->state) { case TWJIT_STATE_EMPTY: return NULL; case TWJIT_STATE_HUNT: if (!starting_sb_is_ready(twjit)) return NULL; twjit->state = TWJIT_STATE_FLOWING; twjit->read_sb = twjit->write_sb; return pull_from_read_sb(twjit); case TWJIT_STATE_FLOWING: if (read_sb_is_empty(twjit)) { twjit->state = TWJIT_STATE_EMPTY; twjit->stats.underruns++; return NULL; } read_sb_thinning(twjit); return pull_from_read_sb(twjit); case TWJIT_STATE_HANDOVER: if (starting_sb_is_ready(twjit)) { toss_read_queue(twjit); twjit->state = TWJIT_STATE_FLOWING; twjit->read_sb = twjit->write_sb; return pull_from_read_sb(twjit); } if (read_sb_is_empty(twjit)) { twjit->state = TWJIT_STATE_HUNT; twjit->stats.underruns++; return NULL; } read_sb_thinning(twjit); return pull_from_read_sb(twjit); default: OSMO_ASSERT(0); } }