qpandalite.task.adapters.quafu_adapter 源代码

"""Quafu backend adapter.

Translates OriginIR circuits to Quafu QuantumCircuit objects and submits
via the ``quafu`` package (User / Task API).  No raw REST calls.

Installation:
    pip install qpandalite[quafu]
"""

from __future__ import annotations

__all__ = ["QuafuAdapter"]

from typing import TYPE_CHECKING, Any

from qpandalite.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    QuantumAdapter,
)
from qpandalite.task.config import load_quafu_config
from qpandalite.task.optional_deps import MissingDependencyError, check_quafu

if TYPE_CHECKING:
    import quafu


[文档] class QuafuAdapter(QuantumAdapter): """Adapter for the BAQIS Quafu (ScQ) quantum cloud platform. Raises: MissingDependencyError: If quafu package is not installed. """ name = "quafu" # Valid chip IDs VALID_CHIP_IDS = frozenset( {"ScQ-P10", "ScQ-P18", "ScQ-P136", "ScQ-P10C", "Dongling"} ) # Upper limit on the number of groups retained in _task_history. # Beyond this threshold the oldest entry is evicted to avoid unbounded # memory growth in long-running processes. _MAX_HISTORY_GROUPS: int = 100 @property def api_token(self) -> str: """Return the API token used for Quafu authentication. Returns: str: The Quafu API token. """ return self._api_token def __init__(self) -> None: # Check if quafu is available if not check_quafu(): raise MissingDependencyError("quafu", "quafu") config = load_quafu_config() self._api_token: str = config["api_token"] # Internal task history: group_name -> {taskid: task_index} # Updated on each submit_batch call so retrieve() can work without # requiring the caller to pass history. self._task_history: dict[str, dict[str, int]] = {} # Track insertion order so we can evict the oldest group when the # cap is reached (simple FIFO). self._history_order: list[str] = [] from quafu import QuantumCircuit, Task, User self._QuantumCircuit = QuantumCircuit self._Task = Task self._User = User
[文档] def is_available(self) -> bool: """Check if the Quafu adapter is available (quafu package installed). Returns: bool: True if the quafu package was successfully imported. """ return check_quafu()
# ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[文档] def translate_circuit(self, originir: str) -> "QuantumCircuit": """Translate an OriginIR string to a Quafu QuantumCircuit.""" from qpandalite.originir.originir_line_parser import OriginIR_LineParser lines = originir.splitlines() qc: "QuantumCircuit | None" = None for line in lines: try: operation, qubit, cbit, parameter, dagger_flag, control_qubits = OriginIR_LineParser.parse_line(line) except NotImplementedError: raise RuntimeError( f"Unknown OriginIR operation in quafu adapter: {line.strip()}" ) from None if operation == "QINIT": qc = self._QuantumCircuit(int(qubit)) # type: ignore[arg-type] continue if qc is None: raise RuntimeError("QINIT must appear before any gate operation.") qc = self._reconstruct_qasm(qc, operation, qubit, cbit, parameter) if qc is None: raise RuntimeError("OriginIR string produced no circuit.") return qc
def _reconstruct_qasm( self, qc: "QuantumCircuit", operation: str | None, qubit: int | list[int], cbit: int | None, parameter: float | list[float] | None, ) -> "QuantumCircuit": """Append a single gate to a Quafu QuantumCircuit based on parsed OriginIR. This method is called internally by translate_circuit() for each line of OriginIR after QINIT. It maps OriginIR gate names to Quafu QuantumCircuit method calls. Args: qc: The Quafu QuantumCircuit to modify (modified in-place). operation: The gate operation name (e.g., 'RX', 'H', 'CNOT'). qubit: Target qubit index or list of indices for multi-qubit gates. cbit: Classical bit index for MEASURE operations. parameter: Rotation angle for parametric gates (e.g., RX, RY, RZ). Returns: The modified QuantumCircuit (same object as input). Raises: RuntimeError: If the operation is not supported by this adapter. Note: Supported gates: RX, RY, RZ, H, X, CZ, CNOT, MEASURE. CREG and None operations are silently ignored. """ if operation == "RX": qc.rx(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RY": qc.ry(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RZ": qc.rz(int(qubit), parameter) # type: ignore[arg-type] elif operation == "H": qc.h(int(qubit)) # type: ignore[arg-type] elif operation == "X": qc.x(int(qubit)) # type: ignore[arg-type] elif operation == "CZ": qc.cz(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "CNOT": qc.cnot(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "MEASURE": qc.measure([int(qubit)], [int(cbit)]) # type: ignore[list-item] elif operation is None or operation == "CREG": pass else: raise RuntimeError( f"Unknown OriginIR operation in quafu adapter: {operation}." ) return qc # ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[文档] def submit( self, circuit: "QuantumCircuit", *, shots: int = 10000, **kwargs: Any ) -> str: """Submit a single circuit to Quafu.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"['ScQ-P10','ScQ-P18','ScQ-P136', 'ScQ-P10C', 'Dongling']" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) result = task.send(circuit, wait=False, name=task_name) # type: ignore[arg-type] return result.taskid
[文档] def submit_batch( self, circuits: list["QuantumCircuit"], *, shots: int = 10000, **kwargs: Any ) -> list[str]: """Submit multiple circuits as a group to Quafu.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") group_name: str | None = kwargs.get("group_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"['ScQ-P10','ScQ-P18','ScQ-P136', 'ScQ-P10C', 'Dongling']" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) taskids: list[str] = [] for index, c in enumerate(circuits): result = task.send( c, wait=False, name=f"{task_name}-{index}", group=group_name # type: ignore[arg-type] ) taskids.append(result.taskid) # Maintain history so query() can retrieve without caller-supplied history. # Apply FIFO eviction when the cap is reached. if group_name: if group_name not in self._task_history: if len(self._history_order) >= self._MAX_HISTORY_GROUPS: oldest = self._history_order.pop(0) self._task_history.pop(oldest, None) self._task_history[group_name] = {} self._history_order.append(group_name) for i, taskid in enumerate(taskids): self._task_history[group_name][taskid] = i return taskids
# ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[文档] def query(self, taskid: str) -> dict[str, Any]: """Query a single Quafu task's status via SDK ``Task.retrieve()``. Uses the internally maintained history dict so the caller does not need to pass any additional context. """ user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() # Build a minimal history dict: try all known groups. # Task.retrieve(taskid, history) will look up the taskid in history. for group_name, id_to_idx in self._task_history.items(): if taskid in id_to_idx: result = task.retrieve(taskid, history={group_name: {taskid: id_to_idx[taskid]}}) return self._result_to_dict(result) # Fallback: try without history (may work if server accepts taskid alone) result = task.retrieve(taskid) return self._result_to_dict(result)
def _result_to_dict(self, result) -> dict[str, Any]: """Convert a Quafu ExecResult to the adapter's standard result dict. This method normalizes Quafu's task status strings and extracts measurement results when the task has completed successfully. Args: result: A quafu.ExecResult object from Task.retrieve(). Returns: dict with keys: - ``status``: ``'success'`` | ``'failed'`` | ``'running'`` - ``result``: dict with ``counts`` and ``probabilities`` (when success) Note: The Quafu status strings are mapped as follows: - 'Completed' -> 'success' - 'Running', 'In Queue' -> 'running' - 'Failed', 'Canceled' -> 'failed' """ status_map = { "Completed": TASK_STATUS_SUCCESS, "Running": TASK_STATUS_RUNNING, "In Queue": TASK_STATUS_RUNNING, "Failed": TASK_STATUS_FAILED, "Canceled": TASK_STATUS_FAILED, } status_str = result.task_status status = status_map.get(status_str, TASK_STATUS_RUNNING) if status == TASK_STATUS_SUCCESS: return { "status": status, "result": { "counts": result.counts, "probabilities": result.probabilities, }, } return {"status": status}
[文档] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple Quafu tasks and merge results.""" taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) if result_i["status"] == TASK_STATUS_FAILED: taskinfo["status"] = TASK_STATUS_FAILED break elif result_i["status"] == TASK_STATUS_RUNNING: taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: taskinfo["result"].append(result_i.get("result", {})) return taskinfo