import argparse import asyncio import json import sys import urllib.parse from datetime import datetime, date import aiohttp async def delay(ms: int): """Async delay in milliseconds.""" await asyncio.sleep(ms / 1000) API_BASE_URL = "https://ygp.gdzwfw.gov.cn/ggzy-portal" def parse_args(): parser = argparse.ArgumentParser(description="广东省公共资源交易平台数据抓取") parser.add_argument("--start-date", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end-date", help="结束日期 (YYYY-MM-DD)") return parser.parse_args() def parse_api_date(date_str): """Parses date string from API (YYYYMMDDHHMMSS) to date object.""" if not date_str: return None try: return datetime.strptime(date_str, "%Y%m%d%H%M%S").date() except ValueError: return None def construct_detail_url(item): """Constructs the detail page URL based on item data. URL format derived from useJump-b2a96f17.js: https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{tradingType}?noticeId=... Route config: /new/jygg/:edition(v1|v2|v3)/:tradingType """ edition = item.get("edition", "v3") trading_type = item.get("noticeSecondType", "A") base_url = f"https://ygp.gdzwfw.gov.cn/#/44/new/jygg/{edition}/{trading_type}" params = { "noticeId": item.get("noticeId", ""), "projectCode": item.get("projectCode", ""), "bizCode": item.get("tradingProcess", item.get("bizCode", "")), "siteCode": item.get("regionCode", item.get("siteCode", "")), "publishDate": item.get("publishDate", ""), "source": item.get("pubServicePlat", ""), "titleDetails": item.get("noticeSecondTypeDesc", ""), "classify": item.get("projectType", "") } params = {k: v for k, v in params.items() if v} query = urllib.parse.urlencode(params) return f"{base_url}?{query}" def build_search_payload(page_num=1, page_size=10): """Build the search API payload. Based on analysis of the frontend code (JyggFilter component). """ return { "pageNo": page_num, "pageSize": page_size, "keyword": "", "siteCode": "44", "secondType": "", "tradingProcess": "", "thirdType": "[]", "projectType": "", "publishStartTime": "", "publishEndTime": "", "type": "trading-type" } def process_items(items, start_date, end_date): """Process a batch of items and filter by date and keyword.""" page_results = [] stop_signal = False min_date_on_page = None for item in items: title = item.get("noticeTitle", "") pub_date_str = item.get("publishDate", "") item_date = parse_api_date(pub_date_str) if item_date: if min_date_on_page is None or item_date < min_date_on_page: min_date_on_page = item_date if item_date > end_date: continue if item_date < start_date: continue if "中标结果" in title: page_results.append({ "项目标题": title, "发布时间": pub_date_str, "详细链接": construct_detail_url(item) }) # Only stop if all items on this page are older than start_date # and there are no matching results if min_date_on_page and min_date_on_page < start_date and not page_results: # Check if the newest item is also older than start_date max_date_on_page = None for item in items: item_date = parse_api_date(item.get("publishDate", "")) if item_date: if max_date_on_page is None or item_date > max_date_on_page: max_date_on_page = item_date if max_date_on_page and max_date_on_page < start_date: stop_signal = True return page_results, stop_signal async def fetch_page(session, page_num, page_size=10): """Fetch a single page of data from the API.""" url = f"{API_BASE_URL}/search/v2/items" payload = build_search_payload(page_num, page_size) headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Accept": "application/json, text/plain, */*", "Origin": "https://ygp.gdzwfw.gov.cn", "Referer": "https://ygp.gdzwfw.gov.cn/" } try: async with session.post(url, json=payload, headers=headers) as response: if response.status != 200: print(f"API Error: {response.status}", file=sys.stderr) return None return await response.json() except Exception as e: print(f"Error fetching page {page_num}: {e}", file=sys.stderr) return None async def run(): args = parse_args() today = date.today() start_date = today end_date = today if args.start_date: try: start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid start date format: {args.start_date}", file=sys.stderr) sys.exit(1) if args.end_date: try: end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date() except ValueError: print(f"Error: Invalid end date format: {args.end_date}", file=sys.stderr) sys.exit(1) if start_date > end_date: print(f"Error: Start date {start_date} is after end date {end_date}", file=sys.stderr) sys.exit(1) print(f"Crawling range: {start_date} to {end_date}", file=sys.stderr) async with aiohttp.ClientSession() as session: page_num = 1 while True: print(f"Processing page {page_num}...", file=sys.stderr) await delay(500) # Initial delay before first request too resp = await fetch_page(session, page_num) if resp is None: print("Failed to fetch data. Stopping.", file=sys.stderr) break # API returns {errcode, errmsg, data} data = resp.get("data", {}) items = data.get("pageData", []) if not items: print("No more items.", file=sys.stderr) break results, stop = process_items(items, start_date, end_date) for r in results: print(json.dumps(r, ensure_ascii=False)) if stop: print("Date range satisfied. Stopping.", file=sys.stderr) break # Check if we've reached the last page # API returns pageTotal, not pages total = int(data.get("total", 0)) pages = data.get("pageTotal", 0) if page_num >= pages: print("Reached last page.", file=sys.stderr) break page_num += 1 await delay(1000) # 1s delay between requests if __name__ == "__main__": asyncio.run(run())