FreeCalypso > hg > themwi-rtp-lib
diff src/twjit_in.c @ 5:1bb26347e253
twjit: split into separate base/in/out modules
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 21:24:56 +0000 |
parents | src/twjit.c@d10ea5dc61b3 |
children | 95f6c8ce33b0 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/twjit_in.c Fri Jul 05 21:24:56 2024 +0000 @@ -0,0 +1,247 @@ +/* + * Themyscira Wireless jitter buffer implementation: RTP input processing. + */ + +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <arpa/inet.h> /* for network byte order functions */ + +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/timer.h> +#include <osmocom/core/utils.h> + +/* FIXME: this libosmo-netif dependency needs to be removed */ +#include <osmocom/netif/rtp.h> + +#include <themwi/rtp/twjit.h> + +static void get_current_time(struct timeval *tp) +{ + struct timespec now; + + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + tp->tv_sec = now.tv_sec; + tp->tv_usec = now.tv_nsec / 1000; +} + +static void +init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + OSMO_ASSERT(llist_empty(&sb->queue)); + OSMO_ASSERT(sb->depth == 0); + /* all good, proceed */ + sb->ssrc = rx_ssrc; + sb->head_ts = rx_ts; + msgb_enqueue(&sb->queue, msg); + sb->depth = 1; + memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); + memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); + sb->drop_int_count = 0; +} + +enum input_decision { + INPUT_CONTINUE, + INPUT_TOO_OLD, + INPUT_RESET, +}; + +static enum input_decision +check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct timeval time_delta; + int32_t ts_delta; + + if (rx_ssrc != sb->ssrc) + return INPUT_RESET; + if (starting) { + timersub(new_time, &sb->last_arrival, &time_delta); + sb->delta_ms = time_delta.tv_sec * 1000 + + time_delta.tv_usec / 1000; + memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); + } + ts_delta = (int32_t)(rx_ts - sb->head_ts); + if (ts_delta < 0) + return INPUT_TOO_OLD; + if (ts_delta % twjit->ts_quantum) + return INPUT_RESET; + if (starting) { + if (sb->conf.start_max_delta && + sb->delta_ms > sb->conf.start_max_delta) + return INPUT_RESET; + } else { + uint32_t fwd = ts_delta / twjit->ts_quantum; + + if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) + return INPUT_RESET; + } + return INPUT_CONTINUE; +} + +static void toss_write_queue(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + msgb_queue_free(&sb->queue); + sb->depth = 0; +} + +static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, + struct msgb *new_msg, uint32_t rx_ts) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + uint32_t ts_delta = rx_ts - sb->head_ts; + uint32_t ins_depth = ts_delta / twjit->ts_quantum; + struct msgb *old_msg; + uint32_t old_ts_delta; + + /* are we increasing total depth, and can we do simple tail append? */ + if (ins_depth >= sb->depth) { + msgb_enqueue(&sb->queue, new_msg); + sb->depth = ins_depth + 1; + return; + } + /* nope - do it the hard way */ + llist_for_each_entry(old_msg, &sb->queue, list) { + old_ts_delta = old_msg->cb[0] - sb->head_ts; + if (old_ts_delta == ts_delta) { + /* two packets with the same timestamp! */ + twjit->stats.duplicate_ts++; + msgb_free(new_msg); + return; + } + if (old_ts_delta > ts_delta) + break; + } + llist_add_tail(&new_msg->list, &old_msg->list); +} + +static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct msgb *msg; + uint32_t msg_ts, ts_adv, quantum_adv; + + while (sb->depth > sb->conf.bd_start) { + msg = msgb_dequeue(&sb->queue); + OSMO_ASSERT(msg); + msgb_free(msg); + OSMO_ASSERT(!llist_empty(&sb->queue)); + msg = llist_entry(sb->queue.next, struct msgb, list); + msg_ts = msg->cb[0]; + ts_adv = msg_ts - sb->head_ts; + quantum_adv = ts_adv / twjit->ts_quantum; + OSMO_ASSERT(sb->depth > quantum_adv); + sb->head_ts = msg_ts; + sb->depth -= quantum_adv; + } +} + +static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, + uint16_t rx_seq, uint32_t rx_ts) +{ + int16_t seq_delta; + int32_t ts_delta; + + if (twjit->last_ssrc != rx_ssrc) { + twjit->stats.ssrc_changes++; + return; + } + seq_delta = (int16_t)(rx_seq - twjit->last_seq); + if (seq_delta < 0) + twjit->stats.seq_backwards++; + else if (seq_delta == 0) + twjit->stats.seq_repeats++; + else if (seq_delta == 1) { + ts_delta = (int32_t)(rx_ts - twjit->last_ts); + if (ts_delta != twjit->ts_quantum) { + if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) + twjit->stats.intentional_gaps++; + else + twjit->stats.ts_resets++; + } + } else + twjit->stats.seq_skips++; +} + +void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) +{ + struct rtp_hdr *rtph; + uint32_t rx_ssrc, rx_ts; + uint16_t rx_seq; + struct timeval new_time; + enum input_decision id; + + rtph = osmo_rtp_get_hdr(msg); + if (!rtph) { + /* invalid packet, couldn't even get header from it */ + twjit->stats.bad_packets++; + msgb_free(msg); + return; + } + rx_ssrc = ntohl(rtph->ssrc); + rx_ts = ntohl(rtph->timestamp); + rx_seq = ntohs(rtph->sequence); + get_current_time(&new_time); + if (twjit->got_first_packet) + rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); + twjit->last_ssrc = rx_ssrc; + twjit->last_seq = rx_seq; + twjit->last_ts = rx_ts; + twjit->got_first_packet = true; + msg->cb[0] = rx_ts; + + switch (twjit->state) { + case TWJIT_STATE_EMPTY: + /* first packet into totally empty buffer */ + twjit->state = TWJIT_STATE_HUNT; + twjit->write_sb = 0; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); + return; + case TWJIT_STATE_HUNT: + case TWJIT_STATE_HANDOVER: + id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + toss_write_queue(twjit); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + trim_starting_sb(twjit); + return; + case TWJIT_STATE_FLOWING: + id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + twjit->stats.too_old++; + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + twjit->state = TWJIT_STATE_HANDOVER; + twjit->write_sb = !twjit->write_sb; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + twjit->stats.handovers++; + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + return; + default: + OSMO_ASSERT(0); + } +}