diff --git a/README.md b/README.md index bbf162d..778f339 100644 --- a/README.md +++ b/README.md @@ -1,241 +1,95 @@ # jobs_robots -本项目用于采集 Telegram 招聘数据并进行结构化清洗,当前统一使用 MySQL。 +Telegram 招聘数据采集与清洗项目,当前主流程为: -## 1. 当前流程 +1. 抓取原始消息到本地 MySQL +2. 清洗为结构化岗位数据 +3. 每日定时增量执行 +4. 同步本地 MySQL 到云端 MySQL -1. `main.py` -- 从 `config.json` 读取数据源、时间窗口、限频、MySQL 配置。 -- 爬取 Telegram 消息,写入 MySQL `messages`。 -- 维护每个来源的增量游标到 `sync_state`。 +## 1. 项目结构 -2. `clean_to_structured.py` -- 从 MySQL `messages` 增量读取新增消息(按 `messages.id` + `clean_state` 检查点)。 -- 按来源规则清洗(`@DeJob_official` 有专用规则,其他走通用规则)。 -- 仅保留招聘类数据,写入 MySQL `structured_jobs`。 +- `main.py`: Telegram 增量爬取,写入 `messages`,维护 `sync_state` +- `clean_to_structured.py`: 按来源规则清洗,写入 `structured_jobs`,维护 `clean_state` +- `import_excel_jobs.py`: 读取 `sheets/` Excel,导入结构化数据,实习数据落 `internship_jobs_raw` +- `sync_to_cloud_mysql.py`: 本地 MySQL -> 云端 MySQL 增量同步 +- `run_daily_incremental.sh`: 每日调度入口(滚动窗口、抓取、清洗、云同步) +- `config.json`: 运行配置(本地使用) +- `config.example.json`: 配置模板 -3. `run_daily_incremental.sh` -- 每日调度入口。 -- 运行前自动更新 `config.json` 时间窗口(滚动窗口)。 -- 依次执行 `main.py` 和 `clean_to_structured.py`。 +## 2. 环境要求 -## 2. 数据库表 +- Python `>=3.13` +- MySQL 8.x(本地) +- MySQL 8.x(云端,可选) +- 已完成 Telethon 登录(项目目录下会生成 `scraper.session`) -### 2.1 原始层 +依赖安装: -- `messages` - - 原始消息存储(`source + message_id` 唯一) -- `sync_state` - - Telegram 增量抓取游标 +```bash +uv sync +``` -### 2.2 清洗层 +## 3. 配置说明 -- `structured_jobs` - - 结构化岗位数据(`source + message_id` 唯一) -- `clean_state` - - 清洗增量检查点(`pipeline_name -> last_message_row_id`) +先复制模板并修改: -## 2.3 字段级数据字典(详细) - -### messages(Telegram 原始消息) - -- `id`(BIGINT, PK, 自增) - 含义:MySQL 行主键,清洗增量检查点使用这个字段。 - 示例:`530812` - -- `source`(VARCHAR) - 含义:消息来源标识(频道/群组),通常是 `@xxx`。 - 示例:`@DeJob_official` - -- `chat_id`(BIGINT, 可空) - 含义:Telegram 实体 ID。 - 示例:`-1001234567890` - -- `message_id`(BIGINT) - 含义:该 source 内部的消息 ID。 - 约束:与 `source` 组成唯一键。 - -- `content`(LONGTEXT, 可空) - 含义:抓取到的消息正文(含非文本补充段,如 `MEDIA_JSON`)。 - 示例:招聘 markdown 文本 + `[MEDIA_TYPE] ...` - -- `date`(DATETIME) - 含义:消息时间(UTC)。 - 示例:`2026-02-26 09:31:10` - -- `created_at`(DATETIME) - 含义:该条记录写入数据库时间。 - -### sync_state(抓取增量状态) - -- `source`(VARCHAR, PK) - 含义:来源标识,与 `messages.source` 对应。 - -- `last_message_id`(BIGINT) - 含义:该来源已抓取到的最大 message_id。 - 用途:下次抓取时只拉 `message_id > last_message_id`。 - -- `updated_at`(DATETIME) - 含义:该来源游标最近更新时间。 - -### structured_jobs(清洗后结构化岗位) - -- `id`(BIGINT, PK, 自增) - 含义:结构化表主键。 - -- `source`(VARCHAR) - 含义:来源标识。 - 示例:`@DeJob_official` - -- `source_channel`(VARCHAR, 可空) - 含义:来源品牌/渠道归类。 - 示例:`DeJob` - -- `parser_name`(VARCHAR) - 含义:使用的解析器名称。 - 示例:`dejob_official` / `generic` - -- `parser_version`(VARCHAR) - 含义:解析器版本号,用于规则演进追踪。 - 示例:`v1` - -- `chat_id`(BIGINT, 可空) - 含义:原始 Telegram chat_id。 - -- `message_id`(BIGINT) - 含义:原始消息 ID(source 内)。 - 约束:与 `source` 组成唯一键。 - -- `message_date`(DATETIME) - 含义:原始消息时间(UTC)。 - -- `job_type`(VARCHAR, 可空) - 含义:岗位类型标记。当前仅保留 `招聘`。 - 示例:`招聘` - -- `company_name`(VARCHAR, 可空) - 含义:公司/项目方名称。 - 示例:`88EX` - -- `industry_tags_json`(JSON) - 含义:行业/赛道标签数组。 - 示例:`[\"CEX\",\"Infra\"]` - -- `company_intro`(LONGTEXT, 可空) - 含义:公司简介文本。 - -- `company_url`(TEXT, 可空) - 含义:公司官网/介绍页链接。 - -- `work_mode`(VARCHAR) - 含义:办公模式。 - 枚举:`remote | onsite | hybrid | unknown` - -- `job_nature`(VARCHAR) - 含义:用工性质。 - 枚举:`full_time | part_time | contract | intern | freelance | unknown` - -- `job_location_text`(VARCHAR, 可空) - 含义:主地点文本(首个地点)。 - -- `job_location_tags_json`(JSON, 可空) - 含义:地点标签数组。无地点时为 `NULL`(不是空数组)。 - -- `employment_type_raw`(TEXT, 可空) - 含义:原始“合作方式”文本,便于回溯规则。 - 示例:`🛵 合作方式:#全职 #远程 #吉隆坡` - -- `position_name`(VARCHAR, 可空) - 含义:岗位主名称。 - 示例:`社区运营` - -- `position_tags_json`(JSON) - 含义:岗位标签数组。 - 示例:`[\"社区运营\",\"运营\"]` - -- `salary_raw`(TEXT, 可空) - 含义:薪资原始字符串。 - 示例:`$1000 - $3000 / month` - -- `salary_currency`(VARCHAR, 可空) - 含义:薪资币种(已识别)。 - 示例:`USD` - -- `salary_min`(BIGINT, 可空) - 含义:薪资下限数值。 - -- `salary_max`(BIGINT, 可空) - 含义:薪资上限数值。 - -- `salary_period`(VARCHAR, 可空) - 含义:薪资周期。 - 枚举:`month | year | day | NULL` - -- `responsibilities_json`(JSON) - 含义:岗位职责数组(按条目拆分)。 - -- `requirements_json`(JSON) - 含义:岗位要求数组(按条目拆分)。 - -- `apply_email`(VARCHAR, 可空) - 含义:投递邮箱。 - -- `apply_telegram`(VARCHAR, 可空) - 含义:投递 Telegram 用户名。 - 示例:`@lulu_lucky1` - -- `job_source_url`(TEXT, 可空) - 含义:岗位来源原文链接(如 DeJob 详情页)。 - -- `body_text`(LONGTEXT) - 含义:清洗后的主体文本(去除部分技术元段)。 - -- `raw_content`(LONGTEXT) - 含义:原始消息内容快照(用于审计/回刷)。 - -- `cleaned_at`(DATETIME) - 含义:最近清洗/更新该条结构化记录的时间。 - -### clean_state(清洗增量状态) - -- `pipeline_name`(VARCHAR, PK) - 含义:清洗流程标识。 - 示例:`structured_cleaner_v1` - -- `last_message_row_id`(BIGINT) - 含义:已处理到的 `messages.id` 最大值。 - 用途:下次清洗只处理更大的 `messages.id`。 - -- `updated_at`(DATETIME) - 含义:检查点更新时间。 - -## 3. 配置文件说明(config.json) +```bash +cp config.example.json config.json +``` 关键字段: -- `sources`: Telegram 来源列表 -- `time_window.enabled`: 是否启用时间窗口 -- `time_window.start` / `time_window.end`: 抓取窗口(脚本会每日自动刷新) -- `daily_window_days`: 滚动窗口天数(当前默认 `2`) -- `throttle`: 限频配置 - - `enabled` - - `per_message_delay_sec` - - `between_sources_delay_sec` -- `mysql`: MySQL 连接配置 - - `host`, `port`, `user`, `password`, `database`, `charset` +- `sources`: 要抓取的 Telegram 来源列表 +- `time_window`: 抓取时间窗口 +- `daily_window_days`: 每日滚动窗口天数(默认 `2`) +- `backfill`: 回补配置 +- `throttle`: 限频配置,降低封号风险 +- `mysql`: 本地 MySQL 连接 +- `mysql_cloud`: 云端 MySQL 连接(用于同步) ## 4. 运行方式 -### 4.1 手动运行 +### 4.1 手动执行 ```bash uv run main.py uv run clean_to_structured.py +uv run sync_to_cloud_mysql.py ``` -### 4.2 每日定时(推荐) +如果在 cron/非交互环境,建议用 venv Python: -脚本:`run_daily_incremental.sh` +```bash +.venv/bin/python main.py +.venv/bin/python clean_to_structured.py +.venv/bin/python sync_to_cloud_mysql.py +``` + +### 4.2 Excel 导入 + +默认读取 `sheets/` 下文件: + +```bash +uv run import_excel_jobs.py +``` + +指定文件/工作表: + +```bash +uv run import_excel_jobs.py --file /path/to/jobs.xlsx --sheet Sheet1 --source @excel_import +``` + +导入规则: + +- 普通岗位:清洗后写入 `structured_jobs` +- 实习岗位:写入 `internship_jobs_raw`,不进入结构化主表 + +### 4.3 每日定时(推荐) + +调度脚本: + +- `/home/liam/code/python/jobs_robots/run_daily_incremental.sh` 示例 crontab(每天 01:10): @@ -243,49 +97,114 @@ uv run clean_to_structured.py 10 1 * * * /home/liam/code/python/jobs_robots/run_daily_incremental.sh ``` -日志文件: +脚本执行顺序: -- `logs/app.log` -- `logs/clean_to_structured.log` -- `logs/daily_job.log` +1. 自动更新 `config.json` 的 `time_window.start/end`(按 `daily_window_days`) +2. 运行 `main.py` 增量抓取 +3. 运行 `clean_to_structured.py` 增量清洗 +4. 若 `mysql_cloud` 已配置,运行 `sync_to_cloud_mysql.py` 同步云端 -## 5. 增量策略 +## 5. 增量与回补策略 -### Telegram 抓取增量 +### 5.1 抓取增量 -- 依据 `sync_state.last_message_id` -- 每个来源独立增量 +- 状态表:`sync_state` +- 游标字段:`last_message_id` +- 粒度:每个 source 独立 -### 清洗增量 +### 5.2 清洗增量 -- 依据 `clean_state.last_message_row_id` -- 每次只处理 `messages.id > checkpoint` -- 成功后更新 checkpoint +- 状态表:`clean_state` +- 游标字段:`last_message_row_id`(对应 `messages.id`) +- 规则:仅处理 `messages.id > checkpoint` -## 6. 字段约定(结构化就业类型) +### 5.3 回补(Backfill) -`structured_jobs` 使用拆分字段,不再依赖 `employment_type_json`: +在 `config.json` 设置: -- `work_mode`: `remote | onsite | hybrid | unknown` -- `job_nature`: `full_time | part_time | contract | intern | freelance | unknown` -- `job_location_text`: 主地点文本 -- `job_location_tags_json`: 地点数组(无地点为 `NULL`) -- `employment_type_raw`: 原始“合作方式”行 +- `backfill.enabled = true` +- `backfill.start / backfill.end` +- `backfill.sources` +- `backfill.ignore_sync_state`(回补时是否忽略抓取游标) -## 7. 常见问题 +回补结束后建议关闭 `backfill.enabled`,恢复日常增量。 -1. 为什么当天凌晨跑出来窗口看着不对? -- 当前滚动窗口按 UTC 日期更新。如果需要按本地时区(如 Asia/Shanghai)可再改。 +## 6. 本地到云端同步 -2. 为什么清洗没有新增? -- 看 `clean_state` 检查点是否已经到最新。 -- 看 `messages` 是否有新数据。 +脚本:`sync_to_cloud_mysql.py` -3. 为什么 MySQL 报字段超长/类型错误? -- 优先看对应脚本日志,字段已做多数保护;若仍报错,保留错误堆栈并反馈。 +同步规则: -## 8. 协作建议 +- `messages`: 按本地 `id` 增量,云端按 `(source, message_id)` upsert +- `structured_jobs`: 按本地 `id` 增量 + `cleaned_at` 补偿更新 +- `sync_state` / `clean_state`: 小表全量 upsert +- `internship_jobs_raw`: 存在则按 `id` 增量 upsert -- 改规则时优先只改对应来源 parser,避免影响全局。 -- 改字段前先确认 `structured_jobs` 兼容性与迁移策略。 -- 所有定时行为以 `run_daily_incremental.sh` 为统一入口,避免多处调度冲突。 +状态表(云端): + +- `cloud_sync_state` + +注意: + +- 同步脚本会自动在云端补齐缺失目标表(从本地表结构复制 DDL) +- `mysql_cloud` 未配置时,日常脚本会跳过云同步 + +## 7. 数据库表与字段含义 + +### 7.1 原始层 + +- `messages` + - 原始消息正文、媒体补充文本、来源、消息时间 + - 唯一键:`(source, message_id)` +- `sync_state` + - 每个 source 的抓取游标 + +### 7.2 清洗层 + +- `structured_jobs` + - 清洗后结构化岗位数据 + - 唯一键:`(source, message_id)` + - 关键字段: + - `source`, `source_channel` + - `company_name`, `position_name` + - `work_mode`(`remote|onsite|hybrid|unknown`) + - `job_nature`(`full_time|part_time|contract|intern|freelance|unknown`) + - `job_location_text`, `job_location_tags_json`(无地点为 `NULL`) + - `apply_email`, `apply_telegram`, `job_source_url` + - `salary_raw`, `salary_currency`, `salary_min`, `salary_max`, `salary_period` + - `body_text`, `raw_content`, `cleaned_at` +- `clean_state` + - 清洗检查点 +- `internship_jobs_raw` + - Excel 导入时保留的实习原始数据 + +## 8. 日志 + +- `logs/app.log`: 抓取日志 +- `logs/clean_to_structured.log`: 清洗日志 +- `logs/sync_to_cloud_mysql.log`: 云同步日志 +- `logs/daily_job.log`: 每日调度总日志 + +## 9. 常见问题 + +1. `uv: command not found`(cron) +- 使用 `.venv/bin/python` 运行,已在 `run_daily_incremental.sh` 中处理。 + +2. `Table 'jobs.messages' doesn't exist`(云同步) +- 云端目标库为空。新版同步脚本会自动建表后再同步。 + +3. `Public Key Retrieval is not allowed`(DBeaver 连 MySQL) +- 连接参数添加 `allowPublicKeyRetrieval=true&useSSL=false`(排障用)。 + +4. `ERROR 1410 You are not allowed to create a user with GRANT` +- 先 `CREATE USER`,再 `GRANT`,不要用旧式 `GRANT ... IDENTIFIED BY ...`。 + +5. 清洗无新增 +- 检查 `messages` 是否有新数据。 +- 检查 `clean_state.last_message_row_id` 是否已到最新。 + +## 10. 协作规范建议 + +- 新增来源规则时,优先增加 source 专用 parser,避免影响已有来源。 +- 结构字段变更前,先确认 `structured_jobs` 迁移策略和历史兼容。 +- 定时任务统一走 `run_daily_incremental.sh`,避免多个入口重复执行。 diff --git a/clean_to_structured.py b/clean_to_structured.py index 3f39cda..0be6ddf 100644 --- a/clean_to_structured.py +++ b/clean_to_structured.py @@ -362,6 +362,81 @@ def extract_apply_telegram(body_text: str) -> str | None: return handles[0] if handles else None +def extract_urls(body_text: str) -> list[str]: + return dedupe(URL_RE.findall(body_text)) + + +def extract_first_url_by_keyword(body_text: str, keywords: list[str]) -> str | None: + urls = extract_urls(body_text) + for u in urls: + lu = u.lower() + if any(k.lower() in lu for k in keywords): + return u + return None + + +def extract_first_nonempty_line(body_text: str) -> str | None: + for ln in body_text.splitlines(): + t = clean_md_text(ln) + if t: + return t + return None + + +def normalize_possible_url(raw: str) -> str | None: + token = clean_md_text(raw or "") + if not token: + return None + token = token.strip("()[]<>.,;\"' ") + if not token: + return None + if token.lower().startswith(("http://", "https://")): + return token + if token.lower().startswith("www."): + return "https://" + token + # simple domain-style fallback, e.g. company.com/apply + if " " not in token and "." in token and "/" in token: + return "https://" + token + if " " not in token and re.fullmatch(r"[A-Za-z0-9.-]+\.[A-Za-z]{2,}", token): + return "https://" + token + return None + + +def extract_apply_link(body_text: str) -> str | None: + # Priority 1: explicit apply-like lines. + for ln in body_text.splitlines(): + low = ln.lower() + if "apply" not in low and "申请" not in ln and "投递" not in ln: + continue + + # direct URL in line + line_urls = URL_RE.findall(ln) + if line_urls: + return line_urls[0] + + # try parse right side after ':' / '-' + if ":" in ln: + rhs = ln.split(":", 1)[1] + elif ":" in ln: + rhs = ln.split(":", 1)[1] + elif "-" in ln: + rhs = ln.split("-", 1)[1] + else: + rhs = ln + + for token in re.split(r"\s+", rhs.strip()): + u = normalize_possible_url(token) + if u: + return u + + # Priority 2: first URL that looks like an apply page. + for u in extract_urls(body_text): + lu = u.lower() + if "apply" in lu or "job" in lu or "careers" in lu: + return u + return None + + def infer_employment_fields( tags: list[str], raw_line: str | None ) -> tuple[str, str, str | None, list[str] | None, str | None]: @@ -615,6 +690,300 @@ def parse_generic( ) +def parse_dejob_global( + source: str, + chat_id: int | None, + message_id: int, + message_date: str, + body_text: str, + raw_content: str, +) -> StructuredJob: + job_type = "招聘" if ("招聘" in body_text or "recruit" in body_text.lower()) else None + + company_name = extract_company_name_dejob(body_text) + + industry_tags = [] + for ln in body_text.splitlines(): + if "🏡" in ln: + norm_ln = normalize_md_line(ln) + industry_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + industry_tags = dedupe([t for t in industry_tags if t]) + break + if not industry_tags: + industry_tags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] + industry_tags = dedupe([h for h in industry_tags if h]) + + cooperation_tags = [] + cooperation_line = None + for ln in body_text.splitlines(): + low = ln.lower() + if "合作方式" in ln or "fulltime" in low or "parttime" in low or "remote" in low: + cooperation_line = ln + norm_ln = normalize_md_line(ln) + cooperation_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + cooperation_tags = dedupe([t for t in cooperation_tags if t]) + break + ( + work_mode, + job_nature, + job_location_text, + job_location_tags, + employment_type_raw, + ) = infer_employment_fields(cooperation_tags, cooperation_line) + + position_tags = [] + for ln in body_text.splitlines(): + if "待招岗位" in ln or "📚" in ln: + norm_ln = normalize_md_line(ln) + position_tags = [ + clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) + ] + position_tags = dedupe([t for t in position_tags if t]) + break + if not position_tags: + position_tags = industry_tags + + position_name = position_tags[0] if position_tags else extract_first_nonempty_line(body_text) + + intro_sec = extract_section(body_text, "Introduction") or extract_section(body_text, "简介") + urls = extract_urls(body_text) + company_url = extract_first_url_by_keyword(body_text, ["dejob.top/jobDetail"]) + if company_url and urls: + for u in urls: + if "dejob.top/jobDetail" not in u: + company_url = u + break + if not company_url: + company_url = extract_first_url(intro_sec) or (urls[0] if urls else None) + + company_intro = None + if intro_sec: + intro_lines = [ln for ln in intro_sec.splitlines() if not URL_RE.search(ln)] + company_intro = clean_md_text("\n".join(intro_lines)) or None + + salary_line = None + for ln in body_text.splitlines(): + if "薪酬" in ln or "salary" in ln.lower(): + salary_line = ln + break + salary_raw, salary_min, salary_max, salary_period = parse_salary(salary_line) + salary_currency = "USD" if salary_raw and "$" in salary_raw else None + + responsibilities = extract_list_section(body_text, "岗位职责") or extract_list_section( + body_text, "Responsibilities" + ) + requirements = extract_list_section(body_text, "岗位要求") or extract_list_section( + body_text, "Requirements" + ) + + apply_email = extract_apply_email(body_text) + apply_tg = extract_apply_telegram(body_text) + job_source_url = extract_first_url_by_keyword(body_text, ["dejob.top/jobDetail"]) + if not job_source_url: + urls = extract_urls(body_text) + job_source_url = urls[0] if urls else None + + return StructuredJob( + source=source, + source_channel="DeJob", + parser_name="dejob_global", + parser_version="v1", + chat_id=chat_id, + message_id=message_id, + message_date=message_date, + job_type=job_type, + company_name=company_name, + industry_tags=industry_tags, + company_intro=company_intro, + company_url=company_url, + work_mode=work_mode, + job_nature=job_nature, + job_location_text=job_location_text, + job_location_tags=job_location_tags, + employment_type_raw=employment_type_raw, + position_name=position_name, + position_tags=position_tags, + salary_raw=salary_raw, + salary_currency=salary_currency, + salary_min=salary_min, + salary_max=salary_max, + salary_period=salary_period, + responsibilities=responsibilities, + requirements=requirements, + apply_email=apply_email, + apply_telegram=apply_tg, + job_source_url=job_source_url or company_url, + body_text=body_text or "empty_message", + raw_content=raw_content, + ) + + +def parse_remote_cn( + source: str, + chat_id: int | None, + message_id: int, + message_date: str, + body_text: str, + raw_content: str, +) -> StructuredJob: + lines = [clean_md_text(ln) for ln in body_text.splitlines() if clean_md_text(ln)] + title = lines[0] if lines else None + + hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] + hashtags = dedupe([h for h in hashtags if h]) + + ( + work_mode, + job_nature, + job_location_text, + job_location_tags, + employment_type_raw, + ) = infer_employment_fields(hashtags, None) + + summary_line = None + for ln in lines: + if ln.startswith("摘要:"): + summary_line = ln + break + salary_raw, salary_min, salary_max, salary_period = parse_salary(summary_line) + salary_currency = "USD" if salary_raw and "$" in salary_raw else None + + urls = extract_urls(body_text) + apply_email = extract_apply_email(body_text) + apply_tg = extract_apply_telegram(body_text) + + # remote_cn often places the detail link right below the title line. + top_url = None + raw_lines = [ln.strip() for ln in body_text.splitlines() if ln.strip()] + for ln in raw_lines[:6]: + found = URL_RE.findall(ln) + if found: + top_url = found[0] + break + + job_source_url = ( + top_url + or extract_first_url_by_keyword(body_text, ["remote-info.cn/jobs/"]) + or (urls[0] if urls else None) + ) + + job_type = "招聘" if ("招聘" in body_text or "job" in body_text.lower()) else None + + return StructuredJob( + source=source, + source_channel="remote_cn", + parser_name="remote_cn", + parser_version="v1", + chat_id=chat_id, + message_id=message_id, + message_date=message_date, + job_type=job_type, + company_name=None, + industry_tags=hashtags, + company_intro=summary_line.replace("摘要:", "", 1).strip() if summary_line else None, + company_url=job_source_url or (urls[0] if urls else None), + work_mode=work_mode, + job_nature=job_nature, + job_location_text=job_location_text, + job_location_tags=job_location_tags, + employment_type_raw=employment_type_raw, + position_name=title, + position_tags=hashtags, + salary_raw=salary_raw, + salary_currency=salary_currency, + salary_min=salary_min, + salary_max=salary_max, + salary_period=salary_period, + responsibilities=[], + requirements=[], + apply_email=apply_email, + apply_telegram=apply_tg, + job_source_url=job_source_url, + body_text=body_text or "empty_message", + raw_content=raw_content, + ) + + +def parse_cryptojobslist_source( + source: str, + chat_id: int | None, + message_id: int, + message_date: str, + body_text: str, + raw_content: str, +) -> StructuredJob: + lines = [clean_md_text(ln) for ln in body_text.splitlines() if clean_md_text(ln)] + title = lines[0] if lines else None + urls = extract_urls(body_text) + hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] + hashtags = dedupe([h for h in hashtags if h]) + + ( + work_mode, + job_nature, + job_location_text, + job_location_tags, + employment_type_raw, + ) = infer_employment_fields(hashtags, None) + + salary_line = None + for ln in lines: + if any(k in ln.lower() for k in ("salary", "$", "usd")): + salary_line = ln + break + salary_raw, salary_min, salary_max, salary_period = parse_salary(salary_line) + salary_currency = "USD" if salary_raw and "$" in salary_raw else None + + apply_email = extract_apply_email(body_text) + apply_tg = extract_apply_telegram(body_text) + apply_link = extract_apply_link(body_text) + job_source_url = ( + apply_link + or extract_first_url_by_keyword(body_text, ["cryptojobslist.com"]) + or (urls[0] if urls else None) + ) + + job_type = "招聘" if ("job" in body_text.lower() or "hiring" in body_text.lower()) else None + + return StructuredJob( + source=source, + source_channel="cryptojobslist", + parser_name="cryptojobslist", + parser_version="v1", + chat_id=chat_id, + message_id=message_id, + message_date=message_date, + job_type=job_type, + company_name=None, + industry_tags=hashtags, + company_intro=None, + company_url=job_source_url or (urls[0] if urls else None), + work_mode=work_mode, + job_nature=job_nature, + job_location_text=job_location_text, + job_location_tags=job_location_tags, + employment_type_raw=employment_type_raw, + position_name=title, + position_tags=hashtags, + salary_raw=salary_raw, + salary_currency=salary_currency, + salary_min=salary_min, + salary_max=salary_max, + salary_period=salary_period, + responsibilities=[], + requirements=[], + apply_email=apply_email, + apply_telegram=apply_tg, + job_source_url=job_source_url, + body_text=body_text or "empty_message", + raw_content=raw_content, + ) + + def route_parse(row: tuple) -> StructuredJob: source, chat_id, message_id, content, message_date = row raw_content = content or "" @@ -624,6 +993,18 @@ def route_parse(row: tuple) -> StructuredJob: return parse_dejob_official( source, chat_id, message_id, message_date, body_text, raw_content ) + if source == "@DeJob_Global_group": + return parse_dejob_global( + source, chat_id, message_id, message_date, body_text, raw_content + ) + if source == "@remote_cn": + return parse_remote_cn( + source, chat_id, message_id, message_date, body_text, raw_content + ) + if source == "@cryptojobslist": + return parse_cryptojobslist_source( + source, chat_id, message_id, message_date, body_text, raw_content + ) return parse_generic(source, chat_id, message_id, message_date, body_text, raw_content) @@ -715,6 +1096,10 @@ def is_recruitment_job(item: StructuredJob) -> bool: return item.job_type == "招聘" +def has_usable_job_link(item: StructuredJob) -> bool: + return bool((item.job_source_url or "").strip()) + + def get_last_processed_row_id(conn, pipeline_name: str) -> int: with conn.cursor() as cur: cur.execute( @@ -763,6 +1148,7 @@ def main(): processed = 0 inserted = 0 skipped_non_recruit = 0 + skipped_no_link = 0 by_parser = {} max_row_id = last_row_id @@ -778,12 +1164,17 @@ def main(): skipped_non_recruit += 1 continue + if not has_usable_job_link(item): + skipped_no_link += 1 + continue + upsert_structured(conn, item) inserted += 1 if processed % 500 == 0: logger.info( - f"[clean] processed={processed}, inserted={inserted}, skipped_non_recruit={skipped_non_recruit}" + f"[clean] processed={processed}, inserted={inserted}, " + f"skipped_non_recruit={skipped_non_recruit}, skipped_no_link={skipped_no_link}" ) if max_row_id > last_row_id: @@ -797,6 +1188,7 @@ def main(): logger.info( "[done] " f"structured_jobs={total}, inserted={inserted}, skipped_non_recruit={skipped_non_recruit}, " + f"skipped_no_link={skipped_no_link}, " f"target=mysql.structured_jobs, parsers={by_parser}" ) if processed == 0: diff --git a/config.example.json b/config.example.json index 186a193..6083768 100644 --- a/config.example.json +++ b/config.example.json @@ -11,6 +11,13 @@ "end": "2026-02-26" }, "daily_window_days": 2, + "backfill": { + "enabled": false, + "start": "", + "end": "", + "sources": [], + "ignore_sync_state": true + }, "throttle": { "enabled": true, "per_message_delay_sec": 0.05, @@ -23,5 +30,13 @@ "password": "CHANGE_ME", "database": "jobs", "charset": "utf8mb4" + }, + "mysql_cloud": { + "host": "CLOUD_DB_HOST", + "port": 3306, + "user": "jobs_user", + "password": "CHANGE_ME", + "database": "jobs", + "charset": "utf8mb4" } } diff --git a/import_excel_jobs.py b/import_excel_jobs.py new file mode 100644 index 0000000..d28bc6f --- /dev/null +++ b/import_excel_jobs.py @@ -0,0 +1,675 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Import external Excel jobs into MySQL. + +Rules: +- Internship rows (实习/intern) are stored into `internship_jobs_raw` only. +- Non-internship rows are normalized and upserted into `structured_jobs`. +""" + +import argparse +import hashlib +import json +import logging +import os +import re +from datetime import date, datetime, timedelta, timezone + +import pymysql +from openpyxl import load_workbook + +CONFIG_FILE = "config.json" +URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE) +EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) + + +def setup_logger() -> logging.Logger: + os.makedirs("logs", exist_ok=True) + logger = logging.getLogger("import_excel_jobs") + logger.setLevel(logging.INFO) + if logger.handlers: + return logger + + fmt = logging.Formatter( + "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + ch = logging.StreamHandler() + ch.setFormatter(fmt) + fh = logging.FileHandler("logs/import_excel_jobs.log", encoding="utf-8") + fh.setFormatter(fmt) + logger.addHandler(ch) + logger.addHandler(fh) + return logger + + +logger = setup_logger() + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Import jobs from Excel into MySQL") + parser.add_argument("--file", default="", help="Excel file path (.xlsx)") + parser.add_argument( + "--dir", + default="sheets", + help="Directory containing Excel files (.xlsx/.xlsm). Used when --file is empty.", + ) + parser.add_argument("--sheet", default="", help="Sheet name, default active sheet") + parser.add_argument( + "--source", + default="@excel_import", + help="source value written into structured_jobs.source", + ) + return parser.parse_args() + + +def load_mysql_config() -> dict: + if not os.path.exists(CONFIG_FILE): + raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") + + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + cfg = json.load(f) + + mysql_cfg = cfg.get("mysql", {}) + if not isinstance(mysql_cfg, dict): + raise ValueError("配置错误: mysql 必须是对象") + + result = { + "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), + "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), + "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), + "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), + "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), + "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), + } + if not result["password"] or result["password"] == "CHANGE_ME": + raise ValueError("请先在 config.json 里填写 mysql.password") + return result + + +def connect_mysql(cfg: dict): + return pymysql.connect( + host=cfg["host"], + port=cfg["port"], + user=cfg["user"], + password=cfg["password"], + database=cfg["database"], + charset=cfg["charset"], + autocommit=True, + ) + + +def init_tables(conn): + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS internship_jobs_raw ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + source VARCHAR(255) NOT NULL, + fingerprint CHAR(64) NOT NULL, + source_file VARCHAR(512) NOT NULL, + sheet_name VARCHAR(255) NOT NULL, + `row_number` INT NOT NULL, + updated_at_raw VARCHAR(128) NULL, + updated_at_utc DATETIME NULL, + industry VARCHAR(255) NULL, + title VARCHAR(512) NULL, + company VARCHAR(255) NULL, + employment_type VARCHAR(255) NULL, + location_text VARCHAR(255) NULL, + apply_email VARCHAR(255) NULL, + job_source_url TEXT NULL, + raw_row_json JSON NOT NULL, + imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_internship_fingerprint (fingerprint), + KEY idx_internship_source (source) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + # Compatibility for an existing old table schema. + for sql in [ + "ALTER TABLE internship_jobs_raw ADD COLUMN fingerprint CHAR(64) NULL", + "ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_raw VARCHAR(128) NULL", + "ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_utc DATETIME NULL", + "ALTER TABLE internship_jobs_raw ADD COLUMN industry VARCHAR(255) NULL", + "ALTER TABLE internship_jobs_raw ADD COLUMN location_text VARCHAR(255) NULL", + "ALTER TABLE internship_jobs_raw ADD COLUMN apply_email VARCHAR(255) NULL", + "ALTER TABLE internship_jobs_raw ADD UNIQUE KEY uk_internship_fingerprint (fingerprint)", + ]: + try: + cur.execute(sql) + except Exception: + pass + + +def norm_header(v) -> str: + s = str(v or "").strip().lower() + s = re.sub(r"\s+", "", s) + return s + + +def norm_value(v) -> str | None: + if v is None: + return None + s = str(v).strip() + return s or None + + +def first_match(data: dict, keys: list[str]) -> str | None: + for k in keys: + if k in data and data[k]: + return data[k] + return None + + +def infer_work_mode(text: str) -> str: + t = (text or "").lower() + has_remote = any(k in t for k in ["远程", "remote", "wfh", "home office"]) + has_onsite = any(k in t for k in ["实地", "onsite", "on-site", "现场", "坐班"]) + if has_remote and has_onsite: + return "hybrid" + if has_remote: + return "remote" + if has_onsite: + return "onsite" + return "unknown" + + +def infer_job_nature(text: str) -> str: + t = (text or "").lower() + if "全职" in t or "full time" in t: + return "full_time" + if "兼职" in t or "part time" in t: + return "part_time" + if "合同" in t or "contract" in t: + return "contract" + if "实习" in t or "intern" in t: + return "intern" + if "freelance" in t or "自由职业" in t: + return "freelance" + return "unknown" + + +def is_internship(text: str) -> bool: + t = (text or "").lower() + return ("实习" in t) or ("intern" in t) + + +def normalize_url(raw: str | None) -> str | None: + if not raw: + return None + s = raw.strip() + if s.lower().startswith(("http://", "https://")): + return s + if s.lower().startswith("www."): + return "https://" + s + if " " not in s and "." in s: + return "https://" + s + return None + + +def extract_url_from_detail(detail: str | None) -> str | None: + if not detail: + return None + m = URL_RE.search(detail) + if m: + return m.group(0) + return None + + +def extract_title_from_detail(detail: str | None, company: str | None) -> str | None: + if not detail: + return None + for ln in str(detail).splitlines(): + t = re.sub(r"\s+", " ", ln).strip() + if not t: + continue + # remove common wrappers/prefixes + t = t.replace("【", "").replace("】", "").strip("-— ") + if company and t.startswith(company): + t = t[len(company) :].strip("-—:: ") + # keep first short sentence as title + if len(t) <= 120: + return t + return t[:120] + return None + + +def extract_email_from_text(text: str | None) -> str | None: + if not text: + return None + m = EMAIL_RE.search(text) + if m: + return m.group(0) + return None + + +def normalize_location_text(raw: str | None) -> str | None: + if not raw: + return None + s = str(raw).strip() + s = s.replace(";", ";").replace(",", ",") + s = re.sub(r"\s+", " ", s) + return s or None + + +def parse_datetime_value(raw) -> str: + now_utc = datetime.now(timezone.utc) + + if raw is None: + return now_utc.strftime("%Y-%m-%d %H:%M:%S") + + if isinstance(raw, datetime): + dt = raw if raw.tzinfo else raw.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + if isinstance(raw, date): + dt = datetime(raw.year, raw.month, raw.day, tzinfo=timezone.utc) + return dt.strftime("%Y-%m-%d %H:%M:%S") + + if isinstance(raw, (int, float)): + # Excel serial date: days since 1899-12-30 + try: + base = datetime(1899, 12, 30, tzinfo=timezone.utc) + dt = base + timedelta(days=float(raw)) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return now_utc.strftime("%Y-%m-%d %H:%M:%S") + + s = str(raw).strip() + if not s: + return now_utc.strftime("%Y-%m-%d %H:%M:%S") + + # ISO-like strings first + try: + iso = s.replace("Z", "+00:00").replace("T", " ") + dt = datetime.fromisoformat(iso) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception: + pass + + normalized = ( + s.replace("年", "-") + .replace("月", "-") + .replace("日", "") + .replace("/", "-") + .replace(".", "-") + ) + normalized = re.sub(r"\s+", " ", normalized).strip() + + for fmt in [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", + "%Y%m%d%H%M%S", + "%Y%m%d%H%M", + "%Y%m%d", + ]: + try: + dt = datetime.strptime(normalized, fmt).replace(tzinfo=timezone.utc) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception: + continue + + logger.warning(f"无法解析时间,使用当前UTC时间兜底: raw={s}") + return now_utc.strftime("%Y-%m-%d %H:%M:%S") + + +def make_message_id(source: str, title: str | None, company: str | None, link: str | None) -> int: + key = f"{source}|{title or ''}|{company or ''}|{link or ''}" + digest = hashlib.sha256(key.encode("utf-8")).hexdigest()[:16] + return int(digest, 16) & ((1 << 63) - 1) + + +def make_fingerprint(payload: dict) -> str: + key = "|".join( + [ + str(payload.get("source") or ""), + str(payload.get("updated_at_raw") or ""), + str(payload.get("industry") or ""), + str(payload.get("company") or ""), + str(payload.get("title") or ""), + str(payload.get("employment_type") or ""), + str(payload.get("location_text") or ""), + str(payload.get("apply_email") or ""), + str(payload.get("job_source_url") or ""), + str(payload.get("detail_text") or ""), + ] + ) + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + +def upsert_structured(conn, item: dict): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO structured_jobs ( + source, source_channel, parser_name, parser_version, chat_id, message_id, + message_date, job_type, company_name, industry_tags_json, company_intro, + company_url, work_mode, job_nature, job_location_text, job_location_tags_json, + employment_type_raw, position_name, position_tags_json, + salary_raw, salary_currency, salary_min, salary_max, salary_period, + responsibilities_json, requirements_json, apply_email, apply_telegram, + job_source_url, body_text, raw_content + ) VALUES ( + %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + source_channel=VALUES(source_channel), + parser_name=VALUES(parser_name), + parser_version=VALUES(parser_version), + chat_id=VALUES(chat_id), + message_date=VALUES(message_date), + job_type=VALUES(job_type), + company_name=VALUES(company_name), + industry_tags_json=VALUES(industry_tags_json), + company_intro=VALUES(company_intro), + company_url=VALUES(company_url), + work_mode=VALUES(work_mode), + job_nature=VALUES(job_nature), + job_location_text=VALUES(job_location_text), + job_location_tags_json=VALUES(job_location_tags_json), + employment_type_raw=VALUES(employment_type_raw), + position_name=VALUES(position_name), + position_tags_json=VALUES(position_tags_json), + salary_raw=VALUES(salary_raw), + salary_currency=VALUES(salary_currency), + salary_min=VALUES(salary_min), + salary_max=VALUES(salary_max), + salary_period=VALUES(salary_period), + responsibilities_json=VALUES(responsibilities_json), + requirements_json=VALUES(requirements_json), + apply_email=VALUES(apply_email), + apply_telegram=VALUES(apply_telegram), + job_source_url=VALUES(job_source_url), + body_text=VALUES(body_text), + raw_content=VALUES(raw_content), + cleaned_at=CURRENT_TIMESTAMP + """, + ( + item["source"], + item["source_channel"], + item["parser_name"], + item["parser_version"], + item["chat_id"], + item["message_id"], + item["message_date"], + item["job_type"], + item["company_name"], + item["industry_tags_json"], + item["company_intro"], + item["company_url"], + item["work_mode"], + item["job_nature"], + item["job_location_text"], + item["job_location_tags_json"], + item["employment_type_raw"], + item["position_name"], + item["position_tags_json"], + item["salary_raw"], + item["salary_currency"], + item["salary_min"], + item["salary_max"], + item["salary_period"], + item["responsibilities_json"], + item["requirements_json"], + item["apply_email"], + item["apply_telegram"], + item["job_source_url"], + item["body_text"], + item["raw_content"], + ), + ) + + +def insert_internship_raw(conn, item: dict): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO internship_jobs_raw ( + source, source_file, sheet_name, `row_number`, + fingerprint, updated_at_raw, updated_at_utc, industry, title, company, + employment_type, location_text, apply_email, job_source_url, raw_row_json + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + updated_at_raw=VALUES(updated_at_raw), + updated_at_utc=VALUES(updated_at_utc), + industry=VALUES(industry), + title=VALUES(title), + company=VALUES(company), + employment_type=VALUES(employment_type), + location_text=VALUES(location_text), + apply_email=VALUES(apply_email), + job_source_url=VALUES(job_source_url), + raw_row_json=VALUES(raw_row_json), + imported_at=CURRENT_TIMESTAMP + """, + ( + item["source"], + item["source_file"], + item["sheet_name"], + item["row_number"], + item["fingerprint"], + item["updated_at_raw"], + item["updated_at_utc"], + item["industry"], + item["title"], + item["company"], + item["employment_type"], + item["location_text"], + item["apply_email"], + item["job_source_url"], + item["raw_row_json"], + ), + ) + + +def list_excel_files(dir_path: str) -> list[str]: + if not os.path.isdir(dir_path): + return [] + files = [] + for name in sorted(os.listdir(dir_path)): + full = os.path.join(dir_path, name) + if not os.path.isfile(full): + continue + lower = name.lower() + if lower.endswith(".xlsx") or lower.endswith(".xlsm"): + files.append(full) + return files + + +def import_one_file(conn, args: argparse.Namespace, file_path: str) -> tuple[int, int, int, int]: + wb = load_workbook(file_path, data_only=True) + ws = wb[args.sheet] if args.sheet else wb.active + + rows = list(ws.iter_rows(values_only=True)) + if not rows: + logger.info(f"Excel 为空,跳过: file={file_path}") + return 0, 0, 0 + + headers = [norm_header(v) for v in rows[0]] + logger.info(f"file={file_path}, sheet={ws.title}, columns={len(headers)}, rows={len(rows)-1}") + + imported = internship_saved = skipped_empty = 0 + skipped_no_link = 0 + + for idx, row in enumerate(rows[1:], start=2): + raw = {headers[i]: norm_value(row[i]) if i < len(row) else None for i in range(len(headers))} + # For your sheet, exact headers are: + # 表格更新时间, 行业, 公司, 职位详情, 工作形式, 工作地点_标准, 投递邮箱 + updated_raw = first_match(raw, ["表格更新时间", "更新时间", "date", "postedat"]) + industry = first_match(raw, ["行业", "industry"]) + company = first_match(raw, ["公司", "公司名称", "company", "companyname"]) + detail = first_match(raw, ["职位详情", "岗位详情", "职位描述", "description", "jd", "详情"]) + employment = first_match( + raw, + ["工作形式", "合作方式", "用工类型", "岗位性质", "employment", "employmenttype", "jobtype"], + ) + location = first_match(raw, ["工作地点_标准", "工作地点", "地点", "城市", "location", "worklocation"]) + email = first_match(raw, ["投递邮箱", "邮箱", "email", "applyemail"]) + salary = first_match(raw, ["薪资", "薪酬", "salary", "compensation"]) + tg = first_match(raw, ["telegram", "tg", "联系方式telegram"]) + + title = extract_title_from_detail(detail, company) + link = normalize_url(extract_url_from_detail(detail)) + if not email: + email = extract_email_from_text(detail) + location = normalize_location_text(location) + posted = parse_datetime_value(updated_raw) + + # Build a combined text for nature detection. + nature_text = " | ".join([x for x in [title, employment, detail] if x]) + + if not any([title, company, link, detail]): + skipped_empty += 1 + continue + + fp_payload = { + "source": args.source, + "updated_at_raw": updated_raw, + "industry": industry, + "company": company, + "title": title, + "employment_type": employment, + "location_text": location, + "apply_email": email, + "job_source_url": link, + "detail_text": detail, + } + fingerprint = make_fingerprint(fp_payload) + + if is_internship(nature_text): + insert_internship_raw( + conn, + { + "source": args.source, + "source_file": os.path.abspath(file_path), + "sheet_name": ws.title, + "row_number": idx, + "fingerprint": fingerprint, + "updated_at_raw": updated_raw, + "updated_at_utc": posted, + "industry": industry, + "title": title, + "company": company, + "employment_type": employment, + "location_text": location, + "apply_email": email, + "job_source_url": link, + "raw_row_json": json.dumps(raw, ensure_ascii=False), + }, + ) + internship_saved += 1 + continue + + if not link: + skipped_no_link += 1 + continue + + message_id = make_message_id(args.source, title, company, link) + work_mode = infer_work_mode(nature_text) + job_nature = infer_job_nature(nature_text) + + upsert_structured( + conn, + { + "source": args.source, + "source_channel": "excel_import", + "parser_name": "excel_import", + "parser_version": "v1", + "chat_id": None, + "message_id": message_id, + "message_date": posted, + "job_type": "招聘", + "company_name": company, + "industry_tags_json": json.dumps([industry], ensure_ascii=False) if industry else json.dumps([], ensure_ascii=False), + "company_intro": None, + "company_url": link, + "work_mode": work_mode, + "job_nature": job_nature, + "job_location_text": location, + "job_location_tags_json": json.dumps([location], ensure_ascii=False) if location else None, + "employment_type_raw": employment, + "position_name": title, + "position_tags_json": json.dumps([], ensure_ascii=False), + "salary_raw": salary, + "salary_currency": "USD" if salary and "$" in salary else None, + "salary_min": None, + "salary_max": None, + "salary_period": None, + "responsibilities_json": json.dumps([], ensure_ascii=False), + "requirements_json": json.dumps([], ensure_ascii=False), + "apply_email": email, + "apply_telegram": tg, + "job_source_url": link, + "body_text": detail or title or "excel_import", + "raw_content": json.dumps(raw, ensure_ascii=False), + }, + ) + imported += 1 + + if (imported + internship_saved) % 200 == 0: + logger.info( + "progress rows=" + f"{idx-1}, imported={imported}, internship_saved={internship_saved}, " + f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}" + ) + + logger.info( + "done file=" + f"{file_path}, imported={imported}, internship_saved={internship_saved}, " + f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}, sheet={ws.title}" + ) + return imported, internship_saved, skipped_empty, skipped_no_link + + +def main(): + args = parse_args() + + files: list[str] + if args.file: + if not os.path.exists(args.file): + raise FileNotFoundError(f"Excel 文件不存在: {args.file}") + files = [args.file] + else: + files = list_excel_files(args.dir) + if not files: + raise FileNotFoundError(f"目录中未找到 Excel 文件: {args.dir}") + + mysql_cfg = load_mysql_config() + conn = connect_mysql(mysql_cfg) + init_tables(conn) + + total_imported = total_internship = total_skipped = total_skipped_no_link = 0 + try: + for f in files: + imported, internship_saved, skipped_empty, skipped_no_link = import_one_file( + conn, args, f + ) + total_imported += imported + total_internship += internship_saved + total_skipped += skipped_empty + total_skipped_no_link += skipped_no_link + finally: + conn.close() + + logger.info( + "all_done " + f"files={len(files)}, imported={total_imported}, internship_saved={total_internship}, " + f"skipped_empty={total_skipped}, skipped_no_link={total_skipped_no_link}" + ) + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py index 7a1d2f3..2b14b01 100644 --- a/main.py +++ b/main.py @@ -84,7 +84,9 @@ def parse_datetime(raw: str, *, is_end: bool = False) -> datetime: return dt -def load_runtime_config() -> tuple[list[str], datetime | None, datetime | None, dict, dict]: +def load_runtime_config() -> tuple[ + list[str], datetime | None, datetime | None, dict, dict, dict +]: if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") @@ -142,7 +144,36 @@ def load_runtime_config() -> tuple[list[str], datetime | None, datetime | None, if not mysql_final["password"]: raise ValueError("配置错误: mysql.password 不能为空") - return sources, start_dt, end_dt, throttle_cfg, mysql_final + backfill = cfg.get("backfill", {}) + if not isinstance(backfill, dict): + raise ValueError("配置错误: backfill 必须是对象") + + backfill_enabled = bool(backfill.get("enabled", False)) + backfill_start_raw = str(backfill.get("start", "") or "").strip() + backfill_end_raw = str(backfill.get("end", "") or "").strip() + backfill_sources = backfill.get("sources", []) + if backfill_sources and not isinstance(backfill_sources, list): + raise ValueError("配置错误: backfill.sources 必须是数组") + backfill_sources = [str(s).strip() for s in backfill_sources if str(s).strip()] + + if backfill_enabled: + bf_start = parse_datetime(backfill_start_raw, is_end=False) if backfill_start_raw else None + bf_end = parse_datetime(backfill_end_raw, is_end=True) if backfill_end_raw else None + if bf_start and bf_end and bf_start > bf_end: + raise ValueError("配置错误: backfill.start 不能晚于 backfill.end") + else: + bf_start = None + bf_end = None + + backfill_cfg = { + "enabled": backfill_enabled, + "start_dt": bf_start, + "end_dt": bf_end, + "sources": backfill_sources, + "ignore_sync_state": bool(backfill.get("ignore_sync_state", True)), + } + + return sources, start_dt, end_dt, throttle_cfg, mysql_final, backfill_cfg # ======================= @@ -337,6 +368,7 @@ async def scrape_one_source( raw_source: str, start_dt: datetime | None, end_dt: datetime | None, + ignore_sync_state: bool, throttle_cfg: dict, ): try: @@ -358,9 +390,17 @@ async def scrape_one_source( use_throttle = bool(throttle_cfg.get("enabled", True)) per_message_delay = float(throttle_cfg.get("per_message_delay_sec", 0.0)) - if window_mode: + if window_mode and ignore_sync_state: logger.info(f"[{source_key}] 时间窗口模式 start={start_dt} end={end_dt} (UTC)") iterator = client.iter_messages(entity, limit=INITIAL_BACKFILL_LIMIT) + elif window_mode: + # 用于日常窗口抓取,仍可依赖 sync_state 避免重复扫过大历史。 + last_id = store.get_last_message_id(source_key) + logger.info( + f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), " + f"message_id > {last_id}" + ) + iterator = client.iter_messages(entity, min_id=last_id, reverse=True) else: last_id = store.get_last_message_id(source_key) logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始") @@ -392,7 +432,8 @@ async def scrape_one_source( if use_throttle and per_message_delay > 0: await asyncio.sleep(per_message_delay) - if not window_mode and max_seen_id > 0: + should_update_sync = (not window_mode) or (window_mode and not ignore_sync_state) + if should_update_sync and max_seen_id > 0: old_last = store.get_last_message_id(source_key) if max_seen_id > old_last: store.set_last_message_id(source_key, max_seen_id) @@ -404,6 +445,7 @@ async def run_scraper( sources: list[str], start_dt: datetime | None, end_dt: datetime | None, + ignore_sync_state: bool, throttle_cfg: dict, store: MySQLStore, ): @@ -420,7 +462,15 @@ async def run_scraper( between_sources_delay = float(throttle_cfg.get("between_sources_delay_sec", 0.0)) for idx, source in enumerate(sources): - await scrape_one_source(client, store, source, start_dt, end_dt, throttle_cfg) + await scrape_one_source( + client, + store, + source, + start_dt, + end_dt, + ignore_sync_state, + throttle_cfg, + ) if use_throttle and between_sources_delay > 0 and idx < len(sources) - 1: logger.info(f"源切换等待 {between_sources_delay:.2f}s 以降低风控") @@ -433,7 +483,28 @@ async def run_scraper( # 主程序入口 # ======================= def main(): - sources, start_dt, end_dt, throttle_cfg, mysql_cfg = load_runtime_config() + ( + sources, + start_dt, + end_dt, + throttle_cfg, + mysql_cfg, + backfill_cfg, + ) = load_runtime_config() + + if backfill_cfg["enabled"]: + if backfill_cfg["sources"]: + sources = backfill_cfg["sources"] + start_dt = backfill_cfg["start_dt"] + end_dt = backfill_cfg["end_dt"] + ignore_sync_state = bool(backfill_cfg["ignore_sync_state"]) + logger.info( + "回补模式启用: " + f"sources={sources}, start={start_dt}, end={end_dt}, " + f"ignore_sync_state={ignore_sync_state}" + ) + else: + ignore_sync_state = False logger.info("程序启动") logger.info(f"本次数据源: {sources}") @@ -450,7 +521,16 @@ def main(): store.connect() try: store.init_db() - asyncio.run(run_scraper(sources, start_dt, end_dt, throttle_cfg, store)) + asyncio.run( + run_scraper( + sources, + start_dt, + end_dt, + ignore_sync_state, + throttle_cfg, + store, + ) + ) finally: store.close() diff --git a/pyproject.toml b/pyproject.toml index e457885..5804787 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,4 +9,5 @@ dependencies = [ "pymysql>=1.1.1", "requests>=2.32.0", "beautifulsoup4>=4.12.0", + "openpyxl>=3.1.0", ] diff --git a/run_daily_incremental.sh b/run_daily_incremental.sh index ee2f3bf..7054c02 100755 --- a/run_daily_incremental.sh +++ b/run_daily_incremental.sh @@ -5,6 +5,7 @@ PROJECT_DIR="/home/liam/code/python/jobs_robots" LOG_DIR="$PROJECT_DIR/logs" LOCK_FILE="$PROJECT_DIR/.daily_job.lock" TS="$(date '+%Y-%m-%d %H:%M:%S')" +PY_BIN="$PROJECT_DIR/.venv/bin/python" mkdir -p "$LOG_DIR" @@ -19,8 +20,13 @@ cd "$PROJECT_DIR" echo "[$TS] daily job start" >> "$LOG_DIR/daily_job.log" +if [[ ! -x "$PY_BIN" ]]; then + echo "[$TS] python not found: $PY_BIN" >> "$LOG_DIR/daily_job.log" + exit 1 +fi + # Auto-advance time window to a rolling daily range. -.venv/bin/python - <<'PY' +"$PY_BIN" - <<'PY' import json from datetime import datetime, timezone, timedelta @@ -50,9 +56,25 @@ print( PY # 1) Crawl TG incremental -uv run main.py >> "$LOG_DIR/daily_job.log" 2>&1 +"$PY_BIN" main.py >> "$LOG_DIR/daily_job.log" 2>&1 # 2) Clean dejob_official and others into structured table -uv run clean_to_structured.py >> "$LOG_DIR/daily_job.log" 2>&1 +"$PY_BIN" clean_to_structured.py >> "$LOG_DIR/daily_job.log" 2>&1 + +# 3) Sync local MySQL to cloud MySQL (only when mysql_cloud is configured) +if "$PY_BIN" - <<'PY' +import json +with open("config.json", "r", encoding="utf-8") as f: + cfg = json.load(f) +cloud = cfg.get("mysql_cloud") or {} +ok = bool(cloud.get("host") and cloud.get("user") and cloud.get("database")) +ok = ok and bool(cloud.get("password")) and cloud.get("password") != "CHANGE_ME" +raise SystemExit(0 if ok else 1) +PY +then + "$PY_BIN" sync_to_cloud_mysql.py >> "$LOG_DIR/daily_job.log" 2>&1 +else + echo "[$(date '+%Y-%m-%d %H:%M:%S')] skip cloud sync: mysql_cloud not configured" >> "$LOG_DIR/daily_job.log" +fi echo "[$(date '+%Y-%m-%d %H:%M:%S')] daily job done" >> "$LOG_DIR/daily_job.log" diff --git a/sheets/国内远程实习完整版.xlsx b/sheets/国内远程实习完整版.xlsx new file mode 100644 index 0000000..f5e4a35 Binary files /dev/null and b/sheets/国内远程实习完整版.xlsx differ diff --git a/sync_to_cloud_mysql.py b/sync_to_cloud_mysql.py new file mode 100644 index 0000000..9b294d8 --- /dev/null +++ b/sync_to_cloud_mysql.py @@ -0,0 +1,528 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Sync local MySQL data to cloud MySQL daily (incremental + upsert). + +Synced tables: +- messages (incremental by local id) +- sync_state (full upsert, small table) +- structured_jobs (incremental by local id + recent updates by cleaned_at) +- clean_state (full upsert, small table) +- internship_jobs_raw (incremental by local id, if table exists) + +State is stored on cloud DB in table `cloud_sync_state`. +""" + +import json +import logging +import os +from datetime import datetime, timezone + +import pymysql + +CONFIG_FILE = "config.json" +PIPELINE_NAME = "local_to_cloud_mysql_v1" +BATCH_SIZE = 1000 + + +def qid(name: str) -> str: + return f"`{name.replace('`', '``')}`" + + +def setup_logger() -> logging.Logger: + os.makedirs("logs", exist_ok=True) + logger = logging.getLogger("sync_to_cloud_mysql") + logger.setLevel(logging.INFO) + if logger.handlers: + return logger + + fmt = logging.Formatter( + "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + ch = logging.StreamHandler() + ch.setFormatter(fmt) + fh = logging.FileHandler("logs/sync_to_cloud_mysql.log", encoding="utf-8") + fh.setFormatter(fmt) + logger.addHandler(ch) + logger.addHandler(fh) + return logger + + +logger = setup_logger() + + +def load_config() -> tuple[dict, dict]: + if not os.path.exists(CONFIG_FILE): + raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") + + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + cfg = json.load(f) + + local_cfg = cfg.get("mysql", {}) + cloud_cfg = cfg.get("mysql_cloud", {}) + + if not isinstance(local_cfg, dict) or not isinstance(cloud_cfg, dict): + raise ValueError("配置错误: mysql / mysql_cloud 必须是对象") + + def norm(db: dict, env_prefix: str, defaults: dict) -> dict: + out = { + "host": db.get("host") or os.getenv(f"{env_prefix}_HOST", defaults["host"]), + "port": int(db.get("port") or os.getenv(f"{env_prefix}_PORT", defaults["port"])), + "user": db.get("user") or os.getenv(f"{env_prefix}_USER", defaults["user"]), + "password": db.get("password") or os.getenv(f"{env_prefix}_PASSWORD", ""), + "database": db.get("database") or os.getenv(f"{env_prefix}_DATABASE", defaults["database"]), + "charset": db.get("charset") or os.getenv(f"{env_prefix}_CHARSET", "utf8mb4"), + } + if not out["password"] or out["password"] == "CHANGE_ME": + raise ValueError(f"配置错误: {env_prefix}.password 不能为空") + return out + + local = norm(local_cfg, "MYSQL_LOCAL", {"host": "127.0.0.1", "port": "3306", "user": "jobs_user", "database": "jobs"}) + cloud = norm(cloud_cfg, "MYSQL_CLOUD", {"host": "127.0.0.1", "port": "3306", "user": "jobs_user", "database": "jobs"}) + return local, cloud + + +def connect_mysql(cfg: dict): + return pymysql.connect( + host=cfg["host"], + port=cfg["port"], + user=cfg["user"], + password=cfg["password"], + database=cfg["database"], + charset=cfg["charset"], + autocommit=True, + cursorclass=pymysql.cursors.DictCursor, + ) + + +def ensure_cloud_tables(cloud_conn): + with cloud_conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS cloud_sync_state ( + pipeline_name VARCHAR(128) PRIMARY KEY, + last_messages_id BIGINT NOT NULL DEFAULT 0, + last_structured_jobs_id BIGINT NOT NULL DEFAULT 0, + last_internship_id BIGINT NOT NULL DEFAULT 0, + last_structured_sync_at DATETIME NULL, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + + +def ensure_destination_tables(local_conn, cloud_conn): + required = ["messages", "sync_state", "structured_jobs", "clean_state"] + optional = ["internship_jobs_raw"] + + for table in required + optional: + if not table_exists(local_conn, table): + if table in required: + raise RuntimeError(f"本地缺少必要表: {table}") + logger.info(f"skip create cloud table {table}: local table not exists") + continue + + if table_exists(cloud_conn, table): + continue + + with local_conn.cursor() as cur: + cur.execute(f"SHOW CREATE TABLE `{table}`") + row = cur.fetchone() + ddl = row.get("Create Table") if isinstance(row, dict) else None + if not ddl: + raise RuntimeError(f"无法读取本地表结构: {table}") + + with cloud_conn.cursor() as cur: + cur.execute(ddl) + logger.info(f"created cloud table from local ddl: {table}") + + +def ensure_cloud_column(local_conn, cloud_conn, table: str, column: str): + if not table_exists(local_conn, table) or not table_exists(cloud_conn, table): + return + + with local_conn.cursor() as cur: + cur.execute(f"SHOW COLUMNS FROM {qid(table)} LIKE %s", (column,)) + local_col = cur.fetchone() + if not local_col: + return + + with cloud_conn.cursor() as cur: + cur.execute(f"SHOW COLUMNS FROM {qid(table)} LIKE %s", (column,)) + cloud_col = cur.fetchone() + if cloud_col: + return + + # Build minimal compatible ADD COLUMN from local definition. + col_type = local_col.get("Type") + nullable = local_col.get("Null", "YES") == "YES" + default_val = local_col.get("Default") + extra = local_col.get("Extra") or "" + + parts = [f"ALTER TABLE {qid(table)} ADD COLUMN {qid(column)} {col_type}"] + parts.append("NULL" if nullable else "NOT NULL") + if default_val is not None: + if isinstance(default_val, str): + escaped = default_val.replace("'", "''") + parts.append(f"DEFAULT '{escaped}'") + else: + parts.append(f"DEFAULT {default_val}") + if extra: + parts.append(extra) + + sql = " ".join(parts) + with cloud_conn.cursor() as cur: + cur.execute(sql) + logger.info(f"added missing cloud column: {table}.{column}") + + +def get_cloud_state(cloud_conn) -> dict: + with cloud_conn.cursor() as cur: + cur.execute( + """ + SELECT last_messages_id, last_structured_jobs_id, last_internship_id, last_structured_sync_at + FROM cloud_sync_state + WHERE pipeline_name=%s + """, + (PIPELINE_NAME,), + ) + row = cur.fetchone() + + if not row: + return { + "last_messages_id": 0, + "last_structured_jobs_id": 0, + "last_internship_id": 0, + "last_structured_sync_at": None, + } + return row + + +def set_cloud_state(cloud_conn, state: dict): + with cloud_conn.cursor() as cur: + cur.execute( + """ + INSERT INTO cloud_sync_state ( + pipeline_name, last_messages_id, last_structured_jobs_id, + last_internship_id, last_structured_sync_at, updated_at + ) VALUES (%s, %s, %s, %s, %s, NOW()) + ON DUPLICATE KEY UPDATE + last_messages_id=VALUES(last_messages_id), + last_structured_jobs_id=VALUES(last_structured_jobs_id), + last_internship_id=VALUES(last_internship_id), + last_structured_sync_at=VALUES(last_structured_sync_at), + updated_at=NOW() + """, + ( + PIPELINE_NAME, + state["last_messages_id"], + state["last_structured_jobs_id"], + state["last_internship_id"], + state["last_structured_sync_at"], + ), + ) + + +def table_exists(conn, table: str) -> bool: + with conn.cursor() as cur: + cur.execute("SHOW TABLES LIKE %s", (table,)) + return cur.fetchone() is not None + + +def sync_messages(local_conn, cloud_conn, last_id: int) -> int: + logger.info(f"sync messages from id > {last_id}") + max_id = last_id + total = 0 + while True: + with local_conn.cursor() as cur: + cur.execute( + """ + SELECT id, source, chat_id, message_id, content, date, created_at + FROM messages + WHERE id > %s + ORDER BY id ASC + LIMIT %s + """, + (max_id, BATCH_SIZE), + ) + rows = cur.fetchall() + + if not rows: + break + + with cloud_conn.cursor() as cur: + for r in rows: + cur.execute( + """ + INSERT INTO messages (source, chat_id, message_id, content, date, created_at) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + chat_id=VALUES(chat_id), + content=VALUES(content), + date=VALUES(date) + """, + ( + r["source"], + r["chat_id"], + r["message_id"], + r["content"], + r["date"], + r["created_at"], + ), + ) + max_id = max(max_id, int(r["id"])) + total += 1 + + logger.info(f"messages synced batch={len(rows)}, total={total}, max_id={max_id}") + + return max_id + + +def sync_small_table_full(local_conn, cloud_conn, table: str): + if not table_exists(local_conn, table): + logger.info(f"skip {table}: local table not exists") + return + + with local_conn.cursor() as cur: + cur.execute(f"SELECT * FROM {table}") + rows = cur.fetchall() + + if not rows: + logger.info(f"{table}: no rows") + return + + cols = list(rows[0].keys()) + col_sql = ", ".join(qid(c) for c in cols) + val_sql = ", ".join(["%s"] * len(cols)) + + # primary key handling for known small tables + if table == "sync_state": + pk = "source" + elif table == "clean_state": + pk = "pipeline_name" + else: + pk = cols[0] + + update_cols = [c for c in cols if c != pk] + update_sql = ", ".join([f"{qid(c)}=VALUES({qid(c)})" for c in update_cols]) + + with cloud_conn.cursor() as cur: + for r in rows: + cur.execute( + f""" + INSERT INTO {qid(table)} ({col_sql}) VALUES ({val_sql}) + ON DUPLICATE KEY UPDATE {update_sql} + """, + tuple(r[c] for c in cols), + ) + + logger.info(f"{table}: synced rows={len(rows)}") + + +def sync_structured_jobs(local_conn, cloud_conn, last_id: int, last_sync_at) -> tuple[int, str]: + logger.info( + f"sync structured_jobs from id > {last_id}, last_sync_at={last_sync_at}" + ) + + max_id = last_id + total = 0 + touched_ids = set() + + # 1) incremental new rows by id + while True: + with local_conn.cursor() as cur: + cur.execute( + """ + SELECT * FROM structured_jobs + WHERE id > %s + ORDER BY id ASC + LIMIT %s + """, + (max_id, BATCH_SIZE), + ) + rows = cur.fetchall() + + if not rows: + break + + _upsert_structured_rows(cloud_conn, rows) + for r in rows: + rid = int(r["id"]) + max_id = max(max_id, rid) + touched_ids.add(rid) + total += len(rows) + logger.info(f"structured_jobs incremental batch={len(rows)}, total={total}, max_id={max_id}") + + # 2) update window by cleaned_at (catch updated existing rows) + if last_sync_at is None: + last_sync_at = "1970-01-01 00:00:00" + if isinstance(last_sync_at, datetime): + last_sync_at = last_sync_at.strftime("%Y-%m-%d %H:%M:%S") + + while True: + with local_conn.cursor() as cur: + cur.execute( + """ + SELECT * FROM structured_jobs + WHERE cleaned_at > %s + ORDER BY cleaned_at ASC, id ASC + LIMIT %s + """, + (last_sync_at, BATCH_SIZE), + ) + rows = cur.fetchall() + + if not rows: + break + + # avoid repeated heavy updates in loop: only keep rows not already touched in incremental loop + delta = [r for r in rows if int(r["id"]) not in touched_ids] + if delta: + _upsert_structured_rows(cloud_conn, delta) + total += len(delta) + logger.info(f"structured_jobs update-window batch={len(delta)}, total={total}") + + # move cursor forward to avoid pagination loops on timestamp boundary + last_row_cleaned_at = rows[-1]["cleaned_at"] + if isinstance(last_row_cleaned_at, datetime): + last_sync_at = last_row_cleaned_at.strftime("%Y-%m-%d %H:%M:%S") + elif last_row_cleaned_at: + last_sync_at = str(last_row_cleaned_at) + + if len(rows) < BATCH_SIZE: + break + + now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + return max_id, now_utc + + +def _upsert_structured_rows(cloud_conn, rows: list[dict]): + if not rows: + return + cols = list(rows[0].keys()) + # id is local technical key, do not sync into cloud table (cloud has own auto id) + cols = [c for c in cols if c != "id"] + + col_sql = ", ".join(qid(c) for c in cols) + val_sql = ", ".join(["%s"] * len(cols)) + update_cols = [c for c in cols if c not in ("source", "message_id")] + update_sql = ", ".join( + [f"{qid(c)}=VALUES({qid(c)})" for c in update_cols] + + [f"{qid('cleaned_at')}=CURRENT_TIMESTAMP"] + ) + + with cloud_conn.cursor() as cur: + for r in rows: + cur.execute( + f""" + INSERT INTO {qid('structured_jobs')} ({col_sql}) + VALUES ({val_sql}) + ON DUPLICATE KEY UPDATE {update_sql} + """, + tuple(r[c] for c in cols), + ) + + +def sync_internship_raw(local_conn, cloud_conn, last_id: int) -> int: + if not table_exists(local_conn, "internship_jobs_raw"): + logger.info("skip internship_jobs_raw: local table not exists") + return last_id + + max_id = last_id + total = 0 + while True: + with local_conn.cursor() as cur: + cur.execute( + """ + SELECT * FROM internship_jobs_raw + WHERE id > %s + ORDER BY id ASC + LIMIT %s + """, + (max_id, BATCH_SIZE), + ) + rows = cur.fetchall() + + if not rows: + break + + cols = [c for c in rows[0].keys() if c != "id"] + col_sql = ", ".join(qid(c) for c in cols) + val_sql = ", ".join(["%s"] * len(cols)) + update_cols = [c for c in cols if c != "fingerprint"] + update_sql = ", ".join( + [f"{qid(c)}=VALUES({qid(c)})" for c in update_cols] + + [f"{qid('imported_at')}=CURRENT_TIMESTAMP"] + ) + + with cloud_conn.cursor() as cur: + for r in rows: + cur.execute( + f""" + INSERT INTO {qid('internship_jobs_raw')} ({col_sql}) + VALUES ({val_sql}) + ON DUPLICATE KEY UPDATE {update_sql} + """, + tuple(r[c] for c in cols), + ) + max_id = max(max_id, int(r["id"])) + total += 1 + + logger.info(f"internship_jobs_raw batch={len(rows)}, total={total}, max_id={max_id}") + + return max_id + + +def main(): + local_cfg, cloud_cfg = load_config() + logger.info( + "sync start: " + f"local={local_cfg['host']}:{local_cfg['port']}/{local_cfg['database']} -> " + f"cloud={cloud_cfg['host']}:{cloud_cfg['port']}/{cloud_cfg['database']}" + ) + + local_conn = connect_mysql(local_cfg) + cloud_conn = connect_mysql(cloud_cfg) + + try: + ensure_cloud_tables(cloud_conn) + ensure_destination_tables(local_conn, cloud_conn) + ensure_cloud_column(local_conn, cloud_conn, "internship_jobs_raw", "updated_at_utc") + state = get_cloud_state(cloud_conn) + logger.info(f"state before sync: {state}") + + state["last_messages_id"] = sync_messages( + local_conn, cloud_conn, int(state["last_messages_id"]) + ) + + sync_small_table_full(local_conn, cloud_conn, "sync_state") + sync_small_table_full(local_conn, cloud_conn, "clean_state") + + last_structured_id, last_structured_sync_at = sync_structured_jobs( + local_conn, + cloud_conn, + int(state["last_structured_jobs_id"]), + state["last_structured_sync_at"], + ) + state["last_structured_jobs_id"] = last_structured_id + state["last_structured_sync_at"] = last_structured_sync_at + + state["last_internship_id"] = sync_internship_raw( + local_conn, cloud_conn, int(state["last_internship_id"]) + ) + + set_cloud_state(cloud_conn, state) + logger.info(f"state after sync: {state}") + logger.info("sync done") + except Exception: + logger.exception("sync failed") + raise + finally: + local_conn.close() + cloud_conn.close() + + +if __name__ == "__main__": + main()