From 8d302a8b55e446fbeed4bc337622c5787fdf0125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E7=A7=8B=E6=97=AD?= Date: Wed, 4 Feb 2026 21:06:11 +0800 Subject: [PATCH] =?UTF-8?q?CSV=20=E5=AE=9E=E6=97=B6=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=EF=BC=8C=E8=87=AA=E5=AE=9A=E4=B9=89=E8=BE=93=E5=87=BA=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=EF=BC=8C=E6=97=B6=E9=97=B4=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 17 ++++++--- ygp_crawler.py | 95 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 9a3c4df..21de2fe 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ - **自动分页**:自动处理多页数据抓取。 - **动态构造 URL**:根据接口返回字段自动生成可直接访问的详情页链接。 - **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API,无需浏览器,轻量高效。 +- **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。 +- **自定义输出路径**:支持通过参数指定输出文件路径。 ## 环境要求 @@ -47,18 +49,25 @@ python ygp_crawler.py python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 ``` -### 3. 保存抓取结果 -脚本将结果以 JSON 格式输出到控制台。可以使用重定向将其保存到文件中。 +### 3. 自定义输出文件路径 +使用 `-o` 或 `--output` 参数指定输出 CSV 文件的路径(默认为 `results.csv`)。 ```bash -python ygp_crawler.py --start-date 2026-02-01 > results.jsonl +python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv ``` ## 数据输出示例 +### 终端输出(JSON 格式,便于阅读) ```json { "项目标题": "某某项目中标结果公示", - "发布时间": "20260204173002", + "发布时间": "2026-02-04 17:30:02", "详细链接": "https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?noticeId=..." } ``` + +### CSV 文件格式 +```csv +项目标题,发布时间,详细链接 +某某项目中标结果公示,2026-02-04 17:30:02,https://ygp.gdzwfw.gov.cn/#/44/new/jygg/v3/A?... +``` diff --git a/ygp_crawler.py b/ygp_crawler.py index 58eb77d..abc3cb3 100644 --- a/ygp_crawler.py +++ b/ygp_crawler.py @@ -1,5 +1,6 @@ import argparse import asyncio +import csv import json import sys import urllib.parse @@ -18,6 +19,7 @@ def parse_args(): parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") + parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)") return parser.parse_args() @@ -31,6 +33,17 @@ def parse_api_date(date_str): return None +def format_datetime(date_str): + """Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss.""" + if not date_str: + return "" + try: + dt = datetime.strptime(date_str, "%Y%m%d%H%M%S") + return dt.strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return date_str + + def construct_detail_url(item): """Constructs the detail page URL based on item data. @@ -104,7 +117,7 @@ def process_items(items, start_date, end_date): if "中标结果" in title: page_results.append({ "项目标题": title, - "发布时间": pub_date_str, + "发布时间": format_datetime(pub_date_str), "详细链接": construct_detail_url(item) }) @@ -175,48 +188,68 @@ async def run(): sys.exit(1) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) + print(f"Output file: {args.output}", file=sys.stderr) - async with aiohttp.ClientSession() as session: - page_num = 1 + # Open CSV file and write header + csv_file = open(args.output, "w", newline="", encoding="utf-8") + csv_writer = csv.writer(csv_file) + csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) + csv_file.flush() - while True: - print(f"Processing page {page_num}...", file=sys.stderr) + try: + async with aiohttp.ClientSession() as session: + page_num = 1 + total_results = 0 - await delay(500) # Initial delay before first request too + while True: + print(f"Processing page {page_num}...", file=sys.stderr) - resp = await fetch_page(session, page_num) + await delay(500) # Initial delay before first request too - if resp is None: - print("Failed to fetch data. Stopping.", file=sys.stderr) - break + resp = await fetch_page(session, page_num) - # API returns {errcode, errmsg, data} - data = resp.get("data", {}) - items = data.get("pageData", []) + if resp is None: + print("Failed to fetch data. Stopping.", file=sys.stderr) + break - if not items: - print("No more items.", file=sys.stderr) - break + # API returns {errcode, errmsg, data} + data = resp.get("data", {}) + items = data.get("pageData", []) - results, stop = process_items(items, start_date, end_date) - for r in results: - print(json.dumps(r, ensure_ascii=False)) + if not items: + print("No more items.", file=sys.stderr) + break - if stop: - print("Date range satisfied. Stopping.", file=sys.stderr) - break + results, stop = process_items(items, start_date, end_date) + for r in results: + # Print to console (as JSON for readability) + print(json.dumps(r, ensure_ascii=False)) + sys.stdout.flush() - # Check if we've reached the last page - # API returns pageTotal, not pages - total = int(data.get("total", 0)) - pages = data.get("pageTotal", 0) + # Write to CSV immediately + csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]]) + csv_file.flush() + total_results += 1 - if page_num >= pages: - print("Reached last page.", file=sys.stderr) - break + if stop: + print("Date range satisfied. Stopping.", file=sys.stderr) + break - page_num += 1 - await delay(1000) # 1s delay between requests + # Check if we've reached the last page + # API returns pageTotal, not pages + total = int(data.get("total", 0)) + pages = data.get("pageTotal", 0) + + if page_num >= pages: + print("Reached last page.", file=sys.stderr) + break + + page_num += 1 + await delay(1000) # 1s delay between requests + + print(f"\nTotal results saved: {total_results}", file=sys.stderr) + finally: + csv_file.close() if __name__ == "__main__":