CSV 实时保存,自定义输出路径,时间格式化

This commit is contained in:
秦秋旭 2026-02-04 21:06:11 +08:00
parent f6b5644442
commit 8d302a8b55
2 changed files with 77 additions and 35 deletions

View File

@ -9,6 +9,8 @@
- **自动分页**:自动处理多页数据抓取。 - **自动分页**:自动处理多页数据抓取。
- **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。 - **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。
- **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API无需浏览器轻量高效。 - **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API无需浏览器轻量高效。
- **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。
- **自定义输出路径**:支持通过参数指定输出文件路径。
## 环境要求 ## 环境要求
@ -47,18 +49,25 @@ python ygp_crawler.py
python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04
``` ```
### 3. 保存抓取结果 ### 3. 自定义输出文件路径
脚本将结果以 JSON 格式输出到控制台。可以使用重定向将其保存到文件中 使用 `-o``--output` 参数指定输出 CSV 文件的路径(默认为 `results.csv`
```bash ```bash
python ygp_crawler.py --start-date 2026-02-01 > results.jsonl python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv
``` ```
## 数据输出示例 ## 数据输出示例
### 终端输出JSON 格式,便于阅读)
```json ```json
{ {
"项目标题": "某某项目中标结果公示", "项目标题": "某某项目中标结果公示",
"发布时间": "20260204173002", "发布时间": "2026-02-04 17:30:02",
"详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..." "详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..."
} }
``` ```
### CSV 文件格式
```csv
项目标题,发布时间,详细链接
某某项目中标结果公示,2026-02-04 17:30:02,https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?...
```

View File

@ -1,5 +1,6 @@
import argparse import argparse
import asyncio import asyncio
import csv
import json import json
import sys import sys
import urllib.parse import urllib.parse
@ -18,6 +19,7 @@ def parse_args():
parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取")
parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)")
parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)")
return parser.parse_args() return parser.parse_args()
@ -31,6 +33,17 @@ def parse_api_date(date_str):
return None return None
def format_datetime(date_str):
"""Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss."""
if not date_str:
return ""
try:
dt = datetime.strptime(date_str, "%Y%m%d%H%M%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return date_str
def construct_detail_url(item): def construct_detail_url(item):
"""Constructs the detail page URL based on item data. """Constructs the detail page URL based on item data.
@ -104,7 +117,7 @@ def process_items(items, start_date, end_date):
if "中标结果" in title: if "中标结果" in title:
page_results.append({ page_results.append({
"项目标题": title, "项目标题": title,
"发布时间": pub_date_str, "发布时间": format_datetime(pub_date_str),
"详细链接": construct_detail_url(item) "详细链接": construct_detail_url(item)
}) })
@ -175,48 +188,68 @@ async def run():
sys.exit(1) sys.exit(1)
print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr)
print(f"Output file: {args.output}", file=sys.stderr)
async with aiohttp.ClientSession() as session: # Open CSV file and write header
page_num = 1 csv_file = open(args.output, "w", newline="", encoding="utf-8")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["项目标题", "发布时间", "详细链接"])
csv_file.flush()
while True: try:
print(f"Processing page {page_num}...", file=sys.stderr) async with aiohttp.ClientSession() as session:
page_num = 1
total_results = 0
await delay(500) # Initial delay before first request too while True:
print(f"Processing page {page_num}...", file=sys.stderr)
resp = await fetch_page(session, page_num) await delay(500) # Initial delay before first request too
if resp is None: resp = await fetch_page(session, page_num)
print("Failed to fetch data. Stopping.", file=sys.stderr)
break
# API returns {errcode, errmsg, data} if resp is None:
data = resp.get("data", {}) print("Failed to fetch data. Stopping.", file=sys.stderr)
items = data.get("pageData", []) break
if not items: # API returns {errcode, errmsg, data}
print("No more items.", file=sys.stderr) data = resp.get("data", {})
break items = data.get("pageData", [])
results, stop = process_items(items, start_date, end_date) if not items:
for r in results: print("No more items.", file=sys.stderr)
print(json.dumps(r, ensure_ascii=False)) break
if stop: results, stop = process_items(items, start_date, end_date)
print("Date range satisfied. Stopping.", file=sys.stderr) for r in results:
break # Print to console (as JSON for readability)
print(json.dumps(r, ensure_ascii=False))
sys.stdout.flush()
# Check if we've reached the last page # Write to CSV immediately
# API returns pageTotal, not pages csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]])
total = int(data.get("total", 0)) csv_file.flush()
pages = data.get("pageTotal", 0) total_results += 1
if page_num >= pages: if stop:
print("Reached last page.", file=sys.stderr) print("Date range satisfied. Stopping.", file=sys.stderr)
break break
page_num += 1 # Check if we've reached the last page
await delay(1000) # 1s delay between requests # API returns pageTotal, not pages
total = int(data.get("total", 0))
pages = data.get("pageTotal", 0)
if page_num >= pages:
print("Reached last page.", file=sys.stderr)
break
page_num += 1
await delay(1000) # 1s delay between requests
print(f"\nTotal results saved: {total_results}", file=sys.stderr)
finally:
csv_file.close()
if __name__ == "__main__": if __name__ == "__main__":