MCPcopy
hub / github.com/sansan0/TrendRadar / LocalStorageBackend

Class LocalStorageBackend

trendradar/storage/local.py:26–494  ·  view source on GitHub ↗

本地存储后端 使用 SQLite 数据库存储新闻数据,支持: - 按日期组织的 SQLite 数据库文件 - 可选的 TXT 快照(用于调试) - HTML 报告生成

Source from the content-addressed store, hash-verified

24
25
26class LocalStorageBackend(SQLiteStorageMixin, StorageBackend):
27 """
28 本地存储后端
29
30 使用 SQLite 数据库存储新闻数据,支持:
31 - 按日期组织的 SQLite 数据库文件
32 - 可选的 TXT 快照(用于调试)
33 - HTML 报告生成
34 """
35
36 def __init__(
37 self,
38 data_dir: str = "output",
39 enable_txt: bool = True,
40 enable_html: bool = True,
41 timezone: str = DEFAULT_TIMEZONE,
42 ):
43 """
44 初始化本地存储后端
45
46 Args:
47 data_dir: 数据目录路径
48 enable_txt: 是否启用 TXT 快照
49 enable_html: 是否启用 HTML 报告
50 timezone: 时区配置
51 """
52 self.data_dir = Path(data_dir)
53 self.enable_txt = enable_txt
54 self.enable_html = enable_html
55 self.timezone = timezone
56 self._db_connections: Dict[str, sqlite3.Connection] = {}
57
58 @property
59 def backend_name(self) -> str:
60 return "local"
61
62 @property
63 def supports_txt(self) -> bool:
64 return self.enable_txt
65
66 # ========================================
67 # SQLiteStorageMixin 抽象方法实现
68 # ========================================
69
70 def _get_configured_time(self) -> datetime:
71 """获取配置时区的当前时间"""
72 return get_configured_time(self.timezone)
73
74 def _format_date_folder(self, date: Optional[str] = None) -> str:
75 """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
76 return format_date_folder(date, self.timezone)
77
78 def _format_time_filename(self) -> str:
79 """格式化时间文件名 (格式: HH-MM)"""
80 return format_time_filename(self.timezone)
81
82 def _get_db_path(self, date: Optional[str] = None, db_type: str = "news") -> Path:
83 """

Callers 2

trigger_crawlMethod · 0.90
get_backendMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected