qpandalite.task.quafu.task 源代码

"""BAQIS Quafu (ScQ) quantum cloud platform backend.

All HTTP communication is routed through ``QuafuAdapter`` in the
``adapters`` layer — the raw ``requests.post`` to the Quafu URL has been
removed from this module.

Configuration is loaded from environment variables (preferred) or from
``quafu_online_config.json`` (deprecated fallback):

    QUAFU_API_TOKEN: API token for Quafu

Public API:
    - submit_task           — Submit circuit(s) for execution on Quafu.
    - query_by_taskid       — Asynchronously query task status by task ID.
    - query_by_taskid_sync  — Synchronously query task status (blocking).
    - query_task_by_group   — Retrieve all tasks in a named group.
    - query_all_tasks       — Query all locally recorded tasks.
"""

from __future__ import annotations

__all__ = [
    "Translation_OriginIR_to_QuafuCircuit",
    "submit_task",
    "query_by_taskid",
    "query_by_taskid_sync",
    "query_task_by_group",
    "query_task_by_group_sync",
    "query_all_tasks",
    "query_all_task",
]

import json
import os
import time
import warnings
from pathlib import Path
from typing import Any, Union

from qpandalite.task.adapters import QuafuAdapter
from qpandalite.task.task_utils import load_all_online_info, write_taskinfo

# Module-level singleton adapter
_adapter: QuafuAdapter | None = None


def _get_adapter() -> QuafuAdapter:
    """Lazily create and cache the QuafuAdapter instance."""
    global _adapter
    if _adapter is None:
        _adapter = QuafuAdapter()
    return _adapter


