changeset 9:117fa99ff871

twjit: implement proper analytics
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 23:49:37 +0000
parents 95f6c8ce33b0
children e60df79cbe9f
files include/twjit.h src/twjit_in.c
diffstat 2 files changed, 114 insertions(+), 62 deletions(-) [+]
line wrap: on
line diff
--- a/include/twjit.h	Fri Jul 05 21:57:51 2024 +0000
+++ b/include/twjit.h	Fri Jul 05 23:49:37 2024 +0000
@@ -53,6 +53,18 @@
 };
 
 /*
+ * Info collected from the incoming RTP data stream
+ * for the purpose of generating RTCP reception report blocks.
+ */
+struct twrtp_jibuf_rr_info {
+	uint32_t rx_packets;
+	uint32_t base_seq;
+	uint32_t max_seq_ext;
+	uint32_t expected_pkt;
+	uint32_t jitter_accum;
+};
+
+/*
  * Each twjit instance has two sub-buffers; each subbuf is a queue of
  * received RTP packets that have the same SSRC and whose timestamps
  * increment in the expected cadence, with each ts delta being an
@@ -63,9 +75,7 @@
 	uint32_t head_ts;
 	struct llist_head queue;
 	uint32_t depth;
-	/* last packet arrival time, used only in starting state */
-	struct timeval last_arrival;
-	uint32_t delta_ms;
+	uint32_t delta_ms;		/* used only in starting state */
 	/* thinning mechanism */
 	uint16_t drop_int_count;
 	/* running config for this subbuf */
@@ -101,11 +111,15 @@
 	struct twrtp_jibuf_sub sb[2];
 	uint8_t read_sb;	/* 0 or 1 */
 	uint8_t write_sb;	/* ditto */
-	/* Rx packet stream analysis */
+	/* info about the most recent Rx packet */
 	uint32_t last_ssrc;
 	uint32_t last_ts;
 	uint16_t last_seq;
 	bool got_first_packet;
+	struct timespec last_arrival;
+	uint32_t last_arrival_delta;
+	/* analytics for RTCP RR */
+	struct twrtp_jibuf_rr_info rr_info;
 	/* stats over lifetime of this instance */
 	struct twrtp_jibuf_stats stats;
 };
--- a/src/twjit_in.c	Fri Jul 05 21:57:51 2024 +0000
+++ b/src/twjit_in.c	Fri Jul 05 23:49:37 2024 +0000
@@ -17,19 +17,90 @@
 
 #include <themwi/rtp/twjit.h>
 
-static void get_current_time(struct timeval *tp)
+/* raw analytics on the Rx packet stream */
+
+static void analytics_init(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq)
 {
-	struct timespec now;
+	struct twrtp_jibuf_rr_info *rri = &twjit->rr_info;
+
+	rri->rx_packets = 1;
+	rri->base_seq = rx_seq;
+	rri->max_seq_ext = rx_seq;
+	rri->expected_pkt = 1;
+	rri->jitter_accum = 0;
+}
+
+static void analytics_cont(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq,
+			   uint32_t rx_ts, struct timespec *now)
+{
+	struct twrtp_jibuf_rr_info *rri = &twjit->rr_info;
+	uint16_t seq_ext_lo = rri->max_seq_ext;
+	uint16_t seq_ext_hi = rri->max_seq_ext >> 16;
+	int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq);
+	int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo);
+	int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts);
+	struct timespec time_delta;
+	uint32_t time_delta_tsu;
+	int32_t jitter_new, ts_delta_clamp;
 
-	osmo_clock_gettime(CLOCK_MONOTONIC, &now);
-	tp->tv_sec = now.tv_sec;
-	tp->tv_usec = now.tv_nsec / 1000;
+	/* analytics for our own stats */
+	if (seq_delta < 0)
+		twjit->stats.seq_backwards++;
+	else if (seq_delta == 0)
+		twjit->stats.seq_repeats++;
+	else if (seq_delta == 1) {
+		if (ts_delta != twjit->ts_quantum) {
+			if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
+				twjit->stats.intentional_gaps++;
+			else
+				twjit->stats.ts_resets++;
+		}
+	} else
+		twjit->stats.seq_skips++;
+
+	/* analytics for RTCP RR: packet counts */
+	rri->rx_packets++;
+	if (seq_delta2 > 0) {
+		if (rx_seq < seq_ext_lo)
+			seq_ext_hi++;
+		seq_ext_lo = rx_seq;
+		rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo;
+		rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1;
+	}
+
+	/* time-of-arrival analytics */
+	time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec;
+	time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec;
+	if (time_delta.tv_nsec < 0) {
+		time_delta.tv_sec--;
+		time_delta.tv_nsec += 1000000000;
+	}
+	/* to avoid overflows in downstream math, clamp to 1 hour */
+	if (time_delta.tv_sec >= 3600) {
+		time_delta.tv_sec = 3600;
+		time_delta.tv_nsec = 0;
+	}
+	/* convert to RTP timestamp units */
+	time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec +
+			 time_delta.tv_nsec / twjit->ns_to_ts_units;
+	twjit->last_arrival_delta = time_delta_tsu;
+	/* jitter calculation for RTCP RR */
+	ts_delta_clamp = twjit->ts_units_per_sec * 3600;
+	if (ts_delta > ts_delta_clamp)
+		ts_delta = ts_delta_clamp;
+	else if (ts_delta < -ts_delta_clamp)
+		ts_delta = -ts_delta_clamp;
+	jitter_new = time_delta_tsu - ts_delta;
+	if (jitter_new < 0)
+		jitter_new = -jitter_new;
+	rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4);
 }
 
