FreeCalypso > hg > themwi-rtp-lib
changeset 5:1bb26347e253
twjit: split into separate base/in/out modules
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 21:24:56 +0000 |
parents | be04d84f5468 |
children | 668b84c52094 |
files | src/Makefile src/twjit.c src/twjit_in.c src/twjit_out.c |
diffstat | 4 files changed, 366 insertions(+), 343 deletions(-) [+] |
line wrap: on
line diff
--- a/src/Makefile Fri Jul 05 18:52:34 2024 +0000 +++ b/src/Makefile Fri Jul 05 21:24:56 2024 +0000 @@ -1,4 +1,4 @@ -OBJS= twjit.o +OBJS= twjit.o twjit_in.o twjit_out.o LIB= libtwrtp.a include ../config.defs
--- a/src/twjit.c Fri Jul 05 18:52:34 2024 +0000 +++ b/src/twjit.c Fri Jul 05 21:24:56 2024 +0000 @@ -1,21 +1,16 @@ /* - * Themyscira Wireless jitter buffer implementation: code body. + * Themyscira Wireless jitter buffer implementation: main body. */ #include <stdint.h> #include <stdbool.h> #include <string.h> -#include <arpa/inet.h> /* for network byte order functions */ #include <osmocom/core/linuxlist.h> #include <osmocom/core/msgb.h> #include <osmocom/core/talloc.h> -#include <osmocom/core/timer.h> #include <osmocom/core/utils.h> -/* FIXME: this libosmo-netif dependency needs to be removed */ -#include <osmocom/netif/rtp.h> - #include <themwi/rtp/twjit.h> void twrtp_jibuf_init_defaults(struct twrtp_jibuf_config *config) @@ -55,339 +50,3 @@ msgb_queue_free(&twjit->sb[1].queue); talloc_free(twjit); } - -/* RTP input to twjit */ - -static void get_current_time(struct timeval *tp) -{ - struct timespec now; - - osmo_clock_gettime(CLOCK_MONOTONIC, &now); - tp->tv_sec = now.tv_sec; - tp->tv_usec = now.tv_nsec / 1000; -} - -static void -init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - OSMO_ASSERT(llist_empty(&sb->queue)); - OSMO_ASSERT(sb->depth == 0); - /* all good, proceed */ - sb->ssrc = rx_ssrc; - sb->head_ts = rx_ts; - msgb_enqueue(&sb->queue, msg); - sb->depth = 1; - memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); - memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); - sb->drop_int_count = 0; -} - -enum input_decision { - INPUT_CONTINUE, - INPUT_TOO_OLD, - INPUT_RESET, -}; - -static enum input_decision -check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - struct timeval time_delta; - int32_t ts_delta; - - if (rx_ssrc != sb->ssrc) - return INPUT_RESET; - if (starting) { - timersub(new_time, &sb->last_arrival, &time_delta); - sb->delta_ms = time_delta.tv_sec * 1000 + - time_delta.tv_usec / 1000; - memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); - } - ts_delta = (int32_t)(rx_ts - sb->head_ts); - if (ts_delta < 0) - return INPUT_TOO_OLD; - if (ts_delta % twjit->ts_quantum) - return INPUT_RESET; - if (starting) { - if (sb->conf.start_max_delta && - sb->delta_ms > sb->conf.start_max_delta) - return INPUT_RESET; - } else { - uint32_t fwd = ts_delta / twjit->ts_quantum; - - if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) - return INPUT_RESET; - } - return INPUT_CONTINUE; -} - -static void toss_write_queue(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - msgb_queue_free(&sb->queue); - sb->depth = 0; -} - -static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, - struct msgb *new_msg, uint32_t rx_ts) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - uint32_t ts_delta = rx_ts - sb->head_ts; - uint32_t ins_depth = ts_delta / twjit->ts_quantum; - struct msgb *old_msg; - uint32_t old_ts_delta; - - /* are we increasing total depth, and can we do simple tail append? */ - if (ins_depth >= sb->depth) { - msgb_enqueue(&sb->queue, new_msg); - sb->depth = ins_depth + 1; - return; - } - /* nope - do it the hard way */ - llist_for_each_entry(old_msg, &sb->queue, list) { - old_ts_delta = old_msg->cb[0] - sb->head_ts; - if (old_ts_delta == ts_delta) { - /* two packets with the same timestamp! */ - twjit->stats.duplicate_ts++; - msgb_free(new_msg); - return; - } - if (old_ts_delta > ts_delta) - break; - } - llist_add_tail(&new_msg->list, &old_msg->list); -} - -static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - struct msgb *msg; - uint32_t msg_ts, ts_adv, quantum_adv; - - while (sb->depth > sb->conf.bd_start) { - msg = msgb_dequeue(&sb->queue); - OSMO_ASSERT(msg); - msgb_free(msg); - OSMO_ASSERT(!llist_empty(&sb->queue)); - msg = llist_entry(sb->queue.next, struct msgb, list); - msg_ts = msg->cb[0]; - ts_adv = msg_ts - sb->head_ts; - quantum_adv = ts_adv / twjit->ts_quantum; - OSMO_ASSERT(sb->depth > quantum_adv); - sb->head_ts = msg_ts; - sb->depth -= quantum_adv; - } -} - -static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, - uint16_t rx_seq, uint32_t rx_ts) -{ - int16_t seq_delta; - int32_t ts_delta; - - if (twjit->last_ssrc != rx_ssrc) { - twjit->stats.ssrc_changes++; - return; - } - seq_delta = (int16_t)(rx_seq - twjit->last_seq); - if (seq_delta < 0) - twjit->stats.seq_backwards++; - else if (seq_delta == 0) - twjit->stats.seq_repeats++; - else if (seq_delta == 1) { - ts_delta = (int32_t)(rx_ts - twjit->last_ts); - if (ts_delta != twjit->ts_quantum) { - if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) - twjit->stats.intentional_gaps++; - else - twjit->stats.ts_resets++; - } - } else - twjit->stats.seq_skips++; -} - -void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) -{ - struct rtp_hdr *rtph; - uint32_t rx_ssrc, rx_ts; - uint16_t rx_seq; - struct timeval new_time; - enum input_decision id; - - rtph = osmo_rtp_get_hdr(msg); - if (!rtph) { - /* invalid packet, couldn't even get header from it */ - twjit->stats.bad_packets++; - msgb_free(msg); - return; - } - rx_ssrc = ntohl(rtph->ssrc); - rx_ts = ntohl(rtph->timestamp); - rx_seq = ntohs(rtph->sequence); - get_current_time(&new_time); - if (twjit->got_first_packet) - rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); - twjit->last_ssrc = rx_ssrc; - twjit->last_seq = rx_seq; - twjit->last_ts = rx_ts; - twjit->got_first_packet = true; - msg->cb[0] = rx_ts; - - switch (twjit->state) { - case TWJIT_STATE_EMPTY: - /* first packet into totally empty buffer */ - twjit->state = TWJIT_STATE_HUNT; - twjit->write_sb = 0; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); - return; - case TWJIT_STATE_HUNT: - case TWJIT_STATE_HANDOVER: - id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, - &new_time); - if (id == INPUT_TOO_OLD) { - msgb_free(msg); - return; - } - if (id == INPUT_RESET) { - toss_write_queue(twjit); - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); - return; - } - insert_pkt_write_sb(twjit, msg, rx_ts); - trim_starting_sb(twjit); - return; - case TWJIT_STATE_FLOWING: - id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, - &new_time); - if (id == INPUT_TOO_OLD) { - twjit->stats.too_old++; - msgb_free(msg); - return; - } - if (id == INPUT_RESET) { - twjit->state = TWJIT_STATE_HANDOVER; - twjit->write_sb = !twjit->write_sb; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); - twjit->stats.handovers++; - return; - } - insert_pkt_write_sb(twjit, msg, rx_ts); - return; - default: - OSMO_ASSERT(0); - } -} - -/* Output from twjit to the fixed timing system */ - -static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - - if (sb->depth < sb->conf.bd_start) - return false; - if (sb->delta_ms < sb->conf.start_min_delta) - return false; - return true; -} - -static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - - return sb->depth == 0; -} - -static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - struct msgb *msg; - - OSMO_ASSERT(!llist_empty(&sb->queue)); - OSMO_ASSERT(sb->depth > 0); - msg = llist_entry(sb->queue.next, struct msgb, list); - if (msg->cb[0] == sb->head_ts) { - llist_del(&msg->list); - twjit->stats.delivered_pkt++; - twjit->stats.delivered_bytes += msg->len; - } else { - msg = NULL; - twjit->stats.output_gaps++; - } - sb->head_ts += twjit->ts_quantum; - sb->depth--; - return msg; -} - -static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - struct msgb *msg; - - if (sb->drop_int_count) { - sb->drop_int_count--; - return; - } - if (sb->depth <= sb->conf.bd_hiwat) - return; - twjit->stats.thinning_drops++; - msg = pull_from_read_sb(twjit); - if (msg) - msgb_free(msg); - sb->drop_int_count = sb->conf.thinning_int - 2; -} - -static void toss_read_queue(struct twrtp_jibuf_inst *twjit) -{ - struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; - - msgb_queue_free(&sb->queue); - sb->depth = 0; -} - -struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) -{ - switch (twjit->state) { - case TWJIT_STATE_EMPTY: - return NULL; - case TWJIT_STATE_HUNT: - if (!starting_sb_is_ready(twjit)) - return NULL; - twjit->state = TWJIT_STATE_FLOWING; - twjit->read_sb = twjit->write_sb; - return pull_from_read_sb(twjit); - case TWJIT_STATE_FLOWING: - if (read_sb_is_empty(twjit)) { - twjit->state = TWJIT_STATE_EMPTY; - twjit->stats.underruns++; - return NULL; - } - read_sb_thinning(twjit); - return pull_from_read_sb(twjit); - case TWJIT_STATE_HANDOVER: - if (starting_sb_is_ready(twjit)) { - toss_read_queue(twjit); - twjit->state = TWJIT_STATE_FLOWING; - twjit->read_sb = twjit->write_sb; - return pull_from_read_sb(twjit); - } - if (read_sb_is_empty(twjit)) { - twjit->state = TWJIT_STATE_HUNT; - twjit->stats.underruns++; - return NULL; - } - read_sb_thinning(twjit); - return pull_from_read_sb(twjit); - default: - OSMO_ASSERT(0); - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/twjit_in.c Fri Jul 05 21:24:56 2024 +0000 @@ -0,0 +1,247 @@ +/* + * Themyscira Wireless jitter buffer implementation: RTP input processing. + */ + +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <arpa/inet.h> /* for network byte order functions */ + +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/timer.h> +#include <osmocom/core/utils.h> + +/* FIXME: this libosmo-netif dependency needs to be removed */ +#include <osmocom/netif/rtp.h> + +#include <themwi/rtp/twjit.h> + +static void get_current_time(struct timeval *tp) +{ + struct timespec now; + + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + tp->tv_sec = now.tv_sec; + tp->tv_usec = now.tv_nsec / 1000; +} + +static void +init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + OSMO_ASSERT(llist_empty(&sb->queue)); + OSMO_ASSERT(sb->depth == 0); + /* all good, proceed */ + sb->ssrc = rx_ssrc; + sb->head_ts = rx_ts; + msgb_enqueue(&sb->queue, msg); + sb->depth = 1; + memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); + memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); + sb->drop_int_count = 0; +} + +enum input_decision { + INPUT_CONTINUE, + INPUT_TOO_OLD, + INPUT_RESET, +}; + +static enum input_decision +check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, + uint32_t rx_ssrc, uint32_t rx_ts, + const struct timeval *new_time) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct timeval time_delta; + int32_t ts_delta; + + if (rx_ssrc != sb->ssrc) + return INPUT_RESET; + if (starting) { + timersub(new_time, &sb->last_arrival, &time_delta); + sb->delta_ms = time_delta.tv_sec * 1000 + + time_delta.tv_usec / 1000; + memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); + } + ts_delta = (int32_t)(rx_ts - sb->head_ts); + if (ts_delta < 0) + return INPUT_TOO_OLD; + if (ts_delta % twjit->ts_quantum) + return INPUT_RESET; + if (starting) { + if (sb->conf.start_max_delta && + sb->delta_ms > sb->conf.start_max_delta) + return INPUT_RESET; + } else { + uint32_t fwd = ts_delta / twjit->ts_quantum; + + if (fwd >= sb->conf.max_future_sec * twjit->quanta_per_sec) + return INPUT_RESET; + } + return INPUT_CONTINUE; +} + +static void toss_write_queue(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + msgb_queue_free(&sb->queue); + sb->depth = 0; +} + +static void insert_pkt_write_sb(struct twrtp_jibuf_inst *twjit, + struct msgb *new_msg, uint32_t rx_ts) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + uint32_t ts_delta = rx_ts - sb->head_ts; + uint32_t ins_depth = ts_delta / twjit->ts_quantum; + struct msgb *old_msg; + uint32_t old_ts_delta; + + /* are we increasing total depth, and can we do simple tail append? */ + if (ins_depth >= sb->depth) { + msgb_enqueue(&sb->queue, new_msg); + sb->depth = ins_depth + 1; + return; + } + /* nope - do it the hard way */ + llist_for_each_entry(old_msg, &sb->queue, list) { + old_ts_delta = old_msg->cb[0] - sb->head_ts; + if (old_ts_delta == ts_delta) { + /* two packets with the same timestamp! */ + twjit->stats.duplicate_ts++; + msgb_free(new_msg); + return; + } + if (old_ts_delta > ts_delta) + break; + } + llist_add_tail(&new_msg->list, &old_msg->list); +} + +static void trim_starting_sb(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + struct msgb *msg; + uint32_t msg_ts, ts_adv, quantum_adv; + + while (sb->depth > sb->conf.bd_start) { + msg = msgb_dequeue(&sb->queue); + OSMO_ASSERT(msg); + msgb_free(msg); + OSMO_ASSERT(!llist_empty(&sb->queue)); + msg = llist_entry(sb->queue.next, struct msgb, list); + msg_ts = msg->cb[0]; + ts_adv = msg_ts - sb->head_ts; + quantum_adv = ts_adv / twjit->ts_quantum; + OSMO_ASSERT(sb->depth > quantum_adv); + sb->head_ts = msg_ts; + sb->depth -= quantum_adv; + } +} + +static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, + uint16_t rx_seq, uint32_t rx_ts) +{ + int16_t seq_delta; + int32_t ts_delta; + + if (twjit->last_ssrc != rx_ssrc) { + twjit->stats.ssrc_changes++; + return; + } + seq_delta = (int16_t)(rx_seq - twjit->last_seq); + if (seq_delta < 0) + twjit->stats.seq_backwards++; + else if (seq_delta == 0) + twjit->stats.seq_repeats++; + else if (seq_delta == 1) { + ts_delta = (int32_t)(rx_ts - twjit->last_ts); + if (ts_delta != twjit->ts_quantum) { + if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) + twjit->stats.intentional_gaps++; + else + twjit->stats.ts_resets++; + } + } else + twjit->stats.seq_skips++; +} + +void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) +{ + struct rtp_hdr *rtph; + uint32_t rx_ssrc, rx_ts; + uint16_t rx_seq; + struct timeval new_time; + enum input_decision id; + + rtph = osmo_rtp_get_hdr(msg); + if (!rtph) { + /* invalid packet, couldn't even get header from it */ + twjit->stats.bad_packets++; + msgb_free(msg); + return; + } + rx_ssrc = ntohl(rtph->ssrc); + rx_ts = ntohl(rtph->timestamp); + rx_seq = ntohs(rtph->sequence); + get_current_time(&new_time); + if (twjit->got_first_packet) + rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); + twjit->last_ssrc = rx_ssrc; + twjit->last_seq = rx_seq; + twjit->last_ts = rx_ts; + twjit->got_first_packet = true; + msg->cb[0] = rx_ts; + + switch (twjit->state) { + case TWJIT_STATE_EMPTY: + /* first packet into totally empty buffer */ + twjit->state = TWJIT_STATE_HUNT; + twjit->write_sb = 0; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); + return; + case TWJIT_STATE_HUNT: + case TWJIT_STATE_HANDOVER: + id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + toss_write_queue(twjit); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + trim_starting_sb(twjit); + return; + case TWJIT_STATE_FLOWING: + id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, + &new_time); + if (id == INPUT_TOO_OLD) { + twjit->stats.too_old++; + msgb_free(msg); + return; + } + if (id == INPUT_RESET) { + twjit->state = TWJIT_STATE_HANDOVER; + twjit->write_sb = !twjit->write_sb; + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, + &new_time); + twjit->stats.handovers++; + return; + } + insert_pkt_write_sb(twjit, msg, rx_ts); + return; + default: + OSMO_ASSERT(0); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/twjit_out.c Fri Jul 05 21:24:56 2024 +0000 @@ -0,0 +1,117 @@ +/* + * Themyscira Wireless jitter buffer implementation: + * output to the fixed timing system. + */ + +#include <stdint.h> +#include <stdbool.h> +#include <string.h> + +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/utils.h> + +#include <themwi/rtp/twjit.h> + +static bool starting_sb_is_ready(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; + + if (sb->depth < sb->conf.bd_start) + return false; + if (sb->delta_ms < sb->conf.start_min_delta) + return false; + return true; +} + +static bool read_sb_is_empty(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + + return sb->depth == 0; +} + +static struct msgb *pull_from_read_sb(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + struct msgb *msg; + + OSMO_ASSERT(!llist_empty(&sb->queue)); + OSMO_ASSERT(sb->depth > 0); + msg = llist_entry(sb->queue.next, struct msgb, list); + if (msg->cb[0] == sb->head_ts) { + llist_del(&msg->list); + twjit->stats.delivered_pkt++; + twjit->stats.delivered_bytes += msg->len; + } else { + msg = NULL; + twjit->stats.output_gaps++; + } + sb->head_ts += twjit->ts_quantum; + sb->depth--; + return msg; +} + +static void read_sb_thinning(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + struct msgb *msg; + + if (sb->drop_int_count) { + sb->drop_int_count--; + return; + } + if (sb->depth <= sb->conf.bd_hiwat) + return; + twjit->stats.thinning_drops++; + msg = pull_from_read_sb(twjit); + if (msg) + msgb_free(msg); + sb->drop_int_count = sb->conf.thinning_int - 2; +} + +static void toss_read_queue(struct twrtp_jibuf_inst *twjit) +{ + struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->read_sb]; + + msgb_queue_free(&sb->queue); + sb->depth = 0; +} + +struct msgb *twrtp_jibuf_output(struct twrtp_jibuf_inst *twjit) +{ + switch (twjit->state) { + case TWJIT_STATE_EMPTY: + return NULL; + case TWJIT_STATE_HUNT: + if (!starting_sb_is_ready(twjit)) + return NULL; + twjit->state = TWJIT_STATE_FLOWING; + twjit->read_sb = twjit->write_sb; + return pull_from_read_sb(twjit); + case TWJIT_STATE_FLOWING: + if (read_sb_is_empty(twjit)) { + twjit->state = TWJIT_STATE_EMPTY; + twjit->stats.underruns++; + return NULL; + } + read_sb_thinning(twjit); + return pull_from_read_sb(twjit); + case TWJIT_STATE_HANDOVER: + if (starting_sb_is_ready(twjit)) { + toss_read_queue(twjit); + twjit->state = TWJIT_STATE_FLOWING; + twjit->read_sb = twjit->write_sb; + return pull_from_read_sb(twjit); + } + if (read_sb_is_empty(twjit)) { + twjit->state = TWJIT_STATE_HUNT; + twjit->stats.underruns++; + return NULL; + } + read_sb_thinning(twjit); + return pull_from_read_sb(twjit); + default: + OSMO_ASSERT(0); + } +}