# Re-export the translation class for backwards compatibility
[文档] class Translation_OriginIR_to_QuafuCircuit: """Translate OriginIR circuits to Quafu QuantumCircuit objects. Deprecated: use ``adapter.translate_circuit(originir)`` instead. """
[文档] @staticmethod def reconstruct_qasm( qc, operation: str | None, qubit: int | list[int], cbit: int | None, parameter: float | list[float] | None, ): adapter = _get_adapter() return adapter._reconstruct_qasm(qc, operation, qubit, cbit, parameter)
[文档] @staticmethod def translate(originir: str): adapter = _get_adapter() return adapter.translate_circuit(originir)
# --------------------------------------------------------------------------- # Task submission # ---------------------------------------------------------------------------
[文档] def submit_task( circuit: str | list[str] | None = None, task_name: str | None = None, chip_id: str | None = None, shots: int = 10000, auto_mapping: bool = True, savepath: Path | str | None = None, group_name: str | None = None, **kwargs, ) -> str | list[str]: """Submit one or more quantum circuits for execution on the Quafu platform. Args: circuit: OriginIR circuit string or list of strings. task_name: Human-readable task name. chip_id: Target chip ID (e.g. ``'ScQ-P10'``). shots: Number of measurement shots. auto_mapping: Whether to enable automatic qubit mapping / compilation. savepath: Directory for local task records. group_name: Optional group name for batch submissions. Returns: str or list[str]: Task ID(s) for the submitted circuit(s). """ adapter = _get_adapter() savepath = Path(savepath) if savepath else Path.cwd() / "quafu_online_info" if isinstance(circuit, str): qc = adapter.translate_circuit(circuit) taskid = adapter.submit( circuit=qc, shots=shots, chip_id=chip_id, auto_mapping=auto_mapping, task_name=task_name, ) if savepath: _ensure_savepath(savepath) task_info: dict[str, Any] = { "taskid": taskid, "taskname": task_name, "backend": chip_id, } with open(savepath / "online_info.txt", "a", encoding="utf-8") as fp: fp.write(json.dumps(task_info) + "\n") elif isinstance(circuit, list): circuits_qc = [adapter.translate_circuit(c) for c in circuit] taskids = adapter.submit_batch( circuits=circuits_qc, shots=shots, chip_id=chip_id, auto_mapping=auto_mapping, task_name=task_name, group_name=group_name, ) if savepath: _ensure_savepath(savepath) all_task_info: list[dict[str, Any]] = [] for task_id in taskids: task_info = { "groupname": group_name, "taskid": task_id, "taskname": task_name, "backend": chip_id, } all_task_info.append(task_info) with open(savepath / "online_info.txt", "a", encoding="utf-8") as fp: for task_info in all_task_info: fp.write(json.dumps(task_info) + "\n") else: raise ValueError("Input must be a valid originir string or list of strings.") return taskid if isinstance(circuit, str) else taskids
def _ensure_savepath(savepath: Path) -> None: if not os.path.exists(savepath): os.makedirs(savepath) if not (savepath / "online_info.txt").exists(): (savepath / "online_info.txt").touch() # --------------------------------------------------------------------------- # Task query # --------------------------------------------------------------------------- def query_by_taskid_single( taskid: str, savepath: Path | str ) -> str | dict[str, Any]: """Query a single task's status from the Quafu platform. Args: taskid (str): The unique task identifier. savepath (Path | str): Directory path for saving task results. Returns: str | dict[str, Any]: One of the following: - "Running": Task is still executing. - "Failed": Task execution failed. - dict: Task result dictionary when completed successfully. Note: Results are cached locally to avoid redundant API calls. """ adapter = _get_adapter() result = adapter.query(taskid) savepath = Path(savepath) if result["status"] in ("Running",): return "Running" if result["status"] in ("Failed",): return "Failed" if not os.path.exists(savepath / f"{taskid}.txt"): write_taskinfo(taskid, result, savepath) return result
[文档] def query_by_taskid( taskid: str | list[str], savepath: Path | str | None = None, ) -> dict[str, Any]: """Query task status by task ID (non-blocking). Args: taskid (str | list[str]): Single task ID or list of task IDs to query. savepath (Path | str | None, optional): Directory path for saving task results locally. Defaults to ./quafu_online_info/. Returns: dict[str, Any]: Task status dictionary containing: - "status": "success", "running", or "failed" - "result": Task result data when status is "success", or error information when status is "failed" Raises: ValueError: If taskid is empty or has invalid type. """ adapter = _get_adapter() savepath = Path(savepath) if savepath else Path.cwd() / "quafu_online_info" if not taskid: raise ValueError("Task id ??") if isinstance(taskid, list): taskinfo: dict[str, Any] = {"status": "success", "result": []} for taskid_i in taskid: taskinfo_i = query_by_taskid_single(taskid_i, savepath) if taskinfo_i == "Failed": taskinfo["status"] = "failed" break elif taskinfo_i == "Running": taskinfo["status"] = "running" if taskinfo["status"] == "success": taskinfo["result"].append(taskinfo_i) elif isinstance(taskid, str): taskinfo = query_by_taskid_single(taskid, savepath) else: raise ValueError("Invalid Taskid") return taskinfo
[文档] def query_by_taskid_sync( taskid: str | list[str], interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list[dict[str, Any]]: """Query task status by task ID (blocking) until completion or timeout. Continuously polls the Quafu platform until the task completes, fails, or the timeout is reached. Args: taskid (str | list[str]): Single task ID or list of task IDs to query. interval (float, optional): Polling interval in seconds. Defaults to 2.0. timeout (float, optional): Maximum wait time in seconds. Defaults to 60.0. retry (int, optional): Number of retry attempts on query failure. Defaults to 5. Raises: TimeoutError: If the task does not complete within the specified timeout. RuntimeError: If the task fails or all retry attempts are exhausted. Returns: list[dict[str, Any]]: List of task result dictionaries when all tasks succeed. """ adapter = _get_adapter() starttime = time.time() while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reach the maximum timeout.") time.sleep(interval) taskinfo = query_by_taskid(taskid) if taskinfo["status"] == "running": continue if taskinfo["status"] == "success": return taskinfo.get("result", []) if taskinfo["status"] == "failed": raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}") if retry > 0: retry -= 1 print(f"Query failed. Retry remains {retry} times.") else: print("Retry count exhausted.") raise RuntimeError("Retry count exhausted.")
[文档] def query_task_by_group( group_name: str, history: dict[str, list[str]] | None = None, verbose: bool = True, savepath: Path | str | None = None, ) -> list: """Retrieve all tasks belonging to a named Quafu group. Queries the Quafu platform for all tasks associated with the specified group name and caches the results locally. Args: group_name (str): The name of the task group to query. history (dict[str, list[str]] | None, optional): Cache of previously queried group tasks. If None, loads from local savepath. verbose (bool, optional): Whether to print detailed output. Defaults to True. savepath (Path | str | None, optional): Directory for saving results. Defaults to current working directory / "quafu_online_info". Raises: ValueError: If group_name is empty or not a string. Returns: list: List of task result objects from the Quafu platform. """ if not group_name: raise ValueError("Task id ??") if not isinstance(group_name, str): raise ValueError("Invalid group name") if savepath is None: savepath = Path.cwd() / "quafu_online_info" savepath = Path(savepath) if history is None: online_info = load_all_online_info(savepath) history = {} for task in online_info: if "groupname" in task: group = task["groupname"] if task["groupname"] not in history: history[group] = [task["taskid"]] else: history[group].append(task["taskid"]) from quafu import Task, User config = _get_adapter().api_token user = User(api_token=config) user.save_apitoken() task = Task() group_result = task.retrieve_group(group_name, history, verbose) for result in group_result: result_dict = dict(result.__dict__) del result_dict["transpiled_circuit"] write_taskinfo(result.taskid, result_dict, savepath=savepath) return group_result
[文档] def query_task_by_group_sync( group_name: str, verbose: bool = True, savepath: Path | str | None = None, interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list: """Blocking query for all tasks in a named Quafu group. Polls the Quafu platform until all tasks in the specified group have reached a terminal state (completed or failed), or until timeout. Args: group_name (str): The name of the task group to query. verbose (bool, optional): Whether to print detailed output. Defaults to True. savepath (Path | str | None, optional): Directory for saving results. Defaults to current working directory / "quafu_online_info". interval (float, optional): Polling interval in seconds. Defaults to 2.0. timeout (float, optional): Maximum wait time in seconds. Defaults to 60.0. retry (int, optional): Number of retry attempts on query failure. Defaults to 5. Raises: TimeoutError: If not all tasks complete within the specified timeout. Returns: list: List of task result objects when all tasks are complete. """ if savepath is None: savepath = Path.cwd() / "quafu_online_info" starttime = time.time() online_info = load_all_online_info(Path(savepath)) history: dict[str, list[str]] = {} for task in online_info: if "groupname" in task: group = task["groupname"] if task["groupname"] not in history: history[group] = [task["taskid"]] else: history[group].append(task["taskid"]) while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reach the maximum timeout.") time.sleep(interval) group_taskinfo = query_task_by_group(group_name, history, verbose, savepath) status = [task.task_status for task in group_taskinfo] if len(status) != len(history[group_name]): continue else: return group_taskinfo
[文档] def query_all_tasks(savepath: Path | str | None = None) -> None: """Query all locally recorded Quafu tasks and cache results.""" if savepath is None: savepath = Path.cwd() / "quafu_online_info" online_info = load_all_online_info(savepath) for task in online_info: taskid = task["taskid"] if not os.path.exists(Path(savepath) / f"{taskid}.txt"): ret = query_by_taskid(taskid) if ret is None: continue elif ret == "Failed": write_taskinfo(taskid, {}, savepath=savepath) else: write_taskinfo(taskid, ret, savepath=savepath)
[文档] def query_all_task(savepath: Path | str | None = None) -> None: """Deprecated — use :func:`query_all_tasks` instead.""" warnings.warn(DeprecationWarning("Use query_all_tasks instead"), stacklevel=2) query_all_tasks(savepath)