#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Import external Excel jobs into MySQL. Rules: - Internship rows (实习/intern) are stored into `internship_jobs_raw` only. - Non-internship rows are normalized and upserted into `structured_jobs`. """ import argparse import hashlib import json import logging import os import re from datetime import date, datetime, timedelta, timezone import pymysql from openpyxl import load_workbook CONFIG_FILE = "config.json" URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE) EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) def setup_logger() -> logging.Logger: os.makedirs("logs", exist_ok=True) logger = logging.getLogger("import_excel_jobs") logger.setLevel(logging.INFO) if logger.handlers: return logger fmt = logging.Formatter( "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) ch = logging.StreamHandler() ch.setFormatter(fmt) fh = logging.FileHandler("logs/import_excel_jobs.log", encoding="utf-8") fh.setFormatter(fmt) logger.addHandler(ch) logger.addHandler(fh) return logger logger = setup_logger() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Import jobs from Excel into MySQL") parser.add_argument("--file", default="", help="Excel file path (.xlsx)") parser.add_argument( "--dir", default="sheets", help="Directory containing Excel files (.xlsx/.xlsm). Used when --file is empty.", ) parser.add_argument("--sheet", default="", help="Sheet name, default active sheet") parser.add_argument( "--source", default="@excel_import", help="source value written into structured_jobs.source", ) return parser.parse_args() def load_mysql_config() -> dict: if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") with open(CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) mysql_cfg = cfg.get("mysql", {}) if not isinstance(mysql_cfg, dict): raise ValueError("配置错误: mysql 必须是对象") result = { "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), } if not result["password"] or result["password"] == "CHANGE_ME": raise ValueError("请先在 config.json 里填写 mysql.password") return result def connect_mysql(cfg: dict): conn = pymysql.connect( host=cfg["host"], port=cfg["port"], user=cfg["user"], password=cfg["password"], database=cfg["database"], charset=cfg["charset"], autocommit=True, ) with conn.cursor() as cur: cur.execute("SET time_zone = '+00:00'") return conn def init_tables(conn): with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS internship_jobs_raw ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source VARCHAR(255) NOT NULL, fingerprint CHAR(64) NOT NULL, source_file VARCHAR(512) NOT NULL, sheet_name VARCHAR(255) NOT NULL, `row_number` INT NOT NULL, updated_at_raw VARCHAR(128) NULL, updated_at_utc DATETIME NULL, industry VARCHAR(255) NULL, title VARCHAR(512) NULL, company VARCHAR(255) NULL, employment_type VARCHAR(255) NULL, location_text VARCHAR(255) NULL, apply_email VARCHAR(255) NULL, job_source_url TEXT NULL, raw_row_json JSON NOT NULL, imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_internship_fingerprint (fingerprint), KEY idx_internship_source (source) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) # Compatibility for an existing old table schema. for sql in [ "ALTER TABLE internship_jobs_raw ADD COLUMN fingerprint CHAR(64) NULL", "ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_raw VARCHAR(128) NULL", "ALTER TABLE internship_jobs_raw ADD COLUMN updated_at_utc DATETIME NULL", "ALTER TABLE internship_jobs_raw ADD COLUMN industry VARCHAR(255) NULL", "ALTER TABLE internship_jobs_raw ADD COLUMN location_text VARCHAR(255) NULL", "ALTER TABLE internship_jobs_raw ADD COLUMN apply_email VARCHAR(255) NULL", "ALTER TABLE internship_jobs_raw ADD UNIQUE KEY uk_internship_fingerprint (fingerprint)", ]: try: cur.execute(sql) except Exception: pass def norm_header(v) -> str: s = str(v or "").strip().lower() s = re.sub(r"\s+", "", s) return s def norm_value(v) -> str | None: if v is None: return None s = str(v).strip() return s or None def first_match(data: dict, keys: list[str]) -> str | None: for k in keys: if k in data and data[k]: return data[k] return None def infer_work_mode(text: str) -> str: t = (text or "").lower() has_remote = any(k in t for k in ["远程", "remote", "wfh", "home office"]) has_onsite = any(k in t for k in ["实地", "onsite", "on-site", "现场", "坐班"]) if has_remote and has_onsite: return "hybrid" if has_remote: return "remote" if has_onsite: return "onsite" return "unknown" def infer_job_nature(text: str) -> str: t = (text or "").lower() if "全职" in t or "full time" in t: return "full_time" if "兼职" in t or "part time" in t: return "part_time" if "合同" in t or "contract" in t: return "contract" if "实习" in t or "intern" in t: return "intern" if "freelance" in t or "自由职业" in t: return "freelance" return "unknown" def is_internship(text: str) -> bool: t = (text or "").lower() return ("实习" in t) or ("intern" in t) def normalize_url(raw: str | None) -> str | None: if not raw: return None s = raw.strip() if s.lower().startswith(("http://", "https://")): return s if s.lower().startswith("www."): return "https://" + s if " " not in s and "." in s: return "https://" + s return None def extract_url_from_detail(detail: str | None) -> str | None: if not detail: return None m = URL_RE.search(detail) if m: return m.group(0) return None def extract_title_from_detail(detail: str | None, company: str | None) -> str | None: if not detail: return None for ln in str(detail).splitlines(): t = re.sub(r"\s+", " ", ln).strip() if not t: continue # remove common wrappers/prefixes t = t.replace("【", "").replace("】", "").strip("-— ") if company and t.startswith(company): t = t[len(company) :].strip("-—:: ") # keep first short sentence as title if len(t) <= 120: return t return t[:120] return None def extract_email_from_text(text: str | None) -> str | None: if not text: return None m = EMAIL_RE.search(text) if m: return m.group(0) return None def normalize_location_text(raw: str | None) -> str | None: if not raw: return None s = str(raw).strip() s = s.replace(";", ";").replace(",", ",") s = re.sub(r"\s+", " ", s) return s or None def parse_datetime_value(raw) -> str: now_utc = datetime.now(timezone.utc) if raw is None: return now_utc.strftime("%Y-%m-%d %H:%M:%S") if isinstance(raw, datetime): dt = raw if raw.tzinfo else raw.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") if isinstance(raw, date): dt = datetime(raw.year, raw.month, raw.day, tzinfo=timezone.utc) return dt.strftime("%Y-%m-%d %H:%M:%S") if isinstance(raw, (int, float)): # Excel serial date: days since 1899-12-30 try: base = datetime(1899, 12, 30, tzinfo=timezone.utc) dt = base + timedelta(days=float(raw)) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return now_utc.strftime("%Y-%m-%d %H:%M:%S") s = str(raw).strip() if not s: return now_utc.strftime("%Y-%m-%d %H:%M:%S") # ISO-like strings first try: iso = s.replace("Z", "+00:00").replace("T", " ") dt = datetime.fromisoformat(iso) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: pass normalized = ( s.replace("年", "-") .replace("月", "-") .replace("日", "") .replace("/", "-") .replace(".", "-") ) normalized = re.sub(r"\s+", " ", normalized).strip() for fmt in [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d", "%Y%m%d%H%M%S", "%Y%m%d%H%M", "%Y%m%d", ]: try: dt = datetime.strptime(normalized, fmt).replace(tzinfo=timezone.utc) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: continue logger.warning(f"无法解析时间,使用当前UTC时间兜底: raw={s}") return now_utc.strftime("%Y-%m-%d %H:%M:%S") def make_message_id(source: str, title: str | None, company: str | None, link: str | None) -> int: key = f"{source}|{title or ''}|{company or ''}|{link or ''}" digest = hashlib.sha256(key.encode("utf-8")).hexdigest()[:16] return int(digest, 16) & ((1 << 63) - 1) def make_fingerprint(payload: dict) -> str: key = "|".join( [ str(payload.get("source") or ""), str(payload.get("updated_at_raw") or ""), str(payload.get("industry") or ""), str(payload.get("company") or ""), str(payload.get("title") or ""), str(payload.get("employment_type") or ""), str(payload.get("location_text") or ""), str(payload.get("apply_email") or ""), str(payload.get("job_source_url") or ""), str(payload.get("detail_text") or ""), ] ) return hashlib.sha256(key.encode("utf-8")).hexdigest() def upsert_structured(conn, item: dict): with conn.cursor() as cur: cur.execute( """ INSERT INTO structured_jobs ( source, source_channel, parser_name, parser_version, chat_id, message_id, message_date, job_type, company_name, industry_tags_json, company_intro, company_url, work_mode, job_nature, job_location_text, job_location_tags_json, employment_type_raw, position_name, position_tags_json, salary_raw, salary_currency, salary_min, salary_max, salary_period, responsibilities_json, requirements_json, apply_email, apply_telegram, job_source_url, body_text, raw_content ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON DUPLICATE KEY UPDATE source_channel=VALUES(source_channel), parser_name=VALUES(parser_name), parser_version=VALUES(parser_version), chat_id=VALUES(chat_id), message_date=VALUES(message_date), job_type=VALUES(job_type), company_name=VALUES(company_name), industry_tags_json=VALUES(industry_tags_json), company_intro=VALUES(company_intro), company_url=VALUES(company_url), work_mode=VALUES(work_mode), job_nature=VALUES(job_nature), job_location_text=VALUES(job_location_text), job_location_tags_json=VALUES(job_location_tags_json), employment_type_raw=VALUES(employment_type_raw), position_name=VALUES(position_name), position_tags_json=VALUES(position_tags_json), salary_raw=VALUES(salary_raw), salary_currency=VALUES(salary_currency), salary_min=VALUES(salary_min), salary_max=VALUES(salary_max), salary_period=VALUES(salary_period), responsibilities_json=VALUES(responsibilities_json), requirements_json=VALUES(requirements_json), apply_email=VALUES(apply_email), apply_telegram=VALUES(apply_telegram), job_source_url=VALUES(job_source_url), body_text=VALUES(body_text), raw_content=VALUES(raw_content), cleaned_at=CURRENT_TIMESTAMP """, ( item["source"], item["source_channel"], item["parser_name"], item["parser_version"], item["chat_id"], item["message_id"], item["message_date"], item["job_type"], item["company_name"], item["industry_tags_json"], item["company_intro"], item["company_url"], item["work_mode"], item["job_nature"], item["job_location_text"], item["job_location_tags_json"], item["employment_type_raw"], item["position_name"], item["position_tags_json"], item["salary_raw"], item["salary_currency"], item["salary_min"], item["salary_max"], item["salary_period"], item["responsibilities_json"], item["requirements_json"], item["apply_email"], item["apply_telegram"], item["job_source_url"], item["body_text"], item["raw_content"], ), ) def insert_internship_raw(conn, item: dict): with conn.cursor() as cur: cur.execute( """ INSERT INTO internship_jobs_raw ( source, source_file, sheet_name, `row_number`, fingerprint, updated_at_raw, updated_at_utc, industry, title, company, employment_type, location_text, apply_email, job_source_url, raw_row_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE updated_at_raw=VALUES(updated_at_raw), updated_at_utc=VALUES(updated_at_utc), industry=VALUES(industry), title=VALUES(title), company=VALUES(company), employment_type=VALUES(employment_type), location_text=VALUES(location_text), apply_email=VALUES(apply_email), job_source_url=VALUES(job_source_url), raw_row_json=VALUES(raw_row_json), imported_at=CURRENT_TIMESTAMP """, ( item["source"], item["source_file"], item["sheet_name"], item["row_number"], item["fingerprint"], item["updated_at_raw"], item["updated_at_utc"], item["industry"], item["title"], item["company"], item["employment_type"], item["location_text"], item["apply_email"], item["job_source_url"], item["raw_row_json"], ), ) def list_excel_files(dir_path: str) -> list[str]: if not os.path.isdir(dir_path): return [] files = [] for name in sorted(os.listdir(dir_path)): full = os.path.join(dir_path, name) if not os.path.isfile(full): continue lower = name.lower() if lower.endswith(".xlsx") or lower.endswith(".xlsm"): files.append(full) return files def import_one_file(conn, args: argparse.Namespace, file_path: str) -> tuple[int, int, int, int]: wb = load_workbook(file_path, data_only=True) ws = wb[args.sheet] if args.sheet else wb.active rows = list(ws.iter_rows(values_only=True)) if not rows: logger.info(f"Excel 为空,跳过: file={file_path}") return 0, 0, 0 headers = [norm_header(v) for v in rows[0]] logger.info(f"file={file_path}, sheet={ws.title}, columns={len(headers)}, rows={len(rows)-1}") imported = internship_saved = skipped_empty = 0 skipped_no_link = 0 for idx, row in enumerate(rows[1:], start=2): raw = {headers[i]: norm_value(row[i]) if i < len(row) else None for i in range(len(headers))} # For your sheet, exact headers are: # 表格更新时间, 行业, 公司, 职位详情, 工作形式, 工作地点_标准, 投递邮箱 updated_raw = first_match(raw, ["表格更新时间", "更新时间", "date", "postedat"]) industry = first_match(raw, ["行业", "industry"]) company = first_match(raw, ["公司", "公司名称", "company", "companyname"]) detail = first_match(raw, ["职位详情", "岗位详情", "职位描述", "description", "jd", "详情"]) employment = first_match( raw, ["工作形式", "合作方式", "用工类型", "岗位性质", "employment", "employmenttype", "jobtype"], ) location = first_match(raw, ["工作地点_标准", "工作地点", "地点", "城市", "location", "worklocation"]) email = first_match(raw, ["投递邮箱", "邮箱", "email", "applyemail"]) salary = first_match(raw, ["薪资", "薪酬", "salary", "compensation"]) tg = first_match(raw, ["telegram", "tg", "联系方式telegram"]) title = extract_title_from_detail(detail, company) link = normalize_url(extract_url_from_detail(detail)) if not email: email = extract_email_from_text(detail) location = normalize_location_text(location) posted = parse_datetime_value(updated_raw) # Build a combined text for nature detection. nature_text = " | ".join([x for x in [title, employment, detail] if x]) if not any([title, company, link, detail]): skipped_empty += 1 continue fp_payload = { "source": args.source, "updated_at_raw": updated_raw, "industry": industry, "company": company, "title": title, "employment_type": employment, "location_text": location, "apply_email": email, "job_source_url": link, "detail_text": detail, } fingerprint = make_fingerprint(fp_payload) if is_internship(nature_text): insert_internship_raw( conn, { "source": args.source, "source_file": os.path.abspath(file_path), "sheet_name": ws.title, "row_number": idx, "fingerprint": fingerprint, "updated_at_raw": updated_raw, "updated_at_utc": posted, "industry": industry, "title": title, "company": company, "employment_type": employment, "location_text": location, "apply_email": email, "job_source_url": link, "raw_row_json": json.dumps(raw, ensure_ascii=False), }, ) internship_saved += 1 continue if not link: skipped_no_link += 1 continue message_id = make_message_id(args.source, title, company, link) work_mode = infer_work_mode(nature_text) job_nature = infer_job_nature(nature_text) upsert_structured( conn, { "source": args.source, "source_channel": "excel_import", "parser_name": "excel_import", "parser_version": "v1", "chat_id": None, "message_id": message_id, "message_date": posted, "job_type": "招聘", "company_name": company, "industry_tags_json": json.dumps([industry], ensure_ascii=False) if industry else json.dumps([], ensure_ascii=False), "company_intro": None, "company_url": link, "work_mode": work_mode, "job_nature": job_nature, "job_location_text": location, "job_location_tags_json": json.dumps([location], ensure_ascii=False) if location else None, "employment_type_raw": employment, "position_name": title, "position_tags_json": json.dumps([], ensure_ascii=False), "salary_raw": salary, "salary_currency": "USD" if salary and "$" in salary else None, "salary_min": None, "salary_max": None, "salary_period": None, "responsibilities_json": json.dumps([], ensure_ascii=False), "requirements_json": json.dumps([], ensure_ascii=False), "apply_email": email, "apply_telegram": tg, "job_source_url": link, "body_text": detail or title or "excel_import", "raw_content": json.dumps(raw, ensure_ascii=False), }, ) imported += 1 if (imported + internship_saved) % 200 == 0: logger.info( "progress rows=" f"{idx-1}, imported={imported}, internship_saved={internship_saved}, " f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}" ) logger.info( "done file=" f"{file_path}, imported={imported}, internship_saved={internship_saved}, " f"skipped_empty={skipped_empty}, skipped_no_link={skipped_no_link}, sheet={ws.title}" ) return imported, internship_saved, skipped_empty, skipped_no_link def main(): args = parse_args() files: list[str] if args.file: if not os.path.exists(args.file): raise FileNotFoundError(f"Excel 文件不存在: {args.file}") files = [args.file] else: files = list_excel_files(args.dir) if not files: raise FileNotFoundError(f"目录中未找到 Excel 文件: {args.dir}") mysql_cfg = load_mysql_config() conn = connect_mysql(mysql_cfg) init_tables(conn) total_imported = total_internship = total_skipped = total_skipped_no_link = 0 try: for f in files: imported, internship_saved, skipped_empty, skipped_no_link = import_one_file( conn, args, f ) total_imported += imported total_internship += internship_saved total_skipped += skipped_empty total_skipped_no_link += skipped_no_link finally: conn.close() logger.info( "all_done " f"files={len(files)}, imported={total_imported}, internship_saved={total_internship}, " f"skipped_empty={total_skipped}, skipped_no_link={total_skipped_no_link}" ) if __name__ == "__main__": main()