Files
tg_crawl/sync_to_cloud_mysql.py
2026-03-05 23:55:18 +08:00

529 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sync local MySQL data to cloud MySQL daily (incremental + upsert).
Synced tables:
- messages (incremental by local id)
- sync_state (full upsert, small table)
- structured_jobs (incremental by local id + recent updates by cleaned_at)
- clean_state (full upsert, small table)
- internship_jobs_raw (incremental by local id, if table exists)
State is stored on cloud DB in table `cloud_sync_state`.
"""
import json
import logging
import os
from datetime import datetime, timezone
import pymysql
CONFIG_FILE = "config.json"
PIPELINE_NAME = "local_to_cloud_mysql_v1"
BATCH_SIZE = 1000
def qid(name: str) -> str:
return f"`{name.replace('`', '``')}`"
def setup_logger() -> logging.Logger:
os.makedirs("logs", exist_ok=True)
logger = logging.getLogger("sync_to_cloud_mysql")
logger.setLevel(logging.INFO)
if logger.handlers:
return logger
fmt = logging.Formatter(
"[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
ch = logging.StreamHandler()
ch.setFormatter(fmt)
fh = logging.FileHandler("logs/sync_to_cloud_mysql.log", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
logger = setup_logger()
def load_config() -> tuple[dict, dict]:
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"未找到配置文件: {CONFIG_FILE}")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
local_cfg = cfg.get("mysql", {})
cloud_cfg = cfg.get("mysql_cloud", {})
if not isinstance(local_cfg, dict) or not isinstance(cloud_cfg, dict):
raise ValueError("配置错误: mysql / mysql_cloud 必须是对象")
def norm(db: dict, env_prefix: str, defaults: dict) -> dict:
out = {
"host": db.get("host") or os.getenv(f"{env_prefix}_HOST", defaults["host"]),
"port": int(db.get("port") or os.getenv(f"{env_prefix}_PORT", defaults["port"])),
"user": db.get("user") or os.getenv(f"{env_prefix}_USER", defaults["user"]),
"password": db.get("password") or os.getenv(f"{env_prefix}_PASSWORD", ""),
"database": db.get("database") or os.getenv(f"{env_prefix}_DATABASE", defaults["database"]),
"charset": db.get("charset") or os.getenv(f"{env_prefix}_CHARSET", "utf8mb4"),
}
if not out["password"] or out["password"] == "CHANGE_ME":
raise ValueError(f"配置错误: {env_prefix}.password 不能为空")
return out
local = norm(local_cfg, "MYSQL_LOCAL", {"host": "127.0.0.1", "port": "3306", "user": "jobs_user", "database": "jobs"})
cloud = norm(cloud_cfg, "MYSQL_CLOUD", {"host": "127.0.0.1", "port": "3306", "user": "jobs_user", "database": "jobs"})
return local, cloud
def connect_mysql(cfg: dict):
return pymysql.connect(
host=cfg["host"],
port=cfg["port"],
user=cfg["user"],
password=cfg["password"],
database=cfg["database"],
charset=cfg["charset"],
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
)
def ensure_cloud_tables(cloud_conn):
with cloud_conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS cloud_sync_state (
pipeline_name VARCHAR(128) PRIMARY KEY,
last_messages_id BIGINT NOT NULL DEFAULT 0,
last_structured_jobs_id BIGINT NOT NULL DEFAULT 0,
last_internship_id BIGINT NOT NULL DEFAULT 0,
last_structured_sync_at DATETIME NULL,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
def ensure_destination_tables(local_conn, cloud_conn):
required = ["messages", "sync_state", "structured_jobs", "clean_state"]
optional = ["internship_jobs_raw"]
for table in required + optional:
if not table_exists(local_conn, table):
if table in required:
raise RuntimeError(f"本地缺少必要表: {table}")
logger.info(f"skip create cloud table {table}: local table not exists")
continue
if table_exists(cloud_conn, table):
continue
with local_conn.cursor() as cur:
cur.execute(f"SHOW CREATE TABLE `{table}`")
row = cur.fetchone()
ddl = row.get("Create Table") if isinstance(row, dict) else None
if not ddl:
raise RuntimeError(f"无法读取本地表结构: {table}")
with cloud_conn.cursor() as cur:
cur.execute(ddl)
logger.info(f"created cloud table from local ddl: {table}")
def ensure_cloud_column(local_conn, cloud_conn, table: str, column: str):
if not table_exists(local_conn, table) or not table_exists(cloud_conn, table):
return
with local_conn.cursor() as cur:
cur.execute(f"SHOW COLUMNS FROM {qid(table)} LIKE %s", (column,))
local_col = cur.fetchone()
if not local_col:
return
with cloud_conn.cursor() as cur:
cur.execute(f"SHOW COLUMNS FROM {qid(table)} LIKE %s", (column,))
cloud_col = cur.fetchone()
if cloud_col:
return
# Build minimal compatible ADD COLUMN from local definition.
col_type = local_col.get("Type")
nullable = local_col.get("Null", "YES") == "YES"
default_val = local_col.get("Default")
extra = local_col.get("Extra") or ""
parts = [f"ALTER TABLE {qid(table)} ADD COLUMN {qid(column)} {col_type}"]
parts.append("NULL" if nullable else "NOT NULL")
if default_val is not None:
if isinstance(default_val, str):
escaped = default_val.replace("'", "''")
parts.append(f"DEFAULT '{escaped}'")
else:
parts.append(f"DEFAULT {default_val}")
if extra:
parts.append(extra)
sql = " ".join(parts)
with cloud_conn.cursor() as cur:
cur.execute(sql)
logger.info(f"added missing cloud column: {table}.{column}")
def get_cloud_state(cloud_conn) -> dict:
with cloud_conn.cursor() as cur:
cur.execute(
"""
SELECT last_messages_id, last_structured_jobs_id, last_internship_id, last_structured_sync_at
FROM cloud_sync_state
WHERE pipeline_name=%s
""",
(PIPELINE_NAME,),
)
row = cur.fetchone()
if not row:
return {
"last_messages_id": 0,
"last_structured_jobs_id": 0,
"last_internship_id": 0,
"last_structured_sync_at": None,
}
return row
def set_cloud_state(cloud_conn, state: dict):
with cloud_conn.cursor() as cur:
cur.execute(
"""
INSERT INTO cloud_sync_state (
pipeline_name, last_messages_id, last_structured_jobs_id,
last_internship_id, last_structured_sync_at, updated_at
) VALUES (%s, %s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
last_messages_id=VALUES(last_messages_id),
last_structured_jobs_id=VALUES(last_structured_jobs_id),
last_internship_id=VALUES(last_internship_id),
last_structured_sync_at=VALUES(last_structured_sync_at),
updated_at=NOW()
""",
(
PIPELINE_NAME,
state["last_messages_id"],
state["last_structured_jobs_id"],
state["last_internship_id"],
state["last_structured_sync_at"],
),
)
def table_exists(conn, table: str) -> bool:
with conn.cursor() as cur:
cur.execute("SHOW TABLES LIKE %s", (table,))
return cur.fetchone() is not None
def sync_messages(local_conn, cloud_conn, last_id: int) -> int:
logger.info(f"sync messages from id > {last_id}")
max_id = last_id
total = 0
while True:
with local_conn.cursor() as cur:
cur.execute(
"""
SELECT id, source, chat_id, message_id, content, date, created_at
FROM messages
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""",
(max_id, BATCH_SIZE),
)
rows = cur.fetchall()
if not rows:
break
with cloud_conn.cursor() as cur:
for r in rows:
cur.execute(
"""
INSERT INTO messages (source, chat_id, message_id, content, date, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
chat_id=VALUES(chat_id),
content=VALUES(content),
date=VALUES(date)
""",
(
r["source"],
r["chat_id"],
r["message_id"],
r["content"],
r["date"],
r["created_at"],
),
)
max_id = max(max_id, int(r["id"]))
total += 1
logger.info(f"messages synced batch={len(rows)}, total={total}, max_id={max_id}")
return max_id
def sync_small_table_full(local_conn, cloud_conn, table: str):
if not table_exists(local_conn, table):
logger.info(f"skip {table}: local table not exists")
return
with local_conn.cursor() as cur:
cur.execute(f"SELECT * FROM {table}")
rows = cur.fetchall()
if not rows:
logger.info(f"{table}: no rows")
return
cols = list(rows[0].keys())
col_sql = ", ".join(qid(c) for c in cols)
val_sql = ", ".join(["%s"] * len(cols))
# primary key handling for known small tables
if table == "sync_state":
pk = "source"
elif table == "clean_state":
pk = "pipeline_name"
else:
pk = cols[0]
update_cols = [c for c in cols if c != pk]
update_sql = ", ".join([f"{qid(c)}=VALUES({qid(c)})" for c in update_cols])
with cloud_conn.cursor() as cur:
for r in rows:
cur.execute(
f"""
INSERT INTO {qid(table)} ({col_sql}) VALUES ({val_sql})
ON DUPLICATE KEY UPDATE {update_sql}
""",
tuple(r[c] for c in cols),
)
logger.info(f"{table}: synced rows={len(rows)}")
def sync_structured_jobs(local_conn, cloud_conn, last_id: int, last_sync_at) -> tuple[int, str]:
logger.info(
f"sync structured_jobs from id > {last_id}, last_sync_at={last_sync_at}"
)
max_id = last_id
total = 0
touched_ids = set()
# 1) incremental new rows by id
while True:
with local_conn.cursor() as cur:
cur.execute(
"""
SELECT * FROM structured_jobs
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""",
(max_id, BATCH_SIZE),
)
rows = cur.fetchall()
if not rows:
break
_upsert_structured_rows(cloud_conn, rows)
for r in rows:
rid = int(r["id"])
max_id = max(max_id, rid)
touched_ids.add(rid)
total += len(rows)
logger.info(f"structured_jobs incremental batch={len(rows)}, total={total}, max_id={max_id}")
# 2) update window by cleaned_at (catch updated existing rows)
if last_sync_at is None:
last_sync_at = "1970-01-01 00:00:00"
if isinstance(last_sync_at, datetime):
last_sync_at = last_sync_at.strftime("%Y-%m-%d %H:%M:%S")
while True:
with local_conn.cursor() as cur:
cur.execute(
"""
SELECT * FROM structured_jobs
WHERE cleaned_at > %s
ORDER BY cleaned_at ASC, id ASC
LIMIT %s
""",
(last_sync_at, BATCH_SIZE),
)
rows = cur.fetchall()
if not rows:
break
# avoid repeated heavy updates in loop: only keep rows not already touched in incremental loop
delta = [r for r in rows if int(r["id"]) not in touched_ids]
if delta:
_upsert_structured_rows(cloud_conn, delta)
total += len(delta)
logger.info(f"structured_jobs update-window batch={len(delta)}, total={total}")
# move cursor forward to avoid pagination loops on timestamp boundary
last_row_cleaned_at = rows[-1]["cleaned_at"]
if isinstance(last_row_cleaned_at, datetime):
last_sync_at = last_row_cleaned_at.strftime("%Y-%m-%d %H:%M:%S")
elif last_row_cleaned_at:
last_sync_at = str(last_row_cleaned_at)
if len(rows) < BATCH_SIZE:
break
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
return max_id, now_utc
def _upsert_structured_rows(cloud_conn, rows: list[dict]):
if not rows:
return
cols = list(rows[0].keys())
# id is local technical key, do not sync into cloud table (cloud has own auto id)
cols = [c for c in cols if c != "id"]
col_sql = ", ".join(qid(c) for c in cols)
val_sql = ", ".join(["%s"] * len(cols))
update_cols = [c for c in cols if c not in ("source", "message_id")]
update_sql = ", ".join(
[f"{qid(c)}=VALUES({qid(c)})" for c in update_cols]
+ [f"{qid('cleaned_at')}=CURRENT_TIMESTAMP"]
)
with cloud_conn.cursor() as cur:
for r in rows:
cur.execute(
f"""
INSERT INTO {qid('structured_jobs')} ({col_sql})
VALUES ({val_sql})
ON DUPLICATE KEY UPDATE {update_sql}
""",
tuple(r[c] for c in cols),
)
def sync_internship_raw(local_conn, cloud_conn, last_id: int) -> int:
if not table_exists(local_conn, "internship_jobs_raw"):
logger.info("skip internship_jobs_raw: local table not exists")
return last_id
max_id = last_id
total = 0
while True:
with local_conn.cursor() as cur:
cur.execute(
"""
SELECT * FROM internship_jobs_raw
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""",
(max_id, BATCH_SIZE),
)
rows = cur.fetchall()
if not rows:
break
cols = [c for c in rows[0].keys() if c != "id"]
col_sql = ", ".join(qid(c) for c in cols)
val_sql = ", ".join(["%s"] * len(cols))
update_cols = [c for c in cols if c != "fingerprint"]
update_sql = ", ".join(
[f"{qid(c)}=VALUES({qid(c)})" for c in update_cols]
+ [f"{qid('imported_at')}=CURRENT_TIMESTAMP"]
)
with cloud_conn.cursor() as cur:
for r in rows:
cur.execute(
f"""
INSERT INTO {qid('internship_jobs_raw')} ({col_sql})
VALUES ({val_sql})
ON DUPLICATE KEY UPDATE {update_sql}
""",
tuple(r[c] for c in cols),
)
max_id = max(max_id, int(r["id"]))
total += 1
logger.info(f"internship_jobs_raw batch={len(rows)}, total={total}, max_id={max_id}")
return max_id
def main():
local_cfg, cloud_cfg = load_config()
logger.info(
"sync start: "
f"local={local_cfg['host']}:{local_cfg['port']}/{local_cfg['database']} -> "
f"cloud={cloud_cfg['host']}:{cloud_cfg['port']}/{cloud_cfg['database']}"
)
local_conn = connect_mysql(local_cfg)
cloud_conn = connect_mysql(cloud_cfg)
try:
ensure_cloud_tables(cloud_conn)
ensure_destination_tables(local_conn, cloud_conn)
ensure_cloud_column(local_conn, cloud_conn, "internship_jobs_raw", "updated_at_utc")
state = get_cloud_state(cloud_conn)
logger.info(f"state before sync: {state}")
state["last_messages_id"] = sync_messages(
local_conn, cloud_conn, int(state["last_messages_id"])
)
sync_small_table_full(local_conn, cloud_conn, "sync_state")
sync_small_table_full(local_conn, cloud_conn, "clean_state")
last_structured_id, last_structured_sync_at = sync_structured_jobs(
local_conn,
cloud_conn,
int(state["last_structured_jobs_id"]),
state["last_structured_sync_at"],
)
state["last_structured_jobs_id"] = last_structured_id
state["last_structured_sync_at"] = last_structured_sync_at
state["last_internship_id"] = sync_internship_raw(
local_conn, cloud_conn, int(state["last_internship_id"])
)
set_cloud_state(cloud_conn, state)
logger.info(f"state after sync: {state}")
logger.info("sync done")
except Exception:
logger.exception("sync failed")
raise
finally:
local_conn.close()
cloud_conn.close()
if __name__ == "__main__":
main()