MCPcopy
hub / github.com/tanelpoder/0xtools / build_dynamic_query

Method build_dynamic_query

xtop/core/query_builder.py:138–261  ·  view source on GitHub ↗

Build a dynamic query based on requested columns. Args: group_cols: Columns to group by where_clause: WHERE clause conditions low_time: Start time for data range high_time: End time for data range latency_columns:

(self, 
                           group_cols: List[str],
                           where_clause: str = "1=1",
                           low_time: Optional[datetime] = None,
                           high_time: Optional[datetime] = None,
                           latency_columns: Optional[List[str]] = None,
                           limit: Optional[int] = None)

Source from the content-addressed store, hash-verified

136 return f"NULL AS {output_alias}"
137
138 def build_dynamic_query(self,
139 group_cols: List[str],
140 where_clause: str = "1=1",
141 low_time: Optional[datetime] = None,
142 high_time: Optional[datetime] = None,
143 latency_columns: Optional[List[str]] = None,
144 limit: Optional[int] = None) -> str:
145 """
146 Build a dynamic query based on requested columns.
147
148 Args:
149 group_cols: Columns to group by
150 where_clause: WHERE clause conditions
151 low_time: Start time for data range
152 high_time: End time for data range
153 latency_columns: Additional latency/aggregate columns to include
154 limit: Row limit for results
155
156 Returns:
157 Complete SQL query string
158 """
159 # Standardize column names to lowercase
160 group_cols = [col.lower() for col in group_cols]
161 # Determine all requested columns
162 all_columns = set(group_cols)
163 all_columns.update(['samples', 'avg_threads']) # Always include these
164 if latency_columns:
165 all_columns.update([col.lower() for col in latency_columns])
166
167 # Determine required data sources
168 required_sources = self._determine_required_sources(all_columns)
169
170 # Check for histogram requirements
171 # Check for histogram columns (case-insensitive)
172 all_columns_lower = {col.lower() for col in all_columns}
173 need_sc_histogram = 'sclat_histogram' in all_columns_lower
174 need_io_histogram = 'iolat_histogram' in all_columns_lower
175
176 # Build the query parts
177 ctes = []
178
179 # 1. Build enriched_samples CTE with all computed columns
180 enriched_cte = self._build_enriched_samples_cte(low_time, high_time)
181 ctes.append(f"enriched_samples AS (\n{enriched_cte}\n)")
182
183 # 2. Build base_samples CTE with JOINs and filters
184 base_cte = self._build_base_samples_cte(
185 required_sources, where_clause, low_time, high_time,
186 need_sc_histogram, need_io_histogram
187 )
188 ctes.append(f"base_samples AS (\n{base_cte}\n)")
189
190 # 3. Add histogram CTEs if needed
191 if need_sc_histogram:
192 hist_cte = self._build_histogram_cte(
193 'sc', group_cols, 'sc_duration_ns', 'sc_lat_bkt_us'
194 )
195 ctes.extend(hist_cte)

Callers

nothing calls this directly

Calls 7

_build_histogram_cteMethod · 0.95
_build_final_selectMethod · 0.95
_build_group_byMethod · 0.95

Tested by

no test coverage detected