260327-窗口迭代逻辑修复,不是每天只跑一条
This commit is contained in:
17
main.py
17
main.py
@@ -403,7 +403,8 @@ async def scrape_one_source(
|
|||||||
f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), "
|
f"[{source_key}] 窗口增量模式 start={start_dt} end={end_dt} (UTC), "
|
||||||
f"message_id > {last_id}"
|
f"message_id > {last_id}"
|
||||||
)
|
)
|
||||||
iterator = client.iter_messages(entity, min_id=last_id, reverse=True)
|
# reverse=False: 从新到旧遍历,结合时间窗口可快速终止,避免扫全量历史。
|
||||||
|
iterator = client.iter_messages(entity, min_id=last_id, reverse=False)
|
||||||
else:
|
else:
|
||||||
last_id = store.get_last_message_id(source_key)
|
last_id = store.get_last_message_id(source_key)
|
||||||
logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始")
|
logger.info(f"[{source_key}] 增量模式,从 message_id > {last_id} 开始")
|
||||||
@@ -411,6 +412,10 @@ async def scrape_one_source(
|
|||||||
|
|
||||||
async for message in iterator:
|
async for message in iterator:
|
||||||
scanned += 1
|
scanned += 1
|
||||||
|
msg_id = int(message.id)
|
||||||
|
if msg_id > max_seen_id:
|
||||||
|
max_seen_id = msg_id
|
||||||
|
|
||||||
message_dt = message.date.astimezone(timezone.utc)
|
message_dt = message.date.astimezone(timezone.utc)
|
||||||
|
|
||||||
if window_mode:
|
if window_mode:
|
||||||
@@ -419,16 +424,12 @@ async def scrape_one_source(
|
|||||||
if start_dt and message_dt < start_dt:
|
if start_dt and message_dt < start_dt:
|
||||||
break
|
break
|
||||||
|
|
||||||
msg_id = int(message.id)
|
|
||||||
msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S")
|
msg_date = message_dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
content = build_message_content(message)
|
content = build_message_content(message)
|
||||||
|
|
||||||
if store.save_message(source_key, chat_id, msg_id, content, msg_date):
|
if store.save_message(source_key, chat_id, msg_id, content, msg_date):
|
||||||
inserted += 1
|
inserted += 1
|
||||||
|
|
||||||
if msg_id > max_seen_id:
|
|
||||||
max_seen_id = msg_id
|
|
||||||
|
|
||||||
if scanned % 200 == 0:
|
if scanned % 200 == 0:
|
||||||
logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条")
|
logger.info(f"[{source_key}] 进度: 扫描 {scanned} 条, 新增 {inserted} 条")
|
||||||
|
|
||||||
@@ -441,6 +442,12 @@ async def scrape_one_source(
|
|||||||
if max_seen_id > old_last:
|
if max_seen_id > old_last:
|
||||||
store.set_last_message_id(source_key, max_seen_id)
|
store.set_last_message_id(source_key, max_seen_id)
|
||||||
|
|
||||||
|
if window_mode and scanned <= 1:
|
||||||
|
logger.warning(
|
||||||
|
f"[{source_key}] 本次仅扫描 {scanned} 条消息,请检查源最近是否活跃,"
|
||||||
|
"或确认 time_window 配置与系统时钟是否正确。"
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条")
|
logger.info(f"[{source_key}] 完成: 扫描 {scanned} 条, 新增 {inserted} 条")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,27 @@ if [[ ! -x "$PY_BIN" ]]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# Dependency preflight (avoid silent runtime failures after cron upgrades)
|
||||||
|
if ! "$PY_BIN" - <<'PY' >> "$LOG_DIR/daily_job.log" 2>&1
|
||||||
|
import importlib
|
||||||
|
missing = []
|
||||||
|
for name in ("telethon", "pymysql", "cryptography"):
|
||||||
|
try:
|
||||||
|
importlib.import_module(name)
|
||||||
|
except Exception:
|
||||||
|
missing.append(name)
|
||||||
|
|
||||||
|
if missing:
|
||||||
|
print(f"missing python packages: {', '.join(missing)}")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
print("python dependency preflight passed")
|
||||||
|
PY
|
||||||
|
then
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S')] dependency preflight failed, exit" >> "$LOG_DIR/daily_job.log"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
# Auto-advance time window to a rolling daily range.
|
# Auto-advance time window to a rolling daily range.
|
||||||
"$PY_BIN" - <<'PY'
|
"$PY_BIN" - <<'PY'
|
||||||
import json
|
import json
|
||||||
|
|||||||
Reference in New Issue
Block a user