FreeCalypso > hg > themwi-rtp-lib
diff src/twjit.c @ 5:1bb26347e253
twjit: split into separate base/in/out modules
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 21:24:56 +0000 |
parents | d10ea5dc61b3 |
children | 668b84c52094 |
line wrap: on
line diff
--- a/src/twjit.c Fri Jul 05 18:52:34 2024 +0000 +++ b/src/twjit.c Fri Jul 05 21:24:56 2024 +0000 @@ -1,21 +1,16 @@ /* - * Themyscira Wireless jitter buffer implementation: code body. + * Themyscira Wireless jitter buffer implementation: main body. */ #include <stdint.h> #include <stdbool.h> #include <string.h> -#include <arpa/inet.h> /* for network byte order functions */ #include <osmocom/core/linuxlist.h> #include <osmocom/core/msgb.h> #include <osmocom/core/talloc.h> -#include <osmocom/core/timer.h> #include <osmocom/core/utils.h> -/* FIXME: this libosmo-netif dependency needs to be removed */ -#include <osmocom/netif/rtp.h> - #include <themwi/rtp/twjit.h> void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) @@ -55,339 +50,3 @@ msgb_queue_free(&twjit->sb[1].queue); talloc_free(twjit); } - -/* RTP input to twjit */ - -static void get_current_time(struct timeval *tp) -{ - struct timespec now; - - osmo_clock_gettime(CLOCK_MONOTONIC, &now); - tp->tv_sec = now.tv_sec; - tp->tv_usec = now.tv_nsec / 1000; -} - -static void -init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - OSMO_ASSERT(llist_empty(&sb->queue)); - OSMO_ASSERT(sb->depth == 0); - /* all good, proceed */ - sb->ssrc = rx_ssrc; - sb->head_ts = rx_ts; - msgb_enqueue(&sb->queue, msg); - sb->depth = 1; - memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); - memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); - sb->drop_int_count = 0; -} - -enum input_decision { - INPUT_CONTINUE, - INPUT_TOO_OLD, - INPUT_RESET, -}; - -static enum input_decision -check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - struct timeval time_delta; - int32_t ts_delta; - - if (rx_ssrc != sb->ssrc) - return INPUT_RESET; - if (starting) { - timersub(new_time, &sb->last_arrival, &time_delta); - sb->delta_ms = time_delta.tv_sec * 1000 + - time_delta.tv_usec / 1000; - memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); - } - ts_delta = (int32_t)(rx_ts - sb->head_ts); - if (ts_delta < 0) - return INPUT_TOO_OLD; - if (ts_delta % twjit->ts_quantum) - return INPUT_RESET; - if (starting) { - if (sb->conf.start_max_delta && - sb->delta_ms > sb->conf.start_max_delta) - return INPUT_RESET; - } else { - uint32_t fwd = ts_delta / twjit->ts_quantum; - - if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) - return INPUT_RESET; - } - return INPUT_CONTINUE; -} - -static void toss_write_queue(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - msgb_queue_free(&sb->queue); - sb->depth = 0; -} - -static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, - struct msgb *new_msg, uint32_t rx_ts) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - uint32_t ts_delta = rx_ts - sb->head_ts; - uint32_t ins_depth = ts_delta / twjit->ts_quantum; - struct msgb *old_msg; - uint32_t old_ts_delta; - - /* are we increasing total depth, and can we do simple tail append? */ - if (ins_depth >= sb->depth) { - msgb_enqueue(&sb->queue, new_msg); - sb->depth = ins_depth + 1; - return; - } - /* nope - do it the hard way */ - llist_for_each_entry(old_msg, &sb->queue, list) { - old_ts_delta = old_msg->cb[0] - sb->head_ts; - if (old_ts_delta == ts_delta) { - /* two packets with the same timestamp! */ - twjit->stats.duplicate_ts++; - msgb_free(new_msg); - return; - } - if (old_ts_delta > ts_delta) - break; - } - llist_add_tail(&new_msg->list, &old_msg->list); -} - -static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - struct msgb *msg; - uint32_t msg_ts, ts_adv, quantum_adv; - - while (sb->depth > sb->conf.bd_start) { - msg = msgb_dequeue(&sb->queue); - OSMO_ASSERT(msg); - msgb_free(msg); - OSMO_ASSERT(!llist_empty(&sb->queue)); - msg = llist_entry(sb->queue.next, struct msgb, list); - msg_ts = msg->cb[0]; - ts_adv = msg_ts - sb->head_ts; - quantum_adv = ts_adv / twjit->ts_quantum; - OSMO_ASSERT(sb->depth > quantum_adv); - sb->head_ts = msg_ts; - sb->depth -= quantum_adv; - } -} - -static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, - uint16_t rx_seq, uint32_t rx_ts) -{ - int16_t seq_delta; - int32_t ts_delta; - - if (twjit->last_ssrc != rx_ssrc) { - twjit->stats.ssrc_changes++; - return; - } - seq_delta = (int16_t)(rx_seq - twjit->last_seq); - if (seq_delta < 0) - twjit->stats.seq_backwards++; - else if (seq_delta == 0) - twjit->stats.seq_repeats++; - else if (seq_delta == 1) { - ts_delta = (int32_t)(rx_ts - twjit->last_ts); - if (ts_delta != twjit->ts_quantum) { - if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) - twjit->stats.intentional_gaps++; - else - twjit->stats.ts_resets++; - } - } else - twjit->stats.seq_skips++; -} - -void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) -{ - struct rtp_hdr *rtph; - uint32_t rx_ssrc, rx_ts; - uint16_t rx_seq; - struct timeval new_time; - enum input_decision id; - - rtph = osmo_rtp_get_hdr(msg); - if (!rtph) { - /* invalid packet, couldn't even get header from it */ - twjit->stats.bad_packets++; - msgb_free(msg); - return; - } - rx_ssrc = ntohl(rtph->ssrc); - rx_ts = ntohl(rtph->timestamp); - rx_seq = ntohs(rtph->sequence); - get_current_time(&new_time); - if (twjit->got_first_packet) - rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); - twjit->last_ssrc = rx_ssrc; - twjit->last_seq = rx_seq; - twjit->last_ts = rx_ts; - twjit->got_first_packet = true; - msg->cb[0] = rx_ts; - - switch (twjit->state) { - case TWJIT_STATE_EMPTY: - /* first packet into totally empty buffer */ - twjit->state = TWJIT_STATE_HUNT; - twjit->write_sb = 0; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); - return; - case TWJIT_STATE_HUNT: - case TWJIT_STATE_HANDOVER: - id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, - &new_time); - if (id == INPUT_TOO_OLD) { - msgb_free(msg); - return; - } - if (id == INPUT_RESET) { - toss_write_queue(twjit); - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); - return; - } - insert_pkt_write_sb(twjit, msg, rx_ts); - trim_starting_sb(twjit); - return; - case TWJIT_STATE_FLOWING: - id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, - &new_time); - if (id == INPUT_TOO_OLD) { - twjit->stats.too_old++; - msgb_free(msg); - return; - } - if (id == INPUT_RESET) { - twjit->state = TWJIT_STATE_HANDOVER; - twjit->write_sb = !twjit->write_sb; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); - twjit->stats.handovers++; - return; - } - insert_pkt_write_sb(twjit, msg, rx_ts); - return; - default: - OSMO_ASSERT(0); - } -} - -/* Output from twjit to the fixed timing system */ - -static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - if (sb->depth < sb->conf.bd_start) - return false; - if (sb->delta_ms < sb->conf.start_min_delta) - return false; - return true; -} - -static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - - return sb->depth == 0; -} - -static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - struct msgb *msg; - - OSMO_ASSERT(!llist_empty(&sb->queue)); - OSMO_ASSERT(sb->depth > 0); - msg = llist_entry(sb->queue.next, struct msgb, list); - if (msg->cb[0] == sb->head_ts) { - llist_del(&msg->list); - twjit->stats.delivered_pkt++; - twjit->stats.delivered_bytes += msg->len; - } else { - msg = NULL; - twjit->stats.output_gaps++; - } - sb->head_ts += twjit->ts_quantum; - sb->depth--; - return msg; -} - -static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - struct msgb *msg; - - if (sb->drop_int_count) { - sb->drop_int_count--; - return; - } - if (sb->depth <= sb->conf.bd_hiwat) - return; - twjit->stats.thinning_drops++; - msg = pull_from_read_sb(twjit); - if (msg) - msgb_free(msg); - sb->drop_int_count = sb->conf.thinning_int - 2; -} - -static void toss_read_queue(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - - msgb_queue_free(&sb->queue); - sb->depth = 0; -} - -struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) -{ - switch (twjit->state) { - case TWJIT_STATE_EMPTY: - return NULL; - case TWJIT_STATE_HUNT: - if (!starting_sb_is_ready(twjit)) - return NULL; - twjit->state = TWJIT_STATE_FLOWING; - twjit->read_sb = twjit->write_sb; - return pull_from_read_sb(twjit); - case TWJIT_STATE_FLOWING: - if (read_sb_is_empty(twjit)) { - twjit->state = TWJIT_STATE_EMPTY; - twjit->stats.underruns++; - return NULL; - } - read_sb_thinning(twjit); - return pull_from_read_sb(twjit); - case TWJIT_STATE_HANDOVER: - if (starting_sb_is_ready(twjit)) { - toss_read_queue(twjit); - twjit->state = TWJIT_STATE_FLOWING; - twjit->read_sb = twjit->write_sb; - return pull_from_read_sb(twjit); - } - if (read_sb_is_empty(twjit)) { - twjit->state = TWJIT_STATE_HUNT; - twjit->stats.underruns++; - return NULL; - } - read_sb_thinning(twjit); - return pull_from_read_sb(twjit); - default: - OSMO_ASSERT(0); - } -}