qpandalite.task.originq_dummy.task 源代码

"""OriginQ dummy backend — local simulator that emulates the Origin Quantum Cloud API.

This module provides the same public interface as ``origin_qcloud`` but
executes circuits on a local simulator instead of a real quantum device.
It is primarily intended for development, testing, and debugging without
consuming real quantum computing resources.

Noise simulation is supported through ``ErrorLoader`` configurations passed
via ``**kwargs`` (``generic_error``, ``gatetype_error``,
``gate_specific_error``, ``readout_error``).

Requires the ``QPandaLiteCpp`` C++ extension for the underlying simulator.

Public API:
    - submit_task — Submit circuit(s) for simulated execution.
    - query_by_taskid — Retrieve simulated results by task ID.
    - query_by_taskid_sync — Blocking version of ``query_by_taskid``.
    - query_all_tasks — Query all locally recorded dummy tasks.
"""

import datetime
import time
from typing import List, Union
import warnings
from qpandalite.originir import OriginIR_LineParser, OriginIR_BaseParser
import qpandalite.simulator as sim
from qpandalite.simulator.error_model import ErrorLoader, ErrorLoader_GateSpecificError
from qpandalite.simulator.originir_simulator import OriginIR_NoisySimulator
try:
    from qpandalite.simulator.originir_simulator import OriginIR_Simulator
except ImportError as e:
    raise ImportError('You must install QPandaLiteCpp to enable the simulation.')

from pathlib import Path
import os
import random
import json
import hashlib
from json.decoder import JSONDecodeError

from ..task_utils import timestr, make_savepath, load_all_online_info, write_taskinfo
from ..config import load_dummy_config

try:
    _dummy_config = load_dummy_config()
except ImportError as e:
    warnings.warn(ImportWarning(f'originq_dummy config not available: {e}. '
                'Using empty defaults (all qubits allowed, no topology restriction).'))
    _dummy_config = {
        'available_qubits': [],
        'available_topology': [],
        'task_group_size': 200,
    }

available_qubits = _dummy_config['available_qubits']
available_topology = _dummy_config['available_topology']
default_task_group_size = _dummy_config['task_group_size']
    
