#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Telegram Multi-Source Scraper (MySQL) ------------------------------------ 支持多个群组/频道作为数据源,统一入 MySQL 并带来源标识。 时间窗口和频率控制都从 config.json 读取。 """ import asyncio import json import logging import os import re from datetime import datetime, timezone import pymysql from telethon import TelegramClient # ======================= # 配置区域 # ======================= API_ID = 30954205 API_HASH = "cc422183f2ed2233dfa05ea7a25abe3f" SESSION_NAME = "scraper" CONFIG_FILE = "config.json" DEFAULT_SOURCES = ["@DeJob_Global_group"] INITIAL_BACKFILL_LIMIT = None PHONE_RE = re.compile(r"(? logging.Logger: os.makedirs("logs", exist_ok=True) logger = logging.getLogger("scraper") logger.setLevel(logging.INFO) if logger.handlers: return logger formatter = logging.Formatter( "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) file_handler = logging.FileHandler("logs/app.log", encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger logger = setup_logger() # ======================= # 配置读取 # ======================= def parse_datetime(raw: str, *, is_end: bool = False) -> datetime: raw = raw.strip() if "T" in raw or "+" in raw or raw.endswith("Z"): dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) elif len(raw) == 10: dt = datetime.strptime(raw, "%Y-%m-%d") if is_end: dt = dt.replace(hour=23, minute=59, second=59) else: dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt def load_runtime_config() -> tuple[list[str], datetime | None, datetime | None, dict, dict]: if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") with open(CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) sources = cfg.get("sources", DEFAULT_SOURCES) if not isinstance(sources, list): raise ValueError("配置错误: sources 必须是数组") sources = [str(s).strip() for s in sources if str(s).strip()] if not sources: raise ValueError("配置错误: sources 不能为空") window = cfg.get("time_window", {}) if not isinstance(window, dict): raise ValueError("配置错误: time_window 必须是对象") enabled = bool(window.get("enabled", False)) start_raw = str(window.get("start", "") or "").strip() end_raw = str(window.get("end", "") or "").strip() if enabled: start_dt = parse_datetime(start_raw, is_end=False) if start_raw else None end_dt = parse_datetime(end_raw, is_end=True) if end_raw else None else: start_dt = None end_dt = None if start_dt and end_dt and start_dt > end_dt: raise ValueError("配置错误: time_window.start 不能晚于 time_window.end") throttle = cfg.get("throttle", {}) if not isinstance(throttle, dict): raise ValueError("配置错误: throttle 必须是对象") throttle_cfg = { "enabled": bool(throttle.get("enabled", True)), "per_message_delay_sec": float(throttle.get("per_message_delay_sec", 0.35)), "between_sources_delay_sec": float(throttle.get("between_sources_delay_sec", 2.0)), } mysql_cfg = cfg.get("mysql", {}) if not isinstance(mysql_cfg, dict): raise ValueError("配置错误: mysql 必须是对象") mysql_final = { "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), } if not mysql_final["password"]: raise ValueError("配置错误: mysql.password 不能为空") return sources, start_dt, end_dt, throttle_cfg, mysql_final # ======================= # MySQL 存储 # ======================= class MySQLStore: def __init__(self, cfg: dict): self.cfg = cfg self.conn: pymysql.connections.Connection | None = None def connect(self): self.conn = pymysql.connect( host=self.cfg["host"], port=self.cfg["port"], user=self.cfg["user"], password=self.cfg["password"], database=self.cfg["database"], charset=self.cfg["charset"], autocommit=True, ) def close(self): if self.conn: self.conn.close() self.conn = None def _cursor(self): if not self.conn: raise RuntimeError("数据库未连接") return self.conn.cursor() def init_db(self): with self._cursor() as cursor: cursor.execute( """ CREATE TABLE IF NOT EXISTS messages ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source VARCHAR(255) NOT NULL, chat_id BIGINT NULL, message_id BIGINT NOT NULL, content LONGTEXT, date DATETIME NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_source_message (source, message_id), KEY idx_source_date (source, date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS sync_state ( source VARCHAR(255) PRIMARY KEY, last_message_id BIGINT NOT NULL DEFAULT 0, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) logger.info("MySQL 数据库初始化完成") def get_last_message_id(self, source: str) -> int: with self._cursor() as cursor: cursor.execute( "SELECT COALESCE(last_message_id, 0) FROM sync_state WHERE source = %s", (source,), ) row = cursor.fetchone() return int(row[0]) if row else 0 def set_last_message_id(self, source: str, message_id: int): with self._cursor() as cursor: cursor.execute( """ INSERT INTO sync_state (source, last_message_id, updated_at) VALUES (%s, %s, NOW()) ON DUPLICATE KEY UPDATE last_message_id = VALUES(last_message_id), updated_at = NOW() """, (source, message_id), ) def save_message( self, source: str, chat_id: int, message_id: int, content: str, date_str: str ) -> bool: with self._cursor() as cursor: cursor.execute( """ INSERT IGNORE INTO messages (source, chat_id, message_id, content, date) VALUES (%s, %s, %s, %s, %s) """, (source, chat_id, message_id, content, date_str), ) return cursor.rowcount == 1 # ======================= # 内容解析 # ======================= def _collect_text_values(obj, out): if obj is None: return if isinstance(obj, str): value = obj.strip() if value: out.add(value) return if isinstance(obj, dict): for v in obj.values(): _collect_text_values(v, out) return if isinstance(obj, (list, tuple, set)): for item in obj: _collect_text_values(item, out) def _to_safe_dict(obj): if obj is None: return None if hasattr(obj, "to_dict"): try: return obj.to_dict() except Exception: return {"raw": str(obj)} return {"raw": str(obj)} def build_message_content(message) -> str: text = (message.text or message.message or "").strip() parts = [] media = message.media action = message.action media_dict = _to_safe_dict(media) action_dict = _to_safe_dict(action) if text: parts.append(text) if media: parts.append(f"[MEDIA_TYPE] {type(media).__name__}") parts.append( "[MEDIA_JSON] " + json.dumps(media_dict, ensure_ascii=False, default=str) ) if action: parts.append(f"[ACTION_TYPE] {type(action).__name__}") parts.append( "[ACTION_JSON] " + json.dumps(action_dict, ensure_ascii=False, default=str) ) phones = set() if media and hasattr(media, "phone_number") and media.phone_number: phones.add(str(media.phone_number)) for raw in ( json.dumps(media_dict, ensure_ascii=False, default=str), json.dumps(action_dict, ensure_ascii=False, default=str), ): if raw and raw != "null": for hit in PHONE_RE.findall(raw): clean = re.sub(r"[^\d+]", "", hit) if clean: phones.add(clean) if phones: parts.append("phones=" + ",".join(sorted(phones))) extracted_texts = set() _collect_text_values(media_dict, extracted_texts) _collect_text_values(action_dict, extracted_texts) if extracted_texts: parts.append("[MEDIA_TEXT] " + " | ".join(sorted(extracted_texts))) if not parts: parts.append("empty_message") return "\n".join(parts) # ======================= # 抓取逻辑 # ======================= def _normalize_source_key(entity, raw_source: str) -> str: username = getattr(entity, "username", None) if username: return f"@{username}" return raw_source async def scrape_one_source( client, store: MySQLStore, raw_source: str, start_dt: datetime | None, end_dt: datetime | None, throttle_cfg: dict, ): try: entity = await client.get_entity(raw_source) except Exception as e: logger.error(f"[{raw_source}] 获取实体失败: {e}") return source_key = _normalize_source_key(entity, raw_source) chat_id = int(getattr(entity, "id", 0) or 0) logger.info(f"[{source_key}] 开始抓取") scanned = 0 inserted = 0 max_seen_id = 0 window_mode = bool(start_dt or end_dt) use_throttle = bool(throttle_cfg.get("enabled", True)) per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0)) if window_mode: logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)") iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT) else: last_id = store.get_last_message_id(source_key) logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始") iterator = client.iter_messages(entity, min_id=last_id, reverse=True) async for message in iterator: scanned += 1 message_dt = message.date.astimezone(timezone.utc) if window_mode: if end_dt and message_dt > end_dt: continue if start_dt and message_dt < start_dt: break msg_id = int(message.id) msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S") content = build_message_content(message) if store.save_message(source_key, chat_id, msg_id, content, msg_date): inserted += 1 if msg_id > max_seen_id: max_seen_id = msg_id if scanned % 200 == 0: logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条") if use_throttle and per_message_delay > 0: await asyncio.sleep(per_message_delay) if not window_mode and max_seen_id > 0: old_last = store.get_last_message_id(source_key) if max_seen_id > old_last: store.set_last_message_id(source_key, max_seen_id) logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条") async def run_scraper( sources: list[str], start_dt: datetime | None, end_dt: datetime | None, throttle_cfg: dict, store: MySQLStore, ): client = TelegramClient(SESSION_NAME, API_ID, API_HASH) try: await client.start() logger.info("Telegram 客户端启动成功") except Exception as e: logger.error(f"Telegram 登录失败: {e}") return use_throttle = bool(throttle_cfg.get("enabled", True)) between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0)) for idx, source in enumerate(sources): await scrape_one_source(client, store, source, start_dt, end_dt, throttle_cfg) if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1: logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控") await asyncio.sleep(between_sources_delay) await client.disconnect() # ======================= # 主程序入口 # ======================= def main(): sources, start_dt, end_dt, throttle_cfg, mysql_cfg = load_runtime_config() logger.info("程序启动") logger.info(f"本次数据源: {sources}") if start_dt or end_dt: logger.info(f"时间窗口(UTC): start={start_dt}, end={end_dt}") logger.info( "频率控制: " f"enabled={throttle_cfg['enabled']}, " f"per_message_delay_sec={throttle_cfg['per_message_delay_sec']}, " f"between_sources_delay_sec={throttle_cfg['between_sources_delay_sec']}" ) store = MySQLStore(mysql_cfg) store.connect() try: store.init_db() asyncio.run(run_scraper(sources, start_dt, end_dt, throttle_cfg, store)) finally: store.close() logger.info("程序结束") if __name__ == "__main__": main()