#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Telegram Multi-Source Scraper (MySQL) ------------------------------------ 支持多个群组/频道作为数据源,统一入 MySQL 并带来源标识。 时间窗口和频率控制都从 config.json 读取。 """ import asyncio import json import logging import os import re from datetime import datetime, timezone import pymysql from telethon import TelegramClient # ======================= # 配置区域 # ======================= API_ID = 30954205 API_HASH = "cc422183f2ed2233dfa05ea7a25abe3f" SESSION_NAME = "scraper" CONFIG_FILE = "config.json" DEFAULT_SOURCES = ["@DeJob_Global_group"] INITIAL_BACKFILL_LIMIT = None PHONE_RE = re.compile(r"(? logging.Logger: os.makedirs("logs", exist_ok=True) logger = logging.getLogger("scraper") logger.setLevel(logging.INFO) if logger.handlers: return logger formatter = logging.Formatter( "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) file_handler = logging.FileHandler("logs/app.log", encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger logger = setup_logger() # ======================= # 配置读取 # ======================= def parse_datetime(raw: str, *, is_end: bool = False) -> datetime: raw = raw.strip() if "T" in raw or "+" in raw or raw.endswith("Z"): dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) elif len(raw) == 10: dt = datetime.strptime(raw, "%Y-%m-%d") if is_end: dt = dt.replace(hour=23, minute=59, second=59) else: dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt def load_runtime_config() -> tuple[ list[str], datetime | None, datetime | None, dict, dict, dict ]: if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") with open(CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) sources = cfg.get("sources", DEFAULT_SOURCES) if not isinstance(sources, list): raise ValueError("配置错误: sources 必须是数组") sources = [str(s).strip() for s in sources if str(s).strip()] if not sources: raise ValueError("配置错误: sources 不能为空") window = cfg.get("time_window", {}) if not isinstance(window, dict): raise ValueError("配置错误: time_window 必须是对象") enabled = bool(window.get("enabled", False)) start_raw = str(window.get("start", "") or "").strip() end_raw = str(window.get("end", "") or "").strip() if enabled: start_dt = parse_datetime(start_raw, is_end=False) if start_raw else None end_dt = parse_datetime(end_raw, is_end=True) if end_raw else None else: start_dt = None end_dt = None if start_dt and end_dt and start_dt > end_dt: raise ValueError("配置错误: time_window.start 不能晚于 time_window.end") throttle = cfg.get("throttle", {}) if not isinstance(throttle, dict): raise ValueError("配置错误: throttle 必须是对象") throttle_cfg = { "enabled": bool(throttle.get("enabled", True)), "per_message_delay_sec": float(throttle.get("per_message_delay_sec", 0.35)), "between_sources_delay_sec": float(throttle.get("between_sources_delay_sec", 2.0)), } mysql_cfg = cfg.get("mysql", {}) if not isinstance(mysql_cfg, dict): raise ValueError("配置错误: mysql 必须是对象") mysql_final = { "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), } if not mysql_final["password"]: raise ValueError("配置错误: mysql.password 不能为空") backfill = cfg.get("backfill", {}) if not isinstance(backfill, dict): raise ValueError("配置错误: backfill 必须是对象") backfill_enabled = bool(backfill.get("enabled", False)) backfill_start_raw = str(backfill.get("start", "") or "").strip() backfill_end_raw = str(backfill.get("end", "") or "").strip() backfill_sources = backfill.get("sources", []) if backfill_sources and not isinstance(backfill_sources, list): raise ValueError("配置错误: backfill.sources 必须是数组") backfill_sources = [str(s).strip() for s in backfill_sources if str(s).strip()] if backfill_enabled: bf_start = parse_datetime(backfill_start_raw, is_end=False) if backfill_start_raw else None bf_end = parse_datetime(backfill_end_raw, is_end=True) if backfill_end_raw else None if bf_start and bf_end and bf_start > bf_end: raise ValueError("配置错误: backfill.start 不能晚于 backfill.end") else: bf_start = None bf_end = None backfill_cfg = { "enabled": backfill_enabled, "start_dt": bf_start, "end_dt": bf_end, "sources": backfill_sources, "ignore_sync_state": bool(backfill.get("ignore_sync_state", True)), } return sources, start_dt, end_dt, throttle_cfg, mysql_final, backfill_cfg # ======================= # MySQL 存储 # ======================= class MySQLStore: def __init__(self, cfg: dict): self.cfg = cfg self.conn: pymysql.connections.Connection | None = None def connect(self): self.conn = pymysql.connect( host=self.cfg["host"], port=self.cfg["port"], user=self.cfg["user"], password=self.cfg["password"], database=self.cfg["database"], charset=self.cfg["charset"], autocommit=True, ) # Force session timestamps to UTC for NOW()/CURRENT_TIMESTAMP consistency. with self.conn.cursor() as cursor: cursor.execute("SET time_zone = '+00:00'") def close(self): if self.conn: self.conn.close() self.conn = None def _cursor(self): if not self.conn: raise RuntimeError("数据库未连接") return self.conn.cursor() def init_db(self): with self._cursor() as cursor: cursor.execute( """ CREATE TABLE IF NOT EXISTS messages ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source VARCHAR(255) NOT NULL, chat_id BIGINT NULL, message_id BIGINT NOT NULL, content LONGTEXT, date DATETIME NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_source_message (source, message_id), KEY idx_source_date (source, date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS sync_state ( source VARCHAR(255) PRIMARY KEY, last_message_id BIGINT NOT NULL DEFAULT 0, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) logger.info("MySQL 数据库初始化完成") def get_last_message_id(self, source: str) -> int: with self._cursor() as cursor: cursor.execute( "SELECT COALESCE(last_message_id, 0) FROM sync_state WHERE source = %s", (source,), ) row = cursor.fetchone() return int(row[0]) if row else 0 def set_last_message_id(self, source: str, message_id: int): with self._cursor() as cursor: cursor.execute( """ INSERT INTO sync_state (source, last_message_id, updated_at) VALUES (%s, %s, NOW()) ON DUPLICATE KEY UPDATE last_message_id = VALUES(last_message_id), updated_at = NOW() """, (source, message_id), ) def save_message( self, source: str, chat_id: int, message_id: int, content: str, date_str: str ) -> bool: with self._cursor() as cursor: cursor.execute( """ INSERT IGNORE INTO messages (source, chat_id, message_id, content, date) VALUES (%s, %s, %s, %s, %s) """, (source, chat_id, message_id, content, date_str), ) return cursor.rowcount == 1 # ======================= # 内容解析 # ======================= def _collect_text_values(obj, out): if obj is None: return if isinstance(obj, str): value = obj.strip() if value: out.add(value) return if isinstance(obj, dict): for v in obj.values(): _collect_text_values(v, out) return if isinstance(obj, (list, tuple, set)): for item in obj: _collect_text_values(item, out) def _to_safe_dict(obj): if obj is None: return None if hasattr(obj, "to_dict"): try: return obj.to_dict() except Exception: return {"raw": str(obj)} return {"raw": str(obj)} def build_message_content(message) -> str: text = (message.text or message.message or "").strip() parts = [] media = message.media action = message.action media_dict = _to_safe_dict(media) action_dict = _to_safe_dict(action) if text: parts.append(text) if media: parts.append(f"[MEDIA_TYPE] {type(media).__name__}") parts.append( "[MEDIA_JSON] " + json.dumps(media_dict, ensure_ascii=False, default=str) ) if action: parts.append(f"[ACTION_TYPE] {type(action).__name__}") parts.append( "[ACTION_JSON] " + json.dumps(action_dict, ensure_ascii=False, default=str) ) phones = set() if media and hasattr(media, "phone_number") and media.phone_number: phones.add(str(media.phone_number)) for raw in ( json.dumps(media_dict, ensure_ascii=False, default=str), json.dumps(action_dict, ensure_ascii=False, default=str), ): if raw and raw != "null": for hit in PHONE_RE.findall(raw): clean = re.sub(r"[^\d+]", "", hit) if clean: phones.add(clean) if phones: parts.append("phones=" + ",".join(sorted(phones))) extracted_texts = set() _collect_text_values(media_dict, extracted_texts) _collect_text_values(action_dict, extracted_texts) if extracted_texts: parts.append("[MEDIA_TEXT] " + " | ".join(sorted(extracted_texts))) if not parts: parts.append("empty_message") return "\n".join(parts) # ======================= # 抓取逻辑 # ======================= def _normalize_source_key(entity, raw_source: str) -> str: username = getattr(entity, "username", None) if username: return f"@{username}" return raw_source async def scrape_one_source( client, store: MySQLStore, raw_source: str, start_dt: datetime | None, end_dt: datetime | None, ignore_sync_state: bool, throttle_cfg: dict, ): try: entity = await client.get_entity(raw_source) except Exception as e: logger.error(f"[{raw_source}] 获取实体失败: {e}") return source_key = _normalize_source_key(entity, raw_source) chat_id = int(getattr(entity, "id", 0) or 0) logger.info(f"[{source_key}] 开始抓取") scanned = 0 inserted = 0 max_seen_id = 0 window_mode = bool(start_dt or end_dt) use_throttle = bool(throttle_cfg.get("enabled", True)) per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0)) if window_mode and ignore_sync_state: logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)") iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT) elif window_mode: # 用于日常窗口抓取,仍可依赖 sync_state 避免重复扫过大历史。 last_id = store.get_last_message_id(source_key) logger.info( f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), " f"message_id > {last_id}" ) iterator = client.iter_messages(entity, min_id=last_id, reverse=True) else: last_id = store.get_last_message_id(source_key) logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始") iterator = client.iter_messages(entity, min_id=last_id, reverse=True) async for message in iterator: scanned += 1 message_dt = message.date.astimezone(timezone.utc) if window_mode: if end_dt and message_dt > end_dt: continue if start_dt and message_dt < start_dt: break msg_id = int(message.id) msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S") content = build_message_content(message) if store.save_message(source_key, chat_id, msg_id, content, msg_date): inserted += 1 if msg_id > max_seen_id: max_seen_id = msg_id if scanned % 200 == 0: logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条") if use_throttle and per_message_delay > 0: await asyncio.sleep(per_message_delay) should_update_sync = (not window_mode) or (window_mode and not ignore_sync_state) if should_update_sync and max_seen_id > 0: old_last = store.get_last_message_id(source_key) if max_seen_id > old_last: store.set_last_message_id(source_key, max_seen_id) logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条") async def run_scraper( sources: list[str], start_dt: datetime | None, end_dt: datetime | None, ignore_sync_state: bool, throttle_cfg: dict, store: MySQLStore, ): client = TelegramClient(SESSION_NAME, API_ID, API_HASH) try: await client.start() logger.info("Telegram 客户端启动成功") except Exception as e: logger.error(f"Telegram 登录失败: {e}") return use_throttle = bool(throttle_cfg.get("enabled", True)) between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0)) for idx, source in enumerate(sources): await scrape_one_source( client, store, source, start_dt, end_dt, ignore_sync_state, throttle_cfg, ) if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1: logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控") await asyncio.sleep(between_sources_delay) await client.disconnect() # ======================= # 主程序入口 # ======================= def main(): ( sources, start_dt, end_dt, throttle_cfg, mysql_cfg, backfill_cfg, ) = load_runtime_config() if backfill_cfg["enabled"]: if backfill_cfg["sources"]: sources = backfill_cfg["sources"] start_dt = backfill_cfg["start_dt"] end_dt = backfill_cfg["end_dt"] ignore_sync_state = bool(backfill_cfg["ignore_sync_state"]) logger.info( "回补模式启用: " f"sources={sources}, start={start_dt}, end={end_dt}, " f"ignore_sync_state={ignore_sync_state}" ) else: ignore_sync_state = False logger.info("程序启动") logger.info(f"本次数据源: {sources}") if start_dt or end_dt: logger.info(f"时间窗口(UTC): start={start_dt}, end={end_dt}") logger.info( "频率控制: " f"enabled={throttle_cfg['enabled']}, " f"per_message_delay_sec={throttle_cfg['per_message_delay_sec']}, " f"between_sources_delay_sec={throttle_cfg['between_sources_delay_sec']}" ) store = MySQLStore(mysql_cfg) store.connect() try: store.init_db() asyncio.run( run_scraper( sources, start_dt, end_dt, ignore_sync_state, throttle_cfg, store, ) ) finally: store.close() logger.info("程序结束") if __name__ == "__main__": main()