Compare commits

..

No commits in common. "8d302a8b55e446fbeed4bc337622c5787fdf0125" and "e8239d2099b5c98e359c1888340a3f41ca1b848c" have entirely different histories.

3 changed files with 144 additions and 204 deletions

View File

@ -8,13 +8,12 @@
- **日期过滤**:支持指定开始和结束日期,默认为抓取当天数据。 - **日期过滤**:支持指定开始和结束日期,默认为抓取当天数据。
- **自动分页**:自动处理多页数据抓取。 - **自动分页**:自动处理多页数据抓取。
- **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。 - **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。
- **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API无需浏览器轻量高效。 - **处理动态加密**:利用 Playwright 驱动浏览器,自动处理 API 请求头中的加密签名。
- **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。
- **自定义输出路径**:支持通过参数指定输出文件路径。
## 环境要求 ## 环境要求
- Python 3.8+ - Python 3.8+
- Chromium 浏览器 (由 Playwright 自动安装)
## 安装步骤 ## 安装步骤
@ -35,6 +34,11 @@
pip install -r requirements.txt pip install -r requirements.txt
``` ```
4. **安装 Playwright 浏览器内核**
```bash
playwright install chromium
```
## 使用方法 ## 使用方法
### 1. 抓取今天发布的数据 (默认) ### 1. 抓取今天发布的数据 (默认)
@ -49,25 +53,18 @@ python ygp_crawler.py
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04
``` ```
### 3. 自定义输出文件路径 ### 3. 保存抓取结果
使用 `-o``--output` 参数指定输出 CSV 文件的路径(默认为 `results.csv` 脚本将结果以 JSON 格式输出到控制台。可以使用重定向将其保存到文件中
```bash ```bash
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv python ygp_crawler.py --start-date 2026-02-01 > results.jsonl
``` ```
## 数据输出示例 ## 数据输出示例
### 终端输出JSON 格式,便于阅读)
```json ```json
{ {
"项目标题": "某某项目中标结果公示", "项目标题": "某某项目中标结果公示",
"发布时间": "2026-02-04 17:30:02", "发布时间": "20260204173002",
"详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..." "详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..."
} }
``` ```
### CSV 文件格式
```csv
项目标题,发布时间,详细链接
某某项目中标结果公示,2026-02-04 17:30:02,https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?...
```

View File

@ -1 +1 @@
aiohttp playwright

View File

@ -1,28 +1,17 @@
import argparse import argparse
import asyncio import asyncio
import csv
import json import json
import sys import sys
import urllib.parse import urllib.parse
from datetime import datetime, date from datetime import datetime, date
import aiohttp from playwright.async_api import async_playwright
async def delay(ms: int):
"""Async delay in milliseconds."""
await asyncio.sleep(ms / 1000)
API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal"
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取")
parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)")
parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)")
return parser.parse_args() return parser.parse_args()
def parse_api_date(date_str): def parse_api_date(date_str):
"""Parses date string from API (YYYYMMDDHHMMSS) to date object.""" """Parses date string from API (YYYYMMDDHHMMSS) to date object."""
if not date_str: if not date_str:
@ -32,139 +21,34 @@ def parse_api_date(date_str):
except ValueError: except ValueError:
return None return None
def format_datetime(date_str):
"""Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss."""
if not date_str:
return ""
try:
dt = datetime.strptime(date_str, "%Y%m%d%H%M%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return date_str
def construct_detail_url(item): def construct_detail_url(item):
"""Constructs the detail page URL based on item data. """Constructs the detail page URL based on item data."""
# Pattern derived from analysis:
# https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=...
URL format derived from useJump-b2a96f17.js: base_url = "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A"
https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=...
Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType
"""
edition = item.get("edition", "v3")
trading_type = item.get("noticeSecondType", "A")
base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}"
# Map API fields to URL parameters
# Note: 'nodeId' is missing as it's not in the list response.
# We assume the page handles missing nodeId or it's not strictly required for direct access.
params = { params = {
"noticeId": item.get("noticeId", ""), "noticeId": item.get("noticeId", ""),
"projectCode": item.get("projectCode", ""), "projectCode": item.get("projectCode", ""),
"bizCode": item.get("tradingProcess", item.get("bizCode", "")), "bizCode": item.get("tradingProcess", ""),
"siteCode": item.get("regionCode", item.get("siteCode", "")), "siteCode": item.get("siteCode", ""),
"publishDate": item.get("publishDate", ""), "publishDate": item.get("publishDate", ""),
"source": item.get("pubServicePlat", ""), "source": item.get("pubServicePlat", ""),
"titleDetails": item.get("noticeSecondTypeDesc", ""), "titleDetails": item.get("noticeSecondTypeDesc", ""),
"classify": item.get("projectType", "") "classify": item.get("projectType", "")
} }
params = {k: v for k, v in params.items() if v}
query = urllib.parse.urlencode(params) query = urllib.parse.urlencode(params)
return f"{base_url}?{query}" return f"{base_url}?{query}"
def build_search_payload(page_num=1, page_size=10):
"""Build the search API payload.
Based on analysis of the frontend code (JyggFilter component).
"""
return {
"pageNo": page_num,
"pageSize": page_size,
"keyword": "",
"siteCode": "44",
"secondType": "",
"tradingProcess": "",
"thirdType": "[]",
"projectType": "",
"publishStartTime": "",
"publishEndTime": "",
"type": "trading-type"
}
def process_items(items, start_date, end_date):
"""Process a batch of items and filter by date and keyword."""
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
if item_date > end_date:
continue
if item_date < start_date:
continue
if "中标结果" in title:
page_results.append({
"项目标题": title,
"发布时间": format_datetime(pub_date_str),
"详细链接": construct_detail_url(item)
})
# Only stop if all items on this page are older than start_date
# and there are no matching results
if min_date_on_page and min_date_on_page < start_date and not page_results:
# Check if the newest item is also older than start_date
max_date_on_page = None
for item in items:
item_date = parse_api_date(item.get("publishDate", ""))
if item_date:
if max_date_on_page is None or item_date > max_date_on_page:
max_date_on_page = item_date
if max_date_on_page and max_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
async def fetch_page(session, page_num, page_size=10):
"""Fetch a single page of data from the API."""
url = f"{API_BASE_URL}/search/v2/items"
payload = build_search_payload(page_num, page_size)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "application/json, text/plain, */*",
"Origin": "https://ygp.gdzwfw.gov.cn",
"Referer": "https://ygp.gdzwfw.gov.cn/"
}
try:
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
return None
return await response.json()
except Exception as e:
print(f"Error fetching page {page_num}: {e}", file=sys.stderr)
return None
async def run(): async def run():
args = parse_args() args = parse_args()
# Determine Date Range
today = date.today() today = date.today()
start_date = today start_date = today
end_date = today end_date = today
@ -188,69 +72,128 @@ async def run():
sys.exit(1) sys.exit(1)
print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr)
print(f"Output file: {args.output}", file=sys.stderr)
# Open CSV file and write header async with async_playwright() as p:
csv_file = open(args.output, "w", newline="", encoding="utf-8") browser = await p.chromium.launch(headless=True)
csv_writer = csv.writer(csv_file) context = await browser.new_context(
csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
csv_file.flush() viewport={"width": 1280, "height": 800}
)
page = await context.new_page()
# Helper to process a batch of items
def process_items(items):
page_results = []
stop_signal = False
min_date_on_page = None
for item in items:
title = item.get("noticeTitle", "")
pub_date_str = item.get("publishDate", "")
item_date = parse_api_date(pub_date_str)
if item_date:
if min_date_on_page is None or item_date < min_date_on_page:
min_date_on_page = item_date
# Date Filter
if item_date > end_date:
continue # Too new, skip
if item_date < start_date:
# Found an item older than start date.
# Since lists are usually ordered, this suggests we might be done.
# However, to be safe (pinned items?), we just skip it here,
# but set a signal that we *might* want to stop if the whole page is old.
pass
# Keyword Filter
if "中标结果" in title:
# Add to results
page_results.append({
"项目标题": title,
"发布时间": pub_date_str,
"详细链接": construct_detail_url(item)
})
# Stop condition: If the newest item on the page (or min_date) is older than start_date?
# Actually, valid items could be anywhere if not strictly sorted.
# But "min_date_on_page < start_date" means the page contains items older than target.
# If the *entire* page is older than start_date, we definitely stop.
# Let's assume strict reverse chronological order for efficiency.
if min_date_on_page and min_date_on_page < start_date:
stop_signal = True
return page_results, stop_signal
try: try:
async with aiohttp.ClientSession() as session: print("Loading page...", file=sys.stderr)
page_num = 1
total_results = 0
# Setup response listener for the INITIAL load
async with page.expect_response("**/search/v2/items", timeout=15000) as response_info:
await page.goto("https://ygp.gdzwfw.gov.cn/#/44/jygg")
response = await response_info.value
data = await response.json()
items = data.get("data", {}).get("pageData", [])
results, stop = process_items(items)
for r in results:
print(json.dumps(r, ensure_ascii=False))
if stop:
print("Date range satisfied (initial page). Stopping.", file=sys.stderr)
await browser.close()
return
# Pagination Loop
page_num = 1
while True: while True:
page_num += 1
print(f"Processing page {page_num}...", file=sys.stderr) print(f"Processing page {page_num}...", file=sys.stderr)
await delay(500) # Initial delay before first request too # Find Next Button
# Selector strategy: The pagination 'next' button.
# Usually .btn-next
next_btn = page.locator(".btn-next")
resp = await fetch_page(session, page_num) # Check if disabled
if await next_btn.get_attribute("disabled") is not None:
if resp is None: print("Reached last page.", file=sys.stderr)
print("Failed to fetch data. Stopping.", file=sys.stderr)
break break
# API returns {errcode, errmsg, data} # Click and Wait for API
data = resp.get("data", {}) try:
items = data.get("pageData", []) async with page.expect_response("**/search/v2/items", timeout=10000) as response_info:
await next_btn.click()
response = await response_info.value
if response.status != 200:
print(f"API Error: {response.status}", file=sys.stderr)
break
data = await response.json()
items = data.get("data", {}).get("pageData", [])
if not items: if not items:
print("No more items.", file=sys.stderr) print("No more items.", file=sys.stderr)
break break
results, stop = process_items(items, start_date, end_date) results, stop = process_items(items)
for r in results: for r in results:
# Print to console (as JSON for readability)
print(json.dumps(r, ensure_ascii=False)) print(json.dumps(r, ensure_ascii=False))
sys.stdout.flush()
# Write to CSV immediately
csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]])
csv_file.flush()
total_results += 1
if stop: if stop:
print("Date range satisfied. Stopping.", file=sys.stderr) print("Date range satisfied. Stopping.", file=sys.stderr)
break break
# Check if we've reached the last page except Exception as e:
# API returns pageTotal, not pages print(f"Error during pagination: {e}", file=sys.stderr)
total = int(data.get("total", 0))
pages = data.get("pageTotal", 0)
if page_num >= pages:
print("Reached last page.", file=sys.stderr)
break break
page_num += 1 except Exception as e:
await delay(1000) # 1s delay between requests print(f"Fatal Error: {e}", file=sys.stderr)
print(f"\nTotal results saved: {total_results}", file=sys.stderr)
finally:
csv_file.close()
await browser.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(run()) asyncio.run(run())