jobs_robots

Telegram 招聘数据采集与清洗项目,当前主流程为:

  1. 抓取原始消息到本地 MySQL
  2. 清洗为结构化岗位数据
  3. 每日定时增量执行
  4. 同步本地 MySQL 到云端 MySQL

1. 项目结构

  • main.py: Telegram 增量爬取,写入 messages,维护 sync_state
  • clean_to_structured.py: 按来源规则清洗,写入 structured_jobs,维护 clean_state
  • import_excel_jobs.py: 读取 sheets/ Excel导入结构化数据实习数据落 internship_jobs_raw
  • sync_to_cloud_mysql.py: 本地 MySQL -> 云端 MySQL 增量同步
  • run_daily_incremental.sh: 每日调度入口(滚动窗口、抓取、清洗、云同步)
  • config.json: 运行配置(本地使用)
  • config.example.json: 配置模板

2. 环境要求

  • Python >=3.13
  • MySQL 8.x本地
  • MySQL 8.x云端可选
  • 已完成 Telethon 登录(项目目录下会生成 scraper.session

依赖安装:

uv sync

3. 配置说明

先复制模板并修改:

cp config.example.json config.json

关键字段:

  • sources: 要抓取的 Telegram 来源列表
  • time_window: 抓取时间窗口
  • daily_window_days: 每日滚动窗口天数(默认 2
  • backfill: 回补配置
  • throttle: 限频配置,降低封号风险
  • mysql: 本地 MySQL 连接
  • mysql_cloud: 云端 MySQL 连接(用于同步)

时间规范:

  • 项目统一使用 UTC 存储所有 DATETIME
  • 各脚本连接 MySQL 后会执行 SET time_zone = '+00:00'
  • NOW() / CURRENT_TIMESTAMP 产生的时间也按 UTC 写入

4. 运行方式

4.1 手动执行

uv run main.py
uv run clean_to_structured.py
uv run sync_to_cloud_mysql.py

如果在 cron/非交互环境,建议用 venv Python

.venv/bin/python main.py
.venv/bin/python clean_to_structured.py
.venv/bin/python sync_to_cloud_mysql.py

4.2 Excel 导入

默认读取 sheets/ 下文件:

uv run import_excel_jobs.py

指定文件/工作表:

uv run import_excel_jobs.py --file /path/to/jobs.xlsx --sheet Sheet1 --source @excel_import

导入规则:

  • 普通岗位:清洗后写入 structured_jobs
  • 实习岗位:写入 internship_jobs_raw,不进入结构化主表

4.3 每日定时(推荐)

调度脚本:

  • /home/liam/code/python/jobs_robots/run_daily_incremental.sh

示例 crontab每天 01:10

10 1 * * * /home/liam/code/python/jobs_robots/run_daily_incremental.sh

脚本执行顺序:

  1. 自动更新 config.jsontime_window.start/end(按 daily_window_days
  2. 运行 main.py 增量抓取
  3. 运行 clean_to_structured.py 增量清洗
  4. mysql_cloud 已配置,运行 sync_to_cloud_mysql.py 同步云端

5. 增量与回补策略

5.1 抓取增量

  • 状态表:sync_state
  • 游标字段:last_message_id
  • 粒度:每个 source 独立

5.2 清洗增量

  • 状态表:clean_state
  • 游标字段:last_message_row_id(对应 messages.id
  • 规则:仅处理 messages.id > checkpoint

5.3 回补Backfill

config.json 设置:

  • backfill.enabled = true
  • backfill.start / backfill.end
  • backfill.sources
  • backfill.ignore_sync_state(回补时是否忽略抓取游标)

回补结束后建议关闭 backfill.enabled,恢复日常增量。

6. 本地到云端同步

脚本:sync_to_cloud_mysql.py

同步规则:

  • messages: 按本地 id 增量,云端按 (source, message_id) upsert
  • structured_jobs: 按本地 id 增量 + cleaned_at 补偿更新
  • sync_state / clean_state: 小表全量 upsert
  • internship_jobs_raw: 存在则按 id 增量 upsert

状态表(云端):

  • cloud_sync_state

注意:

  • 同步脚本会自动在云端补齐缺失目标表(从本地表结构复制 DDL
  • mysql_cloud 未配置时,日常脚本会跳过云同步

7. 数据库表与字段含义

7.1 messages(原始消息)

唯一键:uk_source_message (source, message_id)

字段 类型 含义 备注
id BIGINT, PK, AUTO_INCREMENT 本地行主键 清洗增量以此为游标来源
source VARCHAR(255), NOT NULL 数据来源标识 @DeJob_official
chat_id BIGINT, NULL Telegram chat id 可空
message_id BIGINT, NOT NULL Telegram 消息 id 在同一 source 下唯一
content LONGTEXT 原始正文 含媒体补充文本/结构化片段
date DATETIME, NOT NULL 消息发布时间 统一按 UTC 入库
created_at DATETIME, NOT NULL 入库时间 默认 CURRENT_TIMESTAMP

7.2 sync_state(抓取增量状态)

主键:source

字段 类型 含义 备注
source VARCHAR(255), PK 来源标识 对应 messages.source
last_message_id BIGINT, NOT NULL 已抓取到的最大 message_id 抓取增量游标
updated_at DATETIME, NOT NULL 状态更新时间 自动更新

7.3 structured_jobs(清洗后岗位)

唯一键:uk_source_message (source, message_id)

字段 类型 含义 备注
id BIGINT, PK, AUTO_INCREMENT 结构化主键 内部主键
source VARCHAR(255), NOT NULL 来源标识 @DeJob_official
source_channel VARCHAR(255), NULL 来源渠道归一名 DeJob
parser_name VARCHAR(64), NOT NULL 解析器名称 dejob_official
parser_version VARCHAR(32), NOT NULL 解析器版本 规则演进追踪
chat_id BIGINT, NULL 原始 chat id 可空
message_id BIGINT, NOT NULL 原始消息 id source 组成唯一键
message_date DATETIME, NOT NULL 原始消息时间 UTC
job_type VARCHAR(64), NULL 岗位类型 目前主要是 招聘
company_name VARCHAR(255), NULL 公司名 清洗后的公司名
industry_tags_json JSON, NOT NULL 行业标签数组 例如 ["CEX"]
company_intro LONGTEXT, NULL 公司简介 可空
company_url TEXT, NULL 公司官网/主页 可空
work_mode VARCHAR(32), NOT NULL 办公模式 remote/onsite/hybrid/unknown
job_nature VARCHAR(32), NOT NULL 用工性质 full_time/part_time/contract/intern/freelance/unknown
job_location_text VARCHAR(255), NULL 主地点文本 可空
job_location_tags_json JSON, NULL 地点标签数组 无地点时为 NULL
employment_type_raw TEXT, NULL 原始合作方式文本 用于回溯
position_name VARCHAR(255), NULL 岗位主名称 可空
position_tags_json JSON, NOT NULL 岗位标签数组 例如 ["运营"]
salary_raw TEXT, NULL 原始薪资文本 可空
salary_currency VARCHAR(16), NULL 薪资币种 USD
salary_min BIGINT, NULL 薪资下限 解析值
salary_max BIGINT, NULL 薪资上限 解析值
salary_period VARCHAR(16), NULL 薪资周期 month/year/day
responsibilities_json JSON, NOT NULL 岗位职责数组 可为空数组
requirements_json JSON, NOT NULL 岗位要求数组 可为空数组
apply_email VARCHAR(255), NULL 投递邮箱 可空
apply_telegram VARCHAR(255), NULL TG 联系方式 可空
job_source_url TEXT, NULL 职位详情链接 清洗后可用链接
body_text LONGTEXT, NOT NULL 清洗后的正文 去除技术元信息
raw_content LONGTEXT, NOT NULL 原始内容快照 审计/回刷使用
cleaned_at DATETIME, NOT NULL 最近清洗时间 自动更新

7.4 clean_state(清洗增量状态)

主键:pipeline_name

字段 类型 含义 备注
pipeline_name VARCHAR(128), PK 清洗流程名 当前 structured_cleaner_v1
last_message_row_id BIGINT, NOT NULL 已处理到的 messages.id 清洗增量游标
updated_at DATETIME, NOT NULL 状态更新时间 自动更新

7.5 internship_jobs_raw(实习原始导入表)

唯一键:uk_internship_fingerprint (fingerprint)

字段 类型 含义 备注
id BIGINT, PK, AUTO_INCREMENT 主键 内部主键
source VARCHAR(255), NOT NULL 数据来源 默认 @excel_import
fingerprint CHAR(64), NOT NULL 去重指纹 SHA-256
source_file VARCHAR(512), NOT NULL 来源文件绝对路径 Excel 文件
sheet_name VARCHAR(255), NOT NULL 工作表名 Excel sheet
row_number INT, NOT NULL 原始行号 Excel 行
updated_at_raw VARCHAR(128), NULL 原始更新时间文本 保留原值
updated_at_utc DATETIME, NULL 标准化更新时间 统一 UTC 格式
industry VARCHAR(255), NULL 行业字段 原始/轻清洗
title VARCHAR(512), NULL 岗位标题 从详情提取
company VARCHAR(255), NULL 公司名 原始/轻清洗
employment_type VARCHAR(255), NULL 用工形式原文 如全职/兼职/实习
location_text VARCHAR(255), NULL 地点文本 轻清洗
apply_email VARCHAR(255), NULL 邮箱 提取或原始
job_source_url TEXT, NULL 职位链接 从详情提取
raw_row_json JSON, NOT NULL 原始行 JSON 全量保留
imported_at DATETIME, NOT NULL 导入时间 自动更新时间

7.6 cloud_sync_state(云同步状态)

主键:pipeline_name

字段 类型 含义 备注
pipeline_name VARCHAR(128), PK 同步流程名 当前 local_to_cloud_mysql_v1
last_messages_id BIGINT, NOT NULL messages 已同步最大本地 id 云同步游标
last_structured_jobs_id BIGINT, NOT NULL structured_jobs 已同步最大本地 id 云同步游标
last_internship_id BIGINT, NOT NULL internship_jobs_raw 已同步最大本地 id 云同步游标
last_structured_sync_at DATETIME, NULL 结构化更新时间窗口游标 补偿更新使用
updated_at DATETIME, NOT NULL 同步状态更新时间 自动更新

8. 日志

  • logs/app.log: 抓取日志
  • logs/clean_to_structured.log: 清洗日志
  • logs/sync_to_cloud_mysql.log: 云同步日志
  • logs/daily_job.log: 每日调度总日志

9. 常见问题

  1. uv: command not foundcron
  • 使用 .venv/bin/python 运行,已在 run_daily_incremental.sh 中处理。
  1. Table 'jobs.messages' doesn't exist(云同步)
  • 云端目标库为空。新版同步脚本会自动建表后再同步。
  1. Public Key Retrieval is not allowedDBeaver 连 MySQL
  • 连接参数添加 allowPublicKeyRetrieval=true&useSSL=false(排障用)。
  1. ERROR 1410 You are not allowed to create a user with GRANT
  • CREATE USER,再 GRANT,不要用旧式 GRANT ... IDENTIFIED BY ...
  1. 清洗无新增
  • 检查 messages 是否有新数据。
  • 检查 clean_state.last_message_row_id 是否已到最新。
  1. 历史数据有 UTC+8 和 UTC 混用怎么办
  • 新版脚本已统一写入 UTC。
  • 历史数据需一次性迁移后再对齐分析口径(建议先备份再修复)。

10. 协作规范建议

  • 新增来源规则时,优先增加 source 专用 parser避免影响已有来源。
  • 结构字段变更前,先确认 structured_jobs 迁移策略和历史兼容。
  • 定时任务统一走 run_daily_incremental.sh,避免多个入口重复执行。
Description
No description provided
Readme 418 KiB
Languages
Python 97.7%
Shell 2.3%