542 lines
17 KiB
Python
542 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Telegram Multi-Source Scraper (MySQL)
|
|
------------------------------------
|
|
支持多个群组/频道作为数据源,统一入 MySQL 并带来源标识。
|
|
时间窗口和频率控制都从 config.json 读取。
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
import pymysql
|
|
from telethon import TelegramClient
|
|
|
|
# =======================
|
|
# 配置区域
|
|
# =======================
|
|
API_ID = 30954205
|
|
API_HASH = "cc422183f2ed2233dfa05ea7a25abe3f"
|
|
SESSION_NAME = "scraper"
|
|
CONFIG_FILE = "config.json"
|
|
|
|
DEFAULT_SOURCES = ["@DeJob_Global_group"]
|
|
INITIAL_BACKFILL_LIMIT = None
|
|
PHONE_RE = re.compile(r"(?<!\d)(?:\+?\d[\d \-()]{6,}\d)(?!\d)")
|
|
|
|
|
|
# =======================
|
|
# 日志
|
|
# =======================
|
|
def setup_logger() -> logging.Logger:
|
|
os.makedirs("logs", exist_ok=True)
|
|
|
|
logger = logging.getLogger("scraper")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
if logger.handlers:
|
|
return logger
|
|
|
|
formatter = logging.Formatter(
|
|
"[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
|
|
)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
file_handler = logging.FileHandler("logs/app.log", encoding="utf-8")
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
# =======================
|
|
# 配置读取
|
|
# =======================
|
|
def parse_datetime(raw: str, *, is_end: bool = False) -> datetime:
|
|
raw = raw.strip()
|
|
|
|
if "T" in raw or "+" in raw or raw.endswith("Z"):
|
|
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
|
|
elif len(raw) == 10:
|
|
dt = datetime.strptime(raw, "%Y-%m-%d")
|
|
if is_end:
|
|
dt = dt.replace(hour=23, minute=59, second=59)
|
|
else:
|
|
dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S")
|
|
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
else:
|
|
dt = dt.astimezone(timezone.utc)
|
|
|
|
return dt
|
|
|
|
|
|
def load_runtime_config() -> tuple[
|
|
list[str], datetime | None, datetime | None, dict, dict, dict
|
|
]:
|
|
if not os.path.exists(CONFIG_FILE):
|
|
raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}")
|
|
|
|
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
|
|
cfg = json.load(f)
|
|
|
|
sources = cfg.get("sources", DEFAULT_SOURCES)
|
|
if not isinstance(sources, list):
|
|
raise ValueError("配置错误: sources 必须是数组")
|
|
sources = [str(s).strip() for s in sources if str(s).strip()]
|
|
if not sources:
|
|
raise ValueError("配置错误: sources 不能为空")
|
|
|
|
window = cfg.get("time_window", {})
|
|
if not isinstance(window, dict):
|
|
raise ValueError("配置错误: time_window 必须是对象")
|
|
|
|
enabled = bool(window.get("enabled", False))
|
|
start_raw = str(window.get("start", "") or "").strip()
|
|
end_raw = str(window.get("end", "") or "").strip()
|
|
|
|
if enabled:
|
|
start_dt = parse_datetime(start_raw, is_end=False) if start_raw else None
|
|
end_dt = parse_datetime(end_raw, is_end=True) if end_raw else None
|
|
else:
|
|
start_dt = None
|
|
end_dt = None
|
|
|
|
if start_dt and end_dt and start_dt > end_dt:
|
|
raise ValueError("配置错误: time_window.start 不能晚于 time_window.end")
|
|
|
|
throttle = cfg.get("throttle", {})
|
|
if not isinstance(throttle, dict):
|
|
raise ValueError("配置错误: throttle 必须是对象")
|
|
|
|
throttle_cfg = {
|
|
"enabled": bool(throttle.get("enabled", True)),
|
|
"per_message_delay_sec": float(throttle.get("per_message_delay_sec", 0.35)),
|
|
"between_sources_delay_sec": float(throttle.get("between_sources_delay_sec", 2.0)),
|
|
}
|
|
|
|
mysql_cfg = cfg.get("mysql", {})
|
|
if not isinstance(mysql_cfg, dict):
|
|
raise ValueError("配置错误: mysql 必须是对象")
|
|
|
|
mysql_final = {
|
|
"host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"),
|
|
"port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")),
|
|
"user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"),
|
|
"password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""),
|
|
"database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"),
|
|
"charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"),
|
|
}
|
|
|
|
if not mysql_final["password"]:
|
|
raise ValueError("配置错误: mysql.password 不能为空")
|
|
|
|
backfill = cfg.get("backfill", {})
|
|
if not isinstance(backfill, dict):
|
|
raise ValueError("配置错误: backfill 必须是对象")
|
|
|
|
backfill_enabled = bool(backfill.get("enabled", False))
|
|
backfill_start_raw = str(backfill.get("start", "") or "").strip()
|
|
backfill_end_raw = str(backfill.get("end", "") or "").strip()
|
|
backfill_sources = backfill.get("sources", [])
|
|
if backfill_sources and not isinstance(backfill_sources, list):
|
|
raise ValueError("配置错误: backfill.sources 必须是数组")
|
|
backfill_sources = [str(s).strip() for s in backfill_sources if str(s).strip()]
|
|
|
|
if backfill_enabled:
|
|
bf_start = parse_datetime(backfill_start_raw, is_end=False) if backfill_start_raw else None
|
|
bf_end = parse_datetime(backfill_end_raw, is_end=True) if backfill_end_raw else None
|
|
if bf_start and bf_end and bf_start > bf_end:
|
|
raise ValueError("配置错误: backfill.start 不能晚于 backfill.end")
|
|
else:
|
|
bf_start = None
|
|
bf_end = None
|
|
|
|
backfill_cfg = {
|
|
"enabled": backfill_enabled,
|
|
"start_dt": bf_start,
|
|
"end_dt": bf_end,
|
|
"sources": backfill_sources,
|
|
"ignore_sync_state": bool(backfill.get("ignore_sync_state", True)),
|
|
}
|
|
|
|
return sources, start_dt, end_dt, throttle_cfg, mysql_final, backfill_cfg
|
|
|
|
|
|
# =======================
|
|
# MySQL 存储
|
|
# =======================
|
|
class MySQLStore:
|
|
def __init__(self, cfg: dict):
|
|
self.cfg = cfg
|
|
self.conn: pymysql.connections.Connection | None = None
|
|
|
|
def connect(self):
|
|
self.conn = pymysql.connect(
|
|
host=self.cfg["host"],
|
|
port=self.cfg["port"],
|
|
user=self.cfg["user"],
|
|
password=self.cfg["password"],
|
|
database=self.cfg["database"],
|
|
charset=self.cfg["charset"],
|
|
autocommit=True,
|
|
)
|
|
|
|
def close(self):
|
|
if self.conn:
|
|
self.conn.close()
|
|
self.conn = None
|
|
|
|
def _cursor(self):
|
|
if not self.conn:
|
|
raise RuntimeError("数据库未连接")
|
|
return self.conn.cursor()
|
|
|
|
def init_db(self):
|
|
with self._cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
source VARCHAR(255) NOT NULL,
|
|
chat_id BIGINT NULL,
|
|
message_id BIGINT NOT NULL,
|
|
content LONGTEXT,
|
|
date DATETIME NOT NULL,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE KEY uk_source_message (source, message_id),
|
|
KEY idx_source_date (source, date)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS sync_state (
|
|
source VARCHAR(255) PRIMARY KEY,
|
|
last_message_id BIGINT NOT NULL DEFAULT 0,
|
|
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
ON UPDATE CURRENT_TIMESTAMP
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""
|
|
)
|
|
logger.info("MySQL 数据库初始化完成")
|
|
|
|
def get_last_message_id(self, source: str) -> int:
|
|
with self._cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT COALESCE(last_message_id, 0) FROM sync_state WHERE source = %s",
|
|
(source,),
|
|
)
|
|
row = cursor.fetchone()
|
|
return int(row[0]) if row else 0
|
|
|
|
def set_last_message_id(self, source: str, message_id: int):
|
|
with self._cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO sync_state (source, last_message_id, updated_at)
|
|
VALUES (%s, %s, NOW())
|
|
ON DUPLICATE KEY UPDATE
|
|
last_message_id = VALUES(last_message_id),
|
|
updated_at = NOW()
|
|
""",
|
|
(source, message_id),
|
|
)
|
|
|
|
def save_message(
|
|
self, source: str, chat_id: int, message_id: int, content: str, date_str: str
|
|
) -> bool:
|
|
with self._cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
INSERT IGNORE INTO messages (source, chat_id, message_id, content, date)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""",
|
|
(source, chat_id, message_id, content, date_str),
|
|
)
|
|
return cursor.rowcount == 1
|
|
|
|
|
|
# =======================
|
|
# 内容解析
|
|
# =======================
|
|
def _collect_text_values(obj, out):
|
|
if obj is None:
|
|
return
|
|
if isinstance(obj, str):
|
|
value = obj.strip()
|
|
if value:
|
|
out.add(value)
|
|
return
|
|
if isinstance(obj, dict):
|
|
for v in obj.values():
|
|
_collect_text_values(v, out)
|
|
return
|
|
if isinstance(obj, (list, tuple, set)):
|
|
for item in obj:
|
|
_collect_text_values(item, out)
|
|
|
|
|
|
def _to_safe_dict(obj):
|
|
if obj is None:
|
|
return None
|
|
if hasattr(obj, "to_dict"):
|
|
try:
|
|
return obj.to_dict()
|
|
except Exception:
|
|
return {"raw": str(obj)}
|
|
return {"raw": str(obj)}
|
|
|
|
|
|
def build_message_content(message) -> str:
|
|
text = (message.text or message.message or "").strip()
|
|
|
|
parts = []
|
|
media = message.media
|
|
action = message.action
|
|
media_dict = _to_safe_dict(media)
|
|
action_dict = _to_safe_dict(action)
|
|
|
|
if text:
|
|
parts.append(text)
|
|
if media:
|
|
parts.append(f"[MEDIA_TYPE] {type(media).__name__}")
|
|
parts.append(
|
|
"[MEDIA_JSON] " + json.dumps(media_dict, ensure_ascii=False, default=str)
|
|
)
|
|
if action:
|
|
parts.append(f"[ACTION_TYPE] {type(action).__name__}")
|
|
parts.append(
|
|
"[ACTION_JSON] " + json.dumps(action_dict, ensure_ascii=False, default=str)
|
|
)
|
|
|
|
phones = set()
|
|
if media and hasattr(media, "phone_number") and media.phone_number:
|
|
phones.add(str(media.phone_number))
|
|
|
|
for raw in (
|
|
json.dumps(media_dict, ensure_ascii=False, default=str),
|
|
json.dumps(action_dict, ensure_ascii=False, default=str),
|
|
):
|
|
if raw and raw != "null":
|
|
for hit in PHONE_RE.findall(raw):
|
|
clean = re.sub(r"[^\d+]", "", hit)
|
|
if clean:
|
|
phones.add(clean)
|
|
|
|
if phones:
|
|
parts.append("phones=" + ",".join(sorted(phones)))
|
|
|
|
extracted_texts = set()
|
|
_collect_text_values(media_dict, extracted_texts)
|
|
_collect_text_values(action_dict, extracted_texts)
|
|
if extracted_texts:
|
|
parts.append("[MEDIA_TEXT] " + " | ".join(sorted(extracted_texts)))
|
|
|
|
if not parts:
|
|
parts.append("empty_message")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
# =======================
|
|
# 抓取逻辑
|
|
# =======================
|
|
def _normalize_source_key(entity, raw_source: str) -> str:
|
|
username = getattr(entity, "username", None)
|
|
if username:
|
|
return f"@{username}"
|
|
return raw_source
|
|
|
|
|
|
async def scrape_one_source(
|
|
client,
|
|
store: MySQLStore,
|
|
raw_source: str,
|
|
start_dt: datetime | None,
|
|
end_dt: datetime | None,
|
|
ignore_sync_state: bool,
|
|
throttle_cfg: dict,
|
|
):
|
|
try:
|
|
entity = await client.get_entity(raw_source)
|
|
except Exception as e:
|
|
logger.error(f"[{raw_source}] 获取实体失败: {e}")
|
|
return
|
|
|
|
source_key = _normalize_source_key(entity, raw_source)
|
|
chat_id = int(getattr(entity, "id", 0) or 0)
|
|
|
|
logger.info(f"[{source_key}] 开始抓取")
|
|
|
|
scanned = 0
|
|
inserted = 0
|
|
max_seen_id = 0
|
|
|
|
window_mode = bool(start_dt or end_dt)
|
|
use_throttle = bool(throttle_cfg.get("enabled", True))
|
|
per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0))
|
|
|
|
if window_mode and ignore_sync_state:
|
|
logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)")
|
|
iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT)
|
|
elif window_mode:
|
|
# 用于日常窗口抓取,仍可依赖 sync_state 避免重复扫过大历史。
|
|
last_id = store.get_last_message_id(source_key)
|
|
logger.info(
|
|
f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), "
|
|
f"message_id > {last_id}"
|
|
)
|
|
iterator = client.iter_messages(entity, min_id=last_id, reverse=True)
|
|
else:
|
|
last_id = store.get_last_message_id(source_key)
|
|
logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始")
|
|
iterator = client.iter_messages(entity, min_id=last_id, reverse=True)
|
|
|
|
async for message in iterator:
|
|
scanned += 1
|
|
message_dt = message.date.astimezone(timezone.utc)
|
|
|
|
if window_mode:
|
|
if end_dt and message_dt > end_dt:
|
|
continue
|
|
if start_dt and message_dt < start_dt:
|
|
break
|
|
|
|
msg_id = int(message.id)
|
|
msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
content = build_message_content(message)
|
|
|
|
if store.save_message(source_key, chat_id, msg_id, content, msg_date):
|
|
inserted += 1
|
|
|
|
if msg_id > max_seen_id:
|
|
max_seen_id = msg_id
|
|
|
|
if scanned % 200 == 0:
|
|
logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条")
|
|
|
|
if use_throttle and per_message_delay > 0:
|
|
await asyncio.sleep(per_message_delay)
|
|
|
|
should_update_sync = (not window_mode) or (window_mode and not ignore_sync_state)
|
|
if should_update_sync and max_seen_id > 0:
|
|
old_last = store.get_last_message_id(source_key)
|
|
if max_seen_id > old_last:
|
|
store.set_last_message_id(source_key, max_seen_id)
|
|
|
|
logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条")
|
|
|
|
|
|
async def run_scraper(
|
|
sources: list[str],
|
|
start_dt: datetime | None,
|
|
end_dt: datetime | None,
|
|
ignore_sync_state: bool,
|
|
throttle_cfg: dict,
|
|
store: MySQLStore,
|
|
):
|
|
client = TelegramClient(SESSION_NAME, API_ID, API_HASH)
|
|
|
|
try:
|
|
await client.start()
|
|
logger.info("Telegram 客户端启动成功")
|
|
except Exception as e:
|
|
logger.error(f"Telegram 登录失败: {e}")
|
|
return
|
|
|
|
use_throttle = bool(throttle_cfg.get("enabled", True))
|
|
between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0))
|
|
|
|
for idx, source in enumerate(sources):
|
|
await scrape_one_source(
|
|
client,
|
|
store,
|
|
source,
|
|
start_dt,
|
|
end_dt,
|
|
ignore_sync_state,
|
|
throttle_cfg,
|
|
)
|
|
|
|
if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1:
|
|
logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控")
|
|
await asyncio.sleep(between_sources_delay)
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
# =======================
|
|
# 主程序入口
|
|
# =======================
|
|
def main():
|
|
(
|
|
sources,
|
|
start_dt,
|
|
end_dt,
|
|
throttle_cfg,
|
|
mysql_cfg,
|
|
backfill_cfg,
|
|
) = load_runtime_config()
|
|
|
|
if backfill_cfg["enabled"]:
|
|
if backfill_cfg["sources"]:
|
|
sources = backfill_cfg["sources"]
|
|
start_dt = backfill_cfg["start_dt"]
|
|
end_dt = backfill_cfg["end_dt"]
|
|
ignore_sync_state = bool(backfill_cfg["ignore_sync_state"])
|
|
logger.info(
|
|
"回补模式启用: "
|
|
f"sources={sources}, start={start_dt}, end={end_dt}, "
|
|
f"ignore_sync_state={ignore_sync_state}"
|
|
)
|
|
else:
|
|
ignore_sync_state = False
|
|
|
|
logger.info("程序启动")
|
|
logger.info(f"本次数据源: {sources}")
|
|
if start_dt or end_dt:
|
|
logger.info(f"时间窗口(UTC): start={start_dt}, end={end_dt}")
|
|
logger.info(
|
|
"频率控制: "
|
|
f"enabled={throttle_cfg['enabled']}, "
|
|
f"per_message_delay_sec={throttle_cfg['per_message_delay_sec']}, "
|
|
f"between_sources_delay_sec={throttle_cfg['between_sources_delay_sec']}"
|
|
)
|
|
|
|
store = MySQLStore(mysql_cfg)
|
|
store.connect()
|
|
try:
|
|
store.init_db()
|
|
asyncio.run(
|
|
run_scraper(
|
|
sources,
|
|
start_dt,
|
|
end_dt,
|
|
ignore_sync_state,
|
|
throttle_cfg,
|
|
store,
|
|
)
|
|
)
|
|
finally:
|
|
store.close()
|
|
|
|
logger.info("程序结束")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|