Compare commits

..

2 Commits

3 changed files with 204 additions and 144 deletions

View File

@ -8,12 +8,13 @@
- **日期过滤**:支持指定开始和结束日期,默认为抓取当天数据。 - **日期过滤**:支持指定开始和结束日期,默认为抓取当天数据。
- **自动分页**:自动处理多页数据抓取。 - **自动分页**:自动处理多页数据抓取。
- **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。 - **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。
- **处理动态加密**:利用 Playwright 驱动浏览器,自动处理 API 请求头中的加密签名。 - **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API无需浏览器轻量高效。
- **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。
- **自定义输出路径**:支持通过参数指定输出文件路径。
## 环境要求 ## 环境要求
- Python 3.8+ - Python 3.8+
- Chromium 浏览器 (由 Playwright 自动安装)
## 安装步骤 ## 安装步骤
@ -34,11 +35,6 @@
pip install -r requirements.txt pip install -r requirements.txt
``` ```
4. **安装 Playwright 浏览器内核**
```bash
playwright install chromium
```
## 使用方法 ## 使用方法
### 1. 抓取今天发布的数据 (默认) ### 1. 抓取今天发布的数据 (默认)
@ -53,18 +49,25 @@ python ygp_crawler.py
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04
``` ```
### 3. 保存抓取结果 ### 3. 自定义输出文件路径
脚本将结果以 JSON 格式输出到控制台。可以使用重定向将其保存到文件中 使用 `-o``--output` 参数指定输出 CSV 文件的路径(默认为 `results.csv`
```bash ```bash
python ygp_crawler.py --start-date 2026-02-01 > results.jsonl python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv
``` ```
## 数据输出示例 ## 数据输出示例
### 终端输出JSON 格式,便于阅读)
```json ```json
{ {
"项目标题": "某某项目中标结果公示", "项目标题": "某某项目中标结果公示",
"发布时间": "20260204173002", "发布时间": "2026-02-04 17:30:02",
"详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..." "详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..."
} }
``` ```
### CSV 文件格式
```csv
项目标题,发布时间,详细链接
某某项目中标结果公示,2026-02-04 17:30:02,https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?...
```

View File

@ -1 +1 @@
playwright aiohttp

View File

@ -1,17 +1,28 @@
import argparse import argparse
import asyncio import asyncio
import csv
import json import json
import sys import sys
import urllib.parse import urllib.parse
from datetime import datetime, date from datetime import datetime, date
from playwright.async_api import async_playwright import aiohttp
async def delay(ms: int):
"""Async delay in milliseconds."""
await asyncio.sleep(ms / 1000)
API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal"
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取")
parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)")
parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)")
return parser.parse_args() return parser.parse_args()
def parse_api_date(date_str): def parse_api_date(date_str):
"""Parses date string from API (YYYYMMDDHHMMSS) to date object.""" """Parses date string from API (YYYYMMDDHHMMSS) to date object."""
if not date_str: if not date_str:
@ -21,34 +32,139 @@ def parse_api_date(date_str):
except ValueError: except ValueError:
return None return None
def format_datetime(date_str):
"""Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss."""
if not date_str:
return ""
try:
dt = datetime.strptime(date_str, "%Y%m%d%H%M%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return date_str
def construct_detail_url(item): def construct_detail_url(item):
"""Constructs the detail page URL based on item data.""" """Constructs the detail page URL based on item data.
# Pattern derived from analysis:
# https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=...
base_url = "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A" URL format derived from useJump-b2a96f17.js:
https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=...
Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType
"""
edition = item.get("edition", "v3")
trading_type = item.get("noticeSecondType", "A")
base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}"
# Map API fields to URL parameters
# Note: 'nodeId' is missing as it's not in the list response.
# We assume the page handles missing nodeId or it's not strictly required for direct access.
params = { params = {
"noticeId": item.get("noticeId", ""), "noticeId": item.get("noticeId", ""),
"projectCode": item.get("projectCode", ""), "projectCode": item.get("projectCode", ""),
"bizCode": item.get("tradingProcess", ""), "bizCode": item.get("tradingProcess", item.get("bizCode", "")),
"siteCode": item.get("siteCode", ""), "siteCode": item.get("regionCode", item.get("siteCode", "")),
"publishDate": item.get("publishDate", ""), "publishDate": item.get("publishDate", ""),
"source": item.get("pubServicePlat", ""), "source": item.get("pubServicePlat", ""),
"titleDetails": item.get("noticeSecondTypeDesc", ""), "titleDetails": item.get("noticeSecondTypeDesc", ""),
"classify": item.get("projectType", "") "classify": item.get("projectType", "")
} }
params = {k: v for k, v in params.items() if v}
query = urllib.parse.urlencode(params) query = urllib.parse.urlencode(params)
return f"{base_url}?{query}" return f"{base_url}?{query}"
def build_search_payload(page_num=1, page_size=10):
"""Build the search API payload.
Based on analysis of the frontend code (JyggFilter component).
"""
return {
"pageNo": page_num,
"pageSize": page_size,
"keyword": "",
"siteCode": "44",
"secondType": "",
"tradingProcess": "",
"thirdType": "[]",
"projectType": "",
"publishStartTime": "",
"publishEndTime": "",
"type": "trading-type"
}
def process_items(items, start_date, end_date):
"""Process a batch of items and filter by date and keyword."""
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
if item_date > end_date:
continue
if item_date < start_date:
continue
if "中标结果" in title:
page_results.append({
"项目标题": title,
"发布时间": format_datetime(pub_date_str),
"详细链接": construct_detail_url(item)
})
# Only stop if all items on this page are older than start_date
# and there are no matching results
if min_date_on_page and min_date_on_page < start_date and not page_results:
# Check if the newest item is also older than start_date
max_date_on_page = None
for item in items:
item_date = parse_api_date(item.get("publishDate", ""))
if item_date:
if max_date_on_page is None or item_date > max_date_on_page:
max_date_on_page = item_date
if max_date_on_page and max_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
async def fetch_page(session, page_num, page_size=10):
"""Fetch a single page of data from the API."""
url = f"{API_BASE_URL}/search/v2/items"
payload = build_search_payload(page_num, page_size)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "application/json, text/plain, */*",
"Origin": "https://ygp.gdzwfw.gov.cn",
"Referer": "https://ygp.gdzwfw.gov.cn/"
}
try:
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
return None
return await response.json()
except Exception as e:
print(f"Error fetching page {page_num}: {e}", file=sys.stderr)
return None
async def run(): async def run():
args = parse_args() args = parse_args()
# Determine Date Range
today = date.today() today = date.today()
start_date = today start_date = today
end_date = today end_date = today
@ -72,128 +188,69 @@ async def run():
sys.exit(1) sys.exit(1)
print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr)
print(f"Output file: {args.output}", file=sys.stderr)
async with async_playwright() as p: # Open CSV file and write header
browser = await p.chromium.launch(headless=True) csv_file = open(args.output, "w", newline="", encoding="utf-8")
context = await browser.new_context( csv_writer = csv.writer(csv_file)
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", csv_writer.writerow(["项目标题", "发布时间", "详细链接"])
viewport={"width": 1280, "height": 800} csv_file.flush()
)
page = await context.new_page()
# Helper to process a batch of items
def process_items(items):
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
# Date Filter
if item_date > end_date:
continue # Too new, skip
if item_date < start_date:
# Found an item older than start date.
# Since lists are usually ordered, this suggests we might be done.
# However, to be safe (pinned items?), we just skip it here,
# but set a signal that we *might* want to stop if the whole page is old.
pass
# Keyword Filter
if "中标结果" in title:
# Add to results
page_results.append({
"项目标题": title,
"发布时间": pub_date_str,
"详细链接": construct_detail_url(item)
})
# Stop condition: If the newest item on the page (or min_date) is older than start_date?
# Actually, valid items could be anywhere if not strictly sorted.
# But "min_date_on_page < start_date" means the page contains items older than target.
# If the *entire* page is older than start_date, we definitely stop.
# Let's assume strict reverse chronological order for efficiency.
if min_date_on_page and min_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
try: try:
print("Loading page...", file=sys.stderr) async with aiohttp.ClientSession() as session:
# Setup response listener for the INITIAL load
async with page.expect_response("**/search/v2/items", timeout=15000) as response_info:
await page.goto("https://ygp.gdzwfw.gov.cn/#/44/jygg")
response = await response_info.value
data = await response.json()
items = data.get("data", {}).get("pageData", [])
results, stop = process_items(items)
for r in results:
print(json.dumps(r, ensure_ascii=False))
if stop:
print("Date range satisfied (initial page). Stopping.", file=sys.stderr)
await browser.close()
return
# Pagination Loop
page_num = 1 page_num = 1
total_results = 0
while True: while True:
page_num += 1
print(f"Processing page {page_num}...", file=sys.stderr) print(f"Processing page {page_num}...", file=sys.stderr)
# Find Next Button await delay(500) # Initial delay before first request too
# Selector strategy: The pagination 'next' button.
# Usually .btn-next
next_btn = page.locator(".btn-next")
# Check if disabled resp = await fetch_page(session, page_num)
if await next_btn.get_attribute("disabled") is not None:
print("Reached last page.", file=sys.stderr) if resp is None:
print("Failed to fetch data. Stopping.", file=sys.stderr)
break break
# Click and Wait for API # API returns {errcode, errmsg, data}
try: data = resp.get("data", {})
async with page.expect_response("**/search/v2/items", timeout=10000) as response_info: items = data.get("pageData", [])
await next_btn.click()
response = await response_info.value
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
break
data = await response.json()
items = data.get("data", {}).get("pageData", [])
if not items: if not items:
print("No more items.", file=sys.stderr) print("No more items.", file=sys.stderr)
break break
results, stop = process_items(items) results, stop = process_items(items, start_date, end_date)
for r in results: for r in results:
# Print to console (as JSON for readability)
print(json.dumps(r, ensure_ascii=False)) print(json.dumps(r, ensure_ascii=False))
sys.stdout.flush()
# Write to CSV immediately
csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]])
csv_file.flush()
total_results += 1
if stop: if stop:
print("Date range satisfied. Stopping.", file=sys.stderr) print("Date range satisfied. Stopping.", file=sys.stderr)
break break
except Exception as e: # Check if we've reached the last page
print(f"Error during pagination: {e}", file=sys.stderr) # API returns pageTotal, not pages
total = int(data.get("total", 0))
pages = data.get("pageTotal", 0)
if page_num >= pages:
print("Reached last page.", file=sys.stderr)
break break
except Exception as e: page_num += 1
print(f"Fatal Error: {e}", file=sys.stderr) await delay(1000) # 1s delay between requests
print(f"\nTotal results saved: {total_results}", file=sys.stderr)
finally:
csv_file.close()
await browser.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(run()) asyncio.run(run())