FreeCalypso > hg > themwi-rtp-lib
comparison src/twjit_in.c @ 9:117fa99ff871
twjit: implement proper analytics
author | Mychaela Falconia <falcon@freecalypso.org> |
---|---|
date | Fri, 05 Jul 2024 23:49:37 +0000 |
parents | 95f6c8ce33b0 |
children | e60df79cbe9f |
comparison
equal
deleted
inserted
replaced
8:95f6c8ce33b0 | 9:117fa99ff871 |
---|---|
15 /* FIXME: this libosmo-netif dependency needs to be removed */ | 15 /* FIXME: this libosmo-netif dependency needs to be removed */ |
16 #include <osmocom/netif/rtp.h> | 16 #include <osmocom/netif/rtp.h> |
17 | 17 |
18 #include <themwi/rtp/twjit.h> | 18 #include <themwi/rtp/twjit.h> |
19 | 19 |
20 static void get_current_time(struct timeval *tp) | 20 /* raw analytics on the Rx packet stream */ |
21 { | 21 |
22 struct timespec now; | 22 static void analytics_init(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq) |
23 | 23 { |
24 osmo_clock_gettime(CLOCK_MONOTONIC, &now); | 24 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; |
25 tp->tv_sec = now.tv_sec; | 25 |
26 tp->tv_usec = now.tv_nsec / 1000; | 26 rri->rx_packets = 1; |
27 } | 27 rri->base_seq = rx_seq; |
28 rri->max_seq_ext = rx_seq; | |
29 rri->expected_pkt = 1; | |
30 rri->jitter_accum = 0; | |
31 } | |
32 | |
33 static void analytics_cont(struct twrtp_jibuf_inst *twjit, uint16_t rx_seq, | |
34 uint32_t rx_ts, struct timespec *now) | |
35 { | |
36 struct twrtp_jibuf_rr_info *rri = &twjit->rr_info; | |
37 uint16_t seq_ext_lo = rri->max_seq_ext; | |
38 uint16_t seq_ext_hi = rri->max_seq_ext >> 16; | |
39 int16_t seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
40 int16_t seq_delta2 = (int16_t)(rx_seq - seq_ext_lo); | |
41 int32_t ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
42 struct timespec time_delta; | |
43 uint32_t time_delta_tsu; | |
44 int32_t jitter_new, ts_delta_clamp; | |
45 | |
46 /* analytics for our own stats */ | |
47 if (seq_delta < 0) | |
48 twjit->stats.seq_backwards++; | |
49 else if (seq_delta == 0) | |
50 twjit->stats.seq_repeats++; | |
51 else if (seq_delta == 1) { | |
52 if (ts_delta != twjit->ts_quantum) { | |
53 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
54 twjit->stats.intentional_gaps++; | |
55 else | |
56 twjit->stats.ts_resets++; | |
57 } | |
58 } else | |
59 twjit->stats.seq_skips++; | |
60 | |
61 /* analytics for RTCP RR: packet counts */ | |
62 rri->rx_packets++; | |
63 if (seq_delta2 > 0) { | |
64 if (rx_seq < seq_ext_lo) | |
65 seq_ext_hi++; | |
66 seq_ext_lo = rx_seq; | |
67 rri->max_seq_ext = ((uint32_t) seq_ext_hi << 16) | seq_ext_lo; | |
68 rri->expected_pkt = rri->max_seq_ext - rri->base_seq + 1; | |
69 } | |
70 | |
71 /* time-of-arrival analytics */ | |
72 time_delta.tv_sec = now->tv_sec - twjit->last_arrival.tv_sec; | |
73 time_delta.tv_nsec = now->tv_nsec - twjit->last_arrival.tv_nsec; | |
74 if (time_delta.tv_nsec < 0) { | |
75 time_delta.tv_sec--; | |
76 time_delta.tv_nsec += 1000000000; | |
77 } | |
78 /* to avoid overflows in downstream math, clamp to 1 hour */ | |
79 if (time_delta.tv_sec >= 3600) { | |
80 time_delta.tv_sec = 3600; | |
81 time_delta.tv_nsec = 0; | |
82 } | |
83 /* convert to RTP timestamp units */ | |
84 time_delta_tsu = time_delta.tv_sec * twjit->ts_units_per_sec + | |
85 time_delta.tv_nsec / twjit->ns_to_ts_units; | |
86 twjit->last_arrival_delta = time_delta_tsu; | |
87 /* jitter calculation for RTCP RR */ | |
88 ts_delta_clamp = twjit->ts_units_per_sec * 3600; | |
89 if (ts_delta > ts_delta_clamp) | |
90 ts_delta = ts_delta_clamp; | |
91 else if (ts_delta < -ts_delta_clamp) | |
92 ts_delta = -ts_delta_clamp; | |
93 jitter_new = time_delta_tsu - ts_delta; | |
94 if (jitter_new < 0) | |
95 jitter_new = -jitter_new; | |
96 rri->jitter_accum += jitter_new - ((rri->jitter_accum + 8) >> 4); | |
97 } | |
98 | |
99 /* actual twjit input logic */ | |
28 | 100 |
29 static void | 101 static void |
30 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, | 102 init_subbuf_first_packet(struct twrtp_jibuf_inst *twjit, struct msgb *msg, |
31 uint32_t rx_ssrc, uint32_t rx_ts, | 103 uint32_t rx_ssrc, uint32_t rx_ts) |
32 const struct timeval *new_time) | |
33 { | 104 { |
34 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | 105 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; |
35 | 106 |
36 OSMO_ASSERT(llist_empty(&sb->queue)); | 107 OSMO_ASSERT(llist_empty(&sb->queue)); |
37 OSMO_ASSERT(sb->depth == 0); | 108 OSMO_ASSERT(sb->depth == 0); |
38 /* all good, proceed */ | 109 /* all good, proceed */ |
39 sb->ssrc = rx_ssrc; | 110 sb->ssrc = rx_ssrc; |
40 sb->head_ts = rx_ts; | 111 sb->head_ts = rx_ts; |
41 msgb_enqueue(&sb->queue, msg); | 112 msgb_enqueue(&sb->queue, msg); |
42 sb->depth = 1; | 113 sb->depth = 1; |
43 memcpy(&sb->last_arrival, &new_time, sizeof(struct timeval)); | |
44 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); | 114 memcpy(&sb->conf, twjit->ext_config, sizeof(struct twrtp_jibuf_config)); |
45 sb->drop_int_count = 0; | 115 sb->drop_int_count = 0; |
46 } | 116 } |
47 | 117 |
48 enum input_decision { | 118 enum input_decision { |
51 INPUT_RESET, | 121 INPUT_RESET, |
52 }; | 122 }; |
53 | 123 |
54 static enum input_decision | 124 static enum input_decision |
55 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, | 125 check_input_for_subbuf(struct twrtp_jibuf_inst *twjit, bool starting, |
56 uint32_t rx_ssrc, uint32_t rx_ts, | 126 uint32_t rx_ssrc, uint32_t rx_ts) |
57 const struct timeval *new_time) | 127 { |
58 { | 128 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; |
59 struct twrtp_jibuf_sub *sb = &twjit->sb[twjit->write_sb]; | |
60 struct timeval time_delta; | |
61 int32_t ts_delta; | 129 int32_t ts_delta; |
62 | 130 |
63 if (rx_ssrc != sb->ssrc) | 131 if (rx_ssrc != sb->ssrc) |
64 return INPUT_RESET; | 132 return INPUT_RESET; |
65 if (starting) { | 133 sb->delta_ms = twjit->last_arrival_delta / twjit->ts_units_per_ms; |
66 timersub(new_time, &sb->last_arrival, &time_delta); | |
67 sb->delta_ms = time_delta.tv_sec * 1000 + | |
68 time_delta.tv_usec / 1000; | |
69 memcpy(&sb->last_arrival, new_time, sizeof(struct timeval)); | |
70 } | |
71 ts_delta = (int32_t)(rx_ts - sb->head_ts); | 134 ts_delta = (int32_t)(rx_ts - sb->head_ts); |
72 if (ts_delta < 0) | 135 if (ts_delta < 0) |
73 return INPUT_TOO_OLD; | 136 return INPUT_TOO_OLD; |
74 if (ts_delta % twjit->ts_quantum) | 137 if (ts_delta % twjit->ts_quantum) |
75 return INPUT_RESET; | 138 return INPUT_RESET; |
144 sb->head_ts = msg_ts; | 207 sb->head_ts = msg_ts; |
145 sb->depth -= quantum_adv; | 208 sb->depth -= quantum_adv; |
146 } | 209 } |
147 } | 210 } |
148 | 211 |
149 static void rx_raw_analytics(struct twrtp_jibuf_inst *twjit, uint32_t rx_ssrc, | |
150 uint16_t rx_seq, uint32_t rx_ts) | |
151 { | |
152 int16_t seq_delta; | |
153 int32_t ts_delta; | |
154 | |
155 if (twjit->last_ssrc != rx_ssrc) { | |
156 twjit->stats.ssrc_changes++; | |
157 return; | |
158 } | |
159 seq_delta = (int16_t)(rx_seq - twjit->last_seq); | |
160 if (seq_delta < 0) | |
161 twjit->stats.seq_backwards++; | |
162 else if (seq_delta == 0) | |
163 twjit->stats.seq_repeats++; | |
164 else if (seq_delta == 1) { | |
165 ts_delta = (int32_t)(rx_ts - twjit->last_ts); | |
166 if (ts_delta != twjit->ts_quantum) { | |
167 if (ts_delta > 0 && (ts_delta % twjit->ts_quantum) == 0) | |
168 twjit->stats.intentional_gaps++; | |
169 else | |
170 twjit->stats.ts_resets++; | |
171 } | |
172 } else | |
173 twjit->stats.seq_skips++; | |
174 } | |
175 | |
176 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) | 212 void twrtp_jibuf_input(struct twrtp_jibuf_inst *twjit, struct msgb *msg) |
177 { | 213 { |
178 struct rtp_hdr *rtph; | 214 struct rtp_hdr *rtph; |
179 uint32_t rx_ssrc, rx_ts; | 215 uint32_t rx_ssrc, rx_ts; |
180 uint16_t rx_seq; | 216 uint16_t rx_seq; |
181 struct timeval new_time; | 217 struct timespec now; |
182 enum input_decision id; | 218 enum input_decision id; |
183 | 219 |
184 rtph = osmo_rtp_get_hdr(msg); | 220 rtph = osmo_rtp_get_hdr(msg); |
185 if (!rtph) { | 221 if (!rtph) { |
186 /* invalid packet, couldn't even get header from it */ | 222 /* invalid packet, couldn't even get header from it */ |
189 return; | 225 return; |
190 } | 226 } |
191 rx_ssrc = ntohl(rtph->ssrc); | 227 rx_ssrc = ntohl(rtph->ssrc); |
192 rx_ts = ntohl(rtph->timestamp); | 228 rx_ts = ntohl(rtph->timestamp); |
193 rx_seq = ntohs(rtph->sequence); | 229 rx_seq = ntohs(rtph->sequence); |
194 get_current_time(&new_time); | 230 osmo_clock_gettime(CLOCK_MONOTONIC, &now); |
195 if (twjit->got_first_packet) | 231 if (!twjit->got_first_packet) { |
196 rx_raw_analytics(twjit, rx_ssrc, rx_seq, rx_ts); | 232 analytics_init(twjit, rx_seq); |
233 twjit->got_first_packet = true; | |
234 } else if (rx_ssrc != twjit->last_ssrc) { | |
235 twjit->stats.ssrc_changes++; | |
236 analytics_init(twjit, rx_seq); | |
237 } else | |
238 analytics_cont(twjit, rx_seq, rx_ts, &now); | |
197 twjit->last_ssrc = rx_ssrc; | 239 twjit->last_ssrc = rx_ssrc; |
198 twjit->last_seq = rx_seq; | 240 twjit->last_seq = rx_seq; |
199 twjit->last_ts = rx_ts; | 241 twjit->last_ts = rx_ts; |
200 twjit->got_first_packet = true; | 242 memcpy(&twjit->last_arrival, &now, sizeof(struct timespec)); |
201 msg->cb[0] = rx_ts; | 243 msg->cb[0] = rx_ts; |
202 | 244 |
203 switch (twjit->state) { | 245 switch (twjit->state) { |
204 case TWJIT_STATE_EMPTY: | 246 case TWJIT_STATE_EMPTY: |
205 /* first packet into totally empty buffer */ | 247 /* first packet into totally empty buffer */ |
206 twjit->state = TWJIT_STATE_HUNT; | 248 twjit->state = TWJIT_STATE_HUNT; |
207 twjit->write_sb = 0; | 249 twjit->write_sb = 0; |
208 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, &new_time); | 250 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
209 return; | 251 return; |
210 case TWJIT_STATE_HUNT: | 252 case TWJIT_STATE_HUNT: |
211 case TWJIT_STATE_HANDOVER: | 253 case TWJIT_STATE_HANDOVER: |
212 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts, | 254 id = check_input_for_subbuf(twjit, true, rx_ssrc, rx_ts); |
213 &new_time); | |
214 if (id == INPUT_TOO_OLD) { | 255 if (id == INPUT_TOO_OLD) { |
215 msgb_free(msg); | 256 msgb_free(msg); |
216 return; | 257 return; |
217 } | 258 } |
218 if (id == INPUT_RESET) { | 259 if (id == INPUT_RESET) { |
219 toss_write_queue(twjit); | 260 toss_write_queue(twjit); |
220 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | 261 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
221 &new_time); | |
222 return; | 262 return; |
223 } | 263 } |
224 insert_pkt_write_sb(twjit, msg, rx_ts); | 264 insert_pkt_write_sb(twjit, msg, rx_ts); |
225 trim_starting_sb(twjit); | 265 trim_starting_sb(twjit); |
226 return; | 266 return; |
227 case TWJIT_STATE_FLOWING: | 267 case TWJIT_STATE_FLOWING: |
228 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts, | 268 id = check_input_for_subbuf(twjit, false, rx_ssrc, rx_ts); |
229 &new_time); | |
230 if (id == INPUT_TOO_OLD) { | 269 if (id == INPUT_TOO_OLD) { |
231 twjit->stats.too_old++; | 270 twjit->stats.too_old++; |
232 msgb_free(msg); | 271 msgb_free(msg); |
233 return; | 272 return; |
234 } | 273 } |
235 if (id == INPUT_RESET) { | 274 if (id == INPUT_RESET) { |
236 twjit->state = TWJIT_STATE_HANDOVER; | 275 twjit->state = TWJIT_STATE_HANDOVER; |
237 twjit->write_sb = !twjit->write_sb; | 276 twjit->write_sb = !twjit->write_sb; |
238 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts, | 277 init_subbuf_first_packet(twjit, msg, rx_ssrc, rx_ts); |
239 &new_time); | |
240 twjit->stats.handovers++; | 278 twjit->stats.handovers++; |
241 return; | 279 return; |
242 } | 280 } |
243 insert_pkt_write_sb(twjit, msg, rx_ts); | 281 insert_pkt_write_sb(twjit, msg, rx_ts); |
244 return; | 282 return; |