#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Source-aware structured cleaning (MySQL). Input table: messages Output table: structured_jobs """ import json import logging import os import re from dataclasses import dataclass import pymysql CONFIG_FILE = "config.json" PIPELINE_NAME = "structured_cleaner_v1" URL_RE = re.compile(r"https?://[^\s)]+", re.IGNORECASE) EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) TG_RE = re.compile(r"(? logging.Logger: os.makedirs("logs", exist_ok=True) logger = logging.getLogger("clean_to_structured") logger.setLevel(logging.INFO) if logger.handlers: return logger fmt = logging.Formatter( "[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) ch = logging.StreamHandler() ch.setFormatter(fmt) fh = logging.FileHandler("logs/clean_to_structured.log", encoding="utf-8") fh.setFormatter(fmt) logger.addHandler(ch) logger.addHandler(fh) return logger logger = setup_logger() def load_mysql_config() -> dict: if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}") with open(CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) mysql_cfg = cfg.get("mysql", {}) if not isinstance(mysql_cfg, dict): raise ValueError("配置错误: mysql 必须是对象") result = { "host": mysql_cfg.get("host") or os.getenv("MYSQL_HOST", "127.0.0.1"), "port": int(mysql_cfg.get("port") or os.getenv("MYSQL_PORT", "3306")), "user": mysql_cfg.get("user") or os.getenv("MYSQL_USER", "jobs_user"), "password": mysql_cfg.get("password") or os.getenv("MYSQL_PASSWORD", ""), "database": mysql_cfg.get("database") or os.getenv("MYSQL_DATABASE", "jobs"), "charset": mysql_cfg.get("charset") or os.getenv("MYSQL_CHARSET", "utf8mb4"), } if not result["password"]: raise ValueError("配置错误: mysql.password 不能为空") return result def connect_mysql(cfg: dict): conn = pymysql.connect( host=cfg["host"], port=cfg["port"], user=cfg["user"], password=cfg["password"], database=cfg["database"], charset=cfg["charset"], autocommit=True, ) with conn.cursor() as cur: cur.execute("SET time_zone = '+00:00'") return conn def init_target_db(conn): with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS structured_jobs ( id BIGINT PRIMARY KEY AUTO_INCREMENT, source VARCHAR(255) NOT NULL, source_channel VARCHAR(255) NULL, parser_name VARCHAR(64) NOT NULL, parser_version VARCHAR(32) NOT NULL, chat_id BIGINT NULL, message_id BIGINT NOT NULL, message_date DATETIME NOT NULL, job_type VARCHAR(64) NULL, company_name VARCHAR(255) NULL, industry_tags_json JSON NOT NULL, company_intro LONGTEXT NULL, company_url TEXT NULL, work_mode VARCHAR(32) NOT NULL, job_nature VARCHAR(32) NOT NULL, job_location_text VARCHAR(255) NULL, job_location_tags_json JSON NULL, employment_type_raw TEXT NULL, position_name VARCHAR(255) NULL, position_tags_json JSON NOT NULL, salary_raw TEXT NULL, salary_currency VARCHAR(16) NULL, salary_min BIGINT NULL, salary_max BIGINT NULL, salary_period VARCHAR(16) NULL, responsibilities_json JSON NOT NULL, requirements_json JSON NOT NULL, apply_email VARCHAR(255) NULL, apply_telegram VARCHAR(255) NULL, job_source_url TEXT NULL, body_text LONGTEXT NOT NULL, raw_content LONGTEXT NOT NULL, cleaned_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_source_message (source, message_id), KEY idx_structured_source_date (source, message_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS clean_state ( pipeline_name VARCHAR(128) PRIMARY KEY, last_message_row_id BIGINT NOT NULL DEFAULT 0, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) def dedupe(values: list[str]) -> list[str]: seen = set() out = [] for v in values: if not v: continue if v in seen: continue seen.add(v) out.append(v) return out def clean_md_text(s: str) -> str: s = re.sub(r"\*+", "", s) s = re.sub(r"~+", "", s) s = s.replace("`", "").strip() return re.sub(r"\s+", " ", s).strip() def normalize_md_line(s: str) -> str: s = s.replace("**", "").replace("`", "") s = s.replace("~~", "") s = s.replace("\u3000", " ") return re.sub(r"\s+", " ", s).strip() def clean_company_name(s: str | None) -> str | None: if not s: return None s = clean_md_text(s) s = s.strip(" -|::#") return s or None def infer_salary_currency(text: str) -> str | None: low = text.lower() if any(k in low for k in ["usd", "us$", "dollar"]) or "$" in text: return "USD" if any(k in text for k in ["¥", "¥", "人民币"]) or "cny" in low: return "CNY" if "元" in text or "万" in text: return "CNY" if "k" in low and any(k in low for k in ["month", "year", "day"]): return "USD" if "eur" in low or "€" in text: return "EUR" if "hkd" in low or "hk$" in low: return "HKD" if "sgd" in low or "s$" in low: return "SGD" # No explicit marker: infer by language. if re.search(r"[\u4e00-\u9fff]", text): return "CNY" return "USD" def parse_salary( raw: str | None, ) -> tuple[str | None, str | None, int | None, int | None, str | None]: if not raw: return None, None, None, None, None text = clean_md_text(raw) lower = text.lower() currency = infer_salary_currency(text) num_tokens = re.findall(r"(\d+(?:\.\d+)?)\s*([kKwW万]?)", text.replace(",", "")) salary_min = None salary_max = None if num_tokens: vals: list[tuple[float, str]] = [(float(n), u) for n, u in num_tokens] if len(vals) >= 2: u1 = vals[0][1] u2 = vals[1][1] if not u1 and u2: vals[0] = (vals[0][0], u2) if not u2 and u1: vals[1] = (vals[1][0], u1) def scaled(v: float, unit: str) -> int: m = 1 if unit in ("k", "K"): m = 1000 elif unit in ("w", "W", "万"): m = 10000 return int(v * m) salary_min = scaled(vals[0][0], vals[0][1]) if len(vals) >= 1 else None salary_max = scaled(vals[1][0], vals[1][1]) if len(vals) >= 2 else None period = None if "month" in lower or "每月" in text or "月" in text: period = "month" elif "year" in lower or "年" in text: period = "year" elif "day" in lower or "日" in text: period = "day" return text, currency, salary_min, salary_max, period def strip_meta_lines(content: str) -> str: lines = [] for ln in (content or "").splitlines(): if ln.startswith("[MEDIA_TYPE] "): continue if ln.startswith("[ACTION_TYPE] "): continue if ln.startswith("[MEDIA_JSON] "): continue if ln.startswith("[ACTION_JSON] "): continue if ln.startswith("phones="): continue if ln.startswith("[MEDIA_TEXT] "): continue lines.append(ln.rstrip()) return "\n".join(lines).strip() def preprocess_body_text(body_text: str) -> str: text = re.sub(r"\[([^\]]+)\]\((https?://[^)\s]+)\)", r"\1 \2", body_text) text = text.replace("**", "").replace("__", "").replace("~~", "").replace("`", "") lines = [re.sub(r"[ \t]+", " ", ln).strip() for ln in text.splitlines()] return "\n".join(lines).strip() def extract_section(body_text: str, section_name: str) -> str | None: lines = body_text.splitlines() start = None for i, ln in enumerate(lines): if section_name in ln: start = i + 1 break if start is None: return None collected = [] for ln in lines[start:]: if any(k in ln for k in SECTION_KEYS): break collected.append(ln) text = "\n".join(collected).strip() return text or None def extract_first_url(text: str | None) -> str | None: if not text: return None urls = URL_RE.findall(text) return urls[0] if urls else None def extract_job_source_url(body_text: str) -> str | None: for ln in body_text.splitlines(): if "岗位来源" in ln: urls = URL_RE.findall(ln) if urls: return urls[0] return None def extract_company_name_dejob(body_text: str) -> str | None: for ln in body_text.splitlines(): if "🏡" in ln: no_md = clean_md_text(ln) no_md = no_md.replace("🏡", "").strip() if "#" in no_md: no_md = no_md.split("#", 1)[0].strip() return clean_company_name(no_md) return None def extract_tags_after_key(line: str, key: str) -> list[str]: if key not in line: return [] frag = normalize_md_line(line.split(key, 1)[1]) tags = [clean_md_text(t).replace("·", " ").strip() for t in MD_TAG_RE.findall(frag)] return dedupe([t for t in tags if t]) def extract_list_section(body_text: str, key: str) -> list[str]: sec = extract_section(body_text, key) if not sec: return [] items = [] for ln in sec.splitlines(): t = clean_md_text(ln) t = re.sub(r"^\d+️⃣?\s*", "", t) t = re.sub(r"^\d+[\.、]\s*", "", t) if t: items.append(t) return items def extract_position_name_dejob(body_text: str) -> str | None: for ln in body_text.splitlines(): if "待招岗位" in ln: tags = extract_tags_after_key(ln, "待招岗位") if tags: return tags[0] return None def extract_apply_email(body_text: str) -> str | None: emails = EMAIL_RE.findall(body_text) return emails[0] if emails else None def extract_apply_telegram(body_text: str) -> str | None: for ln in body_text.splitlines(): if "Telegram" in ln: m = TG_RE.search(ln) if m: return m.group(0) handles = TG_RE.findall(body_text) return handles[0] if handles else None def extract_urls(body_text: str) -> list[str]: return dedupe(URL_RE.findall(body_text)) def extract_first_url_by_keyword(body_text: str, keywords: list[str]) -> str | None: urls = extract_urls(body_text) for u in urls: lu = u.lower() if any(k.lower() in lu for k in keywords): return u return None def extract_first_nonempty_line(body_text: str) -> str | None: for ln in body_text.splitlines(): t = clean_md_text(ln) if t: return t return None def normalize_possible_url(raw: str) -> str | None: token = clean_md_text(raw or "") if not token: return None token = token.strip("()[]<>.,;\"' ") if not token: return None if token.lower().startswith(("http://", "https://")): return token if token.lower().startswith("www."): return "https://" + token # simple domain-style fallback, e.g. company.com/apply if " " not in token and "." in token and "/" in token: return "https://" + token if " " not in token and re.fullmatch(r"[A-Za-z0-9.-]+\.[A-Za-z]{2,}", token): return "https://" + token return None def extract_apply_link(body_text: str) -> str | None: # Priority 1: explicit apply-like lines. for ln in body_text.splitlines(): low = ln.lower() if "apply" not in low and "申请" not in ln and "投递" not in ln: continue # direct URL in line line_urls = URL_RE.findall(ln) if line_urls: return line_urls[0] # try parse right side after ':' / '-' if ":" in ln: rhs = ln.split(":", 1)[1] elif ":" in ln: rhs = ln.split(":", 1)[1] elif "-" in ln: rhs = ln.split("-", 1)[1] else: rhs = ln for token in re.split(r"\s+", rhs.strip()): u = normalize_possible_url(token) if u: return u # Priority 2: first URL that looks like an apply page. for u in extract_urls(body_text): lu = u.lower() if "apply" in lu or "job" in lu or "careers" in lu: return u return None def infer_employment_fields( tags: list[str], raw_line: str | None ) -> tuple[str, str, str | None, list[str] | None, str | None]: mode_remote = {"远程", "remote", "居家", "在家办公", "home office", "wfh"} mode_onsite = {"实地", "现场", "线下", "onsite", "on-site", "坐班", "到岗"} nature_map = { "全职": "full_time", "兼职": "part_time", "实习": "intern", "合同": "contract", "contract": "contract", "自由职业": "freelance", "freelance": "freelance", } nature_priority = ["full_time", "part_time", "contract", "intern", "freelance"] normalized = [] for t in tags: n = clean_md_text(t).replace("·", " ").strip() if n: normalized.append(n) normalized = dedupe(normalized) has_remote = False has_onsite = False natures_found = [] locations = [] for tag in normalized: low = tag.lower() if low in mode_remote or tag in mode_remote: has_remote = True continue if low in mode_onsite or tag in mode_onsite: has_onsite = True continue mapped = nature_map.get(tag) or nature_map.get(low) if mapped: natures_found.append(mapped) continue locations.append(tag) if has_remote and has_onsite: work_mode = "hybrid" elif has_remote: work_mode = "remote" elif has_onsite: work_mode = "onsite" else: work_mode = "unknown" job_nature = "unknown" for cand in nature_priority: if cand in natures_found: job_nature = cand break location_tags_raw = dedupe(locations) location_text = location_tags_raw[0] if location_tags_raw else None location_tags: list[str] | None = location_tags_raw if location_tags_raw else None raw = clean_md_text(raw_line) if raw_line else None return work_mode, job_nature, location_text, location_tags, raw def parse_dejob_official( source: str, chat_id: int | None, message_id: int, message_date: str, body_text: str, raw_content: str, ) -> StructuredJob: job_type = "招聘" if ("招聘" in body_text or "Recruitment" in body_text) else None company_name = extract_company_name_dejob(body_text) industry_tags = [] for ln in body_text.splitlines(): if "🏡" in ln: norm_ln = normalize_md_line(ln) industry_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] industry_tags = dedupe([t for t in industry_tags if t]) break cooperation_tags = [] cooperation_line = None for ln in body_text.splitlines(): if "合作方式" in ln: cooperation_line = ln norm_ln = normalize_md_line(ln) cooperation_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] cooperation_tags = dedupe([t for t in cooperation_tags if t]) break ( work_mode, job_nature, job_location_text, job_location_tags, employment_type_raw, ) = infer_employment_fields(cooperation_tags, cooperation_line) position_tags = [] for ln in body_text.splitlines(): if "待招岗位" in ln: norm_ln = normalize_md_line(ln) position_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] position_tags = dedupe([t for t in position_tags if t]) break position_name = extract_position_name_dejob(body_text) intro_sec = extract_section(body_text, "简介") company_url = extract_first_url(intro_sec) or extract_first_url(body_text) company_intro = None if intro_sec: intro_lines = [ln for ln in intro_sec.splitlines() if not URL_RE.search(ln)] company_intro = clean_md_text("\n".join(intro_lines)) or None salary_line = None for ln in body_text.splitlines(): if "薪酬" in ln or "Salary" in ln or "salary" in ln: salary_line = ln break salary_raw, salary_currency, salary_min, salary_max, salary_period = parse_salary( salary_line ) responsibilities = extract_list_section(body_text, "岗位职责") requirements = extract_list_section(body_text, "岗位要求") apply_email = extract_apply_email(body_text) apply_tg = extract_apply_telegram(body_text) job_source_url = extract_job_source_url(body_text) return StructuredJob( source=source, source_channel="DeJob", parser_name="dejob_official", parser_version="v1", chat_id=chat_id, message_id=message_id, message_date=message_date, job_type=job_type, company_name=company_name, industry_tags=industry_tags, company_intro=company_intro, company_url=company_url, work_mode=work_mode, job_nature=job_nature, job_location_text=job_location_text, job_location_tags=job_location_tags, employment_type_raw=employment_type_raw, position_name=position_name, position_tags=position_tags, salary_raw=salary_raw, salary_currency=salary_currency, salary_min=salary_min, salary_max=salary_max, salary_period=salary_period, responsibilities=responsibilities, requirements=requirements, apply_email=apply_email, apply_telegram=apply_tg, job_source_url=job_source_url, body_text=body_text or "empty_message", raw_content=raw_content, ) def parse_generic( source: str, chat_id: int | None, message_id: int, message_date: str, body_text: str, raw_content: str, ) -> StructuredJob: hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] hashtags = dedupe([h for h in hashtags if h]) urls = URL_RE.findall(body_text) emails = EMAIL_RE.findall(body_text) tgs = TG_RE.findall(body_text) title = None for ln in body_text.splitlines(): t = clean_md_text(ln) if t: title = t[:120] break salary_line = None for ln in body_text.splitlines(): if any(k in ln.lower() for k in ("salary", "薪资", "薪酬", "k/", "$")): salary_line = ln break salary_raw, salary_currency, salary_min, salary_max, salary_period = parse_salary( salary_line ) job_type = "招聘" if ("招聘" in body_text or "recruit" in body_text.lower()) else None ( work_mode, job_nature, job_location_text, job_location_tags, employment_type_raw, ) = infer_employment_fields(hashtags, None) return StructuredJob( source=source, source_channel=None, parser_name="generic", parser_version="v1", chat_id=chat_id, message_id=message_id, message_date=message_date, job_type=job_type, company_name=None, industry_tags=hashtags, company_intro=None, company_url=urls[0] if urls else None, work_mode=work_mode, job_nature=job_nature, job_location_text=job_location_text, job_location_tags=job_location_tags, employment_type_raw=employment_type_raw, position_name=title, position_tags=hashtags, salary_raw=salary_raw, salary_currency=salary_currency, salary_min=salary_min, salary_max=salary_max, salary_period=salary_period, responsibilities=[], requirements=[], apply_email=emails[0] if emails else None, apply_telegram=tgs[0] if tgs else None, job_source_url=None, body_text=body_text or "empty_message", raw_content=raw_content, ) def parse_dejob_global( source: str, chat_id: int | None, message_id: int, message_date: str, body_text: str, raw_content: str, ) -> StructuredJob: job_type = "招聘" if ("招聘" in body_text or "recruit" in body_text.lower()) else None company_name = extract_company_name_dejob(body_text) industry_tags = [] for ln in body_text.splitlines(): if "🏡" in ln: norm_ln = normalize_md_line(ln) industry_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] industry_tags = dedupe([t for t in industry_tags if t]) break if not industry_tags: industry_tags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] industry_tags = dedupe([h for h in industry_tags if h]) cooperation_tags = [] cooperation_line = None for ln in body_text.splitlines(): low = ln.lower() if "合作方式" in ln or "fulltime" in low or "parttime" in low or "remote" in low: cooperation_line = ln norm_ln = normalize_md_line(ln) cooperation_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] cooperation_tags = dedupe([t for t in cooperation_tags if t]) break ( work_mode, job_nature, job_location_text, job_location_tags, employment_type_raw, ) = infer_employment_fields(cooperation_tags, cooperation_line) position_tags = [] for ln in body_text.splitlines(): if "待招岗位" in ln or "📚" in ln: norm_ln = normalize_md_line(ln) position_tags = [ clean_md_text(t).replace("·", " ") for t in MD_TAG_RE.findall(norm_ln) ] position_tags = dedupe([t for t in position_tags if t]) break if not position_tags: position_tags = industry_tags position_name = position_tags[0] if position_tags else extract_first_nonempty_line(body_text) intro_sec = extract_section(body_text, "Introduction") or extract_section(body_text, "简介") urls = extract_urls(body_text) company_url = extract_first_url_by_keyword(body_text, ["dejob.top/jobDetail"]) if company_url and urls: for u in urls: if "dejob.top/jobDetail" not in u: company_url = u break if not company_url: company_url = extract_first_url(intro_sec) or (urls[0] if urls else None) company_intro = None if intro_sec: intro_lines = [ln for ln in intro_sec.splitlines() if not URL_RE.search(ln)] company_intro = clean_md_text("\n".join(intro_lines)) or None salary_line = None for ln in body_text.splitlines(): if "薪酬" in ln or "salary" in ln.lower(): salary_line = ln break salary_raw, salary_currency, salary_min, salary_max, salary_period = parse_salary( salary_line ) responsibilities = extract_list_section(body_text, "岗位职责") or extract_list_section( body_text, "Responsibilities" ) requirements = extract_list_section(body_text, "岗位要求") or extract_list_section( body_text, "Requirements" ) apply_email = extract_apply_email(body_text) apply_tg = extract_apply_telegram(body_text) job_source_url = extract_first_url_by_keyword(body_text, ["dejob.top/jobDetail"]) if not job_source_url: urls = extract_urls(body_text) job_source_url = urls[0] if urls else None return StructuredJob( source=source, source_channel="DeJob", parser_name="dejob_global", parser_version="v1", chat_id=chat_id, message_id=message_id, message_date=message_date, job_type=job_type, company_name=company_name, industry_tags=industry_tags, company_intro=company_intro, company_url=company_url, work_mode=work_mode, job_nature=job_nature, job_location_text=job_location_text, job_location_tags=job_location_tags, employment_type_raw=employment_type_raw, position_name=position_name, position_tags=position_tags, salary_raw=salary_raw, salary_currency=salary_currency, salary_min=salary_min, salary_max=salary_max, salary_period=salary_period, responsibilities=responsibilities, requirements=requirements, apply_email=apply_email, apply_telegram=apply_tg, job_source_url=job_source_url or company_url, body_text=body_text or "empty_message", raw_content=raw_content, ) def parse_remote_cn( source: str, chat_id: int | None, message_id: int, message_date: str, body_text: str, raw_content: str, ) -> StructuredJob: lines = [clean_md_text(ln) for ln in body_text.splitlines() if clean_md_text(ln)] title = lines[0] if lines else None hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] hashtags = dedupe([h for h in hashtags if h]) ( work_mode, job_nature, job_location_text, job_location_tags, employment_type_raw, ) = infer_employment_fields(hashtags, None) summary_line = None for ln in lines: if ln.startswith("摘要:"): summary_line = ln break salary_raw, salary_currency, salary_min, salary_max, salary_period = parse_salary( summary_line ) urls = extract_urls(body_text) apply_email = extract_apply_email(body_text) apply_tg = extract_apply_telegram(body_text) # remote_cn often places the detail link right below the title line. top_url = None raw_lines = [ln.strip() for ln in body_text.splitlines() if ln.strip()] for ln in raw_lines[:6]: found = URL_RE.findall(ln) if found: top_url = found[0] break job_source_url = ( top_url or extract_first_url_by_keyword(body_text, ["remote-info.cn/jobs/"]) or (urls[0] if urls else None) ) job_type = "招聘" if ("招聘" in body_text or "job" in body_text.lower()) else None return StructuredJob( source=source, source_channel="remote_cn", parser_name="remote_cn", parser_version="v1", chat_id=chat_id, message_id=message_id, message_date=message_date, job_type=job_type, company_name=None, industry_tags=hashtags, company_intro=summary_line.replace("摘要:", "", 1).strip() if summary_line else None, company_url=job_source_url or (urls[0] if urls else None), work_mode=work_mode, job_nature=job_nature, job_location_text=job_location_text, job_location_tags=job_location_tags, employment_type_raw=employment_type_raw, position_name=title, position_tags=hashtags, salary_raw=salary_raw, salary_currency=salary_currency, salary_min=salary_min, salary_max=salary_max, salary_period=salary_period, responsibilities=[], requirements=[], apply_email=apply_email, apply_telegram=apply_tg, job_source_url=job_source_url, body_text=body_text or "empty_message", raw_content=raw_content, ) def parse_cryptojobslist_source( source: str, chat_id: int | None, message_id: int, message_date: str, body_text: str, raw_content: str, ) -> StructuredJob: lines = [clean_md_text(ln) for ln in body_text.splitlines() if clean_md_text(ln)] title = lines[0] if lines else None urls = extract_urls(body_text) hashtags = [h.replace("·", " ").strip() for h in HASHTAG_RE.findall(body_text)] hashtags = dedupe([h for h in hashtags if h]) ( work_mode, job_nature, job_location_text, job_location_tags, employment_type_raw, ) = infer_employment_fields(hashtags, None) salary_line = None for ln in lines: if any(k in ln.lower() for k in ("salary", "$", "usd")): salary_line = ln break salary_raw, salary_currency, salary_min, salary_max, salary_period = parse_salary( salary_line ) apply_email = extract_apply_email(body_text) apply_tg = extract_apply_telegram(body_text) apply_link = extract_apply_link(body_text) job_source_url = ( apply_link or extract_first_url_by_keyword(body_text, ["cryptojobslist.com"]) or (urls[0] if urls else None) ) job_type = "招聘" if ("job" in body_text.lower() or "hiring" in body_text.lower()) else None return StructuredJob( source=source, source_channel="cryptojobslist", parser_name="cryptojobslist", parser_version="v1", chat_id=chat_id, message_id=message_id, message_date=message_date, job_type=job_type, company_name=None, industry_tags=hashtags, company_intro=None, company_url=job_source_url or (urls[0] if urls else None), work_mode=work_mode, job_nature=job_nature, job_location_text=job_location_text, job_location_tags=job_location_tags, employment_type_raw=employment_type_raw, position_name=title, position_tags=hashtags, salary_raw=salary_raw, salary_currency=salary_currency, salary_min=salary_min, salary_max=salary_max, salary_period=salary_period, responsibilities=[], requirements=[], apply_email=apply_email, apply_telegram=apply_tg, job_source_url=job_source_url, body_text=body_text or "empty_message", raw_content=raw_content, ) def route_parse(row: tuple) -> StructuredJob: source, chat_id, message_id, content, message_date = row raw_content = content or "" body_text = preprocess_body_text(strip_meta_lines(raw_content)) if source == "@DeJob_official": return parse_dejob_official( source, chat_id, message_id, message_date, body_text, raw_content ) if source == "@DeJob_Global_group": return parse_dejob_global( source, chat_id, message_id, message_date, body_text, raw_content ) if source == "@remote_cn": return parse_remote_cn( source, chat_id, message_id, message_date, body_text, raw_content ) if source == "@cryptojobslist": return parse_cryptojobslist_source( source, chat_id, message_id, message_date, body_text, raw_content ) return parse_generic(source, chat_id, message_id, message_date, body_text, raw_content) def upsert_structured(conn, item: StructuredJob): with conn.cursor() as cur: cur.execute( """ INSERT INTO structured_jobs ( source, source_channel, parser_name, parser_version, chat_id, message_id, message_date, job_type, company_name, industry_tags_json, company_intro, company_url, work_mode, job_nature, job_location_text, job_location_tags_json, employment_type_raw, position_name, position_tags_json, salary_raw, salary_currency, salary_min, salary_max, salary_period, responsibilities_json, requirements_json, apply_email, apply_telegram, job_source_url, body_text, raw_content ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE source_channel=VALUES(source_channel), parser_name=VALUES(parser_name), parser_version=VALUES(parser_version), chat_id=VALUES(chat_id), message_date=VALUES(message_date), job_type=VALUES(job_type), company_name=VALUES(company_name), industry_tags_json=VALUES(industry_tags_json), company_intro=VALUES(company_intro), company_url=VALUES(company_url), work_mode=VALUES(work_mode), job_nature=VALUES(job_nature), job_location_text=VALUES(job_location_text), job_location_tags_json=VALUES(job_location_tags_json), employment_type_raw=VALUES(employment_type_raw), position_name=VALUES(position_name), position_tags_json=VALUES(position_tags_json), salary_raw=VALUES(salary_raw), salary_currency=VALUES(salary_currency), salary_min=VALUES(salary_min), salary_max=VALUES(salary_max), salary_period=VALUES(salary_period), responsibilities_json=VALUES(responsibilities_json), requirements_json=VALUES(requirements_json), apply_email=VALUES(apply_email), apply_telegram=VALUES(apply_telegram), job_source_url=VALUES(job_source_url), body_text=VALUES(body_text), raw_content=VALUES(raw_content), cleaned_at=CURRENT_TIMESTAMP """, ( item.source, item.source_channel, item.parser_name, item.parser_version, item.chat_id, item.message_id, item.message_date, item.job_type, item.company_name, json.dumps(item.industry_tags, ensure_ascii=False), item.company_intro, item.company_url, item.work_mode, item.job_nature, item.job_location_text, json.dumps(item.job_location_tags, ensure_ascii=False) if item.job_location_tags is not None else None, item.employment_type_raw, item.position_name, json.dumps(item.position_tags, ensure_ascii=False), item.salary_raw, item.salary_currency, item.salary_min, item.salary_max, item.salary_period, json.dumps(item.responsibilities, ensure_ascii=False), json.dumps(item.requirements, ensure_ascii=False), item.apply_email, item.apply_telegram, item.job_source_url, item.body_text, item.raw_content, ), ) def is_recruitment_job(item: StructuredJob) -> bool: return item.job_type == "招聘" def has_usable_job_link(item: StructuredJob) -> bool: return bool((item.job_source_url or "").strip()) def get_last_processed_row_id(conn, pipeline_name: str) -> int: with conn.cursor() as cur: cur.execute( "SELECT COALESCE(last_message_row_id, 0) FROM clean_state WHERE pipeline_name=%s", (pipeline_name,), ) row = cur.fetchone() return int(row[0]) if row else 0 def set_last_processed_row_id(conn, pipeline_name: str, row_id: int): with conn.cursor() as cur: cur.execute( """ INSERT INTO clean_state (pipeline_name, last_message_row_id, updated_at) VALUES (%s, %s, NOW()) ON DUPLICATE KEY UPDATE last_message_row_id=VALUES(last_message_row_id), updated_at=NOW() """, (pipeline_name, row_id), ) def main(): mysql_cfg = load_mysql_config() conn = connect_mysql(mysql_cfg) try: init_target_db(conn) last_row_id = get_last_processed_row_id(conn, PIPELINE_NAME) logger.info(f"增量清洗起点 messages.id > {last_row_id}") with conn.cursor() as src_cur: src_cur.execute( """ SELECT id, source, chat_id, message_id, content, date FROM messages WHERE id > %s ORDER BY id ASC """, (last_row_id,), ) rows = src_cur.fetchall() processed = 0 inserted = 0 skipped_non_recruit = 0 skipped_no_link = 0 by_parser = {} max_row_id = last_row_id for row in rows: row_id, source, chat_id, message_id, content, message_date = row item = route_parse((source, chat_id, message_id, content, message_date)) processed += 1 by_parser[item.parser_name] = by_parser.get(item.parser_name, 0) + 1 if row_id > max_row_id: max_row_id = row_id if not is_recruitment_job(item): skipped_non_recruit += 1 continue if not has_usable_job_link(item): skipped_no_link += 1 continue upsert_structured(conn, item) inserted += 1 if processed % 500 == 0: logger.info( f"[clean] processed={processed}, inserted={inserted}, " f"skipped_non_recruit={skipped_non_recruit}, skipped_no_link={skipped_no_link}" ) if max_row_id > last_row_id: set_last_processed_row_id(conn, PIPELINE_NAME, max_row_id) logger.info(f"更新检查点 last_message_row_id={max_row_id}") with conn.cursor() as cur: cur.execute("SELECT count(*) FROM structured_jobs") total = cur.fetchone()[0] logger.info( "[done] " f"structured_jobs={total}, inserted={inserted}, skipped_non_recruit={skipped_non_recruit}, " f"skipped_no_link={skipped_no_link}, " f"target=mysql.structured_jobs, parsers={by_parser}" ) if processed == 0: logger.info("无新增消息,清洗完成") except Exception: logger.exception("清洗任务失败") raise finally: conn.close() if __name__ == "__main__": main()