Build base_samples CTE with JOINs and filters
(self, required_sources: Set[str],
where_clause: str,
low_time: Optional[datetime],
high_time: Optional[datetime],
need_sc_histogram: bool,
need_io_histogram: bool)
| 455 | FROM ({base_samples}) AS samples""" |
| 456 | |
| 457 | def _build_base_samples_cte(self, required_sources: Set[str], |
| 458 | where_clause: str, |
| 459 | low_time: Optional[datetime], |
| 460 | high_time: Optional[datetime], |
| 461 | need_sc_histogram: bool, |
| 462 | need_io_histogram: bool) -> str: |
| 463 | """Build base_samples CTE with JOINs and filters""" |
| 464 | # Start with SELECT columns |
| 465 | select_cols = ["es.*"] |
| 466 | |
| 467 | # Determine join availability based on schema info |
| 468 | syscend_join = 'syscend' in required_sources and self._has_columns('syscend', ['tid', 'sysc_seq_num']) |
| 469 | iorq_join = 'iorqend' in required_sources and self._has_columns('iorqend', ['insert_tid', 'iorq_seq_num']) |
| 470 | kstack_join = 'kstacks' in required_sources and self._has_column('kstacks', 'kstack_hash') |
| 471 | ustack_join = 'ustacks' in required_sources and self._has_column('ustacks', 'ustack_hash') |
| 472 | partitions_join = ( |
| 473 | 'partitions' in required_sources |
| 474 | and iorq_join |
| 475 | and self._has_columns('iorqend', ['dev_maj', 'dev_min']) |
| 476 | and self._has_columns('partitions', ['dev_maj', 'dev_min']) |
| 477 | ) |
| 478 | |
| 479 | def project(source: str, alias: str, column: str, output_alias: str, join_available: bool) -> str: |
| 480 | if not join_available: |
| 481 | return f"NULL AS {output_alias}" |
| 482 | return self._column_expr(source, alias, column, output_alias) |
| 483 | |
| 484 | # Add columns from joined sources (use NULL fallbacks when missing) |
| 485 | if 'syscend' in required_sources: |
| 486 | select_cols.append(project('syscend', 'sc', 'duration_ns', 'sc_duration_ns', syscend_join and self._has_column('syscend', 'duration_ns'))) |
| 487 | select_cols.append(project('syscend', 'sc', 'type', 'sc_type', syscend_join)) |
| 488 | if need_sc_histogram and syscend_join and self._has_column('syscend', 'duration_ns'): |
| 489 | bucket_calc = self.fragments.load('histogram_buckets') |
| 490 | bucket_calc = bucket_calc.replace('#DURATION_COLUMN#', 'sc.duration_ns') |
| 491 | bucket_calc = bucket_calc.replace('lat_bkt_us', 'sc_lat_bkt_us') |
| 492 | select_cols.append(bucket_calc) |
| 493 | elif need_sc_histogram: |
| 494 | select_cols.append('NULL AS sc_lat_bkt_us') |
| 495 | |
| 496 | if 'iorqend' in required_sources: |
| 497 | select_cols.append(project('iorqend', 'io', 'duration_ns', 'io_duration_ns', iorq_join and self._has_column('iorqend', 'duration_ns'))) |
| 498 | select_cols.append(project('iorqend', 'io', 'service_ns', 'io_service_ns', iorq_join and self._has_column('iorqend', 'service_ns'))) |
| 499 | select_cols.append(project('iorqend', 'io', 'queued_ns', 'io_queued_ns', iorq_join and self._has_column('iorqend', 'queued_ns'))) |
| 500 | select_cols.append(project('iorqend', 'io', 'bytes', 'io_bytes', iorq_join and self._has_column('iorqend', 'bytes'))) |
| 501 | select_cols.append(project('iorqend', 'io', 'dev_maj', 'io_dev_maj', iorq_join and self._has_column('iorqend', 'dev_maj'))) |
| 502 | select_cols.append(project('iorqend', 'io', 'dev_min', 'io_dev_min', iorq_join and self._has_column('iorqend', 'dev_min'))) |
| 503 | select_cols.append(project('iorqend', 'io', 'iorq_flags', 'iorq_flags', iorq_join and self._has_column('iorqend', 'iorq_flags'))) |
| 504 | if need_io_histogram and iorq_join and self._has_column('iorqend', 'duration_ns'): |
| 505 | bucket_calc = self.fragments.load('histogram_buckets') |
| 506 | bucket_calc = bucket_calc.replace('#DURATION_COLUMN#', 'io.duration_ns') |
| 507 | bucket_calc = bucket_calc.replace('lat_bkt_us', 'io_lat_bkt_us') |
| 508 | select_cols.append(bucket_calc) |
| 509 | elif need_io_histogram: |
| 510 | select_cols.append('NULL AS io_lat_bkt_us') |
| 511 | |
| 512 | if 'kstacks' in required_sources: |
| 513 | select_cols.append(project('kstacks', 'ks', 'kstack_hash', 'KSTACK_HASH', kstack_join)) |
| 514 | select_cols.append(project('kstacks', 'ks', 'kstack_syms', 'KSTACK_SYMS', kstack_join and self._has_column('kstacks', 'kstack_syms'))) |