MCPcopy
hub / github.com/PostHog/posthog / Trends

Class Trends

posthog/queries/trends/trends.py:34–254  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

32
33
34class Trends(TrendsTotalVolume, Lifecycle, TrendsFormula):
35 def _get_sql_for_entity(self, filter: Filter, team: Team, entity: Entity) -> Tuple[str, str, Dict, Callable]:
36 if filter.breakdown and filter.display not in NON_BREAKDOWN_DISPLAY_TYPES:
37 query_type = "trends_breakdown"
38 sql, params, parse_function = TrendsBreakdown(
39 entity, filter, team, using_person_on_events=team.person_on_events_querying_enabled
40 ).get_query()
41 elif filter.shown_as == TRENDS_LIFECYCLE:
42 query_type = "trends_lifecycle"
43 sql, params, parse_function = self._format_lifecycle_query(entity, filter, team)
44 else:
45 query_type = "trends_total_volume"
46 sql, params, parse_function = self._total_volume_query(entity, filter, team)
47
48 return query_type, sql, params, parse_function
49
50 # Use cached result even on refresh if team has strict caching enabled
51 def get_cached_result(self, filter: Filter, team: Team) -> Optional[List[Dict[str, Any]]]:
52
53 if not team.strict_caching_enabled or filter.breakdown or filter.display != TRENDS_LINEAR:
54 return None
55
56 cache_key = generate_cache_key(f"{filter.toJSON()}_{team.pk}")
57 cached_result_package = get_safe_cache(cache_key)
58 cached_result = (
59 cached_result_package.get("result")
60 if cached_result_package and isinstance(cached_result_package, dict)
61 else None
62 )
63
64 if not cached_result:
65 return None
66
67 _is_present = self.is_present_timerange(cached_result, filter, team)
68
69 return cached_result if _is_present else None
70
71 # Determine if the current timerange is present in the cache
72 def is_present_timerange(self, cached_result: List[Dict[str, Any]], filter: Filter, team: Team) -> bool:
73 if (
74 len(cached_result) > 0
75 and cached_result[0].get("days")
76 and cached_result[0].get("data")
77 and len(cached_result[0]["days"]) > 0
78 and len(cached_result[0]["days"]) == len(cached_result[0]["data"])
79 ):
80 latest_date = cached_result[0]["days"][len(cached_result[0]["days"]) - 1]
81
82 parsed_latest_date = parser.parse(latest_date)
83 parsed_latest_date = parsed_latest_date.replace(tzinfo=pytz.timezone(team.timezone))
84 _is_present = is_filter_date_present(filter, parsed_latest_date)
85 else:
86 _is_present = False
87
88 return _is_present
89
90 # Use a condensed filter if a cached result exists in the current timerange
91 def adjusted_filter(self, filter: Filter, team: Team) -> Tuple[Filter, Optional[Dict[str, Any]]]:

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…