ygp-gdzwfw-gov-cn/main.py
秦秋旭 5acb847bc1 优化爬虫:增量爬取、API参数优化、Excel兼容
主要变更:
1. 重命名 ygp_crawler.py -> main.py
2. API参数优化:
   - tradingProcess 固定传 "513,2C52,3C52" 精准筛选中标结果
   - pageSize 固定为 50 提高抓取效率
   - 通过 publishStartTime/publishEndTime 传入时间范围
3. 默认查询最近3个月(原为当天)
4. 增量爬取改为默认开启(移除 -i 参数)
5. CSV文件添加 UTF-8 BOM,Excel可直接打开
6. 更新 README.md 文档
7. 添加前端 JS 代码参考文件到 assets/ 目录

使用方法:
- 增量更新:python main.py
- 全量查询:rm results.csv && python main.py

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 00:37:07 +08:00

334 lines
11 KiB
Python

import argparse
import asyncio
import csv
import json
import os
import sys
import urllib.parse
from datetime import datetime, date, timedelta
import aiohttp
async def delay(ms: int):
"""Async delay in milliseconds."""
await asyncio.sleep(ms / 1000)
API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal"
def parse_args():
parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取")
parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)")
parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)")
return parser.parse_args()
def parse_api_date(date_str):
"""Parses date string from API (YYYYMMDDHHMMSS) to date object."""
if not date_str:
return None
try:
return datetime.strptime(date_str, "%Y%m%d%H%M%S").date()
except ValueError:
return None
def parse_csv_datetime(date_str):
"""Parses CSV datetime string (YYYY-MM-DD HH:mm:ss) to date object."""
if not date_str:
return None
try:
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").date()
except ValueError:
return None
def format_datetime(date_str):
"""Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss."""
if not date_str:
return ""
try:
dt = datetime.strptime(date_str, "%Y%m%d%H%M%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return date_str
def read_existing_csv(csv_path):
"""
Reads existing CSV file and returns:
- existing_data: list of rows (excluding header)
- min_date: oldest date in the file
- max_date: newest date in the file
"""
if not os.path.exists(csv_path):
return [], None, None
existing_data = []
min_date = None
max_date = None
try:
with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.reader(f)
header = next(reader, None) # Skip header
if not header:
return [], None, None
for row in reader:
if len(row) >= 2:
existing_data.append(row)
# Parse date from row (index 1 is 发布时间)
row_date = parse_csv_datetime(row[1])
if row_date:
if min_date is None or row_date < min_date:
min_date = row_date
if max_date is None or row_date > max_date:
max_date = row_date
except Exception as e:
print(f"Warning: Error reading existing CSV: {e}", file=sys.stderr)
return [], None, None
return existing_data, min_date, max_date
def construct_detail_url(item):
"""Constructs the detail page URL based on item data.
URL format derived from useJump-b2a96f17.js:
https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=...
Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType
"""
edition = item.get("edition", "v3")
trading_type = item.get("noticeSecondType", "A")
base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}"
params = {
"noticeId": item.get("noticeId", ""),
"projectCode": item.get("projectCode", ""),
"bizCode": item.get("tradingProcess", item.get("bizCode", "")),
"siteCode": item.get("regionCode", item.get("siteCode", "")),
"publishDate": item.get("publishDate", ""),
"source": item.get("pubServicePlat", ""),
"titleDetails": item.get("noticeSecondTypeDesc", ""),
"classify": item.get("projectType", "")
}
params = {k: v for k, v in params.items() if v}
query = urllib.parse.urlencode(params)
return f"{base_url}?{query}"
def build_search_payload(page_num=1, publish_start_time="", publish_end_time=""):
"""Build the search API payload.
Args:
page_num: Page number
publish_start_time: Start time in format YYYYMMDDHHMMSS
publish_end_time: End time in format YYYYMMDDHHMMSS
"""
return {
"pageNo": page_num,
"pageSize": 50, # Fixed page size 50
"keyword": "",
"siteCode": "44",
"secondType": "",
"tradingProcess": "513,2C52,3C52", # Fixed to search for 中标结果
"thirdType": "[]",
"projectType": "",
"publishStartTime": publish_start_time, # Format: YYYYMMDDHHMMSS
"publishEndTime": publish_end_time, # Format: YYYYMMDDHHMMSS
"type": "trading-type"
}
def process_items(items):
"""Process a batch of items - API already filters by tradingProcess and date range."""
page_results = []
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
page_results.append({
"项目标题": title,
"发布时间": format_datetime(pub_date_str),
"详细链接": construct_detail_url(item)
})
return page_results
async def fetch_page(session, page_num, publish_start_time="", publish_end_time=""):
"""Fetch a single page of data from the API."""
url = f"{API_BASE_URL}/search/v2/items"
payload = build_search_payload(page_num, publish_start_time, publish_end_time)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "application/json, text/plain, */*",
"Origin": "https://ygp.gdzwfw.gov.cn",
"Referer": "https://ygp.gdzwfw.gov.cn/"
}
try:
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
return None
return await response.json()
except Exception as e:
print(f"Error fetching page {page_num}: {e}", file=sys.stderr)
return None
def deduplicate_results(new_results, existing_data):
"""Remove duplicates from new_results based on existing_data."""
# Create a set of existing URLs for fast lookup
existing_urls = set()
for row in existing_data:
if len(row) >= 3:
existing_urls.add(row[2]) # 详细链接 is the 3rd column
# Filter out duplicates
unique_results = []
for r in new_results:
if r["详细链接"] not in existing_urls:
unique_results.append(r)
return unique_results
def format_api_datetime(dt):
"""Format datetime to API format YYYYMMDDHHMMSS."""
return dt.strftime("%Y%m%d%H%M%S")
async def run():
args = parse_args()
today = date.today()
# Default: last 3 months
three_months_ago = today - timedelta(days=90)
start_date = three_months_ago
end_date = today
# Always use incremental mode - read existing CSV if exists
existing_data = []
csv_min_date = None
csv_max_date = None
if os.path.exists(args.output):
existing_data, csv_min_date, csv_max_date = read_existing_csv(args.output)
if csv_min_date and csv_max_date:
print(f"Existing data range: {csv_min_date} to {csv_max_date}", file=sys.stderr)
print(f"Existing records: {len(existing_data)}", file=sys.stderr)
# Determine date range
if args.start_date:
try:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date()
except ValueError:
print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr)
sys.exit(1)
elif csv_max_date:
# Without explicit start_date, fetch from max_date+1 to today
start_date = csv_max_date + timedelta(days=1)
if args.end_date:
try:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date()
except ValueError:
print(f"Error: Invalid end date format: {args.end_date}", file=sys.stderr)
sys.exit(1)
if start_date > end_date:
print(f"Info: Start date {start_date} is after end date {end_date}, no new data to fetch.", file=sys.stderr)
print(f"Existing records: {len(existing_data)}", file=sys.stderr)
sys.exit(0)
# Format time for API: YYYYMMDDHHMMSS
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time().replace(microsecond=0))
publish_start_time = format_api_datetime(start_datetime)
publish_end_time = format_api_datetime(end_datetime)
print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr)
print(f"API time range: {publish_start_time} to {publish_end_time}", file=sys.stderr)
print(f"Output file: {args.output}", file=sys.stderr)
# Collect all new results first
new_results = []
async with aiohttp.ClientSession() as session:
page_num = 1
while True:
print(f"Processing page {page_num}...", file=sys.stderr)
await delay(500)
resp = await fetch_page(session, page_num, publish_start_time, publish_end_time)
if resp is None:
print("Failed to fetch data. Stopping.", file=sys.stderr)
break
data = resp.get("data", {})
items = data.get("pageData", [])
if not items:
print("No more items.", file=sys.stderr)
break
results = process_items(items)
new_results.extend(results)
# Print to console immediately
for r in results:
print(json.dumps(r, ensure_ascii=False))
sys.stdout.flush()
pages = data.get("pageTotal", 0)
if page_num >= pages:
print("Reached last page.", file=sys.stderr)
break
page_num += 1
await delay(1000)
print(f"\nNew results fetched: {len(new_results)}", file=sys.stderr)
# Always deduplicate against existing data
if existing_data:
new_results = deduplicate_results(new_results, existing_data)
print(f"After deduplication: {len(new_results)}", file=sys.stderr)
# Write to CSV: new data first, then existing data
# Use utf-8-sig to add BOM for Excel to auto-detect encoding and delimiter
with open(args.output, "w", newline="", encoding="utf-8-sig") as csv_file:
csv_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(["项目标题", "发布时间", "详细链接"])
# Write new results first (newer data)
for r in new_results:
csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]])
# Write existing data (older data)
for row in existing_data:
csv_writer.writerow(row[:3]) # Only write first 3 columns
total_records = len(new_results) + len(existing_data)
print(f"Total records in CSV: {total_records}", file=sys.stderr)
if __name__ == "__main__":
asyncio.run(run())