commit 5efb8fc9edf2b0c728e19b2b140320be9708fe76 Author: BoliviaYu Date: Thu Feb 26 20:00:06 2026 +0800 first init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a1446cc --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +# Python cache +__pycache__/ +*.py[cod] +*.pyo +*.pyd + +# Virtual environments +.venv/ +venv/ +env/ + +# Logs +logs/ +*.log + +# Local databases +*.db +*.sqlite +*.sqlite3 + +# Telegram / runtime session files +*.session +*.session-journal + +# Local lock / pid style runtime artifacts +*.lock + +# Tooling +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.coverage +htmlcov/ + +# IDE/editor +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store + +# Local secrets/config overrides +.env +.env.* +config.local.json +config.json diff --git a/README.md b/README.md new file mode 100644 index 0000000..bbf162d --- /dev/null +++ b/README.md @@ -0,0 +1,291 @@ +# jobs_robots + +本项目用于采集 Telegram 招聘数据并进行结构化清洗,当前统一使用 MySQL。 + +## 1. 当前流程 + +1. `main.py` +- 从 `config.json` 读取数据源、时间窗口、限频、MySQL 配置。 +- 爬取 Telegram 消息,写入 MySQL `messages`。 +- 维护每个来源的增量游标到 `sync_state`。 + +2. `clean_to_structured.py` +- 从 MySQL `messages` 增量读取新增消息(按 `messages.id` + `clean_state` 检查点)。 +- 按来源规则清洗(`@DeJob_official` 有专用规则,其他走通用规则)。 +- 仅保留招聘类数据,写入 MySQL `structured_jobs`。 + +3. `run_daily_incremental.sh` +- 每日调度入口。 +- 运行前自动更新 `config.json` 时间窗口(滚动窗口)。 +- 依次执行 `main.py` 和 `clean_to_structured.py`。 + +## 2. 数据库表 + +### 2.1 原始层 + +- `messages` + - 原始消息存储(`source + message_id` 唯一) +- `sync_state` + - Telegram 增量抓取游标 + +### 2.2 清洗层 + +- `structured_jobs` + - 结构化岗位数据(`source + message_id` 唯一) +- `clean_state` + - 清洗增量检查点(`pipeline_name -> last_message_row_id`) + +## 2.3 字段级数据字典(详细) + +### messages(Telegram 原始消息) + +- `id`(BIGINT, PK, 自增) + 含义:MySQL 行主键,清洗增量检查点使用这个字段。 + 示例:`530812` + +- `source`(VARCHAR) + 含义:消息来源标识(频道/群组),通常是 `@xxx`。 + 示例:`@DeJob_official` + +- `chat_id`(BIGINT, 可空) + 含义:Telegram 实体 ID。 + 示例:`-1001234567890` + +- `message_id`(BIGINT) + 含义:该 source 内部的消息 ID。 + 约束:与 `source` 组成唯一键。 + +- `content`(LONGTEXT, 可空) + 含义:抓取到的消息正文(含非文本补充段,如 `MEDIA_JSON`)。 + 示例:招聘 markdown 文本 + `[MEDIA_TYPE] ...` + +- `date`(DATETIME) + 含义:消息时间(UTC)。 + 示例:`2026-02-26 09:31:10` + +- `created_at`(DATETIME) + 含义:该条记录写入数据库时间。 + +### sync_state(抓取增量状态) + +- `source`(VARCHAR, PK) + 含义:来源标识,与 `messages.source` 对应。 + +- `last_message_id`(BIGINT) + 含义:该来源已抓取到的最大 message_id。 + 用途:下次抓取时只拉 `message_id > last_message_id`。 + +- `updated_at`(DATETIME) + 含义:该来源游标最近更新时间。 + +### structured_jobs(清洗后结构化岗位) + +- `id`(BIGINT, PK, 自增) + 含义:结构化表主键。 + +- `source`(VARCHAR) + 含义:来源标识。 + 示例:`@DeJob_official` + +- `source_channel`(VARCHAR, 可空) + 含义:来源品牌/渠道归类。 + 示例:`DeJob` + +- `parser_name`(VARCHAR) + 含义:使用的解析器名称。 + 示例:`dejob_official` / `generic` + +- `parser_version`(VARCHAR) + 含义:解析器版本号,用于规则演进追踪。 + 示例:`v1` + +- `chat_id`(BIGINT, 可空) + 含义:原始 Telegram chat_id。 + +- `message_id`(BIGINT) + 含义:原始消息 ID(source 内)。 + 约束:与 `source` 组成唯一键。 + +- `message_date`(DATETIME) + 含义:原始消息时间(UTC)。 + +- `job_type`(VARCHAR, 可空) + 含义:岗位类型标记。当前仅保留 `招聘`。 + 示例:`招聘` + +- `company_name`(VARCHAR, 可空) + 含义:公司/项目方名称。 + 示例:`88EX` + +- `industry_tags_json`(JSON) + 含义:行业/赛道标签数组。 + 示例:`[\"CEX\",\"Infra\"]` + +- `company_intro`(LONGTEXT, 可空) + 含义:公司简介文本。 + +- `company_url`(TEXT, 可空) + 含义:公司官网/介绍页链接。 + +- `work_mode`(VARCHAR) + 含义:办公模式。 + 枚举:`remote | onsite | hybrid | unknown` + +- `job_nature`(VARCHAR) + 含义:用工性质。 + 枚举:`full_time | part_time | contract | intern | freelance | unknown` + +- `job_location_text`(VARCHAR, 可空) + 含义:主地点文本(首个地点)。 + +- `job_location_tags_json`(JSON, 可空) + 含义:地点标签数组。无地点时为 `NULL`(不是空数组)。 + +- `employment_type_raw`(TEXT, 可空) + 含义:原始“合作方式”文本,便于回溯规则。 + 示例:`🛵 合作方式:#全职 #远程 #吉隆坡` + +- `position_name`(VARCHAR, 可空) + 含义:岗位主名称。 + 示例:`社区运营` + +- `position_tags_json`(JSON) + 含义:岗位标签数组。 + 示例:`[\"社区运营\",\"运营\"]` + +- `salary_raw`(TEXT, 可空) + 含义:薪资原始字符串。 + 示例:`$1000 - $3000 / month` + +- `salary_currency`(VARCHAR, 可空) + 含义:薪资币种(已识别)。 + 示例:`USD` + +- `salary_min`(BIGINT, 可空) + 含义:薪资下限数值。 + +- `salary_max`(BIGINT, 可空) + 含义:薪资上限数值。 + +- `salary_period`(VARCHAR, 可空) + 含义:薪资周期。 + 枚举:`month | year | day | NULL` + +- `responsibilities_json`(JSON) + 含义:岗位职责数组(按条目拆分)。 + +- `requirements_json`(JSON) + 含义:岗位要求数组(按条目拆分)。 + +- `apply_email`(VARCHAR, 可空) + 含义:投递邮箱。 + +- `apply_telegram`(VARCHAR, 可空) + 含义:投递 Telegram 用户名。 + 示例:`@lulu_lucky1` + +- `job_source_url`(TEXT, 可空) + 含义:岗位来源原文链接(如 DeJob 详情页)。 + +- `body_text`(LONGTEXT) + 含义:清洗后的主体文本(去除部分技术元段)。 + +- `raw_content`(LONGTEXT) + 含义:原始消息内容快照(用于审计/回刷)。 + +- `cleaned_at`(DATETIME) + 含义:最近清洗/更新该条结构化记录的时间。 + +### clean_state(清洗增量状态) + +- `pipeline_name`(VARCHAR, PK) + 含义:清洗流程标识。 + 示例:`structured_cleaner_v1` + +- `last_message_row_id`(BIGINT) + 含义:已处理到的 `messages.id` 最大值。 + 用途:下次清洗只处理更大的 `messages.id`。 + +- `updated_at`(DATETIME) + 含义:检查点更新时间。 + +## 3. 配置文件说明(config.json) + +关键字段: + +- `sources`: Telegram 来源列表 +- `time_window.enabled`: 是否启用时间窗口 +- `time_window.start` / `time_window.end`: 抓取窗口(脚本会每日自动刷新) +- `daily_window_days`: 滚动窗口天数(当前默认 `2`) +- `throttle`: 限频配置 + - `enabled` + - `per_message_delay_sec` + - `between_sources_delay_sec` +- `mysql`: MySQL 连接配置 + - `host`, `port`, `user`, `password`, `database`, `charset` + +## 4. 运行方式 + +### 4.1 手动运行 + +```bash +uv run main.py +uv run clean_to_structured.py +``` + +### 4.2 每日定时(推荐) + +脚本:`run_daily_incremental.sh` + +示例 crontab(每天 01:10): + +```cron +10 1 * * * /home/liam/code/python/jobs_robots/run_daily_incremental.sh +``` + +日志文件: + +- `logs/app.log` +- `logs/clean_to_structured.log` +- `logs/daily_job.log` + +## 5. 增量策略 + +### Telegram 抓取增量 + +- 依据 `sync_state.last_message_id` +- 每个来源独立增量 + +### 清洗增量 + +- 依据 `clean_state.last_message_row_id` +- 每次只处理 `messages.id > checkpoint` +- 成功后更新 checkpoint + +## 6. 字段约定(结构化就业类型) + +`structured_jobs` 使用拆分字段,不再依赖 `employment_type_json`: + +- `work_mode`: `remote | onsite | hybrid | unknown` +- `job_nature`: `full_time | part_time | contract | intern | freelance | unknown` +- `job_location_text`: 主地点文本 +- `job_location_tags_json`: 地点数组(无地点为 `NULL`) +- `employment_type_raw`: 原始“合作方式”行 + +## 7. 常见问题 + +1. 为什么当天凌晨跑出来窗口看着不对? +- 当前滚动窗口按 UTC 日期更新。如果需要按本地时区(如 Asia/Shanghai)可再改。 + +2. 为什么清洗没有新增? +- 看 `clean_state` 检查点是否已经到最新。 +- 看 `messages` 是否有新数据。 + +3. 为什么 MySQL 报字段超长/类型错误? +- 优先看对应脚本日志,字段已做多数保护;若仍报错,保留错误堆栈并反馈。 + +## 8. 协作建议 + +- 改规则时优先只改对应来源 parser,避免影响全局。 +- 改字段前先确认 `structured_jobs` 兼容性与迁移策略。 +- 所有定时行为以 `run_daily_incremental.sh` 为统一入口,避免多处调度冲突。 diff --git a/clean_to_structured.py b/clean_to_structured.py new file mode 100644 index 0000000..3f39cda --- /dev/null +++ b/clean_to_structured.py @@ -0,0 +1,812 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Source-aware structured cleaning (MySQL). + +Input table: messages +Output table: structured_jobs +""" + +import json +import logging +import os +import re +from dataclasses import dataclass + +import pymysql + +CONFIG_FILE = "config.json" +PIPELINE_NAME = "structured_cleaner_v1" + +URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE) +EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) +TG_RE = re.compile(r"(? logging.Logger: + os.makedirs("logs", exist_ok=True) + logger = logging.getLogger("clean_to_structured") + logger.setLevel(logging.INFO) + if logger.handlers: + return logger + + fmt = logging.Formatter( + "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + ch = logging.StreamHandler() + ch.setFormatter(fmt) + fh = logging.FileHandler("logs/clean_to_structured.log", encoding="utf-8") + fh.setFormatter(fmt) + logger.addHandler(ch) + logger.addHandler(fh) + return logger + + +logger = setup_logger() + + +def load_mysql_config() -> dict: + if not os.path.exists(CONFIG_FILE): + raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") + + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + cfg = json.load(f) + + mysql_cfg = cfg.get("mysql", {}) + if not isinstance(mysql_cfg, dict): + raise ValueError("配置错误: mysql 必须是对象") + + result = { + "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), + "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), + "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), + "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), + "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), + "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), + } + if not result["password"]: + raise ValueError("配置错误: mysql.password 不能为空") + return result + + +def connect_mysql(cfg: dict): + return pymysql.connect( + host=cfg["host"], + port=cfg["port"], + user=cfg["user"], + password=cfg["password"], + database=cfg["database"], + charset=cfg["charset"], + autocommit=True, + ) + + +def init_target_db(conn): + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS structured_jobs ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + source VARCHAR(255) NOT NULL, + source_channel VARCHAR(255) NULL, + parser_name VARCHAR(64) NOT NULL, + parser_version VARCHAR(32) NOT NULL, + chat_id BIGINT NULL, + message_id BIGINT NOT NULL, + message_date DATETIME NOT NULL, + job_type VARCHAR(64) NULL, + company_name VARCHAR(255) NULL, + industry_tags_json JSON NOT NULL, + company_intro LONGTEXT NULL, + company_url TEXT NULL, + work_mode VARCHAR(32) NOT NULL, + job_nature VARCHAR(32) NOT NULL, + job_location_text VARCHAR(255) NULL, + job_location_tags_json JSON NULL, + employment_type_raw TEXT NULL, + position_name VARCHAR(255) NULL, + position_tags_json JSON NOT NULL, + salary_raw TEXT NULL, + salary_currency VARCHAR(16) NULL, + salary_min BIGINT NULL, + salary_max BIGINT NULL, + salary_period VARCHAR(16) NULL, + responsibilities_json JSON NOT NULL, + requirements_json JSON NOT NULL, + apply_email VARCHAR(255) NULL, + apply_telegram VARCHAR(255) NULL, + job_source_url TEXT NULL, + body_text LONGTEXT NOT NULL, + raw_content LONGTEXT NOT NULL, + cleaned_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uk_source_message (source, message_id), + KEY idx_structured_source_date (source, message_date) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS clean_state ( + pipeline_name VARCHAR(128) PRIMARY KEY, + last_message_row_id BIGINT NOT NULL DEFAULT 0, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + + +def dedupe(values: list[str]) -> list[str]: + seen = set() + out = [] + for v in values: + if not v: + continue + if v in seen: + continue + seen.add(v) + out.append(v) + return out + + +def clean_md_text(s: str) -> str: + s = re.sub(r"\*+", "", s) + s = re.sub(r"~+", "", s) + s = s.replace("`", "").strip() + return re.sub(r"\s+", " ", s).strip() + + +def normalize_md_line(s: str) -> str: + s = s.replace("**", "").replace("`", "") + s = s.replace("~~", "") + s = s.replace("\u3000", " ") + return re.sub(r"\s+", " ", s).strip() + + +def clean_company_name(s: str | None) -> str | None: + if not s: + return None + s = clean_md_text(s) + s = s.strip(" -|::#") + return s or None + + +def parse_salary(raw: str | None) -> tuple[str | None, int | None, int | None, str | None]: + if not raw: + return None, None, None, None + + text = clean_md_text(raw) + lower = text.lower() + + nums = re.findall(r"\d+(?:\.\d+)?", text.replace(",", "")) + salary_min = int(float(nums[0])) if len(nums) >= 1 else None + salary_max = int(float(nums[1])) if len(nums) >= 2 else None + + period = None + if "month" in lower or "每月" in text or "月" in text: + period = "month" + elif "year" in lower or "年" in text: + period = "year" + elif "day" in lower or "日" in text: + period = "day" + + return text, salary_min, salary_max, period + + +def strip_meta_lines(content: str) -> str: + lines = [] + for ln in (content or "").splitlines(): + if ln.startswith("[MEDIA_TYPE] "): + continue + if ln.startswith("[ACTION_TYPE] "): + continue + if ln.startswith("[MEDIA_JSON] "): + continue + if ln.startswith("[ACTION_JSON] "): + continue + if ln.startswith("phones="): + continue + if ln.startswith("[MEDIA_TEXT] "): + continue + lines.append(ln.rstrip()) + return "\n".join(lines).strip() + + +def preprocess_body_text(body_text: str) -> str: + text = re.sub(r"\[([^\]]+)\]\((https?://[^)\s]+)\)", r"\1 \2", body_text) + text = text.replace("**", "").replace("__", "").replace("~~", "").replace("`", "") + lines = [re.sub(r"[ \t]+", " ", ln).strip() for ln in text.splitlines()] + return "\n".join(lines).strip() + + +def extract_section(body_text: str, section_name: str) -> str | None: + lines = body_text.splitlines() + start = None + for i, ln in enumerate(lines): + if section_name in ln: + start = i + 1 + break + if start is None: + return None + + collected = [] + for ln in lines[start:]: + if any(k in ln for k in SECTION_KEYS): + break + collected.append(ln) + + text = "\n".join(collected).strip() + return text or None + + +def extract_first_url(text: str | None) -> str | None: + if not text: + return None + urls = URL_RE.findall(text) + return urls[0] if urls else None + + +def extract_job_source_url(body_text: str) -> str | None: + for ln in body_text.splitlines(): + if "岗位来源" in ln: + urls = URL_RE.findall(ln) + if urls: + return urls[0] + return None + + +def extract_company_name_dejob(body_text: str) -> str | None: + for ln in body_text.splitlines(): + if "🏡" in ln: + no_md = clean_md_text(ln) + no_md = no_md.replace("🏡", "").strip() + if "#" in no_md: + no_md = no_md.split("#", 1)[0].strip() + return clean_company_name(no_md) + return None + + +def extract_tags_after_key(line: str, key: str) -> list[str]: + if key not in line: + return [] + frag = normalize_md_line(line.split(key, 1)[1]) + tags = [clean_md_text(t).replace("·", " ").strip() for t in MD_TAG_RE.findall(frag)] + return dedupe([t for t in tags if t]) + + +def extract_list_section(body_text: str, key: str) -> list[str]: + sec = extract_section(body_text, key) + if not sec: + return [] + items = [] + for ln in sec.splitlines(): + t = clean_md_text(ln) + t = re.sub(r"^\d+️⃣?\s*", "", t) + t = re.sub(r"^\d+[\.、]\s*", "", t) + if t: + items.append(t) + return items + + +def extract_position_name_dejob(body_text: str) -> str | None: + for ln in body_text.splitlines(): + if "待招岗位" in ln: + tags = extract_tags_after_key(ln, "待招岗位") + if tags: + return tags[0] + return None + + +def extract_apply_email(body_text: str) -> str | None: + emails = EMAIL_RE.findall(body_text) + return emails[0] if emails else None + + +def extract_apply_telegram(body_text: str) -> str | None: + for ln in body_text.splitlines(): + if "Telegram" in ln: + m = TG_RE.search(ln) + if m: + return m.group(0) + handles = TG_RE.findall(body_text) + return handles[0] if handles else None + + +def infer_employment_fields( + tags: list[str], raw_line: str | None +) -> tuple[str, str, str | None, list[str] | None, str | None]: + mode_remote = {"远程", "remote", "居家", "在家办公", "home office", "wfh"} + mode_onsite = {"实地", "现场", "线下", "onsite", "on-site", "坐班", "到岗"} + nature_map = { + "全职": "full_time", + "兼职": "part_time", + "实习": "intern", + "合同": "contract", + "contract": "contract", + "自由职业": "freelance", + "freelance": "freelance", + } + nature_priority = ["full_time", "part_time", "contract", "intern", "freelance"] + + normalized = [] + for t in tags: + n = clean_md_text(t).replace("·", " ").strip() + if n: + normalized.append(n) + normalized = dedupe(normalized) + + has_remote = False + has_onsite = False + natures_found = [] + locations = [] + + for tag in normalized: + low = tag.lower() + if low in mode_remote or tag in mode_remote: + has_remote = True + continue + if low in mode_onsite or tag in mode_onsite: + has_onsite = True + continue + + mapped = nature_map.get(tag) or nature_map.get(low) + if mapped: + natures_found.append(mapped) + continue + + locations.append(tag) + + if has_remote and has_onsite: + work_mode = "hybrid" + elif has_remote: + work_mode = "remote" + elif has_onsite: + work_mode = "onsite" + else: + work_mode = "unknown" + + job_nature = "unknown" + for cand in nature_priority: + if cand in natures_found: + job_nature = cand + break + + location_tags_raw = dedupe(locations) + location_text = location_tags_raw[0] if location_tags_raw else None + location_tags: list[str] | None = location_tags_raw if location_tags_raw else None + raw = clean_md_text(raw_line) if raw_line else None + + return work_mode, job_nature, location_text, location_tags, raw + + +def parse_dejob_official( + source: str, + chat_id: int | None, + message_id: int, + message_date: str, + body_text: str, + raw_content: str, +) -> StructuredJob: + job_type = "招聘" if ("招聘" in body_text or "Recruitment" in body_text) else None + + company_name = extract_company_name_dejob(body_text) + + industry_tags = [] + for ln in body_text.splitlines(): + if "🏡" in ln: + norm_ln = normalize_md_line(ln) + industry_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + industry_tags = dedupe([t for t in industry_tags if t]) + break + + cooperation_tags = [] + cooperation_line = None + for ln in body_text.splitlines(): + if "合作方式" in ln: + cooperation_line = ln + norm_ln = normalize_md_line(ln) + cooperation_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + cooperation_tags = dedupe([t for t in cooperation_tags if t]) + break + ( + work_mode, + job_nature, + job_location_text, + job_location_tags, + employment_type_raw, + ) = infer_employment_fields(cooperation_tags, cooperation_line) + + position_tags = [] + for ln in body_text.splitlines(): + if "待招岗位" in ln: + norm_ln = normalize_md_line(ln) + position_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + position_tags = dedupe([t for t in position_tags if t]) + break + + position_name = extract_position_name_dejob(body_text) + + intro_sec = extract_section(body_text, "简介") + company_url = extract_first_url(intro_sec) or extract_first_url(body_text) + company_intro = None + if intro_sec: + intro_lines = [ln for ln in intro_sec.splitlines() if not URL_RE.search(ln)] + company_intro = clean_md_text("\n".join(intro_lines)) or None + + salary_line = None + for ln in body_text.splitlines(): + if "薪酬" in ln or "Salary" in ln or "salary" in ln: + salary_line = ln + break + + salary_raw, salary_min, salary_max, salary_period = parse_salary(salary_line) + salary_currency = "USD" if salary_raw and "$" in salary_raw else None + + responsibilities = extract_list_section(body_text, "岗位职责") + requirements = extract_list_section(body_text, "岗位要求") + + apply_email = extract_apply_email(body_text) + apply_tg = extract_apply_telegram(body_text) + job_source_url = extract_job_source_url(body_text) + + return StructuredJob( + source=source, + source_channel="DeJob", + parser_name="dejob_official", + parser_version="v1", + chat_id=chat_id, + message_id=message_id, + message_date=message_date, + job_type=job_type, + company_name=company_name, + industry_tags=industry_tags, + company_intro=company_intro, + company_url=company_url, + work_mode=work_mode, + job_nature=job_nature, + job_location_text=job_location_text, + job_location_tags=job_location_tags, + employment_type_raw=employment_type_raw, + position_name=position_name, + position_tags=position_tags, + salary_raw=salary_raw, + salary_currency=salary_currency, + salary_min=salary_min, + salary_max=salary_max, + salary_period=salary_period, + responsibilities=responsibilities, + requirements=requirements, + apply_email=apply_email, + apply_telegram=apply_tg, + job_source_url=job_source_url, + body_text=body_text or "empty_message", + raw_content=raw_content, + ) + + +def parse_generic( + source: str, + chat_id: int | None, + message_id: int, + message_date: str, + body_text: str, + raw_content: str, +) -> StructuredJob: + hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] + hashtags = dedupe([h for h in hashtags if h]) + + urls = URL_RE.findall(body_text) + emails = EMAIL_RE.findall(body_text) + tgs = TG_RE.findall(body_text) + + title = None + for ln in body_text.splitlines(): + t = clean_md_text(ln) + if t: + title = t[:120] + break + + salary_line = None + for ln in body_text.splitlines(): + if any(k in ln.lower() for k in ("salary", "薪资", "薪酬", "k/", "$")): + salary_line = ln + break + + salary_raw, salary_min, salary_max, salary_period = parse_salary(salary_line) + salary_currency = "USD" if salary_raw and "$" in salary_raw else None + + job_type = "招聘" if ("招聘" in body_text or "recruit" in body_text.lower()) else None + ( + work_mode, + job_nature, + job_location_text, + job_location_tags, + employment_type_raw, + ) = infer_employment_fields(hashtags, None) + + return StructuredJob( + source=source, + source_channel=None, + parser_name="generic", + parser_version="v1", + chat_id=chat_id, + message_id=message_id, + message_date=message_date, + job_type=job_type, + company_name=None, + industry_tags=hashtags, + company_intro=None, + company_url=urls[0] if urls else None, + work_mode=work_mode, + job_nature=job_nature, + job_location_text=job_location_text, + job_location_tags=job_location_tags, + employment_type_raw=employment_type_raw, + position_name=title, + position_tags=hashtags, + salary_raw=salary_raw, + salary_currency=salary_currency, + salary_min=salary_min, + salary_max=salary_max, + salary_period=salary_period, + responsibilities=[], + requirements=[], + apply_email=emails[0] if emails else None, + apply_telegram=tgs[0] if tgs else None, + job_source_url=None, + body_text=body_text or "empty_message", + raw_content=raw_content, + ) + + +def route_parse(row: tuple) -> StructuredJob: + source, chat_id, message_id, content, message_date = row + raw_content = content or "" + body_text = preprocess_body_text(strip_meta_lines(raw_content)) + + if source == "@DeJob_official": + return parse_dejob_official( + source, chat_id, message_id, message_date, body_text, raw_content + ) + + return parse_generic(source, chat_id, message_id, message_date, body_text, raw_content) + + +def upsert_structured(conn, item: StructuredJob): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO structured_jobs ( + source, source_channel, parser_name, parser_version, chat_id, message_id, + message_date, job_type, company_name, industry_tags_json, company_intro, + company_url, work_mode, job_nature, job_location_text, job_location_tags_json, employment_type_raw, + position_name, position_tags_json, + salary_raw, salary_currency, salary_min, salary_max, salary_period, + responsibilities_json, requirements_json, apply_email, apply_telegram, + job_source_url, body_text, raw_content + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + source_channel=VALUES(source_channel), + parser_name=VALUES(parser_name), + parser_version=VALUES(parser_version), + chat_id=VALUES(chat_id), + message_date=VALUES(message_date), + job_type=VALUES(job_type), + company_name=VALUES(company_name), + industry_tags_json=VALUES(industry_tags_json), + company_intro=VALUES(company_intro), + company_url=VALUES(company_url), + work_mode=VALUES(work_mode), + job_nature=VALUES(job_nature), + job_location_text=VALUES(job_location_text), + job_location_tags_json=VALUES(job_location_tags_json), + employment_type_raw=VALUES(employment_type_raw), + position_name=VALUES(position_name), + position_tags_json=VALUES(position_tags_json), + salary_raw=VALUES(salary_raw), + salary_currency=VALUES(salary_currency), + salary_min=VALUES(salary_min), + salary_max=VALUES(salary_max), + salary_period=VALUES(salary_period), + responsibilities_json=VALUES(responsibilities_json), + requirements_json=VALUES(requirements_json), + apply_email=VALUES(apply_email), + apply_telegram=VALUES(apply_telegram), + job_source_url=VALUES(job_source_url), + body_text=VALUES(body_text), + raw_content=VALUES(raw_content), + cleaned_at=CURRENT_TIMESTAMP + """, + ( + item.source, + item.source_channel, + item.parser_name, + item.parser_version, + item.chat_id, + item.message_id, + item.message_date, + item.job_type, + item.company_name, + json.dumps(item.industry_tags, ensure_ascii=False), + item.company_intro, + item.company_url, + item.work_mode, + item.job_nature, + item.job_location_text, + json.dumps(item.job_location_tags, ensure_ascii=False) + if item.job_location_tags is not None + else None, + item.employment_type_raw, + item.position_name, + json.dumps(item.position_tags, ensure_ascii=False), + item.salary_raw, + item.salary_currency, + item.salary_min, + item.salary_max, + item.salary_period, + json.dumps(item.responsibilities, ensure_ascii=False), + json.dumps(item.requirements, ensure_ascii=False), + item.apply_email, + item.apply_telegram, + item.job_source_url, + item.body_text, + item.raw_content, + ), + ) + + +def is_recruitment_job(item: StructuredJob) -> bool: + return item.job_type == "招聘" + + +def get_last_processed_row_id(conn, pipeline_name: str) -> int: + with conn.cursor() as cur: + cur.execute( + "SELECT COALESCE(last_message_row_id, 0) FROM clean_state WHERE pipeline_name=%s", + (pipeline_name,), + ) + row = cur.fetchone() + return int(row[0]) if row else 0 + + +def set_last_processed_row_id(conn, pipeline_name: str, row_id: int): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO clean_state (pipeline_name, last_message_row_id, updated_at) + VALUES (%s, %s, NOW()) + ON DUPLICATE KEY UPDATE + last_message_row_id=VALUES(last_message_row_id), + updated_at=NOW() + """, + (pipeline_name, row_id), + ) + + +def main(): + mysql_cfg = load_mysql_config() + conn = connect_mysql(mysql_cfg) + + try: + init_target_db(conn) + last_row_id = get_last_processed_row_id(conn, PIPELINE_NAME) + logger.info(f"增量清洗起点 messages.id > {last_row_id}") + + with conn.cursor() as src_cur: + src_cur.execute( + """ + SELECT id, source, chat_id, message_id, content, date + FROM messages + WHERE id > %s + ORDER BY id ASC + """, + (last_row_id,), + ) + rows = src_cur.fetchall() + + processed = 0 + inserted = 0 + skipped_non_recruit = 0 + by_parser = {} + max_row_id = last_row_id + + for row in rows: + row_id, source, chat_id, message_id, content, message_date = row + item = route_parse((source, chat_id, message_id, content, message_date)) + processed += 1 + by_parser[item.parser_name] = by_parser.get(item.parser_name, 0) + 1 + if row_id > max_row_id: + max_row_id = row_id + + if not is_recruitment_job(item): + skipped_non_recruit += 1 + continue + + upsert_structured(conn, item) + inserted += 1 + + if processed % 500 == 0: + logger.info( + f"[clean] processed={processed}, inserted={inserted}, skipped_non_recruit={skipped_non_recruit}" + ) + + if max_row_id > last_row_id: + set_last_processed_row_id(conn, PIPELINE_NAME, max_row_id) + logger.info(f"更新检查点 last_message_row_id={max_row_id}") + + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM structured_jobs") + total = cur.fetchone()[0] + + logger.info( + "[done] " + f"structured_jobs={total}, inserted={inserted}, skipped_non_recruit={skipped_non_recruit}, " + f"target=mysql.structured_jobs, parsers={by_parser}" + ) + if processed == 0: + logger.info("无新增消息,清洗完成") + except Exception: + logger.exception("清洗任务失败") + raise + finally: + conn.close() + + +if __name__ == "__main__": + main() diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..186a193 --- /dev/null +++ b/config.example.json @@ -0,0 +1,27 @@ +{ + "sources": [ + "@DeJob_Global_group", + "@DeJob_official", + "@cryptojobslist", + "@remote_cn" + ], + "time_window": { + "enabled": true, + "start": "2026-02-25", + "end": "2026-02-26" + }, + "daily_window_days": 2, + "throttle": { + "enabled": true, + "per_message_delay_sec": 0.05, + "between_sources_delay_sec": 3.0 + }, + "mysql": { + "host": "127.0.0.1", + "port": 3306, + "user": "jobs_user", + "password": "CHANGE_ME", + "database": "jobs", + "charset": "utf8mb4" + } +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..7a1d2f3 --- /dev/null +++ b/main.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Telegram Multi-Source Scraper (MySQL) +------------------------------------ +支持多个群组/频道作为数据源,统一入 MySQL 并带来源标识。 +时间窗口和频率控制都从 config.json 读取。 +""" + +import asyncio +import json +import logging +import os +import re +from datetime import datetime, timezone + +import pymysql +from telethon import TelegramClient + +# ======================= +# 配置区域 +# ======================= +API_ID = 30954205 +API_HASH = "cc422183f2ed2233dfa05ea7a25abe3f" +SESSION_NAME = "scraper" +CONFIG_FILE = "config.json" + +DEFAULT_SOURCES = ["@DeJob_Global_group"] +INITIAL_BACKFILL_LIMIT = None +PHONE_RE = re.compile(r"(? logging.Logger: + os.makedirs("logs", exist_ok=True) + + logger = logging.getLogger("scraper") + logger.setLevel(logging.INFO) + + if logger.handlers: + return logger + + formatter = logging.Formatter( + "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + file_handler = logging.FileHandler("logs/app.log", encoding="utf-8") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +logger = setup_logger() + + +# ======================= +# 配置读取 +# ======================= +def parse_datetime(raw: str, *, is_end: bool = False) -> datetime: + raw = raw.strip() + + if "T" in raw or "+" in raw or raw.endswith("Z"): + dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) + elif len(raw) == 10: + dt = datetime.strptime(raw, "%Y-%m-%d") + if is_end: + dt = dt.replace(hour=23, minute=59, second=59) + else: + dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") + + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + + return dt + + +def load_runtime_config() -> tuple[list[str], datetime | None, datetime | None, dict, dict]: + if not os.path.exists(CONFIG_FILE): + raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") + + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + cfg = json.load(f) + + sources = cfg.get("sources", DEFAULT_SOURCES) + if not isinstance(sources, list): + raise ValueError("配置错误: sources 必须是数组") + sources = [str(s).strip() for s in sources if str(s).strip()] + if not sources: + raise ValueError("配置错误: sources 不能为空") + + window = cfg.get("time_window", {}) + if not isinstance(window, dict): + raise ValueError("配置错误: time_window 必须是对象") + + enabled = bool(window.get("enabled", False)) + start_raw = str(window.get("start", "") or "").strip() + end_raw = str(window.get("end", "") or "").strip() + + if enabled: + start_dt = parse_datetime(start_raw, is_end=False) if start_raw else None + end_dt = parse_datetime(end_raw, is_end=True) if end_raw else None + else: + start_dt = None + end_dt = None + + if start_dt and end_dt and start_dt > end_dt: + raise ValueError("配置错误: time_window.start 不能晚于 time_window.end") + + throttle = cfg.get("throttle", {}) + if not isinstance(throttle, dict): + raise ValueError("配置错误: throttle 必须是对象") + + throttle_cfg = { + "enabled": bool(throttle.get("enabled", True)), + "per_message_delay_sec": float(throttle.get("per_message_delay_sec", 0.35)), + "between_sources_delay_sec": float(throttle.get("between_sources_delay_sec", 2.0)), + } + + mysql_cfg = cfg.get("mysql", {}) + if not isinstance(mysql_cfg, dict): + raise ValueError("配置错误: mysql 必须是对象") + + mysql_final = { + "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), + "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), + "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), + "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), + "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), + "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), + } + + if not mysql_final["password"]: + raise ValueError("配置错误: mysql.password 不能为空") + + return sources, start_dt, end_dt, throttle_cfg, mysql_final + + +# ======================= +# MySQL 存储 +# ======================= +class MySQLStore: + def __init__(self, cfg: dict): + self.cfg = cfg + self.conn: pymysql.connections.Connection | None = None + + def connect(self): + self.conn = pymysql.connect( + host=self.cfg["host"], + port=self.cfg["port"], + user=self.cfg["user"], + password=self.cfg["password"], + database=self.cfg["database"], + charset=self.cfg["charset"], + autocommit=True, + ) + + def close(self): + if self.conn: + self.conn.close() + self.conn = None + + def _cursor(self): + if not self.conn: + raise RuntimeError("数据库未连接") + return self.conn.cursor() + + def init_db(self): + with self._cursor() as cursor: + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS messages ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + source VARCHAR(255) NOT NULL, + chat_id BIGINT NULL, + message_id BIGINT NOT NULL, + content LONGTEXT, + date DATETIME NOT NULL, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_source_message (source, message_id), + KEY idx_source_date (source, date) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS sync_state ( + source VARCHAR(255) PRIMARY KEY, + last_message_id BIGINT NOT NULL DEFAULT 0, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + logger.info("MySQL 数据库初始化完成") + + def get_last_message_id(self, source: str) -> int: + with self._cursor() as cursor: + cursor.execute( + "SELECT COALESCE(last_message_id, 0) FROM sync_state WHERE source = %s", + (source,), + ) + row = cursor.fetchone() + return int(row[0]) if row else 0 + + def set_last_message_id(self, source: str, message_id: int): + with self._cursor() as cursor: + cursor.execute( + """ + INSERT INTO sync_state (source, last_message_id, updated_at) + VALUES (%s, %s, NOW()) + ON DUPLICATE KEY UPDATE + last_message_id = VALUES(last_message_id), + updated_at = NOW() + """, + (source, message_id), + ) + + def save_message( + self, source: str, chat_id: int, message_id: int, content: str, date_str: str + ) -> bool: + with self._cursor() as cursor: + cursor.execute( + """ + INSERT IGNORE INTO messages (source, chat_id, message_id, content, date) + VALUES (%s, %s, %s, %s, %s) + """, + (source, chat_id, message_id, content, date_str), + ) + return cursor.rowcount == 1 + + +# ======================= +# 内容解析 +# ======================= +def _collect_text_values(obj, out): + if obj is None: + return + if isinstance(obj, str): + value = obj.strip() + if value: + out.add(value) + return + if isinstance(obj, dict): + for v in obj.values(): + _collect_text_values(v, out) + return + if isinstance(obj, (list, tuple, set)): + for item in obj: + _collect_text_values(item, out) + + +def _to_safe_dict(obj): + if obj is None: + return None + if hasattr(obj, "to_dict"): + try: + return obj.to_dict() + except Exception: + return {"raw": str(obj)} + return {"raw": str(obj)} + + +def build_message_content(message) -> str: + text = (message.text or message.message or "").strip() + + parts = [] + media = message.media + action = message.action + media_dict = _to_safe_dict(media) + action_dict = _to_safe_dict(action) + + if text: + parts.append(text) + if media: + parts.append(f"[MEDIA_TYPE] {type(media).__name__}") + parts.append( + "[MEDIA_JSON] " + json.dumps(media_dict, ensure_ascii=False, default=str) + ) + if action: + parts.append(f"[ACTION_TYPE] {type(action).__name__}") + parts.append( + "[ACTION_JSON] " + json.dumps(action_dict, ensure_ascii=False, default=str) + ) + + phones = set() + if media and hasattr(media, "phone_number") and media.phone_number: + phones.add(str(media.phone_number)) + + for raw in ( + json.dumps(media_dict, ensure_ascii=False, default=str), + json.dumps(action_dict, ensure_ascii=False, default=str), + ): + if raw and raw != "null": + for hit in PHONE_RE.findall(raw): + clean = re.sub(r"[^\d+]", "", hit) + if clean: + phones.add(clean) + + if phones: + parts.append("phones=" + ",".join(sorted(phones))) + + extracted_texts = set() + _collect_text_values(media_dict, extracted_texts) + _collect_text_values(action_dict, extracted_texts) + if extracted_texts: + parts.append("[MEDIA_TEXT] " + " | ".join(sorted(extracted_texts))) + + if not parts: + parts.append("empty_message") + + return "\n".join(parts) + + +# ======================= +# 抓取逻辑 +# ======================= +def _normalize_source_key(entity, raw_source: str) -> str: + username = getattr(entity, "username", None) + if username: + return f"@{username}" + return raw_source + + +async def scrape_one_source( + client, + store: MySQLStore, + raw_source: str, + start_dt: datetime | None, + end_dt: datetime | None, + throttle_cfg: dict, +): + try: + entity = await client.get_entity(raw_source) + except Exception as e: + logger.error(f"[{raw_source}] 获取实体失败: {e}") + return + + source_key = _normalize_source_key(entity, raw_source) + chat_id = int(getattr(entity, "id", 0) or 0) + + logger.info(f"[{source_key}] 开始抓取") + + scanned = 0 + inserted = 0 + max_seen_id = 0 + + window_mode = bool(start_dt or end_dt) + use_throttle = bool(throttle_cfg.get("enabled", True)) + per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0)) + + if window_mode: + logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)") + iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT) + else: + last_id = store.get_last_message_id(source_key) + logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始") + iterator = client.iter_messages(entity, min_id=last_id, reverse=True) + + async for message in iterator: + scanned += 1 + message_dt = message.date.astimezone(timezone.utc) + + if window_mode: + if end_dt and message_dt > end_dt: + continue + if start_dt and message_dt < start_dt: + break + + msg_id = int(message.id) + msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S") + content = build_message_content(message) + + if store.save_message(source_key, chat_id, msg_id, content, msg_date): + inserted += 1 + + if msg_id > max_seen_id: + max_seen_id = msg_id + + if scanned % 200 == 0: + logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条") + + if use_throttle and per_message_delay > 0: + await asyncio.sleep(per_message_delay) + + if not window_mode and max_seen_id > 0: + old_last = store.get_last_message_id(source_key) + if max_seen_id > old_last: + store.set_last_message_id(source_key, max_seen_id) + + logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条") + + +async def run_scraper( + sources: list[str], + start_dt: datetime | None, + end_dt: datetime | None, + throttle_cfg: dict, + store: MySQLStore, +): + client = TelegramClient(SESSION_NAME, API_ID, API_HASH) + + try: + await client.start() + logger.info("Telegram 客户端启动成功") + except Exception as e: + logger.error(f"Telegram 登录失败: {e}") + return + + use_throttle = bool(throttle_cfg.get("enabled", True)) + between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0)) + + for idx, source in enumerate(sources): + await scrape_one_source(client, store, source, start_dt, end_dt, throttle_cfg) + + if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1: + logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控") + await asyncio.sleep(between_sources_delay) + + await client.disconnect() + + +# ======================= +# 主程序入口 +# ======================= +def main(): + sources, start_dt, end_dt, throttle_cfg, mysql_cfg = load_runtime_config() + + logger.info("程序启动") + logger.info(f"本次数据源: {sources}") + if start_dt or end_dt: + logger.info(f"时间窗口(UTC): start={start_dt}, end={end_dt}") + logger.info( + "频率控制: " + f"enabled={throttle_cfg['enabled']}, " + f"per_message_delay_sec={throttle_cfg['per_message_delay_sec']}, " + f"between_sources_delay_sec={throttle_cfg['between_sources_delay_sec']}" + ) + + store = MySQLStore(mysql_cfg) + store.connect() + try: + store.init_db() + asyncio.run(run_scraper(sources, start_dt, end_dt, throttle_cfg, store)) + finally: + store.close() + + logger.info("程序结束") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e457885 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[project] +name = "jobs_robots" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [ + "telethon>=1.42.0", + "pymysql>=1.1.1", + "requests>=2.32.0", + "beautifulsoup4>=4.12.0", +] diff --git a/run_daily_incremental.sh b/run_daily_incremental.sh new file mode 100755 index 0000000..ee2f3bf --- /dev/null +++ b/run_daily_incremental.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +set -euo pipefail + +PROJECT_DIR="/home/liam/code/python/jobs_robots" +LOG_DIR="$PROJECT_DIR/logs" +LOCK_FILE="$PROJECT_DIR/.daily_job.lock" +TS="$(date '+%Y-%m-%d %H:%M:%S')" + +mkdir -p "$LOG_DIR" + +# Prevent overlap if previous run is still active. +exec 9>"$LOCK_FILE" +if ! flock -n 9; then + echo "[$TS] another job is running, exit" >> "$LOG_DIR/daily_job.log" + exit 0 +fi + +cd "$PROJECT_DIR" + +echo "[$TS] daily job start" >> "$LOG_DIR/daily_job.log" + +# Auto-advance time window to a rolling daily range. +.venv/bin/python - <<'PY' +import json +from datetime import datetime, timezone, timedelta + +cfg_path = 'config.json' +with open(cfg_path, 'r', encoding='utf-8') as f: + cfg = json.load(f) + +window = cfg.setdefault('time_window', {}) +if window.get('enabled', False): + days = int(cfg.get('daily_window_days', 1) or 1) + if days < 1: + days = 1 + end_dt = datetime.now(timezone.utc).date() + start_dt = end_dt - timedelta(days=days - 1) + window['start'] = start_dt.strftime('%Y-%m-%d') + window['end'] = end_dt.strftime('%Y-%m-%d') + +with open(cfg_path, 'w', encoding='utf-8') as f: + json.dump(cfg, f, ensure_ascii=False, indent=2) + f.write('\n') + +print( + "updated time_window: " + f"start={window.get('start')} end={window.get('end')} " + f"daily_window_days={cfg.get('daily_window_days', 1)}" +) +PY + +# 1) Crawl TG incremental +uv run main.py >> "$LOG_DIR/daily_job.log" 2>&1 + +# 2) Clean dejob_official and others into structured table +uv run clean_to_structured.py >> "$LOG_DIR/daily_job.log" 2>&1 + +echo "[$(date '+%Y-%m-%d %H:%M:%S')] daily job done" >> "$LOG_DIR/daily_job.log"