+/* actual twjit input logic */
+
 static void
 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
-			 uint32_t rx_ssrc, uint32_t rx_ts,
-			 const struct timeval *new_time)
+			 uint32_t rx_ssrc, uint32_t rx_ts)
 {
 	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
 
@@ -40,7 +111,6 @@
 	sb->head_ts = rx_ts;
 	msgb_enqueue(&sb->queue, msg);
 	sb->depth = 1;
-	memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
 	memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
 	sb->drop_int_count = 0;
 }
@@ -53,21 +123,14 @@
 
 static enum input_decision
 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
-			uint32_t rx_ssrc, uint32_t rx_ts,
-			const struct timeval *new_time)
+			uint32_t rx_ssrc, uint32_t rx_ts)
 {
 	struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
-	struct timeval time_delta;
 	int32_t ts_delta;
 
 	if (rx_ssrc != sb->ssrc)
 		return INPUT_RESET;
-	if (starting) {
-		timersub(new_time, &sb->last_arrival, &time_delta);
-		sb->delta_ms = time_delta.tv_sec * 1000 +
-				time_delta.tv_usec / 1000;
-		memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
-	}
+	sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms;
 	ts_delta = (int32_t)(rx_ts - sb->head_ts);
 	if (ts_delta < 0)
 		return INPUT_TOO_OLD;
@@ -146,39 +209,12 @@
 	}
 }
 
-static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
-			     uint16_t rx_seq, uint32_t rx_ts)
-{
-	int16_t seq_delta;
-	int32_t ts_delta;
-
-	if (twjit->last_ssrc != rx_ssrc) {
-		twjit->stats.ssrc_changes++;
-		return;
-	}
-	seq_delta = (int16_t)(rx_seq - twjit->last_seq);
-	if (seq_delta < 0)
-		twjit->stats.seq_backwards++;
-	else if (seq_delta == 0)
-		twjit->stats.seq_repeats++;
-	else if (seq_delta == 1) {
-		ts_delta = (int32_t)(rx_ts - twjit->last_ts);
-		if (ts_delta != twjit->ts_quantum) {
-			if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
-				twjit->stats.intentional_gaps++;
-			else
-				twjit->stats.ts_resets++;
-		}
-	} else
-		twjit->stats.seq_skips++;
-}
-
 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
 {
 	struct rtp_hdr *rtph;
 	uint32_t rx_ssrc, rx_ts;
 	uint16_t rx_seq;
-	struct timeval new_time;
+	struct timespec now;
 	enum input_decision id;
 
 	rtph = osmo_rtp_get_hdr(msg);
@@ -191,13 +227,19 @@
 	rx_ssrc = ntohl(rtph->ssrc);
 	rx_ts = ntohl(rtph->timestamp);
 	rx_seq = ntohs(rtph->sequence);
-	get_current_time(&new_time);
-	if (twjit->got_first_packet)
-		rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts);
+	osmo_clock_gettime(CLOCK_MONOTONIC, &now);
+	if (!twjit->got_first_packet) {
+		analytics_init(twjit, rx_seq);
+		twjit->got_first_packet = true;
+	} else if (rx_ssrc != twjit->last_ssrc) {
+		twjit->stats.ssrc_changes++;
+		analytics_init(twjit, rx_seq);
+	} else
+		analytics_cont(twjit, rx_seq, rx_ts, &now);
 	twjit->last_ssrc = rx_ssrc;
 	twjit->last_seq = rx_seq;
 	twjit->last_ts = rx_ts;
-	twjit->got_first_packet = true;
+	memcpy(&twjit->last_arrival, &now, sizeof(struct timespec));
 	msg->cb[0] = rx_ts;
 
 	switch (twjit->state) {
@@ -205,28 +247,25 @@
 		/* first packet into totally empty buffer */
 		twjit->state = TWJIT_STATE_HUNT;
 		twjit->write_sb = 0;
-		init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time);
+		init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
 		return;
 	case TWJIT_STATE_HUNT:
 	case TWJIT_STATE_HANDOVER:
-		id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts,
-					    &new_time);
+		id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts);
 		if (id == INPUT_TOO_OLD) {
 			msgb_free(msg);
 			return;
 		}
 		if (id == INPUT_RESET) {
 			toss_write_queue(twjit);
-			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
-						 &new_time);
+			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
 			return;
 		}
 		insert_pkt_write_sb(twjit, msg, rx_ts);
 		trim_starting_sb(twjit);
 		return;
 	case TWJIT_STATE_FLOWING:
-		id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts,
-					    &new_time);
+		id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts);
 		if (id == INPUT_TOO_OLD) {
 			twjit->stats.too_old++;
 			msgb_free(msg);
@@ -235,8 +274,7 @@
 		if (id == INPUT_RESET) {
 			twjit->state = TWJIT_STATE_HANDOVER;
 			twjit->write_sb = !twjit->write_sb;
-			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts,
-						 &new_time);
+			init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
 			twjit->stats.handovers++;
 			return;
 		}