import argparse import asyncio import csv import json import os import sys import urllib.parse from datetime import datetime, date, timedelta import aiohttp async def delay(ms: int): """Async delay in milliseconds.""" await asyncio.sleep(ms / 1000) API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal" def parse_args(): parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)") parser.add_argument("--incremental", "-i", action="store_true", help="启用增量爬取模式") return parser.parse_args() def parse_api_date(date_str): """Parses date string from API (YYYYMMDDHHMMSS) to date object.""" if not date_str: return None try: return datetime.strptime(date_str, "%Y%m%d%H%M%S").date() except ValueError: return None def parse_csv_datetime(date_str): """Parses CSV datetime string (YYYY-MM-DD HH:mm:ss) to date object.""" if not date_str: return None try: return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").date() except ValueError: return None def format_datetime(date_str): """Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss.""" if not date_str: return "" try: dt = datetime.strptime(date_str, "%Y%m%d%H%M%S") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: return date_str def read_existing_csv(csv_path): """ Reads existing CSV file and returns: - existing_data: list of rows (excluding header) - min_date: oldest date in the file - max_date: newest date in the file """ if not os.path.exists(csv_path): return [], None, None existing_data = [] min_date = None max_date = None try: with open(csv_path, "r", encoding="utf-8", newline="") as f: reader = csv.reader(f) header = next(reader, None) # Skip header if not header: return [], None, None for row in reader: if len(row) >= 2: existing_data.append(row) # Parse date from row (index 1 is 发布时间) row_date = parse_csv_datetime(row[1]) if row_date: if min_date is None or row_date < min_date: min_date = row_date if max_date is None or row_date > max_date: max_date = row_date except Exception as e: print(f"Warning: Error reading existing CSV: {e}", file=sys.stderr) return [], None, None return existing_data, min_date, max_date def construct_detail_url(item): """Constructs the detail page URL based on item data. URL format derived from useJump-b2a96f17.js: https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=... Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType """ edition = item.get("edition", "v3") trading_type = item.get("noticeSecondType", "A") base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}" params = { "noticeId": item.get("noticeId", ""), "projectCode": item.get("projectCode", ""), "bizCode": item.get("tradingProcess", item.get("bizCode", "")), "siteCode": item.get("regionCode", item.get("siteCode", "")), "publishDate": item.get("publishDate", ""), "source": item.get("pubServicePlat", ""), "titleDetails": item.get("noticeSecondTypeDesc", ""), "classify": item.get("projectType", "") } params = {k: v for k, v in params.items() if v} query = urllib.parse.urlencode(params) return f"{base_url}?{query}" def build_search_payload(page_num=1, page_size=10): """Build the search API payload.""" return { "pageNo": page_num, "pageSize": page_size, "keyword": "", "siteCode": "44", "secondType": "", "tradingProcess": "", "thirdType": "[]", "projectType": "", "publishStartTime": "", "publishEndTime": "", "type": "trading-type" } def process_items(items, start_date, end_date): """Process a batch of items and filter by date and keyword.""" page_results = [] stop_signal = False min_date_on_page = None for item in items: title = item.get("noticeTitle", "") pub_date_str = item.get("publishDate", "") item_date = parse_api_date(pub_date_str) if item_date: if min_date_on_page is None or item_date < min_date_on_page: min_date_on_page = item_date if item_date > end_date: continue if item_date < start_date: continue if "中标结果" in title: page_results.append({ "项目标题": title, "发布时间": format_datetime(pub_date_str), "详细链接": construct_detail_url(item) }) # Only stop if all items on this page are older than start_date # and there are no matching results if min_date_on_page and min_date_on_page < start_date and not page_results: # Check if the newest item is also older than start_date max_date_on_page = None for item in items: item_date = parse_api_date(item.get("publishDate", "")) if item_date: if max_date_on_page is None or item_date > max_date_on_page: max_date_on_page = item_date if max_date_on_page and max_date_on_page < start_date: stop_signal = True return page_results, stop_signal async def fetch_page(session, page_num, page_size=10): """Fetch a single page of data from the API.""" url = f"{API_BASE_URL}/search/v2/items" payload = build_search_payload(page_num, page_size) headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Accept": "application/json, text/plain, */*", "Origin": "https://ygp.gdzwfw.gov.cn", "Referer": "https://ygp.gdzwfw.gov.cn/" } try: async with session.post(url, json=payload, headers=headers) as response: if response.status != 200: print(f"API Error: {response.status}", file=sys.stderr) return None return await response.json() except Exception as e: print(f"Error fetching page {page_num}: {e}", file=sys.stderr) return None def deduplicate_results(new_results, existing_data): """Remove duplicates from new_results based on existing_data.""" # Create a set of existing URLs for fast lookup existing_urls = set() for row in existing_data: if len(row) >= 3: existing_urls.add(row[2]) # 详细链接 is the 3rd column # Filter out duplicates unique_results = [] for r in new_results: if r["详细链接"] not in existing_urls: unique_results.append(r) return unique_results async def run(): args = parse_args() today = date.today() start_date = today end_date = today # Read existing CSV if in incremental mode existing_data = [] csv_min_date = None csv_max_date = None if args.incremental and os.path.exists(args.output): existing_data, csv_min_date, csv_max_date = read_existing_csv(args.output) if csv_min_date and csv_max_date: print(f"Existing data range: {csv_min_date} to {csv_max_date}", file=sys.stderr) print(f"Existing records: {len(existing_data)}", file=sys.stderr) # Determine date range if args.start_date: try: start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr) sys.exit(1) elif args.incremental and csv_max_date: # In incremental mode without explicit start_date, fetch from max_date+1 to today start_date = csv_max_date + timedelta(days=1) if args.end_date: try: end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid end date format: {args.end_date}", file=sys.stderr) sys.exit(1) if start_date > end_date: print(f"Info: Start date {start_date} is after end date {end_date}, no new data to fetch.", file=sys.stderr) print(f"Existing records: {len(existing_data)}", file=sys.stderr) sys.exit(0) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"Output file: {args.output}", file=sys.stderr) if args.incremental: print(f"Incremental mode: ON", file=sys.stderr) # Collect all new results first new_results = [] async with aiohttp.ClientSession() as session: page_num = 1 while True: print(f"Processing page {page_num}...", file=sys.stderr) await delay(500) resp = await fetch_page(session, page_num) if resp is None: print("Failed to fetch data. Stopping.", file=sys.stderr) break data = resp.get("data", {}) items = data.get("pageData", []) if not items: print("No more items.", file=sys.stderr) break results, stop = process_items(items, start_date, end_date) new_results.extend(results) # Print to console immediately for r in results: print(json.dumps(r, ensure_ascii=False)) sys.stdout.flush() if stop: print("Date range satisfied. Stopping.", file=sys.stderr) break pages = data.get("pageTotal", 0) if page_num >= pages: print("Reached last page.", file=sys.stderr) break page_num += 1 await delay(1000) print(f"\nNew results fetched: {len(new_results)}", file=sys.stderr) # Deduplicate if in incremental mode if args.incremental and existing_data: new_results = deduplicate_results(new_results, existing_data) print(f"After deduplication: {len(new_results)}", file=sys.stderr) # Write to CSV: new data first, then existing data with open(args.output, "w", newline="", encoding="utf-8") as csv_file: csv_writer = csv.writer(csv_file) csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) # Write new results first (newer data) for r in new_results: csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]]) # Write existing data (older data) for row in existing_data: csv_writer.writerow(row[:3]) # Only write first 3 columns total_records = len(new_results) + len(existing_data) print(f"Total records in CSV: {total_records}", file=sys.stderr) if __name__ == "__main__": asyncio.run(run())