qpandalite.task.ibm.task 源代码

"""IBM Quantum backend for task submission and querying via Qiskit.

Submits quantum circuits (OriginIR -> Qiskit QuantumCircuit) to IBM Quantum
devices using the ``qiskit`` and ``qiskit_ibm_provider`` packages.
No raw REST calls — all communication goes through the qiskit SDK.

Configuration is loaded from environment variables (preferred) or from
``ibm_online_config.json`` (deprecated fallback):

    IBM_TOKEN: IBM Quantum API token

Public API:
    - submit_task      — Submit circuit(s) for execution on IBM Quantum.
    - query_by_taskid  — Query task status by job ID.
    - query_by_taskid_sync — Blocking query with polling.
    - query_all_tasks  — Query all locally recorded tasks.
"""

from __future__ import annotations

__all__ = [
    "submit_task",
    "query_by_taskid",
    "query_by_taskid_sync",
    "query_all_tasks",
    "query_all_task",
]

import time
import warnings
from pathlib import Path
from typing import List, Union

import qiskit
import qiskit_ibm_provider

from qpandalite.task.adapters import QiskitAdapter
from qpandalite.task.config import load_ibm_config
from qpandalite.task.task_utils import (
    load_all_online_info,
    make_savepath,
    write_taskinfo,
)

# ---------------------------------------------------------------------------
# Provider initialization (lazy via _get_adapter)
# ---------------------------------------------------------------------------

_adapter: QiskitAdapter | None = None


def _get_adapter() -> QiskitAdapter:
    """Lazily create and cache the QiskitAdapter instance."""
    global _adapter
    if _adapter is None:
        _adapter = QiskitAdapter()
    return _adapter


# ---------------------------------------------------------------------------
# Task query
# ---------------------------------------------------------------------------


def query_by_taskid_single(taskid: str) -> dict:
    """Query a single IBM Quantum job's status (non-blocking)."""
    if not taskid:
        raise ValueError("Task id ??")
    adapter = _get_adapter()
    return adapter.query(taskid)


[文档] def query_by_taskid( taskid: Union[List[str], str], ) -> dict: """Query task status by job ID (non-blocking). Supports a single job ID or a list. Results are aggregated; overall status reflects the worst case. """ if not taskid: raise ValueError("Task id ??") adapter = _get_adapter() if isinstance(taskid, list): taskinfo: dict = {"status": "success", "result": []} for taskid_i in taskid: taskinfo_i = adapter.query(taskid_i) if taskinfo_i["status"] in ("ERROR", "CANCELLED"): taskinfo["status"] = "failed" break elif taskinfo_i["status"] in ( "INITIALIZING", "QUEUED", "VALIDATING", "RUNNING", ): taskinfo["status"] = "running" if taskinfo_i["status"] == "success": if taskinfo["status"] == "success": taskinfo["result"].extend(taskinfo_i.get("result", [])) elif isinstance(taskid, str): taskinfo = adapter.query(taskid) else: raise ValueError("Invalid Taskid") return taskinfo
[文档] def query_by_taskid_sync( taskid: Union[List[str], str], interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list: """Query task status by job ID (blocking) until completion or timeout.""" adapter = _get_adapter() starttime = time.time() while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reach the maximum timeout.") time.sleep(interval) taskinfo = query_by_taskid(taskid) if taskinfo["status"] == "running": continue if taskinfo["status"] == "success": return taskinfo.get("result", []) if taskinfo["status"] == "failed": raise RuntimeError( f"Failed to execute, errorinfo = {taskinfo.get('result')}" ) if retry > 0: retry -= 1 print(f"Query failed. Retry remains {retry} times.") else: print("Retry count exhausted.") raise RuntimeError("Retry count exhausted.")
# --------------------------------------------------------------------------- # Task submission # --------------------------------------------------------------------------- def _translate_circuit(originir: str) -> qiskit.QuantumCircuit: """Convert an OriginIR string to a Qiskit QuantumCircuit.""" from qpandalite.circuit_builder.qcircuit import Circuit circuit = Circuit() circuit.load_originir(originir) return qiskit.QuantumCircuit.from_qasm_str(circuit.qasm)
[文档] def submit_task( circuit, task_name: str | None = None, tasktype=None, chip_id: str | None = None, shots: int = 1000, circuit_optimize: bool = True, measurement_amend: bool = False, auto_mapping: bool = False, specified_block=None, savepath=None, **kwargs, ) -> str: """Submit one or more quantum circuits for execution on IBM Quantum. Accepts OriginIR strings, Qiskit QuantumCircuit objects, or QASM strings. All circuit types are translated to Qiskit QuantumCircuit internally. Args: circuit: OriginIR string, Qiskit QuantumCircuit, QASM string, or list. task_name: Human-readable task name. tasktype: Reserved (unused). chip_id: IBM backend name. shots: Number of measurement shots. circuit_optimize: Enable transpiler optimization (level 3). measurement_amend: Reserved (unused). auto_mapping: Qubit mapping strategy (True=sabre, list=explicit, False=default). specified_block: Reserved (unused). savepath: Directory for local task records. Returns: str: The IBM Quantum job ID. """ import json adapter = _get_adapter() savepath = Path(savepath) if savepath else Path.cwd() / "online_info" # Normalise input to list of Qiskit QuantumCircuit new_circuit: list[qiskit.QuantumCircuit] = [] if isinstance(circuit, list): for c in circuit: if isinstance(c, str): new_circuit.append(_translate_circuit(c)) elif isinstance(c, qiskit.circuit.QuantumCircuit): new_circuit.append(c) else: raise TypeError( "Each circuit must be an OriginIR string, " "Qiskit QuantumCircuit, or QASM string." ) elif isinstance(circuit, str): new_circuit.append(_translate_circuit(circuit)) elif isinstance(circuit, qiskit.circuit.QuantumCircuit): new_circuit.append(circuit) else: raise TypeError( "Circuit must be an OriginIR string, " "Qiskit QuantumCircuit, QASM string, or list of these." ) if not all(isinstance(i, qiskit.circuit.QuantumCircuit) for i in new_circuit): raise TypeError( "All circuits must be Qiskit QuantumCircuit type or valid QASM." ) job_id = adapter.submit_batch( circuits=new_circuit, shots=shots, chip_id=chip_id, circuit_optimize=circuit_optimize, auto_mapping=auto_mapping, task_name=task_name, ) ret = {"taskid": job_id, "taskname": task_name} if savepath: make_savepath(savepath) with open(savepath / "online_info.txt", "a") as fp: fp.write(json.dumps(ret) + "\n") return job_id
[文档] def query_all_tasks(savepath=None) -> tuple[int, int]: """Query all locally recorded IBM Quantum tasks and cache results.""" savepath = Path(savepath) if savepath else Path.cwd() / "online_info" online_info = load_all_online_info(savepath) task_count = len(online_info) finished = 0 for task in online_info: taskid = task["taskid"] if not (savepath / f"{taskid}.txt").exists(): ret = query_by_taskid(taskid).copy() write_taskinfo(taskid, ret, savepath=savepath) finished += 1 else: finished += 1 return finished, task_count
[文档] def query_all_task(savepath=None) -> tuple[int, int]: """Deprecated — use :func:`query_all_tasks` instead.""" warnings.warn(DeprecationWarning("Use query_all_tasks instead")) return query_all_tasks(savepath)