Files
tg_crawl/README.md

305 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# jobs_robots
Telegram 招聘数据采集与清洗项目,当前主流程为:
1. 抓取原始消息到本地 MySQL
2. 清洗为结构化岗位数据
3. 每日定时增量执行
4. 同步本地 MySQL 到云端 MySQL
## 1. 项目结构
- `main.py`: Telegram 增量爬取,写入 `messages`,维护 `sync_state`
- `clean_to_structured.py`: 按来源规则清洗,写入 `structured_jobs`,维护 `clean_state`
- `import_excel_jobs.py`: 读取 `sheets/` Excel导入结构化数据实习数据落 `internship_jobs_raw`
- `sync_to_cloud_mysql.py`: 本地 MySQL -> 云端 MySQL 增量同步
- `run_daily_incremental.sh`: 每日调度入口(滚动窗口、抓取、清洗、云同步)
- `config.json`: 运行配置(本地使用)
- `config.example.json`: 配置模板
## 2. 环境要求
- Python `>=3.13`
- MySQL 8.x本地
- MySQL 8.x云端可选
- 已完成 Telethon 登录(项目目录下会生成 `scraper.session`
依赖安装:
```bash
uv sync
```
## 3. 配置说明
先复制模板并修改:
```bash
cp config.example.json config.json
```
关键字段:
- `sources`: 要抓取的 Telegram 来源列表
- `time_window`: 抓取时间窗口
- `daily_window_days`: 每日滚动窗口天数(默认 `2`
- `backfill`: 回补配置
- `throttle`: 限频配置,降低封号风险
- `mysql`: 本地 MySQL 连接
- `mysql_cloud`: 云端 MySQL 连接(用于同步)
时间规范:
- 项目统一使用 UTC 存储所有 `DATETIME`
- 各脚本连接 MySQL 后会执行 `SET time_zone = '+00:00'`
- `NOW()` / `CURRENT_TIMESTAMP` 产生的时间也按 UTC 写入
## 4. 运行方式
### 4.1 手动执行
```bash
uv run main.py
uv run clean_to_structured.py
uv run sync_to_cloud_mysql.py
```
如果在 cron/非交互环境,建议用 venv Python
```bash
.venv/bin/python main.py
.venv/bin/python clean_to_structured.py
.venv/bin/python sync_to_cloud_mysql.py
```
### 4.2 Excel 导入
默认读取 `sheets/` 下文件:
```bash
uv run import_excel_jobs.py
```
指定文件/工作表:
```bash
uv run import_excel_jobs.py --file /path/to/jobs.xlsx --sheet Sheet1 --source @excel_import
```
导入规则:
- 普通岗位:清洗后写入 `structured_jobs`
- 实习岗位:写入 `internship_jobs_raw`,不进入结构化主表
### 4.3 每日定时(推荐)
调度脚本:
- `/home/liam/code/python/jobs_robots/run_daily_incremental.sh`
示例 crontab每天 01:10
```cron
10 1 * * * /home/liam/code/python/jobs_robots/run_daily_incremental.sh
```
脚本执行顺序:
1. 自动更新 `config.json``time_window.start/end`(按 `daily_window_days`
2. 运行 `main.py` 增量抓取
3. 运行 `clean_to_structured.py` 增量清洗
4.`mysql_cloud` 已配置,运行 `sync_to_cloud_mysql.py` 同步云端
## 5. 增量与回补策略
### 5.1 抓取增量
- 状态表:`sync_state`
- 游标字段:`last_message_id`
- 粒度:每个 source 独立
### 5.2 清洗增量
- 状态表:`clean_state`
- 游标字段:`last_message_row_id`(对应 `messages.id`
- 规则:仅处理 `messages.id > checkpoint`
### 5.3 回补Backfill
`config.json` 设置:
- `backfill.enabled = true`
- `backfill.start / backfill.end`
- `backfill.sources`
- `backfill.ignore_sync_state`(回补时是否忽略抓取游标)
回补结束后建议关闭 `backfill.enabled`,恢复日常增量。
## 6. 本地到云端同步
脚本:`sync_to_cloud_mysql.py`
同步规则:
- `messages`: 按本地 `id` 增量,云端按 `(source, message_id)` upsert
- `structured_jobs`: 按本地 `id` 增量 + `cleaned_at` 补偿更新
- `sync_state` / `clean_state`: 小表全量 upsert
- `internship_jobs_raw`: 存在则按 `id` 增量 upsert
状态表(云端):
- `cloud_sync_state`
注意:
- 同步脚本会自动在云端补齐缺失目标表(从本地表结构复制 DDL
- `mysql_cloud` 未配置时,日常脚本会跳过云同步
## 7. 数据库表与字段含义
### 7.1 `messages`(原始消息)
唯一键:`uk_source_message (source, message_id)`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `id` | BIGINT, PK, AUTO_INCREMENT | 本地行主键 | 清洗增量以此为游标来源 |
| `source` | VARCHAR(255), NOT NULL | 数据来源标识 | 如 `@DeJob_official` |
| `chat_id` | BIGINT, NULL | Telegram chat id | 可空 |
| `message_id` | BIGINT, NOT NULL | Telegram 消息 id | 在同一 source 下唯一 |
| `content` | LONGTEXT | 原始正文 | 含媒体补充文本/结构化片段 |
| `date` | DATETIME, NOT NULL | 消息发布时间 | 统一按 UTC 入库 |
| `created_at` | DATETIME, NOT NULL | 入库时间 | 默认 `CURRENT_TIMESTAMP` |
### 7.2 `sync_state`(抓取增量状态)
主键:`source`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `source` | VARCHAR(255), PK | 来源标识 | 对应 `messages.source` |
| `last_message_id` | BIGINT, NOT NULL | 已抓取到的最大 message_id | 抓取增量游标 |
| `updated_at` | DATETIME, NOT NULL | 状态更新时间 | 自动更新 |
### 7.3 `structured_jobs`(清洗后岗位)
唯一键:`uk_source_message (source, message_id)`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `id` | BIGINT, PK, AUTO_INCREMENT | 结构化主键 | 内部主键 |
| `source` | VARCHAR(255), NOT NULL | 来源标识 | 如 `@DeJob_official` |
| `source_channel` | VARCHAR(255), NULL | 来源渠道归一名 | 如 `DeJob` |
| `parser_name` | VARCHAR(64), NOT NULL | 解析器名称 | 如 `dejob_official` |
| `parser_version` | VARCHAR(32), NOT NULL | 解析器版本 | 规则演进追踪 |
| `chat_id` | BIGINT, NULL | 原始 chat id | 可空 |
| `message_id` | BIGINT, NOT NULL | 原始消息 id | 与 `source` 组成唯一键 |
| `message_date` | DATETIME, NOT NULL | 原始消息时间 | UTC |
| `job_type` | VARCHAR(64), NULL | 岗位类型 | 目前主要是 `招聘` |
| `company_name` | VARCHAR(255), NULL | 公司名 | 清洗后的公司名 |
| `industry_tags_json` | JSON, NOT NULL | 行业标签数组 | 例如 `["CEX"]` |
| `company_intro` | LONGTEXT, NULL | 公司简介 | 可空 |
| `company_url` | TEXT, NULL | 公司官网/主页 | 可空 |
| `work_mode` | VARCHAR(32), NOT NULL | 办公模式 | `remote/onsite/hybrid/unknown` |
| `job_nature` | VARCHAR(32), NOT NULL | 用工性质 | `full_time/part_time/contract/intern/freelance/unknown` |
| `job_location_text` | VARCHAR(255), NULL | 主地点文本 | 可空 |
| `job_location_tags_json` | JSON, NULL | 地点标签数组 | 无地点时为 `NULL` |
| `employment_type_raw` | TEXT, NULL | 原始合作方式文本 | 用于回溯 |
| `position_name` | VARCHAR(255), NULL | 岗位主名称 | 可空 |
| `position_tags_json` | JSON, NOT NULL | 岗位标签数组 | 例如 `["运营"]` |
| `salary_raw` | TEXT, NULL | 原始薪资文本 | 可空 |
| `salary_currency` | VARCHAR(16), NULL | 薪资币种 | 如 `USD` |
| `salary_min` | BIGINT, NULL | 薪资下限 | 解析值 |
| `salary_max` | BIGINT, NULL | 薪资上限 | 解析值 |
| `salary_period` | VARCHAR(16), NULL | 薪资周期 | `month/year/day` |
| `responsibilities_json` | JSON, NOT NULL | 岗位职责数组 | 可为空数组 |
| `requirements_json` | JSON, NOT NULL | 岗位要求数组 | 可为空数组 |
| `apply_email` | VARCHAR(255), NULL | 投递邮箱 | 可空 |
| `apply_telegram` | VARCHAR(255), NULL | TG 联系方式 | 可空 |
| `job_source_url` | TEXT, NULL | 职位详情链接 | 清洗后可用链接 |
| `body_text` | LONGTEXT, NOT NULL | 清洗后的正文 | 去除技术元信息 |
| `raw_content` | LONGTEXT, NOT NULL | 原始内容快照 | 审计/回刷使用 |
| `cleaned_at` | DATETIME, NOT NULL | 最近清洗时间 | 自动更新 |
### 7.4 `clean_state`(清洗增量状态)
主键:`pipeline_name`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `pipeline_name` | VARCHAR(128), PK | 清洗流程名 | 当前 `structured_cleaner_v1` |
| `last_message_row_id` | BIGINT, NOT NULL | 已处理到的 `messages.id` | 清洗增量游标 |
| `updated_at` | DATETIME, NOT NULL | 状态更新时间 | 自动更新 |
### 7.5 `internship_jobs_raw`(实习原始导入表)
唯一键:`uk_internship_fingerprint (fingerprint)`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `id` | BIGINT, PK, AUTO_INCREMENT | 主键 | 内部主键 |
| `source` | VARCHAR(255), NOT NULL | 数据来源 | 默认 `@excel_import` |
| `fingerprint` | CHAR(64), NOT NULL | 去重指纹 | SHA-256 |
| `source_file` | VARCHAR(512), NOT NULL | 来源文件绝对路径 | Excel 文件 |
| `sheet_name` | VARCHAR(255), NOT NULL | 工作表名 | Excel sheet |
| `row_number` | INT, NOT NULL | 原始行号 | Excel 行 |
| `updated_at_raw` | VARCHAR(128), NULL | 原始更新时间文本 | 保留原值 |
| `updated_at_utc` | DATETIME, NULL | 标准化更新时间 | 统一 UTC 格式 |
| `industry` | VARCHAR(255), NULL | 行业字段 | 原始/轻清洗 |
| `title` | VARCHAR(512), NULL | 岗位标题 | 从详情提取 |
| `company` | VARCHAR(255), NULL | 公司名 | 原始/轻清洗 |
| `employment_type` | VARCHAR(255), NULL | 用工形式原文 | 如全职/兼职/实习 |
| `location_text` | VARCHAR(255), NULL | 地点文本 | 轻清洗 |
| `apply_email` | VARCHAR(255), NULL | 邮箱 | 提取或原始 |
| `job_source_url` | TEXT, NULL | 职位链接 | 从详情提取 |
| `raw_row_json` | JSON, NOT NULL | 原始行 JSON | 全量保留 |
| `imported_at` | DATETIME, NOT NULL | 导入时间 | 自动更新时间 |
### 7.6 `cloud_sync_state`(云同步状态)
主键:`pipeline_name`
| 字段 | 类型 | 含义 | 备注 |
|---|---|---|---|
| `pipeline_name` | VARCHAR(128), PK | 同步流程名 | 当前 `local_to_cloud_mysql_v1` |
| `last_messages_id` | BIGINT, NOT NULL | `messages` 已同步最大本地 id | 云同步游标 |
| `last_structured_jobs_id` | BIGINT, NOT NULL | `structured_jobs` 已同步最大本地 id | 云同步游标 |
| `last_internship_id` | BIGINT, NOT NULL | `internship_jobs_raw` 已同步最大本地 id | 云同步游标 |
| `last_structured_sync_at` | DATETIME, NULL | 结构化更新时间窗口游标 | 补偿更新使用 |
| `updated_at` | DATETIME, NOT NULL | 同步状态更新时间 | 自动更新 |
## 8. 日志
- `logs/app.log`: 抓取日志
- `logs/clean_to_structured.log`: 清洗日志
- `logs/sync_to_cloud_mysql.log`: 云同步日志
- `logs/daily_job.log`: 每日调度总日志
## 9. 常见问题
1. `uv: command not found`cron
- 使用 `.venv/bin/python` 运行,已在 `run_daily_incremental.sh` 中处理。
2. `Table 'jobs.messages' doesn't exist`(云同步)
- 云端目标库为空。新版同步脚本会自动建表后再同步。
3. `Public Key Retrieval is not allowed`DBeaver 连 MySQL
- 连接参数添加 `allowPublicKeyRetrieval=true&useSSL=false`(排障用)。
4. `ERROR 1410 You are not allowed to create a user with GRANT`
-`CREATE USER`,再 `GRANT`,不要用旧式 `GRANT ... IDENTIFIED BY ...`
5. 清洗无新增
- 检查 `messages` 是否有新数据。
- 检查 `clean_state.last_message_row_id` 是否已到最新。
6. 历史数据有 UTC+8 和 UTC 混用怎么办
- 新版脚本已统一写入 UTC。
- 历史数据需一次性迁移后再对齐分析口径(建议先备份再修复)。
## 10. 协作规范建议
- 新增来源规则时,优先增加 source 专用 parser避免影响已有来源。
- 结构字段变更前,先确认 `structured_jobs` 迁移策略和历史兼容。
- 定时任务统一走 `run_daily_incremental.sh`,避免多个入口重复执行。