import argparse import asyncio import csv import json import os import sys import urllib.parse from datetime import datetime, date, timedelta import aiohttp async def delay(ms: int): """Async delay in milliseconds.""" await asyncio.sleep(ms / 1000) API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal" def parse_args(): parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--output", "-o", default="results.csv", help="输出CSV文件路径 (默认: results.csv)") return parser.parse_args() def parse_api_date(date_str): """Parses date string from API (YYYYMMDDHHMMSS) to date object.""" if not date_str: return None try: return datetime.strptime(date_str, "%Y%m%d%H%M%S").date() except ValueError: return None def parse_csv_datetime(date_str): """Parses CSV datetime string (YYYY-MM-DD HH:mm:ss) to date object.""" if not date_str: return None try: return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").date() except ValueError: return None def format_datetime(date_str): """Formats API date string (YYYYMMDDHHMMSS) to YYYY-MM-DD HH:mm:ss.""" if not date_str: return "" try: dt = datetime.strptime(date_str, "%Y%m%d%H%M%S") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: return date_str def read_existing_csv(csv_path): """ Reads existing CSV file and returns: - existing_data: list of rows (excluding header) - min_date: oldest date in the file - max_date: newest date in the file """ if not os.path.exists(csv_path): return [], None, None existing_data = [] min_date = None max_date = None try: with open(csv_path, "r", encoding="utf-8-sig", newline="") as f: reader = csv.reader(f) header = next(reader, None) # Skip header if not header: return [], None, None for row in reader: if len(row) >= 2: existing_data.append(row) # Parse date from row (index 1 is 发布时间) row_date = parse_csv_datetime(row[1]) if row_date: if min_date is None or row_date < min_date: min_date = row_date if max_date is None or row_date > max_date: max_date = row_date except Exception as e: print(f"Warning: Error reading existing CSV: {e}", file=sys.stderr) return [], None, None return existing_data, min_date, max_date def construct_detail_url(item): """Constructs the detail page URL based on item data. URL format derived from useJump-b2a96f17.js: https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=... Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType """ edition = item.get("edition", "v3") trading_type = item.get("noticeSecondType", "A") base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}" params = { "noticeId": item.get("noticeId", ""), "projectCode": item.get("projectCode", ""), "bizCode": item.get("tradingProcess", item.get("bizCode", "")), "siteCode": item.get("regionCode", item.get("siteCode", "")), "publishDate": item.get("publishDate", ""), "source": item.get("pubServicePlat", ""), "titleDetails": item.get("noticeSecondTypeDesc", ""), "classify": item.get("projectType", "") } params = {k: v for k, v in params.items() if v} query = urllib.parse.urlencode(params) return f"{base_url}?{query}" def build_search_payload(page_num=1, publish_start_time="", publish_end_time=""): """Build the search API payload. Args: page_num: Page number publish_start_time: Start time in format YYYYMMDDHHMMSS publish_end_time: End time in format YYYYMMDDHHMMSS """ return { "pageNo": page_num, "pageSize": 50, # Fixed page size 50 "keyword": "", "siteCode": "44", "secondType": "", "tradingProcess": "513,2C52,3C52", # Fixed to search for 中标结果 "thirdType": "[]", "projectType": "", "publishStartTime": publish_start_time, # Format: YYYYMMDDHHMMSS "publishEndTime": publish_end_time, # Format: YYYYMMDDHHMMSS "type": "trading-type" } def process_items(items): """Process a batch of items - API already filters by tradingProcess and date range.""" page_results = [] for item in items: title = item.get("noticeTitle", "") pub_date_str = item.get("publishDate", "") page_results.append({ "项目标题": title, "发布时间": format_datetime(pub_date_str), "详细链接": construct_detail_url(item) }) return page_results async def fetch_page(session, page_num, publish_start_time="", publish_end_time=""): """Fetch a single page of data from the API.""" url = f"{API_BASE_URL}/search/v2/items" payload = build_search_payload(page_num, publish_start_time, publish_end_time) headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Accept": "application/json, text/plain, */*", "Origin": "https://ygp.gdzwfw.gov.cn", "Referer": "https://ygp.gdzwfw.gov.cn/" } try: async with session.post(url, json=payload, headers=headers) as response: if response.status != 200: print(f"API Error: {response.status}", file=sys.stderr) return None return await response.json() except Exception as e: print(f"Error fetching page {page_num}: {e}", file=sys.stderr) return None def deduplicate_results(new_results, existing_data): """Remove duplicates from new_results based on existing_data.""" # Create a set of existing URLs for fast lookup existing_urls = set() for row in existing_data: if len(row) >= 3: existing_urls.add(row[2]) # 详细链接 is the 3rd column # Filter out duplicates unique_results = [] for r in new_results: if r["详细链接"] not in existing_urls: unique_results.append(r) return unique_results def format_api_datetime(dt): """Format datetime to API format YYYYMMDDHHMMSS.""" return dt.strftime("%Y%m%d%H%M%S") async def run(): args = parse_args() today = date.today() # Default: last 3 months three_months_ago = today - timedelta(days=90) start_date = three_months_ago end_date = today # Always use incremental mode - read existing CSV if exists existing_data = [] csv_min_date = None csv_max_date = None if os.path.exists(args.output): existing_data, csv_min_date, csv_max_date = read_existing_csv(args.output) if csv_min_date and csv_max_date: print(f"Existing data range: {csv_min_date} to {csv_max_date}", file=sys.stderr) print(f"Existing records: {len(existing_data)}", file=sys.stderr) # Determine date range if args.start_date: try: start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr) sys.exit(1) elif csv_max_date: # Without explicit start_date, fetch from max_date+1 to today start_date = csv_max_date + timedelta(days=1) if args.end_date: try: end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid end date format: {args.end_date}", file=sys.stderr) sys.exit(1) if start_date > end_date: print(f"Info: Start date {start_date} is after end date {end_date}, no new data to fetch.", file=sys.stderr) print(f"Existing records: {len(existing_data)}", file=sys.stderr) sys.exit(0) # Format time for API: YYYYMMDDHHMMSS start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time().replace(microsecond=0)) publish_start_time = format_api_datetime(start_datetime) publish_end_time = format_api_datetime(end_datetime) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) print(f"API time range: {publish_start_time} to {publish_end_time}", file=sys.stderr) print(f"Output file: {args.output}", file=sys.stderr) # Collect all new results first new_results = [] async with aiohttp.ClientSession() as session: page_num = 1 while True: print(f"Processing page {page_num}...", file=sys.stderr) await delay(500) resp = await fetch_page(session, page_num, publish_start_time, publish_end_time) if resp is None: print("Failed to fetch data. Stopping.", file=sys.stderr) break data = resp.get("data", {}) items = data.get("pageData", []) if not items: print("No more items.", file=sys.stderr) break results = process_items(items) new_results.extend(results) # Print to console immediately for r in results: print(json.dumps(r, ensure_ascii=False)) sys.stdout.flush() pages = data.get("pageTotal", 0) if page_num >= pages: print("Reached last page.", file=sys.stderr) break page_num += 1 await delay(1000) print(f"\nNew results fetched: {len(new_results)}", file=sys.stderr) # Always deduplicate against existing data if existing_data: new_results = deduplicate_results(new_results, existing_data) print(f"After deduplication: {len(new_results)}", file=sys.stderr) # Write to CSV: new data first, then existing data # Use utf-8-sig to add BOM for Excel to auto-detect encoding and delimiter with open(args.output, "w", newline="", encoding="utf-8-sig") as csv_file: csv_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_MINIMAL) csv_writer.writerow(["项目标题", "发布时间", "详细链接"]) # Write new results first (newer data) for r in new_results: csv_writer.writerow([r["项目标题"], r["发布时间"], r["详细链接"]]) # Write existing data (older data) for row in existing_data: csv_writer.writerow(row[:3]) # Only write first 3 columns total_records = len(new_results) + len(existing_data) print(f"Total records in CSV: {total_records}", file=sys.stderr) if __name__ == "__main__": asyncio.run(run())