Files
tg_crawl/main.py

545 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Telegram Multi-Source Scraper (MySQL)
------------------------------------
支持多个群组/频道作为数据源,统一入 MySQL 并带来源标识。
时间窗口和频率控制都从 config.json 读取。
"""
import asyncio
import json
import logging
import os
import re
from datetime import datetime, timezone
import pymysql
from telethon import TelegramClient
# =======================
# 配置区域
# =======================
API_ID = 30954205
API_HASH = "cc422183f2ed2233dfa05ea7a25abe3f"
SESSION_NAME = "scraper"
CONFIG_FILE = "config.json"
DEFAULT_SOURCES = ["@DeJob_Global_group"]
INITIAL_BACKFILL_LIMIT = None
PHONE_RE = re.compile(r"(?<!\d)(?:\+?\d[\d \-()]{6,}\d)(?!\d)")
# =======================
# 日志
# =======================
def setup_logger() -> logging.Logger:
os.makedirs("logs", exist_ok=True)
logger = logging.getLogger("scraper")
logger.setLevel(logging.INFO)
if logger.handlers:
return logger
formatter = logging.Formatter(
"[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
file_handler = logging.FileHandler("logs/app.log", encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
# =======================
# 配置读取
# =======================
def parse_datetime(raw: str, *, is_end: bool = False) -> datetime:
raw = raw.strip()
if "T" in raw or "+" in raw or raw.endswith("Z"):
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
elif len(raw) == 10:
dt = datetime.strptime(raw, "%Y-%m-%d")
if is_end:
dt = dt.replace(hour=23, minute=59, second=59)
else:
dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S")
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt
def load_runtime_config() -> tuple[
list[str], datetime | None, datetime | None, dict, dict, dict
]:
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
sources = cfg.get("sources", DEFAULT_SOURCES)
if not isinstance(sources, list):
raise ValueError("配置错误: sources 必须是数组")
sources = [str(s).strip() for s in sources if str(s).strip()]
if not sources:
raise ValueError("配置错误: sources 不能为空")
window = cfg.get("time_window", {})
if not isinstance(window, dict):
raise ValueError("配置错误: time_window 必须是对象")
enabled = bool(window.get("enabled", False))
start_raw = str(window.get("start", "") or "").strip()
end_raw = str(window.get("end", "") or "").strip()
if enabled:
start_dt = parse_datetime(start_raw, is_end=False) if start_raw else None
end_dt = parse_datetime(end_raw, is_end=True) if end_raw else None
else:
start_dt = None
end_dt = None
if start_dt and end_dt and start_dt > end_dt:
raise ValueError("配置错误: time_window.start 不能晚于 time_window.end")
throttle = cfg.get("throttle", {})
if not isinstance(throttle, dict):
raise ValueError("配置错误: throttle 必须是对象")
throttle_cfg = {
"enabled": bool(throttle.get("enabled", True)),
"per_message_delay_sec": float(throttle.get("per_message_delay_sec", 0.35)),
"between_sources_delay_sec": float(throttle.get("between_sources_delay_sec", 2.0)),
}
mysql_cfg = cfg.get("mysql", {})
if not isinstance(mysql_cfg, dict):
raise ValueError("配置错误: mysql 必须是对象")
mysql_final = {
"host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"),
"port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")),
"user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"),
"password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""),
"database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"),
"charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"),
}
if not mysql_final["password"]:
raise ValueError("配置错误: mysql.password 不能为空")
backfill = cfg.get("backfill", {})
if not isinstance(backfill, dict):
raise ValueError("配置错误: backfill 必须是对象")
backfill_enabled = bool(backfill.get("enabled", False))
backfill_start_raw = str(backfill.get("start", "") or "").strip()
backfill_end_raw = str(backfill.get("end", "") or "").strip()
backfill_sources = backfill.get("sources", [])
if backfill_sources and not isinstance(backfill_sources, list):
raise ValueError("配置错误: backfill.sources 必须是数组")
backfill_sources = [str(s).strip() for s in backfill_sources if str(s).strip()]
if backfill_enabled:
bf_start = parse_datetime(backfill_start_raw, is_end=False) if backfill_start_raw else None
bf_end = parse_datetime(backfill_end_raw, is_end=True) if backfill_end_raw else None
if bf_start and bf_end and bf_start > bf_end:
raise ValueError("配置错误: backfill.start 不能晚于 backfill.end")
else:
bf_start = None
bf_end = None
backfill_cfg = {
"enabled": backfill_enabled,
"start_dt": bf_start,
"end_dt": bf_end,
"sources": backfill_sources,
"ignore_sync_state": bool(backfill.get("ignore_sync_state", True)),
}
return sources, start_dt, end_dt, throttle_cfg, mysql_final, backfill_cfg
# =======================
# MySQL 存储
# =======================
class MySQLStore:
def __init__(self, cfg: dict):
self.cfg = cfg
self.conn: pymysql.connections.Connection | None = None
def connect(self):
self.conn = pymysql.connect(
host=self.cfg["host"],
port=self.cfg["port"],
user=self.cfg["user"],
password=self.cfg["password"],
database=self.cfg["database"],
charset=self.cfg["charset"],
autocommit=True,
)
# Force session timestamps to UTC for NOW()/CURRENT_TIMESTAMP consistency.
with self.conn.cursor() as cursor:
cursor.execute("SET time_zone = '+00:00'")
def close(self):
if self.conn:
self.conn.close()
self.conn = None
def _cursor(self):
if not self.conn:
raise RuntimeError("数据库未连接")
return self.conn.cursor()
def init_db(self):
with self._cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
source VARCHAR(255) NOT NULL,
chat_id BIGINT NULL,
message_id BIGINT NOT NULL,
content LONGTEXT,
date DATETIME NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_source_message (source, message_id),
KEY idx_source_date (source, date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS sync_state (
source VARCHAR(255) PRIMARY KEY,
last_message_id BIGINT NOT NULL DEFAULT 0,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
logger.info("MySQL 数据库初始化完成")
def get_last_message_id(self, source: str) -> int:
with self._cursor() as cursor:
cursor.execute(
"SELECT COALESCE(last_message_id, 0) FROM sync_state WHERE source = %s",
(source,),
)
row = cursor.fetchone()
return int(row[0]) if row else 0
def set_last_message_id(self, source: str, message_id: int):
with self._cursor() as cursor:
cursor.execute(
"""
INSERT INTO sync_state (source, last_message_id, updated_at)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE
last_message_id = VALUES(last_message_id),
updated_at = NOW()
""",
(source, message_id),
)
def save_message(
self, source: str, chat_id: int, message_id: int, content: str, date_str: str
) -> bool:
with self._cursor() as cursor:
cursor.execute(
"""
INSERT IGNORE INTO messages (source, chat_id, message_id, content, date)
VALUES (%s, %s, %s, %s, %s)
""",
(source, chat_id, message_id, content, date_str),
)
return cursor.rowcount == 1
# =======================
# 内容解析
# =======================
def _collect_text_values(obj, out):
if obj is None:
return
if isinstance(obj, str):
value = obj.strip()
if value:
out.add(value)
return
if isinstance(obj, dict):
for v in obj.values():
_collect_text_values(v, out)
return
if isinstance(obj, (list, tuple, set)):
for item in obj:
_collect_text_values(item, out)
def _to_safe_dict(obj):
if obj is None:
return None
if hasattr(obj, "to_dict"):
try:
return obj.to_dict()
except Exception:
return {"raw": str(obj)}
return {"raw": str(obj)}
def build_message_content(message) -> str:
text = (message.text or message.message or "").strip()
parts = []
media = message.media
action = message.action
media_dict = _to_safe_dict(media)
action_dict = _to_safe_dict(action)
if text:
parts.append(text)
if media:
parts.append(f"[MEDIA_TYPE] {type(media).__name__}")
parts.append(
"[MEDIA_JSON] " + json.dumps(media_dict, ensure_ascii=False, default=str)
)
if action:
parts.append(f"[ACTION_TYPE] {type(action).__name__}")
parts.append(
"[ACTION_JSON] " + json.dumps(action_dict, ensure_ascii=False, default=str)
)
phones = set()
if media and hasattr(media, "phone_number") and media.phone_number:
phones.add(str(media.phone_number))
for raw in (
json.dumps(media_dict, ensure_ascii=False, default=str),
json.dumps(action_dict, ensure_ascii=False, default=str),
):
if raw and raw != "null":
for hit in PHONE_RE.findall(raw):
clean = re.sub(r"[^\d+]", "", hit)
if clean:
phones.add(clean)
if phones:
parts.append("phones=" + ",".join(sorted(phones)))
extracted_texts = set()
_collect_text_values(media_dict, extracted_texts)
_collect_text_values(action_dict, extracted_texts)
if extracted_texts:
parts.append("[MEDIA_TEXT] " + " | ".join(sorted(extracted_texts)))
if not parts:
parts.append("empty_message")
return "\n".join(parts)
# =======================
# 抓取逻辑
# =======================
def _normalize_source_key(entity, raw_source: str) -> str:
username = getattr(entity, "username", None)
if username:
return f"@{username}"
return raw_source
async def scrape_one_source(
client,
store: MySQLStore,
raw_source: str,
start_dt: datetime | None,
end_dt: datetime | None,
ignore_sync_state: bool,
throttle_cfg: dict,
):
try:
entity = await client.get_entity(raw_source)
except Exception as e:
logger.error(f"[{raw_source}] 获取实体失败: {e}")
return
source_key = _normalize_source_key(entity, raw_source)
chat_id = int(getattr(entity, "id", 0) or 0)
logger.info(f"[{source_key}] 开始抓取")
scanned = 0
inserted = 0
max_seen_id = 0
window_mode = bool(start_dt or end_dt)
use_throttle = bool(throttle_cfg.get("enabled", True))
per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0))
if window_mode and ignore_sync_state:
logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)")
iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT)
elif window_mode:
# 用于日常窗口抓取,仍可依赖 sync_state 避免重复扫过大历史。
last_id = store.get_last_message_id(source_key)
logger.info(
f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), "
f"message_id > {last_id}"
)
iterator = client.iter_messages(entity, min_id=last_id, reverse=True)
else:
last_id = store.get_last_message_id(source_key)
logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始")
iterator = client.iter_messages(entity, min_id=last_id, reverse=True)
async for message in iterator:
scanned += 1
message_dt = message.date.astimezone(timezone.utc)
if window_mode:
if end_dt and message_dt > end_dt:
continue
if start_dt and message_dt < start_dt:
break
msg_id = int(message.id)
msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S")
content = build_message_content(message)
if store.save_message(source_key, chat_id, msg_id, content, msg_date):
inserted += 1
if msg_id > max_seen_id:
max_seen_id = msg_id
if scanned % 200 == 0:
logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted}")
if use_throttle and per_message_delay > 0:
await asyncio.sleep(per_message_delay)
should_update_sync = (not window_mode) or (window_mode and not ignore_sync_state)
if should_update_sync and max_seen_id > 0:
old_last = store.get_last_message_id(source_key)
if max_seen_id > old_last:
store.set_last_message_id(source_key, max_seen_id)
logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted}")
async def run_scraper(
sources: list[str],
start_dt: datetime | None,
end_dt: datetime | None,
ignore_sync_state: bool,
throttle_cfg: dict,
store: MySQLStore,
):
client = TelegramClient(SESSION_NAME, API_ID, API_HASH)
try:
await client.start()
logger.info("Telegram 客户端启动成功")
except Exception as e:
logger.error(f"Telegram 登录失败: {e}")
return
use_throttle = bool(throttle_cfg.get("enabled", True))
between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0))
for idx, source in enumerate(sources):
await scrape_one_source(
client,
store,
source,
start_dt,
end_dt,
ignore_sync_state,
throttle_cfg,
)
if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1:
logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控")
await asyncio.sleep(between_sources_delay)
await client.disconnect()
# =======================
# 主程序入口
# =======================
def main():
(
sources,
start_dt,
end_dt,
throttle_cfg,
mysql_cfg,
backfill_cfg,
) = load_runtime_config()
if backfill_cfg["enabled"]:
if backfill_cfg["sources"]:
sources = backfill_cfg["sources"]
start_dt = backfill_cfg["start_dt"]
end_dt = backfill_cfg["end_dt"]
ignore_sync_state = bool(backfill_cfg["ignore_sync_state"])
logger.info(
"回补模式启用: "
f"sources={sources}, start={start_dt}, end={end_dt}, "
f"ignore_sync_state={ignore_sync_state}"
)
else:
ignore_sync_state = False
logger.info("程序启动")
logger.info(f"本次数据源: {sources}")
if start_dt or end_dt:
logger.info(f"时间窗口(UTC): start={start_dt}, end={end_dt}")
logger.info(
"频率控制: "
f"enabled={throttle_cfg['enabled']}, "
f"per_message_delay_sec={throttle_cfg['per_message_delay_sec']}, "
f"between_sources_delay_sec={throttle_cfg['between_sources_delay_sec']}"
)
store = MySQLStore(mysql_cfg)
store.connect()
try:
store.init_db()
asyncio.run(
run_scraper(
sources,
start_dt,
end_dt,
ignore_sync_state,
throttle_cfg,
store,
)
)
finally:
store.close()
logger.info("程序结束")
if __name__ == "__main__":
main()