From 4a458a897b963ddf079b826494b5738e891f54ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E7=A7=8B=E6=97=AD?= Date: Wed, 4 Feb 2026 21:14:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E9=87=8F=E7=88=AC=E5=8F=96=EF=BC=8C?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 23 ++++++ ygp_crawler.py | 196 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 166 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 21de2fe..1a59f6a 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ - **纯 HTTP 请求**:直接使用 aiohttp 调用官方 API,无需浏览器,轻量高效。 - **CSV 实时保存**:数据实时保存到 CSV 文件,同时输出到终端。 - **自定义输出路径**:支持通过参数指定输出文件路径。 +- **增量爬取**:支持从已有 CSV 文件自动计算时间范围,只抓取新数据,避免重复。 +- **数据排序**:新数据在前,旧数据在后,按发布时间倒序排列。 ## 环境要求 @@ -55,6 +57,27 @@ python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 python ygp_crawler.py --start-date 2026-02-01 --end-date 2026-02-04 -o my_data.csv ``` +### 4. 增量爬取 +使用 `-i` 或 `--incremental` 参数启用增量爬取模式。脚本会自动读取已有 CSV 文件,计算时间范围,只抓取新数据。 + +**自动计算日期范围(推荐)**: +```bash +# 自动从已有数据的最新日期+1开始,爬取到今天 +python ygp_crawler.py -i +``` + +**手动指定日期范围**: +```bash +# 在增量模式下手动指定日期范围 +python ygp_crawler.py -i --start-date 2026-02-01 --end-date 2026-02-04 +``` + +**增量爬取特性**: +- 自动识别已有 CSV 文件中的数据时间范围 +- 自动去重(根据详情链接判断) +- 新数据插入到文件前面,旧数据保留在后面 +- 如果无新数据可抓取,自动退出 + ## 数据输出示例 ### 终端输出(JSON 格式,便于阅读) diff --git a/ygp_crawler.py b/ygp_crawler.py index abc3cb3..be0ce83 100644 --- a/ygp_crawler.py +++ b/ygp_crawler.py @@ -2,9 +2,10 @@ import argparse import asyncio import csv import json +import os import sys import urllib.parse -from datetime import datetime, date +from datetime import datetime, date, timedelta import aiohttp @@ -12,6 +13,7 @@ async def delay(ms: int): """Async delay in milliseconds.""" await asyncio.sleep(ms / 1000) + API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal" @@ -20,6 +22,7 @@ def parse_args(): parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)") + parser.add_argument("--incremental", "-i", action="store_true", help="启用增量爬取模式") return parser.parse_args() @@ -33,6 +36,16 @@ def parse_api_date(date_str): return None +def parse_csv_datetime(date_str): + """Parses CSV datetime string (YYYY-MM-DD HH:mm:ss) to date object.""" + if not date_str: + return None + try: + return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").date() + except ValueError: + return None + + def format_datetime(date_str): """Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss.""" if not date_str: @@ -44,6 +57,44 @@ def format_datetime(date_str): return date_str +def read_existing_csv(csv_path): + """ + Reads existing CSV file and returns: + - existing_data: list of rows (excluding header) + - min_date: oldest date in the file + - max_date: newest date in the file + """ + if not os.path.exists(csv_path): + return [], None, None + + existing_data = [] + min_date = None + max_date = None + + try: + with open(csv_path, "r", encoding="utf-8", newline="") as f: + reader = csv.reader(f) + header = next(reader, None) # Skip header + if not header: + return [], None, None + + for row in reader: + if len(row) >= 2: + existing_data.append(row) + # Parse date from row (index 1 is 发布时间) + row_date = parse_csv_datetime(row[1]) + if row_date: + if min_date is None or row_date < min_date: + min_date = row_date + if max_date is None or row_date > max_date: + max_date = row_date + except Exception as e: + print(f"Warning: Error reading existing CSV: {e}", file=sys.stderr) + return [], None, None + + return existing_data, min_date, max_date + + def construct_detail_url(item): """Constructs the detail page URL based on item data. @@ -75,10 +126,7 @@ def construct_detail_url(item): def build_search_payload(page_num=1, page_size=10): - """Build the search API payload. - - Based on analysis of the frontend code (JyggFilter component). - """ + """Build the search API payload.""" return { "pageNo": page_num, "pageSize": page_size, @@ -162,6 +210,23 @@ async def fetch_page(session, page_num, page_size=10): return None +def deduplicate_results(new_results, existing_data): + """Remove duplicates from new_results based on existing_data.""" + # Create a set of existing URLs for fast lookup + existing_urls = set() + for row in existing_data: + if len(row) >= 3: + existing_urls.add(row[2]) # 详细链接 is the 3rd column + + # Filter out duplicates + unique_results = [] + for r in new_results: + if r["详细链接"] not in existing_urls: + unique_results.append(r) + + return unique_results + + async def run(): args = parse_args() @@ -169,12 +234,27 @@ async def run(): start_date = today end_date = today + # Read existing CSV if in incremental mode + existing_data = [] + csv_min_date = None + csv_max_date = None + + if args.incremental and os.path.exists(args.output): + existing_data, csv_min_date, csv_max_date = read_existing_csv(args.output) + if csv_min_date and csv_max_date: + print(f"Existing data range: {csv_min_date} to {csv_max_date}", file=sys.stderr) + print(f"Existing records: {len(existing_data)}", file=sys.stderr) + + # Determine date range if args.start_date: try: start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr) sys.exit(1) + elif args.incremental and csv_max_date: + # In incremental mode without explicit start_date, fetch from max_date+1 to today + start_date = csv_max_date + timedelta(days=1) if args.end_date: try: @@ -184,72 +264,82 @@ async def run(): sys.exit(1) if start_date > end_date: - print(f"Error: Start date {start_date} is after end date {end_date}", file=sys.stderr) - sys.exit(1) + print(f"Info: Start date {start_date} is after end date {end_date}, no new data to fetch.", file=sys.stderr) + print(f"Existing records: {len(existing_data)}", file=sys.stderr) + sys.exit(0) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"Output file: {args.output}", file=sys.stderr) + if args.incremental: + print(f"Incremental mode: ON", file=sys.stderr) - # Open CSV file and write header - csv_file = open(args.output, "w", newline="", encoding="utf-8") - csv_writer = csv.writer(csv_file) - csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) - csv_file.flush() + # Collect all new results first + new_results = [] - try: - async with aiohttp.ClientSession() as session: - page_num = 1 - total_results = 0 + async with aiohttp.ClientSession() as session: + page_num = 1 - while True: - print(f"Processing page {page_num}...", file=sys.stderr) + while True: + print(f"Processing page {page_num}...", file=sys.stderr) - await delay(500) # Initial delay before first request too + await delay(500) - resp = await fetch_page(session, page_num) + resp = await fetch_page(session, page_num) - if resp is None: - print("Failed to fetch data. Stopping.", file=sys.stderr) - break + if resp is None: + print("Failed to fetch data. Stopping.", file=sys.stderr) + break - # API returns {errcode, errmsg, data} - data = resp.get("data", {}) - items = data.get("pageData", []) + data = resp.get("data", {}) + items = data.get("pageData", []) - if not items: - print("No more items.", file=sys.stderr) - break + if not items: + print("No more items.", file=sys.stderr) + break - results, stop = process_items(items, start_date, end_date) - for r in results: - # Print to console (as JSON for readability) - print(json.dumps(r, ensure_ascii=False)) - sys.stdout.flush() + results, stop = process_items(items, start_date, end_date) + new_results.extend(results) - # Write to CSV immediately - csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]]) - csv_file.flush() - total_results += 1 + # Print to console immediately + for r in results: + print(json.dumps(r, ensure_ascii=False)) + sys.stdout.flush() - if stop: - print("Date range satisfied. Stopping.", file=sys.stderr) - break + if stop: + print("Date range satisfied. Stopping.", file=sys.stderr) + break - # Check if we've reached the last page - # API returns pageTotal, not pages - total = int(data.get("total", 0)) - pages = data.get("pageTotal", 0) + pages = data.get("pageTotal", 0) - if page_num >= pages: - print("Reached last page.", file=sys.stderr) - break + if page_num >= pages: + print("Reached last page.", file=sys.stderr) + break - page_num += 1 - await delay(1000) # 1s delay between requests + page_num += 1 + await delay(1000) - print(f"\nTotal results saved: {total_results}", file=sys.stderr) - finally: - csv_file.close() + print(f"\nNew results fetched: {len(new_results)}", file=sys.stderr) + + # Deduplicate if in incremental mode + if args.incremental and existing_data: + new_results = deduplicate_results(new_results, existing_data) + print(f"After deduplication: {len(new_results)}", file=sys.stderr) + + # Write to CSV: new data first, then existing data + with open(args.output, "w", newline="", encoding="utf-8") as csv_file: + csv_writer = csv.writer(csv_file) + csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) + + # Write new results first (newer data) + for r in new_results: + csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]]) + + # Write existing data (older data) + for row in existing_data: + csv_writer.writerow(row[:3]) # Only write first 3 columns + + total_records = len(new_results) + len(existing_data) + print(f"Total records in CSV: {total_records}", file=sys.stderr) if __name__ == "__main__":