# HG changeset patch # User Mychaela Falconia # Date 1720223377 0 # Node ID 117fa99ff871ac76c2a04cda9c2479c4e818b0fa # Parent 95f6c8ce33b04dce46caea354a048b787cb2745f twjit: implement proper analytics diff -r 95f6c8ce33b0 -r 117fa99ff871 include/twjit.h --- a/include/twjit.h Fri Jul 05 21:57:51 2024 +0000 +++ b/include/twjit.h Fri Jul 05 23:49:37 2024 +0000 @@ -53,6 +53,18 @@ }; /* + * Info collected from the incoming RTP data stream + * for the purpose of generating RTCP reception report blocks. + */ +struct twrtp_jibuf_rr_info { + uint32_t rx_packets; + uint32_t base_seq; + uint32_t max_seq_ext; + uint32_t expected_pkt; + uint32_t jitter_accum; +}; + +/* * Each twjit instance has two sub-buffers; each subbuf is a queue of * received RTP packets that have the same SSRC and whose timestamps * increment in the expected cadence, with each ts delta being an @@ -63,9 +75,7 @@ uint32_t head_ts; struct llist_head queue; uint32_t depth; - /* last packet arrival time, used only in starting state */ - struct timeval last_arrival; - uint32_t delta_ms; + uint32_t delta_ms; /* used only in starting state */ /* thinning mechanism */ uint16_t drop_int_count; /* running config for this subbuf */ @@ -101,11 +111,15 @@ struct twrtp_jibuf_sub sb[2]; uint8_t read_sb; /* 0 or 1 */ uint8_t write_sb; /* ditto */ - /* Rx packet stream analysis */ + /* info about the most recent Rx packet */ uint32_t last_ssrc; uint32_t last_ts; uint16_t last_seq; bool got_first_packet; + struct timespec last_arrival; + uint32_t last_arrival_delta; + /* analytics for RTCP RR */ + struct twrtp_jibuf_rr_info rr_info; /* stats over lifetime of this instance */ struct twrtp_jibuf_stats stats; }; diff -r 95f6c8ce33b0 -r 117fa99ff871 src/twjit_in.c --- a/src/twjit_in.c Fri Jul 05 21:57:51 2024 +0000 +++ b/src/twjit_in.c Fri Jul 05 23:49:37 2024 +0000 @@ -17,19 +17,90 @@ #include -static void get_current_time(struct timeval *tp) +/* raw analytics on the Rx packet stream */ + +static void analytics_init(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq) { - struct timespec now; + struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; + + rri->rx_packets = 1; + rri->base_seq = rx_seq; + rri->max_seq_ext = rx_seq; + rri->expected_pkt = 1; + rri->jitter_accum = 0; +} + +static void analytics_cont(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq, + uint32_t rx_ts, struct timespec *now) +{ + struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; + uint16_t seq_ext_lo = rri->max_seq_ext; + uint16_t seq_ext_hi = rri->max_seq_ext >> 16; + int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq); + int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo); + int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts); + struct timespec time_delta; + uint32_t time_delta_tsu; + int32_t jitter_new, ts_delta_clamp; - osmo_clock_gettime(CLOCK_MONOTONIC, &now); - tp->tv_sec = now.tv_sec; - tp->tv_usec = now.tv_nsec / 1000; + /* analytics for our own stats */ + if (seq_delta < 0) + twjit->stats.seq_backwards++; + else if (seq_delta == 0) + twjit->stats.seq_repeats++; + else if (seq_delta == 1) { + if (ts_delta != twjit->ts_quantum) { + if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) + twjit->stats.intentional_gaps++; + else + twjit->stats.ts_resets++; + } + } else + twjit->stats.seq_skips++; + + /* analytics for RTCP RR: packet counts */ + rri->rx_packets++; + if (seq_delta2 > 0) { + if (rx_seq < seq_ext_lo) + seq_ext_hi++; + seq_ext_lo = rx_seq; + rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo; + rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1; + } + + /* time-of-arrival analytics */ + time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec; + time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec; + if (time_delta.tv_nsec < 0) { + time_delta.tv_sec--; + time_delta.tv_nsec += 1000000000; + } + /* to avoid overflows in downstream math, clamp to 1 hour */ + if (time_delta.tv_sec >= 3600) { + time_delta.tv_sec = 3600; + time_delta.tv_nsec = 0; + } + /* convert to RTP timestamp units */ + time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec + + time_delta.tv_nsec / twjit->ns_to_ts_units; + twjit->last_arrival_delta = time_delta_tsu; + /* jitter calculation for RTCP RR */ + ts_delta_clamp = twjit->ts_units_per_sec * 3600; + if (ts_delta > ts_delta_clamp) + ts_delta = ts_delta_clamp; + else if (ts_delta < -ts_delta_clamp) + ts_delta = -ts_delta_clamp; + jitter_new = time_delta_tsu - ts_delta; + if (jitter_new < 0) + jitter_new = -jitter_new; + rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4); } +/* actual twjit input logic */ + static void init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) + uint32_t rx_ssrc, uint32_t rx_ts) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; @@ -40,7 +111,6 @@ sb->head_ts = rx_ts; msgb_enqueue(&sb->queue, msg); sb->depth = 1; - memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); sb->drop_int_count = 0; } @@ -53,21 +123,14 @@ static enum input_decision check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, - uint32_t rx_ssrc, uint32_t rx_ts, - const struct timeval *new_time) + uint32_t rx_ssrc, uint32_t rx_ts) { struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; - struct timeval time_delta; int32_t ts_delta; if (rx_ssrc != sb->ssrc) return INPUT_RESET; - if (starting) { - timersub(new_time, &sb->last_arrival, &time_delta); - sb->delta_ms = time_delta.tv_sec * 1000 + - time_delta.tv_usec / 1000; - memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); - } + sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms; ts_delta = (int32_t)(rx_ts - sb->head_ts); if (ts_delta < 0) return INPUT_TOO_OLD; @@ -146,39 +209,12 @@ } } -static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, - uint16_t rx_seq, uint32_t rx_ts) -{ - int16_t seq_delta; - int32_t ts_delta; - - if (twjit->last_ssrc != rx_ssrc) { - twjit->stats.ssrc_changes++; - return; - } - seq_delta = (int16_t)(rx_seq - twjit->last_seq); - if (seq_delta < 0) - twjit->stats.seq_backwards++; - else if (seq_delta == 0) - twjit->stats.seq_repeats++; - else if (seq_delta == 1) { - ts_delta = (int32_t)(rx_ts - twjit->last_ts); - if (ts_delta != twjit->ts_quantum) { - if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) - twjit->stats.intentional_gaps++; - else - twjit->stats.ts_resets++; - } - } else - twjit->stats.seq_skips++; -} - void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) { struct rtp_hdr *rtph; uint32_t rx_ssrc, rx_ts; uint16_t rx_seq; - struct timeval new_time; + struct timespec now; enum input_decision id; rtph = osmo_rtp_get_hdr(msg); @@ -191,13 +227,19 @@ rx_ssrc = ntohl(rtph->ssrc); rx_ts = ntohl(rtph->timestamp); rx_seq = ntohs(rtph->sequence); - get_current_time(&new_time); - if (twjit->got_first_packet) - rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); + osmo_clock_gettime(CLOCK_MONOTONIC, &now); + if (!twjit->got_first_packet) { + analytics_init(twjit, rx_seq); + twjit->got_first_packet = true; + } else if (rx_ssrc != twjit->last_ssrc) { + twjit->stats.ssrc_changes++; + analytics_init(twjit, rx_seq); + } else + analytics_cont(twjit, rx_seq, rx_ts, &now); twjit->last_ssrc = rx_ssrc; twjit->last_seq = rx_seq; twjit->last_ts = rx_ts; - twjit->got_first_packet = true; + memcpy(&twjit->last_arrival, &now, sizeof(struct timespec)); msg->cb[0] = rx_ts; switch (twjit->state) { @@ -205,28 +247,25 @@ /* first packet into totally empty buffer */ twjit->state = TWJIT_STATE_HUNT; twjit->write_sb = 0; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); return; case TWJIT_STATE_HUNT: case TWJIT_STATE_HANDOVER: - id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, - &new_time); + id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts); if (id == INPUT_TOO_OLD) { msgb_free(msg); return; } if (id == INPUT_RESET) { toss_write_queue(twjit); - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); return; } insert_pkt_write_sb(twjit, msg, rx_ts); trim_starting_sb(twjit); return; case TWJIT_STATE_FLOWING: - id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, - &new_time); + id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts); if (id == INPUT_TOO_OLD) { twjit->stats.too_old++; msgb_free(msg); @@ -235,8 +274,7 @@ if (id == INPUT_RESET) { twjit->state = TWJIT_STATE_HANDOVER; twjit->write_sb = !twjit->write_sb; - init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, - &new_time); + init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); twjit->stats.handovers++; return; }