comparison src/twjit_in.c @ 9:117fa99ff871

twjit: implement proper analytics
author Mychaela Falconia <falcon@freecalypso.org>
date Fri, 05 Jul 2024 23:49:37 +0000
parents 95f6c8ce33b0
children e60df79cbe9f
comparison
equal deleted inserted replaced
8:95f6c8ce33b0 9:117fa99ff871
15 /* FIXME: this libosmo-netif dependency needs to be removed */ 15 /* FIXME: this libosmo-netif dependency needs to be removed */
16 #include <osmocom/netif/rtp.h> 16 #include <osmocom/netif/rtp.h>
17 17
18 #include <themwi/rtp/twjit.h> 18 #include <themwi/rtp/twjit.h>
19 19
20 static void get_current_time(struct timeval *tp) 20 /* raw analytics on the Rx packet stream */
21 { 21
22 struct timespec now; 22 static void analytics_init(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq)
23 23 {
24 osmo_clock_gettime(CLOCK_MONOTONIC, &now); 24 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info;
25 tp->tv_sec = now.tv_sec; 25
26 tp->tv_usec = now.tv_nsec / 1000; 26 rri->rx_packets = 1;
27 } 27 rri->base_seq = rx_seq;
28 rri->max_seq_ext = rx_seq;
29 rri->expected_pkt = 1;
30 rri->jitter_accum = 0;
31 }
32
33 static void analytics_cont(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq,
34 uint32_t rx_ts, struct timespec *now)
35 {
36 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info;
37 uint16_t seq_ext_lo = rri->max_seq_ext;
38 uint16_t seq_ext_hi = rri->max_seq_ext >> 16;
39 int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq);
40 int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo);
41 int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts);
42 struct timespec time_delta;
43 uint32_t time_delta_tsu;
44 int32_t jitter_new, ts_delta_clamp;
45
46 /* analytics for our own stats */
47 if (seq_delta < 0)
48 twjit->stats.seq_backwards++;
49 else if (seq_delta == 0)
50 twjit->stats.seq_repeats++;
51 else if (seq_delta == 1) {
52 if (ts_delta != twjit->ts_quantum) {
53 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
54 twjit->stats.intentional_gaps++;
55 else
56 twjit->stats.ts_resets++;
57 }
58 } else
59 twjit->stats.seq_skips++;
60
61 /* analytics for RTCP RR: packet counts */
62 rri->rx_packets++;
63 if (seq_delta2 > 0) {
64 if (rx_seq < seq_ext_lo)
65 seq_ext_hi++;
66 seq_ext_lo = rx_seq;
67 rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo;
68 rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1;
69 }
70
71 /* time-of-arrival analytics */
72 time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec;
73 time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec;
74 if (time_delta.tv_nsec < 0) {
75 time_delta.tv_sec--;
76 time_delta.tv_nsec += 1000000000;
77 }
78 /* to avoid overflows in downstream math, clamp to 1 hour */
79 if (time_delta.tv_sec >= 3600) {
80 time_delta.tv_sec = 3600;
81 time_delta.tv_nsec = 0;
82 }
83 /* convert to RTP timestamp units */
84 time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec +
85 time_delta.tv_nsec / twjit->ns_to_ts_units;
86 twjit->last_arrival_delta = time_delta_tsu;
87 /* jitter calculation for RTCP RR */
88 ts_delta_clamp = twjit->ts_units_per_sec * 3600;
89 if (ts_delta > ts_delta_clamp)
90 ts_delta = ts_delta_clamp;
91 else if (ts_delta < -ts_delta_clamp)
92 ts_delta = -ts_delta_clamp;
93 jitter_new = time_delta_tsu - ts_delta;
94 if (jitter_new < 0)
95 jitter_new = -jitter_new;
96 rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4);
97 }
98
99 /* actual twjit input logic */
28 100
29 static void 101 static void
30 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, 102 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg,
31 uint32_t rx_ssrc, uint32_t rx_ts, 103 uint32_t rx_ssrc, uint32_t rx_ts)
32 const struct timeval *new_time)
33 { 104 {
34 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; 105 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
35 106
36 OSMO_ASSERT(llist_empty(&sb->queue)); 107 OSMO_ASSERT(llist_empty(&sb->queue));
37 OSMO_ASSERT(sb->depth == 0); 108 OSMO_ASSERT(sb->depth == 0);
38 /* all good, proceed */ 109 /* all good, proceed */
39 sb->ssrc = rx_ssrc; 110 sb->ssrc = rx_ssrc;
40 sb->head_ts = rx_ts; 111 sb->head_ts = rx_ts;
41 msgb_enqueue(&sb->queue, msg); 112 msgb_enqueue(&sb->queue, msg);
42 sb->depth = 1; 113 sb->depth = 1;
43 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval));
44 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); 114 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config));
45 sb->drop_int_count = 0; 115 sb->drop_int_count = 0;
46 } 116 }
47 117
48 enum input_decision { 118 enum input_decision {
51 INPUT_RESET, 121 INPUT_RESET,
52 }; 122 };
53 123
54 static enum input_decision 124 static enum input_decision
55 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, 125 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting,
56 uint32_t rx_ssrc, uint32_t rx_ts, 126 uint32_t rx_ssrc, uint32_t rx_ts)
57 const struct timeval *new_time) 127 {
58 { 128 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
59 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb];
60 struct timeval time_delta;
61 int32_t ts_delta; 129 int32_t ts_delta;
62 130
63 if (rx_ssrc != sb->ssrc) 131 if (rx_ssrc != sb->ssrc)
64 return INPUT_RESET; 132 return INPUT_RESET;
65 if (starting) { 133 sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms;
66 timersub(new_time, &sb->last_arrival, &time_delta);
67 sb->delta_ms = time_delta.tv_sec * 1000 +
68 time_delta.tv_usec / 1000;
69 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval));
70 }
71 ts_delta = (int32_t)(rx_ts - sb->head_ts); 134 ts_delta = (int32_t)(rx_ts - sb->head_ts);
72 if (ts_delta < 0) 135 if (ts_delta < 0)
73 return INPUT_TOO_OLD; 136 return INPUT_TOO_OLD;
74 if (ts_delta % twjit->ts_quantum) 137 if (ts_delta % twjit->ts_quantum)
75 return INPUT_RESET; 138 return INPUT_RESET;
144 sb->head_ts = msg_ts; 207 sb->head_ts = msg_ts;
145 sb->depth -= quantum_adv; 208 sb->depth -= quantum_adv;
146 } 209 }
147 } 210 }
148 211
149 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc,
150 uint16_t rx_seq, uint32_t rx_ts)
151 {
152 int16_t seq_delta;
153 int32_t ts_delta;
154
155 if (twjit->last_ssrc != rx_ssrc) {
156 twjit->stats.ssrc_changes++;
157 return;
158 }
159 seq_delta = (int16_t)(rx_seq - twjit->last_seq);
160 if (seq_delta < 0)
161 twjit->stats.seq_backwards++;
162 else if (seq_delta == 0)
163 twjit->stats.seq_repeats++;
164 else if (seq_delta == 1) {
165 ts_delta = (int32_t)(rx_ts - twjit->last_ts);
166 if (ts_delta != twjit->ts_quantum) {
167 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0)
168 twjit->stats.intentional_gaps++;
169 else
170 twjit->stats.ts_resets++;
171 }
172 } else
173 twjit->stats.seq_skips++;
174 }
175
176 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) 212 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg)
177 { 213 {
178 struct rtp_hdr *rtph; 214 struct rtp_hdr *rtph;
179 uint32_t rx_ssrc, rx_ts; 215 uint32_t rx_ssrc, rx_ts;
180 uint16_t rx_seq; 216 uint16_t rx_seq;
181 struct timeval new_time; 217 struct timespec now;
182 enum input_decision id; 218 enum input_decision id;
183 219
184 rtph = osmo_rtp_get_hdr(msg); 220 rtph = osmo_rtp_get_hdr(msg);
185 if (!rtph) { 221 if (!rtph) {
186 /* invalid packet, couldn't even get header from it */ 222 /* invalid packet, couldn't even get header from it */
189 return; 225 return;
190 } 226 }
191 rx_ssrc = ntohl(rtph->ssrc); 227 rx_ssrc = ntohl(rtph->ssrc);
192 rx_ts = ntohl(rtph->timestamp); 228 rx_ts = ntohl(rtph->timestamp);
193 rx_seq = ntohs(rtph->sequence); 229 rx_seq = ntohs(rtph->sequence);
194 get_current_time(&new_time); 230 osmo_clock_gettime(CLOCK_MONOTONIC, &now);
195 if (twjit->got_first_packet) 231 if (!twjit->got_first_packet) {
196 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); 232 analytics_init(twjit, rx_seq);
233 twjit->got_first_packet = true;
234 } else if (rx_ssrc != twjit->last_ssrc) {
235 twjit->stats.ssrc_changes++;
236 analytics_init(twjit, rx_seq);
237 } else
238 analytics_cont(twjit, rx_seq, rx_ts, &now);
197 twjit->last_ssrc = rx_ssrc; 239 twjit->last_ssrc = rx_ssrc;
198 twjit->last_seq = rx_seq; 240 twjit->last_seq = rx_seq;
199 twjit->last_ts = rx_ts; 241 twjit->last_ts = rx_ts;
200 twjit->got_first_packet = true; 242 memcpy(&twjit->last_arrival, &now, sizeof(struct timespec));
201 msg->cb[0] = rx_ts; 243 msg->cb[0] = rx_ts;
202 244
203 switch (twjit->state) { 245 switch (twjit->state) {
204 case TWJIT_STATE_EMPTY: 246 case TWJIT_STATE_EMPTY:
205 /* first packet into totally empty buffer */ 247 /* first packet into totally empty buffer */
206 twjit->state = TWJIT_STATE_HUNT; 248 twjit->state = TWJIT_STATE_HUNT;
207 twjit->write_sb = 0; 249 twjit->write_sb = 0;
208 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); 250 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
209 return; 251 return;
210 case TWJIT_STATE_HUNT: 252 case TWJIT_STATE_HUNT:
211 case TWJIT_STATE_HANDOVER: 253 case TWJIT_STATE_HANDOVER:
212 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, 254 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts);
213 &new_time);
214 if (id == INPUT_TOO_OLD) { 255 if (id == INPUT_TOO_OLD) {
215 msgb_free(msg); 256 msgb_free(msg);
216 return; 257 return;
217 } 258 }
218 if (id == INPUT_RESET) { 259 if (id == INPUT_RESET) {
219 toss_write_queue(twjit); 260 toss_write_queue(twjit);
220 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, 261 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
221 &new_time);
222 return; 262 return;
223 } 263 }
224 insert_pkt_write_sb(twjit, msg, rx_ts); 264 insert_pkt_write_sb(twjit, msg, rx_ts);
225 trim_starting_sb(twjit); 265 trim_starting_sb(twjit);
226 return; 266 return;
227 case TWJIT_STATE_FLOWING: 267 case TWJIT_STATE_FLOWING:
228 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, 268 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts);
229 &new_time);
230 if (id == INPUT_TOO_OLD) { 269 if (id == INPUT_TOO_OLD) {
231 twjit->stats.too_old++; 270 twjit->stats.too_old++;
232 msgb_free(msg); 271 msgb_free(msg);
233 return; 272 return;
234 } 273 }
235 if (id == INPUT_RESET) { 274 if (id == INPUT_RESET) {
236 twjit->state = TWJIT_STATE_HANDOVER; 275 twjit->state = TWJIT_STATE_HANDOVER;
237 twjit->write_sb = !twjit->write_sb; 276 twjit->write_sb = !twjit->write_sb;
238 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, 277 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts);
239 &new_time);
240 twjit->stats.handovers++; 278 twjit->stats.handovers++;
241 return; 279 return;
242 } 280 }
243 insert_pkt_write_sb(twjit, msg, rx_ts); 281 insert_pkt_write_sb(twjit, msg, rx_ts);
244 return; 282 return;