"""OriginQ dummy backend — local simulator that emulates the Origin Quantum Cloud API.
This module provides the same public interface as ``origin_qcloud`` but
executes circuits on a local simulator instead of a real quantum device.
It is primarily intended for development, testing, and debugging without
consuming real quantum computing resources.
Noise simulation is supported through ``ErrorLoader`` configurations passed
via ``**kwargs`` (``generic_error``, ``gatetype_error``,
``gate_specific_error``, ``readout_error``).
Requires the ``QPandaLiteCpp`` C++ extension for the underlying simulator.
Public API:
- submit_task — Submit circuit(s) for simulated execution.
- query_by_taskid — Retrieve simulated results by task ID.
- query_by_taskid_sync — Blocking version of ``query_by_taskid``.
- query_all_tasks — Query all locally recorded dummy tasks.
"""
import datetime
import time
from typing import List, Union
import warnings
from qpandalite.originir import OriginIR_LineParser, OriginIR_BaseParser
import qpandalite.simulator as sim
from qpandalite.simulator.error_model import ErrorLoader, ErrorLoader_GateSpecificError
from qpandalite.simulator.originir_simulator import OriginIR_NoisySimulator
try:
from qpandalite.simulator.originir_simulator import OriginIR_Simulator
except ImportError as e:
raise ImportError('You must install QPandaLiteCpp to enable the simulation.')
from pathlib import Path
import os
import random
import json
import hashlib
from json.decoder import JSONDecodeError
from ..task_utils import timestr, make_savepath, load_all_online_info, write_taskinfo
from ..config import load_dummy_config
try:
_dummy_config = load_dummy_config()
except ImportError as e:
warnings.warn(ImportWarning(f'originq_dummy config not available: {e}. '
'Using empty defaults (all qubits allowed, no topology restriction).'))
_dummy_config = {
'available_qubits': [],
'available_topology': [],
'task_group_size': 200,
}
available_qubits = _dummy_config['available_qubits']
available_topology = _dummy_config['available_topology']
default_task_group_size = _dummy_config['task_group_size']
[文档]
class DummyCacheContainer:
"""In-memory and on-disk cache for dummy simulation results.
Stores results keyed by task ID, optionally persisting them to a JSONL
file for later retrieval across sessions.
"""
def __init__(self) -> None:
"""Initialize an empty cache container."""
self.cached_results = dict()
self.dummy_path = None
[文档]
def clear_dummy_cache(self):
"""Remove all cached results from memory."""
self.cached_results = dict()
[文档]
def save_dummy_cache(self, extra_savepath):
"""Dump all cached results to a timestamped JSONL file.
Args:
extra_savepath (os.PathLike): Output directory.
"""
if not os.path.exists(extra_savepath):
os.makedirs(extra_savepath)
extra_savepath = Path(extra_savepath)
with open(extra_savepath / f'{timestr()}-dummycache.txt', 'a'):
for result in self.cached_results:
fp.write(json.dumps(result) + '\n')
[文档]
def write_dummy_cache(self, taskid, result_body):
"""Store a result body under the given task ID.
Args:
taskid (str): Unique task identifier.
result_body (dict): Result payload to cache.
Raises:
ValueError: If *taskid* already exists in the cache.
"""
if taskid in self.cached_results:
raise ValueError('Impossible to have same taskid in the same cache container.\n'
f'taskid (duplicated taskid) = {taskid}\n'
f'cached_result[taskid] (cached) = {self.cached_results[taskid]}\n'
f'result_body (input) = {result_body}')
self.cached_results[taskid] = result_body
if self.dummy_path:
with open(self.dummy_path / 'dummy_result.jsonl', 'a') as fp:
fp.write(json.dumps(result_body) + '\n')
[文档]
def load_dummy_cache(self, taskid):
"""Retrieve a cached result by task ID.
Checks in-memory cache first, then falls back to the on-disk
JSONL file (if ``dummy_path`` is set).
Args:
taskid (str): Task identifier to look up.
Returns:
dict or None: The cached result, or ``None`` if not found.
"""
if taskid in self.cached_results:
return self.cached_results[taskid]
if self.dummy_path:
with open(self.dummy_path / 'dummy_result.jsonl', 'r') as fp:
lines = fp.read().strip().splitlines()
for line in lines[::-1]:
result = json.loads(line)
if result['taskid'] == taskid:
return result
dummy_cache_container = DummyCacheContainer()
[文档]
def set_dummy_path(dummy_path : os.PathLike):
"""Set the on-disk storage path for dummy simulation results.
Args:
dummy_path (os.PathLike): Directory for persisting dummy results.
"""
_create_dummy_cache(dummy_path)
dummy_cache_container.dummy_path = Path(dummy_path)
[文档]
def save_dummy_cache(extra_savepath):
"""Export the current in-memory dummy cache to a file.
Args:
extra_savepath (os.PathLike): Output directory.
"""
dummy_cache_container.save_dummy_cache(extra_savepath)
[文档]
def clear_dummy():
"""Clear all cached dummy simulation results from memory."""
dummy_cache_container.clear_dummy_cache()
def _create_dummy_cache(dummy_path = None):
'''Create simulation storage for dummy simulation server.
Args:
dummy_path (str or Path, optional): Path for dummy storage. Defaults to None.
'''
if dummy_path:
if not os.path.exists(dummy_path):
os.makedirs(dummy_path)
if isinstance(dummy_path, str):
dummy_path = Path(dummy_path)
if not os.path.exists(dummy_path / 'dummy_result.jsonl'):
with open(dummy_path / 'dummy_result.jsonl', 'a') as fp:
pass
def _write_dummy_cache(taskid,
taskname,
results):
'''Write simulation results to dummy server.
Args:
taskid (str): Taskid.
taskname (str): Task name.
results (List[Dict[str, List[float]]]): A list of simulation results. Example: [{'key':['001', '101'], 'value':[100, 100]}]
'''
result_body = {
'taskid' : taskid,
'taskname' : taskname,
'status' : 'success',
'result' : results
}
dummy_cache_container.write_dummy_cache(taskid, result_body)
def _load_dummy_cache(taskid):
'''Load simulation results by taskid.
Args:
taskid (str): Taskid.
Returns:
result (Dict): The result which emulates the results produced by query_by_taskid
'''
result = dummy_cache_container.load_dummy_cache(taskid)
if result is not None:
return result
raise ValueError(f'Taskid {taskid} is not found. This is impossible in dummy server mode.')
def _random_taskid() -> str:
'''Create a dummy taskid for every task.
Returns:
str: Taskid.
'''
n = random.random()
md5 = hashlib.md5()
md5.update(f"{n:.14f}".encode('utf-8'))
return md5.hexdigest()
def _submit_task_group_dummy_impl(
circuits,
task_name,
shots,
auto_mapping,
**kwargs
):
"""Simulate a group of circuits locally and cache the results.
If the number of circuits exceeds ``default_task_group_size``, the
group is split and processed recursively.
Noise parameters are forwarded via ``**kwargs``:
- ``generic_error`` — Depolarizing noise applied to all gates.
- ``gatetype_error`` — Noise specific to gate types.
- ``gate_specific_error`` — Noise specific to individual gates.
- ``readout_error`` — Readout (measurement) error rates.
Args:
circuits (list[str]): OriginIR circuit strings to simulate.
task_name (str): Name for the task group.
shots (int): Number of shots (unused — probability output).
auto_mapping (bool): Whether to use automatic qubit mapping.
**kwargs: Noise configuration forwarded to ``ErrorLoader``.
Returns:
str or list[str]: Task ID(s) for the submitted group(s).
"""
if len(circuits) > default_task_group_size:
# list of circuits
groups = []
group = []
for circuit in circuits:
if len(group) >= default_task_group_size:
groups.append(group)
group = []
group.append(circuit)
if group:
groups.append(group)
# recursively call, and return a list of taskid
return [_submit_task_group_dummy_impl(group,
'{}_{}'.format(task_name, i),
shots,
auto_mapping,
**kwargs) for i, group in enumerate(groups)]
# generate taskid
taskid = _random_taskid()
results = []
generic_error = kwargs.get('generic_error', None)
gatetype_error = kwargs.get('gatetype_error', None)
gate_specific_error = kwargs.get('gate_specific_error', None)
readout_error = kwargs.get('readout_error', [])
for circuit in circuits:
# If there is noise_description
if generic_error or gatetype_error or gate_specific_error or readout_error:
error_loader = ErrorLoader_GateSpecificError(
generic_error = generic_error,
gatetype_error = gatetype_error,
gate_specific_error= gate_specific_error
)
if auto_mapping:
simulator = OriginIR_NoisySimulator(
backend_type='density_matrix',
error_loader=error_loader,
readout_error=readout_error)
else:
simulator = OriginIR_NoisySimulator(
backend_type='density_matrix',
error_loader=error_loader,
readout_error=readout_error,
available_qubits=available_qubits,
available_topology=available_topology)
else:
if auto_mapping:
simulator = sim.OriginIR_Simulator()
else:
simulator = sim.OriginIR_Simulator(available_qubits=available_qubits,
available_topology=available_topology)
prob_result = simulator.simulate_pmeasure(circuit)
n_qubits = simulator.qubit_num
key = []
value = []
# get probs from probability list
# Note: originq server will directly produce prob list instead of shots list.
for i, meas_result in enumerate(prob_result):
key.append(hex(i))
value.append(meas_result)
results.append({'key':key, 'value': value})
# write cache, ready for loading results
_write_dummy_cache(taskid, task_name, results)
# print(results)
return taskid
[文档]
def submit_task(
circuit,
task_name = None,
tasktype = None, # dummy parameter
chip_id = None, # dummy parameter
shots = 1000, # dummy parameter. Note: originq server will directly output prob instead of shots.
circuit_optimize = True, # dummy parameter
measurement_amend = False, # dummy parameter
auto_mapping = False,
specified_block = None, # dummy parameter
savepath = Path.cwd() / 'online_info',
url = None, # dummy parameter
**kwargs
):
'''
Submit circuit(s) for dummy (simulated) execution.
Accepts a single OriginIR circuit string or a list thereof. The
circuits are simulated locally using the built-in simulator and the
results are cached for later retrieval via ``query_by_taskid``.
Args:
circuit (Union[str, List[str]]): One or more OriginIR circuit strings.
task_name (str, optional): Human-readable task name.
tasktype: Ignored (dummy parameter kept for API compatibility).
chip_id: Ignored (dummy parameter).
shots (int, optional): Number of shots. Ignored — the simulator
returns probabilities directly.
circuit_optimize (bool, optional): Ignored (dummy parameter).
measurement_amend (bool, optional): Ignored (dummy parameter).
auto_mapping (bool, optional): Whether to use automatic qubit
mapping in the simulator.
specified_block: Ignored (dummy parameter).
savepath (os.PathLike, optional): Directory for local task records.
url: Ignored (dummy parameter).
**kwargs: Noise parameters forwarded to the simulator.
Returns:
str: The task ID assigned to this submission.
Raises:
ValueError: If *circuit* is not a ``str`` or ``list`` of ``str``.
'''
if isinstance(circuit, list):
for c in circuit:
if not isinstance(c, str):
raise ValueError('Input is not a valid circuit list (a.k.a List[str]).')
taskid = _submit_task_group_dummy_impl(
circuits = circuit,
task_name = task_name,
shots = shots,
auto_mapping = auto_mapping,
**kwargs
)
elif isinstance(circuit, str):
taskid = _submit_task_group_dummy_impl(
circuits = [circuit],
task_name = task_name,
shots = shots,
auto_mapping = auto_mapping,
**kwargs
)
else:
raise ValueError('Input must be a str or List[str], where each str is a valid originir string.')
ret = {'taskid': taskid, 'taskname': task_name}
if savepath:
make_savepath(savepath)
with open(savepath / 'online_info.txt', 'a') as fp:
fp.write(json.dumps(ret) + '\n')
return taskid
[文档]
def query_by_taskid(taskid : Union[List[str],str],
url = None, # dummy parameter
):
'''Query circuit status by taskid (Async). This function will return without waiting.
Args:
taskid (str): The taskid.
Raises:
ValueError: Taskid invalid.
ValueError: URL invalid.
RuntimeError: Error when querying.
Returns:
Dict[str, dict]: The status and the result
status : success | failed | running
result (when success): List[Dict[str,list]]
result (when failed): {'errcode': str, 'errinfo': str}
result (when running): N/A
'''
if not taskid: raise ValueError('Task id ??')
if isinstance(taskid, list):
taskinfo = dict()
taskinfo['status'] = 'success'
taskinfo['result'] = []
for taskid_i in taskid:
taskinfo_i = _load_dummy_cache(taskid_i)
if taskinfo_i['status'] == 'failed':
# if any task is failed, then this group is failed.
taskinfo['status'] = 'failed'
break
elif taskinfo_i['status'] == 'running':
# if any task is running, then set to running
taskinfo['status'] = 'running'
if taskinfo_i['status'] == 'success':
if taskinfo['status'] == 'success':
# update if task is successfully finished (so far)
taskinfo['result'].extend(taskinfo_i['result'])
elif isinstance(taskid, str):
taskinfo = _load_dummy_cache(taskid)
else:
raise ValueError('Invalid Taskid')
return taskinfo
[文档]
def query_by_taskid_sync(taskid : Union[str, List[str]],
interval : float = 2.0,
timeout : float = 60.0,
retry : int = 0,
url = None, # dummy parameter
):
'''Query circuit status by taskid (synchronous version), it will wait until the task finished.
Args:
taskid (str): The taskid.
interval (float): Interval time between two queries. (in seconds)
timeout (float): Interval time between two queries. (in seconds)
Raises:
RuntimeError: Taskid invalid.
RuntimeError: URL invalid.
TimeoutError: Timeout reached
Returns:
Dict[str, dict]: The status and the result
status : success | failed | running
result (when success): List[Dict[str,list]]
result (when failed): {'errcode': str, 'errinfo': str}
result (when running): N/A
'''
starttime = time.time()
while True:
now = time.time()
if now - starttime > timeout:
raise TimeoutError(f'Reach the maximum timeout.')
taskinfo = query_by_taskid(taskid=taskid)
if taskinfo['status'] == 'running':
continue
if taskinfo['status'] == 'success':
result = taskinfo['result']
return result
if taskinfo['status'] == 'failed':
errorinfo = taskinfo['result']
raise RuntimeError(f'Failed to execute, errorinfo = {errorinfo}')
time.sleep(interval)
[文档]
def query_all_tasks(savepath = None,
url = None, # dummy parameter
):
'''Query all task info in the savepath. If you only want to query from taskid, then you can use query_by_taskid instead.
Args:
url (str, optional): The url for querying. Defaults to default_query_url.
savepath (PathLikeObject(str, pathlib.Path, etc...), optional): The savepath for loading the online info. Defaults to None.
Returns:
tuple[int,int]: Two integers (finished task count, all task count)
'''
if not savepath:
savepath = Path.cwd() / 'online_info'
online_info = load_all_online_info(savepath)
task_count = len(online_info)
finished = 0
for task in online_info:
taskid = task['taskid']
if isinstance(taskid, list):
status = 'finished'
for taskid_i in taskid:
if not os.path.exists(savepath / '{}.txt'.format(taskid)):
taskinfo = query_by_taskid(taskid=taskid_i)
if taskinfo['status'] == 'success' or taskinfo['status'] == 'failed':
write_taskinfo(taskid_i, taskinfo, savepath)
else:
status = 'unfinished'
if status == 'finished':
finished += 1
elif isinstance(taskid, str):
if not os.path.exists(savepath / '{}.txt'.format(taskid)):
taskinfo = query_by_taskid(taskid=taskid)
if taskinfo['status'] == 'success' or taskinfo['status'] == 'failed':
write_taskinfo(taskid, taskinfo, savepath)
finished += 1
else:
finished += 1
else:
raise RuntimeError('Invalid Taskid.')
return finished, task_count
[文档]
def query_all_task(savepath = None,
url = None, # dummy parameter
):
'''Deprecated!! Use query_all_tasks instead
'''
warnings.warn(DeprecationWarning("Use query_all_tasks instead"))
return query_all_tasks(savepath, url)
if __name__ == '__main__':
_random_taskid()