Files
tg_crawl/import_excel_jobs.py

679 lines
24 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Import external Excel jobs into MySQL.
Rules:
- Internship rows (实习/intern) are stored into `internship_jobs_raw` only.
- Non-internship rows are normalized and upserted into `structured_jobs`.
"""
import argparse
import hashlib
import json
import logging
import os
import re
from datetime import date, datetime, timedelta, timezone
import pymysql
from openpyxl import load_workbook
CONFIG_FILE = "config.json"
URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE)
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
def setup_logger() -> logging.Logger:
os.makedirs("logs", exist_ok=True)
logger = logging.getLogger("import_excel_jobs")
logger.setLevel(logging.INFO)
if logger.handlers:
return logger
fmt = logging.Formatter(
"[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
ch = logging.StreamHandler()
ch.setFormatter(fmt)
fh = logging.FileHandler("logs/import_excel_jobs.log", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
logger = setup_logger()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Import jobs from Excel into MySQL")
parser.add_argument("--file", default="", help="Excel file path (.xlsx)")
parser.add_argument(
"--dir",
default="sheets",
help="Directory containing Excel files (.xlsx/.xlsm). Used when --file is empty.",
)
parser.add_argument("--sheet", default="", help="Sheet name, default active sheet")
parser.add_argument(
"--source",
default="@excel_import",
help="source value written into structured_jobs.source",
)
return parser.parse_args()
def load_mysql_config() -> dict:
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
mysql_cfg = cfg.get("mysql", {})
if not isinstance(mysql_cfg, dict):
raise ValueError("配置错误: mysql 必须是对象")
result = {
"host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"),
"port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")),
"user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"),
"password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""),
"database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"),
"charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"),
}
if not result["password"] or result["password"] == "CHANGE_ME":
raise ValueError("请先在 config.json 里填写 mysql.password")
return result
def connect_mysql(cfg: dict):
conn = pymysql.connect(
host=cfg["host"],
port=cfg["port"],
user=cfg["user"],
password=cfg["password"],
database=cfg["database"],
charset=cfg["charset"],
autocommit=True,
)
with conn.cursor() as cur:
cur.execute("SET time_zone = '+00:00'")
return conn
def init_tables(conn):
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS internship_jobs_raw (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
source VARCHAR(255) NOT NULL,
fingerprint CHAR(64) NOT NULL,
source_file VARCHAR(512) NOT NULL,
sheet_name VARCHAR(255) NOT NULL,
`row_number` INT NOT NULL,
updated_at_raw VARCHAR(128) NULL,
updated_at_utc DATETIME NULL,
industry VARCHAR(255) NULL,
title VARCHAR(512) NULL,
company VARCHAR(255) NULL,
employment_type VARCHAR(255) NULL,
location_text VARCHAR(255) NULL,
apply_email VARCHAR(255) NULL,
job_source_url TEXT NULL,
raw_row_json JSON NOT NULL,
imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_internship_fingerprint (fingerprint),
KEY idx_internship_source (source)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
# Compatibility for an existing old table schema.
for sql in [
"ALTER TABLE internship_jobs_raw ADD COLUMN fingerprint CHAR(64) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_raw VARCHAR(128) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_utc DATETIME NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN industry VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN location_text VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD COLUMN apply_email VARCHAR(255) NULL",
"ALTER TABLE internship_jobs_raw ADD UNIQUE KEY uk_internship_fingerprint (fingerprint)",
]:
try:
cur.execute(sql)
except Exception:
pass
def norm_header(v) -> str:
s = str(v or "").strip().lower()
s = re.sub(r"\s+", "", s)
return s
def norm_value(v) -> str | None:
if v is None:
return None
s = str(v).strip()
return s or None
def first_match(data: dict, keys: list[str]) -> str | None:
for k in keys:
if k in data and data[k]:
return data[k]
return None
def infer_work_mode(text: str) -> str:
t = (text or "").lower()
has_remote = any(k in t for k in ["远程", "remote", "wfh", "home office"])
has_onsite = any(k in t for k in ["实地", "onsite", "on-site", "现场", "坐班"])
if has_remote and has_onsite:
return "hybrid"
if has_remote:
return "remote"
if has_onsite:
return "onsite"
return "unknown"
def infer_job_nature(text: str) -> str:
t = (text or "").lower()
if "全职" in t or "full time" in t:
return "full_time"
if "兼职" in t or "part time" in t:
return "part_time"
if "合同" in t or "contract" in t:
return "contract"
if "实习" in t or "intern" in t:
return "intern"
if "freelance" in t or "自由职业" in t:
return "freelance"
return "unknown"
def is_internship(text: str) -> bool:
t = (text or "").lower()
return ("实习" in t) or ("intern" in t)
def normalize_url(raw: str | None) -> str | None:
if not raw:
return None
s = raw.strip()
if s.lower().startswith(("http://", "https://")):
return s
if s.lower().startswith("www."):
return "https://" + s
if " " not in s and "." in s:
return "https://" + s
return None
def extract_url_from_detail(detail: str | None) -> str | None:
if not detail:
return None
m = URL_RE.search(detail)
if m:
return m.group(0)
return None
def extract_title_from_detail(detail: str | None, company: str | None) -> str | None:
if not detail:
return None
for ln in str(detail).splitlines():
t = re.sub(r"\s+", " ", ln).strip()
if not t:
continue
# remove common wrappers/prefixes
t = t.replace("", "").replace("", "").strip("-— ")
if company and t.startswith(company):
t = t[len(company) :].strip("-—: ")
# keep first short sentence as title
if len(t) <= 120:
return t
return t[:120]
return None
def extract_email_from_text(text: str | None) -> str | None:
if not text:
return None
m = EMAIL_RE.search(text)
if m:
return m.group(0)
return None
def normalize_location_text(raw: str | None) -> str | None:
if not raw:
return None
s = str(raw).strip()
s = s.replace("", ";").replace("", ",")
s = re.sub(r"\s+", " ", s)
return s or None
def parse_datetime_value(raw) -> str:
now_utc = datetime.now(timezone.utc)
if raw is None:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, datetime):
dt = raw if raw.tzinfo else raw.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, date):
dt = datetime(raw.year, raw.month, raw.day, tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(raw, (int, float)):
# Excel serial date: days since 1899-12-30
try:
base = datetime(1899, 12, 30, tzinfo=timezone.utc)
dt = base + timedelta(days=float(raw))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
s = str(raw).strip()
if not s:
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
# ISO-like strings first
try:
iso = s.replace("Z", "+00:00").replace("T", " ")
dt = datetime.fromisoformat(iso)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
normalized = (
s.replace("", "-")
.replace("", "-")
.replace("", "")
.replace("/", "-")
.replace(".", "-")
)
normalized = re.sub(r"\s+", " ", normalized).strip()
for fmt in [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d",
"%Y%m%d%H%M%S",
"%Y%m%d%H%M",
"%Y%m%d",
]:
try:
dt = datetime.strptime(normalized, fmt).replace(tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
continue
logger.warning(f"无法解析时间使用当前UTC时间兜底: raw={s}")
return now_utc.strftime("%Y-%m-%d %H:%M:%S")
def make_message_id(source: str, title: str | None, company: str | None, link: str | None) -> int:
key = f"{source}|{title or ''}|{company or ''}|{link or ''}"
digest = hashlib.sha256(key.encode("utf-8")).hexdigest()[:16]
return int(digest, 16) & ((1 << 63) - 1)
def make_fingerprint(payload: dict) -> str:
key = "|".join(
[
str(payload.get("source") or ""),
str(payload.get("updated_at_raw") or ""),
str(payload.get("industry") or ""),
str(payload.get("company") or ""),
str(payload.get("title") or ""),
str(payload.get("employment_type") or ""),
str(payload.get("location_text") or ""),
str(payload.get("apply_email") or ""),
str(payload.get("job_source_url") or ""),
str(payload.get("detail_text") or ""),
]
)
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def upsert_structured(conn, item: dict):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO structured_jobs (
source, source_channel, parser_name, parser_version, chat_id, message_id,
message_date, job_type, company_name, industry_tags_json, company_intro,
company_url, work_mode, job_nature, job_location_text, job_location_tags_json,
employment_type_raw, position_name, position_tags_json,
salary_raw, salary_currency, salary_min, salary_max, salary_period,
responsibilities_json, requirements_json, apply_email, apply_telegram,
job_source_url, body_text, raw_content
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s
)
ON DUPLICATE KEY UPDATE
source_channel=VALUES(source_channel),
parser_name=VALUES(parser_name),
parser_version=VALUES(parser_version),
chat_id=VALUES(chat_id),
message_date=VALUES(message_date),
job_type=VALUES(job_type),
company_name=VALUES(company_name),
industry_tags_json=VALUES(industry_tags_json),
company_intro=VALUES(company_intro),
company_url=VALUES(company_url),
work_mode=VALUES(work_mode),
job_nature=VALUES(job_nature),
job_location_text=VALUES(job_location_text),
job_location_tags_json=VALUES(job_location_tags_json),
employment_type_raw=VALUES(employment_type_raw),
position_name=VALUES(position_name),
position_tags_json=VALUES(position_tags_json),
salary_raw=VALUES(salary_raw),
salary_currency=VALUES(salary_currency),
salary_min=VALUES(salary_min),
salary_max=VALUES(salary_max),
salary_period=VALUES(salary_period),
responsibilities_json=VALUES(responsibilities_json),
requirements_json=VALUES(requirements_json),
apply_email=VALUES(apply_email),
apply_telegram=VALUES(apply_telegram),
job_source_url=VALUES(job_source_url),
body_text=VALUES(body_text),
raw_content=VALUES(raw_content),
cleaned_at=CURRENT_TIMESTAMP
""",
(
item["source"],
item["source_channel"],
item["parser_name"],
item["parser_version"],
item["chat_id"],
item["message_id"],
item["message_date"],
item["job_type"],
item["company_name"],
item["industry_tags_json"],
item["company_intro"],
item["company_url"],
item["work_mode"],
item["job_nature"],
item["job_location_text"],
item["job_location_tags_json"],
item["employment_type_raw"],
item["position_name"],
item["position_tags_json"],
item["salary_raw"],
item["salary_currency"],
item["salary_min"],
item["salary_max"],
item["salary_period"],
item["responsibilities_json"],
item["requirements_json"],
item["apply_email"],
item["apply_telegram"],
item["job_source_url"],
item["body_text"],
item["raw_content"],
),
)
def insert_internship_raw(conn, item: dict):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO internship_jobs_raw (
source, source_file, sheet_name, `row_number`,
fingerprint, updated_at_raw, updated_at_utc, industry, title, company,
employment_type, location_text, apply_email, job_source_url, raw_row_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
updated_at_raw=VALUES(updated_at_raw),
updated_at_utc=VALUES(updated_at_utc),
industry=VALUES(industry),
title=VALUES(title),
company=VALUES(company),
employment_type=VALUES(employment_type),
location_text=VALUES(location_text),
apply_email=VALUES(apply_email),
job_source_url=VALUES(job_source_url),
raw_row_json=VALUES(raw_row_json),
imported_at=CURRENT_TIMESTAMP
""",
(
item["source"],
item["source_file"],
item["sheet_name"],
item["row_number"],
item["fingerprint"],
item["updated_at_raw"],
item["updated_at_utc"],
item["industry"],
item["title"],
item["company"],
item["employment_type"],
item["location_text"],
item["apply_email"],
item["job_source_url"],
item["raw_row_json"],
),
)
def list_excel_files(dir_path: str) -> list[str]:
if not os.path.isdir(dir_path):
return []
files = []
for name in sorted(os.listdir(dir_path)):
full = os.path.join(dir_path, name)
if not os.path.isfile(full):
continue
lower = name.lower()
if lower.endswith(".xlsx") or lower.endswith(".xlsm"):
files.append(full)
return files
def import_one_file(conn, args: argparse.Namespace, file_path: str) -> tuple[int, int, int, int]:
wb = load_workbook(file_path, data_only=True)
ws = wb[args.sheet] if args.sheet else wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
logger.info(f"Excel 为空,跳过: file={file_path}")
return 0, 0, 0
headers = [norm_header(v) for v in rows[0]]
logger.info(f"file={file_path}, sheet={ws.title}, columns={len(headers)}, rows={len(rows)-1}")
imported = internship_saved = skipped_empty = 0
skipped_no_link = 0
for idx, row in enumerate(rows[1:], start=2):
raw = {headers[i]: norm_value(row[i]) if i < len(row) else None for i in range(len(headers))}
# For your sheet, exact headers are:
# 表格更新时间, 行业, 公司, 职位详情, 工作形式, 工作地点_标准, 投递邮箱
updated_raw = first_match(raw, ["表格更新时间", "更新时间", "date", "postedat"])
industry = first_match(raw, ["行业", "industry"])
company = first_match(raw, ["公司", "公司名称", "company", "companyname"])
detail = first_match(raw, ["职位详情", "岗位详情", "职位描述", "description", "jd", "详情"])
employment = first_match(
raw,
["工作形式", "合作方式", "用工类型", "岗位性质", "employment", "employmenttype", "jobtype"],
)
location = first_match(raw, ["工作地点_标准", "工作地点", "地点", "城市", "location", "worklocation"])
email = first_match(raw, ["投递邮箱", "邮箱", "email", "applyemail"])
salary = first_match(raw, ["薪资", "薪酬", "salary", "compensation"])
tg = first_match(raw, ["telegram", "tg", "联系方式telegram"])
title = extract_title_from_detail(detail, company)
link = normalize_url(extract_url_from_detail(detail))
if not email:
email = extract_email_from_text(detail)
location = normalize_location_text(location)
posted = parse_datetime_value(updated_raw)
# Build a combined text for nature detection.
nature_text = " | ".join([x for x in [title, employment, detail] if x])
if not any([title, company, link, detail]):
skipped_empty += 1
continue
fp_payload = {
"source": args.source,
"updated_at_raw": updated_raw,
"industry": industry,
"company": company,
"title": title,
"employment_type": employment,
"location_text": location,
"apply_email": email,
"job_source_url": link,
"detail_text": detail,
}
fingerprint = make_fingerprint(fp_payload)
if is_internship(nature_text):
insert_internship_raw(
conn,
{
"source": args.source,
"source_file": os.path.abspath(file_path),
"sheet_name": ws.title,
"row_number": idx,
"fingerprint": fingerprint,
"updated_at_raw": updated_raw,
"updated_at_utc": posted,
"industry": industry,
"title": title,
"company": company,
"employment_type": employment,
"location_text": location,
"apply_email": email,
"job_source_url": link,
"raw_row_json": json.dumps(raw, ensure_ascii=False),
},
)
internship_saved += 1
continue
if not link:
skipped_no_link += 1
continue
message_id = make_message_id(args.source, title, company, link)
work_mode = infer_work_mode(nature_text)
job_nature = infer_job_nature(nature_text)
upsert_structured(
conn,
{
"source": args.source,
"source_channel": "excel_import",
"parser_name": "excel_import",
"parser_version": "v1",
"chat_id": None,
"message_id": message_id,
"message_date": posted,
"job_type": "招聘",
"company_name": company,
"industry_tags_json": json.dumps([industry], ensure_ascii=False) if industry else json.dumps([], ensure_ascii=False),
"company_intro": None,
"company_url": link,
"work_mode": work_mode,
"job_nature": job_nature,
"job_location_text": location,
"job_location_tags_json": json.dumps([location], ensure_ascii=False) if location else None,
"employment_type_raw": employment,
"position_name": title,
"position_tags_json": json.dumps([], ensure_ascii=False),
"salary_raw": salary,
"salary_currency": "USD" if salary and "$" in salary else None,
"salary_min": None,
"salary_max": None,
"salary_period": None,
"responsibilities_json": json.dumps([], ensure_ascii=False),
"requirements_json": json.dumps([], ensure_ascii=False),
"apply_email": email,
"apply_telegram": tg,
"job_source_url": link,
"body_text": detail or title or "excel_import",
"raw_content": json.dumps(raw, ensure_ascii=False),
},
)
imported += 1
if (imported + internship_saved) % 200 == 0:
logger.info(
"progress rows="
f"{idx-1}, imported={imported}, internship_saved={internship_saved}, "
f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}"
)
logger.info(
"done file="
f"{file_path}, imported={imported}, internship_saved={internship_saved}, "
f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}, sheet={ws.title}"
)
return imported, internship_saved, skipped_empty, skipped_no_link
def main():
args = parse_args()
files: list[str]
if args.file:
if not os.path.exists(args.file):
raise FileNotFoundError(f"Excel 文件不存在: {args.file}")
files = [args.file]
else:
files = list_excel_files(args.dir)
if not files:
raise FileNotFoundError(f"目录中未找到 Excel 文件: {args.dir}")
mysql_cfg = load_mysql_config()
conn = connect_mysql(mysql_cfg)
init_tables(conn)
total_imported = total_internship = total_skipped = total_skipped_no_link = 0
try:
for f in files:
imported, internship_saved, skipped_empty, skipped_no_link = import_one_file(
conn, args, f
)
total_imported += imported
total_internship += internship_saved
total_skipped += skipped_empty
total_skipped_no_link += skipped_no_link
finally:
conn.close()
logger.info(
"all_done "
f"files={len(files)}, imported={total_imported}, internship_saved={total_internship}, "
f"skipped_empty={total_skipped}, skipped_no_link={total_skipped_no_link}"
)
if __name__ == "__main__":
main()