[文档] class DummyCacheContainer: """In-memory and on-disk cache for dummy simulation results. Stores results keyed by task ID, optionally persisting them to a JSONL file for later retrieval across sessions. """ def __init__(self) -> None: """Initialize an empty cache container.""" self.cached_results = dict() self.dummy_path = None
[文档] def clear_dummy_cache(self): """Remove all cached results from memory.""" self.cached_results = dict()
[文档] def save_dummy_cache(self, extra_savepath): """Dump all cached results to a timestamped JSONL file. Args: extra_savepath (os.PathLike): Output directory. """ if not os.path.exists(extra_savepath): os.makedirs(extra_savepath) extra_savepath = Path(extra_savepath) with open(extra_savepath / f'{timestr()}-dummycache.txt', 'a'): for result in self.cached_results: fp.write(json.dumps(result) + '\n')
[文档] def write_dummy_cache(self, taskid, result_body): """Store a result body under the given task ID. Args: taskid (str): Unique task identifier. result_body (dict): Result payload to cache. Raises: ValueError: If *taskid* already exists in the cache. """ if taskid in self.cached_results: raise ValueError('Impossible to have same taskid in the same cache container.\n' f'taskid (duplicated taskid) = {taskid}\n' f'cached_result[taskid] (cached) = {self.cached_results[taskid]}\n' f'result_body (input) = {result_body}') self.cached_results[taskid] = result_body if self.dummy_path: with open(self.dummy_path / 'dummy_result.jsonl', 'a') as fp: fp.write(json.dumps(result_body) + '\n')
[文档] def load_dummy_cache(self, taskid): """Retrieve a cached result by task ID. Checks in-memory cache first, then falls back to the on-disk JSONL file (if ``dummy_path`` is set). Args: taskid (str): Task identifier to look up. Returns: dict or None: The cached result, or ``None`` if not found. """ if taskid in self.cached_results: return self.cached_results[taskid] if self.dummy_path: with open(self.dummy_path / 'dummy_result.jsonl', 'r') as fp: lines = fp.read().strip().splitlines() for line in lines[::-1]: result = json.loads(line) if result['taskid'] == taskid: return result
dummy_cache_container = DummyCacheContainer()
[文档] def set_dummy_path(dummy_path : os.PathLike): """Set the on-disk storage path for dummy simulation results. Args: dummy_path (os.PathLike): Directory for persisting dummy results. """ _create_dummy_cache(dummy_path) dummy_cache_container.dummy_path = Path(dummy_path)
[文档] def save_dummy_cache(extra_savepath): """Export the current in-memory dummy cache to a file. Args: extra_savepath (os.PathLike): Output directory. """ dummy_cache_container.save_dummy_cache(extra_savepath)
[文档] def clear_dummy(): """Clear all cached dummy simulation results from memory.""" dummy_cache_container.clear_dummy_cache()
def _create_dummy_cache(dummy_path = None): '''Create simulation storage for dummy simulation server. Args: dummy_path (str or Path, optional): Path for dummy storage. Defaults to None. ''' if dummy_path: if not os.path.exists(dummy_path): os.makedirs(dummy_path) if isinstance(dummy_path, str): dummy_path = Path(dummy_path) if not os.path.exists(dummy_path / 'dummy_result.jsonl'): with open(dummy_path / 'dummy_result.jsonl', 'a') as fp: pass def _write_dummy_cache(taskid, taskname, results): '''Write simulation results to dummy server. Args: taskid (str): Taskid. taskname (str): Task name. results (List[Dict[str, List[float]]]): A list of simulation results. Example: [{'key':['001', '101'], 'value':[100, 100]}] ''' result_body = { 'taskid' : taskid, 'taskname' : taskname, 'status' : 'success', 'result' : results } dummy_cache_container.write_dummy_cache(taskid, result_body) def _load_dummy_cache(taskid): '''Load simulation results by taskid. Args: taskid (str): Taskid. Returns: result (Dict): The result which emulates the results produced by query_by_taskid ''' result = dummy_cache_container.load_dummy_cache(taskid) if result is not None: return result raise ValueError(f'Taskid {taskid} is not found. This is impossible in dummy server mode.') def _random_taskid() -> str: '''Create a dummy taskid for every task. Returns: str: Taskid. ''' n = random.random() md5 = hashlib.md5() md5.update(f"{n:.14f}".encode('utf-8')) return md5.hexdigest() def _submit_task_group_dummy_impl( circuits, task_name, shots, auto_mapping, **kwargs ): """Simulate a group of circuits locally and cache the results. If the number of circuits exceeds ``default_task_group_size``, the group is split and processed recursively. Noise parameters are forwarded via ``**kwargs``: - ``generic_error`` — Depolarizing noise applied to all gates. - ``gatetype_error`` — Noise specific to gate types. - ``gate_specific_error`` — Noise specific to individual gates. - ``readout_error`` — Readout (measurement) error rates. Args: circuits (list[str]): OriginIR circuit strings to simulate. task_name (str): Name for the task group. shots (int): Number of shots (unused — probability output). auto_mapping (bool): Whether to use automatic qubit mapping. **kwargs: Noise configuration forwarded to ``ErrorLoader``. Returns: str or list[str]: Task ID(s) for the submitted group(s). """ if len(circuits) > default_task_group_size: # list of circuits groups = [] group = [] for circuit in circuits: if len(group) >= default_task_group_size: groups.append(group) group = [] group.append(circuit) if group: groups.append(group) # recursively call, and return a list of taskid return [_submit_task_group_dummy_impl(group, '{}_{}'.format(task_name, i), shots, auto_mapping, **kwargs) for i, group in enumerate(groups)] # generate taskid taskid = _random_taskid() results = [] generic_error = kwargs.get('generic_error', None) gatetype_error = kwargs.get('gatetype_error', None) gate_specific_error = kwargs.get('gate_specific_error', None) readout_error = kwargs.get('readout_error', []) for circuit in circuits: # If there is noise_description if generic_error or gatetype_error or gate_specific_error or readout_error: error_loader = ErrorLoader_GateSpecificError( generic_error = generic_error, gatetype_error = gatetype_error, gate_specific_error= gate_specific_error ) if auto_mapping: simulator = OriginIR_NoisySimulator( backend_type='density_matrix', error_loader=error_loader, readout_error=readout_error) else: simulator = OriginIR_NoisySimulator( backend_type='density_matrix', error_loader=error_loader, readout_error=readout_error, available_qubits=available_qubits, available_topology=available_topology) else: if auto_mapping: simulator = sim.OriginIR_Simulator() else: simulator = sim.OriginIR_Simulator(available_qubits=available_qubits, available_topology=available_topology) prob_result = simulator.simulate_pmeasure(circuit) n_qubits = simulator.qubit_num key = [] value = [] # get probs from probability list # Note: originq server will directly produce prob list instead of shots list. for i, meas_result in enumerate(prob_result): key.append(hex(i)) value.append(meas_result) results.append({'key':key, 'value': value}) # write cache, ready for loading results _write_dummy_cache(taskid, task_name, results) # print(results) return taskid
[文档] def submit_task( circuit, task_name = None, tasktype = None, # dummy parameter chip_id = None, # dummy parameter shots = 1000, # dummy parameter. Note: originq server will directly output prob instead of shots. circuit_optimize = True, # dummy parameter measurement_amend = False, # dummy parameter auto_mapping = False, specified_block = None, # dummy parameter savepath = Path.cwd() / 'online_info', url = None, # dummy parameter **kwargs ): ''' Submit circuit(s) for dummy (simulated) execution. Accepts a single OriginIR circuit string or a list thereof. The circuits are simulated locally using the built-in simulator and the results are cached for later retrieval via ``query_by_taskid``. Args: circuit (Union[str, List[str]]): One or more OriginIR circuit strings. task_name (str, optional): Human-readable task name. tasktype: Ignored (dummy parameter kept for API compatibility). chip_id: Ignored (dummy parameter). shots (int, optional): Number of shots. Ignored — the simulator returns probabilities directly. circuit_optimize (bool, optional): Ignored (dummy parameter). measurement_amend (bool, optional): Ignored (dummy parameter). auto_mapping (bool, optional): Whether to use automatic qubit mapping in the simulator. specified_block: Ignored (dummy parameter). savepath (os.PathLike, optional): Directory for local task records. url: Ignored (dummy parameter). **kwargs: Noise parameters forwarded to the simulator. Returns: str: The task ID assigned to this submission. Raises: ValueError: If *circuit* is not a ``str`` or ``list`` of ``str``. ''' if isinstance(circuit, list): for c in circuit: if not isinstance(c, str): raise ValueError('Input is not a valid circuit list (a.k.a List[str]).') taskid = _submit_task_group_dummy_impl( circuits = circuit, task_name = task_name, shots = shots, auto_mapping = auto_mapping, **kwargs ) elif isinstance(circuit, str): taskid = _submit_task_group_dummy_impl( circuits = [circuit], task_name = task_name, shots = shots, auto_mapping = auto_mapping, **kwargs ) else: raise ValueError('Input must be a str or List[str], where each str is a valid originir string.') ret = {'taskid': taskid, 'taskname': task_name} if savepath: make_savepath(savepath) with open(savepath / 'online_info.txt', 'a') as fp: fp.write(json.dumps(ret) + '\n') return taskid
[文档] def query_by_taskid(taskid : Union[List[str],str], url = None, # dummy parameter ): '''Query circuit status by taskid (Async). This function will return without waiting. Args: taskid (str): The taskid. Raises: ValueError: Taskid invalid. ValueError: URL invalid. RuntimeError: Error when querying. Returns: Dict[str, dict]: The status and the result status : success | failed | running result (when success): List[Dict[str,list]] result (when failed): {'errcode': str, 'errinfo': str} result (when running): N/A ''' if not taskid: raise ValueError('Task id ??') if isinstance(taskid, list): taskinfo = dict() taskinfo['status'] = 'success' taskinfo['result'] = [] for taskid_i in taskid: taskinfo_i = _load_dummy_cache(taskid_i) if taskinfo_i['status'] == 'failed': # if any task is failed, then this group is failed. taskinfo['status'] = 'failed' break elif taskinfo_i['status'] == 'running': # if any task is running, then set to running taskinfo['status'] = 'running' if taskinfo_i['status'] == 'success': if taskinfo['status'] == 'success': # update if task is successfully finished (so far) taskinfo['result'].extend(taskinfo_i['result']) elif isinstance(taskid, str): taskinfo = _load_dummy_cache(taskid) else: raise ValueError('Invalid Taskid') return taskinfo
[文档] def query_by_taskid_sync(taskid : Union[str, List[str]], interval : float = 2.0, timeout : float = 60.0, retry : int = 0, url = None, # dummy parameter ): '''Query circuit status by taskid (synchronous version), it will wait until the task finished. Args: taskid (str): The taskid. interval (float): Interval time between two queries. (in seconds) timeout (float): Interval time between two queries. (in seconds) Raises: RuntimeError: Taskid invalid. RuntimeError: URL invalid. TimeoutError: Timeout reached Returns: Dict[str, dict]: The status and the result status : success | failed | running result (when success): List[Dict[str,list]] result (when failed): {'errcode': str, 'errinfo': str} result (when running): N/A ''' starttime = time.time() while True: now = time.time() if now - starttime > timeout: raise TimeoutError(f'Reach the maximum timeout.') taskinfo = query_by_taskid(taskid=taskid) if taskinfo['status'] == 'running': continue if taskinfo['status'] == 'success': result = taskinfo['result'] return result if taskinfo['status'] == 'failed': errorinfo = taskinfo['result'] raise RuntimeError(f'Failed to execute, errorinfo = {errorinfo}') time.sleep(interval)
[文档] def query_all_tasks(savepath = None, url = None, # dummy parameter ): '''Query all task info in the savepath. If you only want to query from taskid, then you can use query_by_taskid instead. Args: url (str, optional): The url for querying. Defaults to default_query_url. savepath (PathLikeObject(str, pathlib.Path, etc...), optional): The savepath for loading the online info. Defaults to None. Returns: tuple[int,int]: Two integers (finished task count, all task count) ''' if not savepath: savepath = Path.cwd() / 'online_info' online_info = load_all_online_info(savepath) task_count = len(online_info) finished = 0 for task in online_info: taskid = task['taskid'] if isinstance(taskid, list): status = 'finished' for taskid_i in taskid: if not os.path.exists(savepath / '{}.txt'.format(taskid)): taskinfo = query_by_taskid(taskid=taskid_i) if taskinfo['status'] == 'success' or taskinfo['status'] == 'failed': write_taskinfo(taskid_i, taskinfo, savepath) else: status = 'unfinished' if status == 'finished': finished += 1 elif isinstance(taskid, str): if not os.path.exists(savepath / '{}.txt'.format(taskid)): taskinfo = query_by_taskid(taskid=taskid) if taskinfo['status'] == 'success' or taskinfo['status'] == 'failed': write_taskinfo(taskid, taskinfo, savepath) finished += 1 else: finished += 1 else: raise RuntimeError('Invalid Taskid.') return finished, task_count
[文档] def query_all_task(savepath = None, url = None, # dummy parameter ): '''Deprecated!! Use query_all_tasks instead ''' warnings.warn(DeprecationWarning("Use query_all_tasks instead")) return query_all_tasks(savepath, url)
if __name__ == '__main__': _random_taskid()