Files
tg_crawl/import_excel_jobs.py

679 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Import external Excel jobs into MySQL.
Rules:
- Internship rows (实习/intern) are stored into `internship_jobs_raw` only.
- Non-internship rows are normalized and upserted into `structured_jobs`.
"""
import argparse
import hashlib
import json
import logging
import os
import re
from datetime import date, datetime, timedelta, timezone
import pymysql
from openpyxl import load_workbook
CONFIG_FILE = "config.json"
URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE)
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
def setup_logger() -> logging.Logger:
os.makedirs("logs", exist_ok=True)
logger = logging.getLogger("import_excel_jobs")
logger.setLevel(logging.INFO)
if logger.handlers:
return logger
fmt = logging.Formatter(
"[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
ch = logging.StreamHandler()
ch.setFormatter(fmt)
fh = logging.FileHandler("logs/import_excel_jobs.log", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
logger = setup_logger()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Import jobs from Excel into MySQL")
parser.add_argument("--file", default="", help="Excel file path (.xlsx)")
parser.add_argument(
"--dir",
default="sheets",
help="Directory containing Excel files (.xlsx/.xlsm). Used when --file is empty.",
)
parser.add_argument("--sheet", default="", help="Sheet name, default active sheet")
parser.add_argument(
"--source",
default="@excel_import",
help="source value written into structured_jobs.source",
)
return parser.parse_args()
def load_mysql_config() -> dict:
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
mysql_cfg = cfg.get("mysql", {})
if not isinstance(mysql_cfg, dict):
raise ValueError("配置错误: mysql 必须是对象")
result = {
"host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"),
"port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")),
"user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"),
"password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""),
"database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"),
"charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"),
}
if not result["password"] or result["password"] == "CHANGE_ME":
raise ValueError("请先在 config.json 里填写 mysql.password")
return result
def connect_mysql(cfg: dict):
conn = pymysql.connect(
host=cfg["host"],
port=cfg["port"],
user=cfg["user"],
password=cfg["password"],
database=cfg["database"],
charset=cfg["charset"],
autocommit=True,
)
with conn.cursor() as cur:
cur.execute("SET time_zone = '+00:00'")
return conn
def init_tables(conn):
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS internship_jobs_raw (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
source VARCHAR(255) NOT NULL,
fingerprint CHAR(64) NOT NULL,
source_file VARCHAR(512) NOT NULL,
sheet_name VARCHAR(255) NOT NULL,
`row_number` INT NOT NULL,
updated_at_raw VARCHAR(128) NULL,
updated_at_utc DATETIME NULL,
industry VARCHAR(255) NULL,
title VARCHAR(512) NULL,
company VARCHAR(255) NULL,
employment_type VARCHAR(255) NULL,
location_text VARCHAR(255) NULL,
apply_email VARCHAR(255) NULL,
job_source_url TEXT NULL,
raw_row_json JSON NOT NULL,
imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_internship_fingerprint (fingerprint),
KEY idx_internship_source (source)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
# Compatibility for an existing old table schema.
for sql in [
"ALTER TABLE internship_jobs_raw ADD COLUMN fingerprint CHAR(64) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_raw VARCHAR(128) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_utc DATETIME NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN industry VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN location_text VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN apply_email VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD UNIQUE KEY uk_internship_fingerprint (fingerprint)",
]:
try:
cur.execute(sql)
except Exception:
pass
def norm_header(v) -> str:
s = str(v or "").strip().lower()
s = re.sub(r"\s+", "", s)
return s
def norm_value(v) -> str | None:
if v is None:
return None
s = str(v).strip()
return s or None
def first_match(data: dict, keys: list[str]) -> str | None:
for k in keys:
if k in data and data[k]:
return data[k]
return None
def infer_work_mode(text: str) -> str:
t = (text or "").lower()
has_remote = any(k in t for k in ["远程", "remote", "wfh", "home office"])
has_onsite = any(k in t for k in ["实地", "onsite", "on-site", "现场", "坐班"])
if has_remote and has_onsite:
return "hybrid"
if has_remote:
return "remote"
if has_onsite:
return "onsite"
return "unknown"
def infer_job_nature(text: str) -> str:
t = (text or "").lower()
if "全职" in t or "full time" in t:
return "full_time"
if "兼职" in t or "part time" in t:
return "part_time"
if "合同" in t or "contract" in t:
return "contract"
if "实习" in t or "intern" in t:
return "intern"
if "freelance" in t or "自由职业" in t:
return "freelance"
return "unknown"
def is_internship(text: str) -> bool:
t = (text or "").lower()
return ("实习" in t) or ("intern" in t)
def normalize_url(raw: str | None) -> str | None:
if not raw:
return None
s = raw.strip()
if s.lower().startswith(("http://", "https://")):
return s
if s.lower().startswith("www."):
return "https://" + s
if " " not in s and "." in s:
return "https://" + s
return None
def extract_url_from_detail(detail: str | None) -> str | None:
if not detail:
return None
m = URL_RE.search(detail)
if m:
return m.group(0)
return None
def extract_title_from_detail(detail: str | None, company: str | None) -> str | None:
if not detail:
return None
for ln in str(detail).splitlines():
t = re.sub(r"\s+", " ", ln).strip()
if not t:
continue
# remove common wrappers/prefixes
t = t.replace("", "").replace("", "").strip("-— ")
if company and t.startswith(company):
t = t[len(company) :].strip("-—: ")
# keep first short sentence as title
if len(t) <= 120:
return t
return t[:120]
return None
def extract_email_from_text(text: str | None) -> str | None:
if not text:
return None
m = EMAIL_RE.search(text)
if m:
return m.group(0)
return None
def normalize_location_text(raw: str | None) -> str | None:
if not raw:
return None
s = str(raw).strip()
s = s.replace("", ";").replace("", ",")
s = re.sub(r"\s+", " ", s)
return s or None
def parse_datetime_value(raw) -> str:
now_utc = datetime.now(timezone.utc)
if raw is None:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, datetime):
dt = raw if raw.tzinfo else raw.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, date):
dt = datetime(raw.year, raw.month, raw.day, tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, (int, float)):
# Excel serial date: days since 1899-12-30
try:
base = datetime(1899, 12, 30, tzinfo=timezone.utc)
dt = base + timedelta(days=float(raw))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
s = str(raw).strip()
if not s:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
# ISO-like strings first
try:
iso = s.replace("Z", "+00:00").replace("T", " ")
dt = datetime.fromisoformat(iso)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
normalized = (
s.replace("", "-")
.replace("", "-")
.replace("", "")
.replace("/", "-")
.replace(".", "-")
)
normalized = re.sub(r"\s+", " ", normalized).strip()
for fmt in [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d",
"%Y%m%d%H%M%S",
"%Y%m%d%H%M",
"%Y%m%d",
]:
try:
dt = datetime.strptime(normalized, fmt).replace(tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
continue
logger.warning(f"无法解析时间使用当前UTC时间兜底: raw={s}")
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
def make_message_id(source: str, title: str | None, company: str | None, link: str | None) -> int:
key = f"{source}|{title or ''}|{company or ''}|{link or ''}"
digest = hashlib.sha256(key.encode("utf-8")).hexdigest()[:16]
return int(digest, 16) & ((1 << 63) - 1)
def make_fingerprint(payload: dict) -> str:
key = "|".join(
[
str(payload.get("source") or ""),
str(payload.get("updated_at_raw") or ""),
str(payload.get("industry") or ""),
str(payload.get("company") or ""),
str(payload.get("title") or ""),
str(payload.get("employment_type") or ""),
str(payload.get("location_text") or ""),
str(payload.get("apply_email") or ""),
str(payload.get("job_source_url") or ""),
str(payload.get("detail_text") or ""),
]
)
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def upsert_structured(conn, item: dict):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO structured_jobs (
source, source_channel, parser_name, parser_version, chat_id, message_id,
message_date, job_type, company_name, industry_tags_json, company_intro,
company_url, work_mode, job_nature, job_location_text, job_location_tags_json,
employment_type_raw, position_name, position_tags_json,
salary_raw, salary_currency, salary_min, salary_max, salary_period,
responsibilities_json, requirements_json, apply_email, apply_telegram,
job_source_url, body_text, raw_content
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s
)
ON DUPLICATE KEY UPDATE
source_channel=VALUES(source_channel),
parser_name=VALUES(parser_name),
parser_version=VALUES(parser_version),
chat_id=VALUES(chat_id),
message_date=VALUES(message_date),
job_type=VALUES(job_type),
company_name=VALUES(company_name),
industry_tags_json=VALUES(industry_tags_json),
company_intro=VALUES(company_intro),
company_url=VALUES(company_url),
work_mode=VALUES(work_mode),
job_nature=VALUES(job_nature),
job_location_text=VALUES(job_location_text),
job_location_tags_json=VALUES(job_location_tags_json),
employment_type_raw=VALUES(employment_type_raw),
position_name=VALUES(position_name),
position_tags_json=VALUES(position_tags_json),
salary_raw=VALUES(salary_raw),
salary_currency=VALUES(salary_currency),
salary_min=VALUES(salary_min),
salary_max=VALUES(salary_max),
salary_period=VALUES(salary_period),
responsibilities_json=VALUES(responsibilities_json),
requirements_json=VALUES(requirements_json),
apply_email=VALUES(apply_email),
apply_telegram=VALUES(apply_telegram),
job_source_url=VALUES(job_source_url),
body_text=VALUES(body_text),
raw_content=VALUES(raw_content),
cleaned_at=CURRENT_TIMESTAMP
""",
(
item["source"],
item["source_channel"],
item["parser_name"],
item["parser_version"],
item["chat_id"],
item["message_id"],
item["message_date"],
item["job_type"],
item["company_name"],
item["industry_tags_json"],
item["company_intro"],
item["company_url"],
item["work_mode"],
item["job_nature"],
item["job_location_text"],
item["job_location_tags_json"],
item["employment_type_raw"],
item["position_name"],
item["position_tags_json"],
item["salary_raw"],
item["salary_currency"],
item["salary_min"],
item["salary_max"],
item["salary_period"],
item["responsibilities_json"],
item["requirements_json"],
item["apply_email"],
item["apply_telegram"],
item["job_source_url"],
item["body_text"],
item["raw_content"],
),
)
def insert_internship_raw(conn, item: dict):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO internship_jobs_raw (
source, source_file, sheet_name, `row_number`,
fingerprint, updated_at_raw, updated_at_utc, industry, title, company,
employment_type, location_text, apply_email, job_source_url, raw_row_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
updated_at_raw=VALUES(updated_at_raw),
updated_at_utc=VALUES(updated_at_utc),
industry=VALUES(industry),
title=VALUES(title),
company=VALUES(company),
employment_type=VALUES(employment_type),
location_text=VALUES(location_text),
apply_email=VALUES(apply_email),
job_source_url=VALUES(job_source_url),
raw_row_json=VALUES(raw_row_json),
imported_at=CURRENT_TIMESTAMP
""",
(
item["source"],
item["source_file"],
item["sheet_name"],
item["row_number"],
item["fingerprint"],
item["updated_at_raw"],
item["updated_at_utc"],
item["industry"],
item["title"],
item["company"],
item["employment_type"],
item["location_text"],
item["apply_email"],
item["job_source_url"],
item["raw_row_json"],
),
)
def list_excel_files(dir_path: str) -> list[str]:
if not os.path.isdir(dir_path):
return []
files = []
for name in sorted(os.listdir(dir_path)):
full = os.path.join(dir_path, name)
if not os.path.isfile(full):
continue
lower = name.lower()
if lower.endswith(".xlsx") or lower.endswith(".xlsm"):
files.append(full)
return files
def import_one_file(conn, args: argparse.Namespace, file_path: str) -> tuple[int, int, int, int]:
wb = load_workbook(file_path, data_only=True)
ws = wb[args.sheet] if args.sheet else wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
logger.info(f"Excel 为空,跳过: file={file_path}")
return 0, 0, 0
headers = [norm_header(v) for v in rows[0]]
logger.info(f"file={file_path}, sheet={ws.title}, columns={len(headers)}, rows={len(rows)-1}")
imported = internship_saved = skipped_empty = 0
skipped_no_link = 0
for idx, row in enumerate(rows[1:], start=2):
raw = {headers[i]: norm_value(row[i]) if i < len(row) else None for i in range(len(headers))}
# For your sheet, exact headers are:
# 表格更新时间, 行业, 公司, 职位详情, 工作形式, 工作地点_标准, 投递邮箱
updated_raw = first_match(raw, ["表格更新时间", "更新时间", "date", "postedat"])
industry = first_match(raw, ["行业", "industry"])
company = first_match(raw, ["公司", "公司名称", "company", "companyname"])
detail = first_match(raw, ["职位详情", "岗位详情", "职位描述", "description", "jd", "详情"])
employment = first_match(
raw,
["工作形式", "合作方式", "用工类型", "岗位性质", "employment", "employmenttype", "jobtype"],
)
location = first_match(raw, ["工作地点_标准", "工作地点", "地点", "城市", "location", "worklocation"])
email = first_match(raw, ["投递邮箱", "邮箱", "email", "applyemail"])
salary = first_match(raw, ["薪资", "薪酬", "salary", "compensation"])
tg = first_match(raw, ["telegram", "tg", "联系方式telegram"])
title = extract_title_from_detail(detail, company)
link = normalize_url(extract_url_from_detail(detail))
if not email:
email = extract_email_from_text(detail)
location = normalize_location_text(location)
posted = parse_datetime_value(updated_raw)
# Build a combined text for nature detection.
nature_text = " | ".join([x for x in [title, employment, detail] if x])
if not any([title, company, link, detail]):
skipped_empty += 1
continue
fp_payload = {
"source": args.source,
"updated_at_raw": updated_raw,
"industry": industry,
"company": company,
"title": title,
"employment_type": employment,
"location_text": location,
"apply_email": email,
"job_source_url": link,
"detail_text": detail,
}
fingerprint = make_fingerprint(fp_payload)
if is_internship(nature_text):
insert_internship_raw(
conn,
{
"source": args.source,
"source_file": os.path.abspath(file_path),
"sheet_name": ws.title,
"row_number": idx,
"fingerprint": fingerprint,
"updated_at_raw": updated_raw,
"updated_at_utc": posted,
"industry": industry,
"title": title,
"company": company,
"employment_type": employment,
"location_text": location,
"apply_email": email,
"job_source_url": link,
"raw_row_json": json.dumps(raw, ensure_ascii=False),
},
)
internship_saved += 1
continue
if not link:
skipped_no_link += 1
continue
message_id = make_message_id(args.source, title, company, link)
work_mode = infer_work_mode(nature_text)
job_nature = infer_job_nature(nature_text)
upsert_structured(
conn,
{
"source": args.source,
"source_channel": "excel_import",
"parser_name": "excel_import",
"parser_version": "v1",
"chat_id": None,
"message_id": message_id,
"message_date": posted,
"job_type": "招聘",
"company_name": company,
"industry_tags_json": json.dumps([industry], ensure_ascii=False) if industry else json.dumps([], ensure_ascii=False),
"company_intro": None,
"company_url": link,
"work_mode": work_mode,
"job_nature": job_nature,
"job_location_text": location,
"job_location_tags_json": json.dumps([location], ensure_ascii=False) if location else None,
"employment_type_raw": employment,
"position_name": title,
"position_tags_json": json.dumps([], ensure_ascii=False),
"salary_raw": salary,
"salary_currency": "USD" if salary and "$" in salary else None,
"salary_min": None,
"salary_max": None,
"salary_period": None,
"responsibilities_json": json.dumps([], ensure_ascii=False),
"requirements_json": json.dumps([], ensure_ascii=False),
"apply_email": email,
"apply_telegram": tg,
"job_source_url": link,
"body_text": detail or title or "excel_import",
"raw_content": json.dumps(raw, ensure_ascii=False),
},
)
imported += 1
if (imported + internship_saved) % 200 == 0:
logger.info(
"progress rows="
f"{idx-1}, imported={imported}, internship_saved={internship_saved}, "
f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}"
)
logger.info(
"done file="
f"{file_path}, imported={imported}, internship_saved={internship_saved}, "
f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}, sheet={ws.title}"
)
return imported, internship_saved, skipped_empty, skipped_no_link
def main():
args = parse_args()
files: list[str]
if args.file:
if not os.path.exists(args.file):
raise FileNotFoundError(f"Excel 文件不存在: {args.file}")
files = [args.file]
else:
files = list_excel_files(args.dir)
if not files:
raise FileNotFoundError(f"目录中未找到 Excel 文件: {args.dir}")
mysql_cfg = load_mysql_config()
conn = connect_mysql(mysql_cfg)
init_tables(conn)
total_imported = total_internship = total_skipped = total_skipped_no_link = 0
try:
for f in files:
imported, internship_saved, skipped_empty, skipped_no_link = import_one_file(
conn, args, f
)
total_imported += imported
total_internship += internship_saved
total_skipped += skipped_empty
total_skipped_no_link += skipped_no_link
finally:
conn.close()
logger.info(
"all_done "
f"files={len(files)}, imported={total_imported}, internship_saved={total_internship}, "
f"skipped_empty={total_skipped}, skipped_no_link={total_skipped_no_link}"
)
if __name__ == "__main__":
main()