Compare commits

..

2 Commits

3 changed files with 204 additions and 144 deletions

View File

@ -8,12 +8,13 @@
- **日期过滤**:支持指定开始和结束日期,默认为抓取当天数据。
- **自动分页**:自动处理多页数据抓取。
- **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。
- **处理动态加密**:利用 Playwright 驱动浏览器,自动处理 API 请求头中的加密签名。
- **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API无需浏览器轻量高效。
- **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。
- **自定义输出路径**:支持通过参数指定输出文件路径。
## 环境要求
- Python 3.8+
- Chromium 浏览器 (由 Playwright 自动安装)
## 安装步骤
@ -34,11 +35,6 @@
pip install -r requirements.txt
```
4. **安装 Playwright 浏览器内核**
```bash
playwright install chromium
```
## 使用方法
### 1. 抓取今天发布的数据 (默认)
@ -53,18 +49,25 @@ python ygp_crawler.py
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04
```
### 3. 保存抓取结果
脚本将结果以 JSON 格式输出到控制台。可以使用重定向将其保存到文件中
### 3. 自定义输出文件路径
使用 `-o``--output` 参数指定输出 CSV 文件的路径(默认为 `results.csv`
```bash
python ygp_crawler.py --start-date 2026-02-01 > results.jsonl
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv
```
## 数据输出示例
### 终端输出JSON 格式,便于阅读)
```json
{
"项目标题": "某某项目中标结果公示",
"发布时间": "20260204173002",
"发布时间": "2026-02-04 17:30:02",
"详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..."
}
```
### CSV 文件格式
```csv
项目标题,发布时间,详细链接
某某项目中标结果公示,2026-02-04 17:30:02,https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?...
```

View File

@ -1 +1 @@
playwright
aiohttp

View File

@ -1,17 +1,28 @@
import argparse
import asyncio
import csv
import json
import sys
import urllib.parse
from datetime import datetime, date
from playwright.async_api import async_playwright
import aiohttp
async def delay(ms: int):
"""Async delay in milliseconds."""
await asyncio.sleep(ms / 1000)
API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal"
def parse_args():
parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取")
parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)")
parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)")
return parser.parse_args()
def parse_api_date(date_str):
"""Parses date string from API (YYYYMMDDHHMMSS) to date object."""
if not date_str:
@ -21,179 +32,225 @@ def parse_api_date(date_str):
except ValueError:
return None
def format_datetime(date_str):
"""Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss."""
if not date_str:
return ""
try:
dt = datetime.strptime(date_str, "%Y%m%d%H%M%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return date_str
def construct_detail_url(item):
"""Constructs the detail page URL based on item data."""
# Pattern derived from analysis:
# https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=...
base_url = "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A"
# Map API fields to URL parameters
# Note: 'nodeId' is missing as it's not in the list response.
# We assume the page handles missing nodeId or it's not strictly required for direct access.
"""Constructs the detail page URL based on item data.
URL format derived from useJump-b2a96f17.js:
https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=...
Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType
"""
edition = item.get("edition", "v3")
trading_type = item.get("noticeSecondType", "A")
base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}"
params = {
"noticeId": item.get("noticeId", ""),
"projectCode": item.get("projectCode", ""),
"bizCode": item.get("tradingProcess", ""),
"siteCode": item.get("siteCode", ""),
"bizCode": item.get("tradingProcess", item.get("bizCode", "")),
"siteCode": item.get("regionCode", item.get("siteCode", "")),
"publishDate": item.get("publishDate", ""),
"source": item.get("pubServicePlat", ""),
"titleDetails": item.get("noticeSecondTypeDesc", ""),
"classify": item.get("projectType", "")
}
params = {k: v for k, v in params.items() if v}
query = urllib.parse.urlencode(params)
return f"{base_url}?{query}"
def build_search_payload(page_num=1, page_size=10):
"""Build the search API payload.
Based on analysis of the frontend code (JyggFilter component).
"""
return {
"pageNo": page_num,
"pageSize": page_size,
"keyword": "",
"siteCode": "44",
"secondType": "",
"tradingProcess": "",
"thirdType": "[]",
"projectType": "",
"publishStartTime": "",
"publishEndTime": "",
"type": "trading-type"
}
def process_items(items, start_date, end_date):
"""Process a batch of items and filter by date and keyword."""
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
if item_date > end_date:
continue
if item_date < start_date:
continue
if "中标结果" in title:
page_results.append({
"项目标题": title,
"发布时间": format_datetime(pub_date_str),
"详细链接": construct_detail_url(item)
})
# Only stop if all items on this page are older than start_date
# and there are no matching results
if min_date_on_page and min_date_on_page < start_date and not page_results:
# Check if the newest item is also older than start_date
max_date_on_page = None
for item in items:
item_date = parse_api_date(item.get("publishDate", ""))
if item_date:
if max_date_on_page is None or item_date > max_date_on_page:
max_date_on_page = item_date
if max_date_on_page and max_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
async def fetch_page(session, page_num, page_size=10):
"""Fetch a single page of data from the API."""
url = f"{API_BASE_URL}/search/v2/items"
payload = build_search_payload(page_num, page_size)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "application/json, text/plain, */*",
"Origin": "https://ygp.gdzwfw.gov.cn",
"Referer": "https://ygp.gdzwfw.gov.cn/"
}
try:
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
return None
return await response.json()
except Exception as e:
print(f"Error fetching page {page_num}: {e}", file=sys.stderr)
return None
async def run():
args = parse_args()
# Determine Date Range
today = date.today()
start_date = today
end_date = today
if args.start_date:
try:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date()
except ValueError:
print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr)
sys.exit(1)
if args.end_date:
try:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date()
except ValueError:
print(f"Error: Invalid end date format: {args.end_date}", file=sys.stderr)
sys.exit(1)
if start_date > end_date:
print(f"Error: Start date {start_date} is after end date {end_date}", file=sys.stderr)
sys.exit(1)
print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr)
print(f"Output file: {args.output}", file=sys.stderr)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800}
)
page = await context.new_page()
# Open CSV file and write header
csv_file = open(args.output, "w", newline="", encoding="utf-8")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["项目标题", "发布时间", "详细链接"])
csv_file.flush()
# Helper to process a batch of items
def process_items(items):
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
# Date Filter
if item_date > end_date:
continue # Too new, skip
if item_date < start_date:
# Found an item older than start date.
# Since lists are usually ordered, this suggests we might be done.
# However, to be safe (pinned items?), we just skip it here,
# but set a signal that we *might* want to stop if the whole page is old.
pass
# Keyword Filter
if "中标结果" in title:
# Add to results
page_results.append({
"项目标题": title,
"发布时间": pub_date_str,
"详细链接": construct_detail_url(item)
})
# Stop condition: If the newest item on the page (or min_date) is older than start_date?
# Actually, valid items could be anywhere if not strictly sorted.
# But "min_date_on_page < start_date" means the page contains items older than target.
# If the *entire* page is older than start_date, we definitely stop.
# Let's assume strict reverse chronological order for efficiency.
if min_date_on_page and min_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
try:
print("Loading page...", file=sys.stderr)
# Setup response listener for the INITIAL load
async with page.expect_response("**/search/v2/items", timeout=15000) as response_info:
await page.goto("https://ygp.gdzwfw.gov.cn/#/44/jygg")
response = await response_info.value
data = await response.json()
items = data.get("data", {}).get("pageData", [])
results, stop = process_items(items)
for r in results:
print(json.dumps(r, ensure_ascii=False))
if stop:
print("Date range satisfied (initial page). Stopping.", file=sys.stderr)
await browser.close()
return
# Pagination Loop
try:
async with aiohttp.ClientSession() as session:
page_num = 1
total_results = 0
while True:
page_num += 1
print(f"Processing page {page_num}...", file=sys.stderr)
# Find Next Button
# Selector strategy: The pagination 'next' button.
# Usually .btn-next
next_btn = page.locator(".btn-next")
# Check if disabled
if await next_btn.get_attribute("disabled") is not None:
await delay(500) # Initial delay before first request too
resp = await fetch_page(session, page_num)
if resp is None:
print("Failed to fetch data. Stopping.", file=sys.stderr)
break
# API returns {errcode, errmsg, data}
data = resp.get("data", {})
items = data.get("pageData", [])
if not items:
print("No more items.", file=sys.stderr)
break
results, stop = process_items(items, start_date, end_date)
for r in results:
# Print to console (as JSON for readability)
print(json.dumps(r, ensure_ascii=False))
sys.stdout.flush()
# Write to CSV immediately
csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]])
csv_file.flush()
total_results += 1
if stop:
print("Date range satisfied. Stopping.", file=sys.stderr)
break
# Check if we've reached the last page
# API returns pageTotal, not pages
total = int(data.get("total", 0))
pages = data.get("pageTotal", 0)
if page_num >= pages:
print("Reached last page.", file=sys.stderr)
break
# Click and Wait for API
try:
async with page.expect_response("**/search/v2/items", timeout=10000) as response_info:
await next_btn.click()
response = await response_info.value
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
break
data = await response.json()
items = data.get("data", {}).get("pageData", [])
if not items:
print("No more items.", file=sys.stderr)
break
results, stop = process_items(items)
for r in results:
print(json.dumps(r, ensure_ascii=False))
if stop:
print("Date range satisfied. Stopping.", file=sys.stderr)
break
except Exception as e:
print(f"Error during pagination: {e}", file=sys.stderr)
break
except Exception as e:
print(f"Fatal Error: {e}", file=sys.stderr)
await browser.close()
page_num += 1
await delay(1000) # 1s delay between requests
print(f"\nTotal results saved: {total_results}", file=sys.stderr)
finally:
csv_file.close()
if __name__ == "__main__":
asyncio.run(run())