class BidsOnsetRoutine(BaseDeviceRoutine):
"""Routine for synchronising with hardware and setting the BIDS event time reference."""
categories = ["BIDS"]
targets = ["PsychoPy"]
iconFile = Path(__file__).parent / "BIDS.png"
iconSVG = Path(__file__).parent / "BidsOnsetRoutine.svg"
tooltip = _translate(
"BIDS Onset Timer: synchronise with hardware and set event time reference."
)
plugin = "psychopy-bids"
def __init__(self, exp, name="bidsOnset", deviceLabel=""):
BaseDeviceRoutine.__init__(self, exp, name=name, deviceLabel=deviceLabel)
self.type = "BIDSonset"
# Trigger mode
_trigger_modes = [
"keyboard: receive",
"parallel: receive",
"parallel: send",
]
if SerialDeviceBackend is not None:
_trigger_modes += ["serial: receive", "serial: send"]
self.params["trigger_mode"] = Param(
"keyboard: receive",
valType="str",
inputType="choice",
categ="Device",
allowedVals=_trigger_modes,
label=_translate("Trigger Mode"),
)
# Keyboard params
self.params["trigger_key"] = Param(
"5",
valType="str",
inputType="single",
categ="Device",
hint=_translate("Key to wait for (MRI scanners typically send '5')."),
label=_translate("Trigger Key"),
)
# Parallel port params
# Based on psychopy.experiment.components.parallelOut (ParallelOutComponent)
default_address = (prefs.hardware["parallelPorts"] or ["0x0378"])[0]
self.params["address"] = Param(
default_address,
valType="str",
inputType="single",
categ="Device",
hint=_translate(
"Parallel port address (e.g. 0x0378). Change via Preferences > Hardware."
),
label=_translate("Port Address"),
)
self.params["pin_number"] = Param(
"10",
valType="code",
inputType="single",
categ="Device",
hint=_translate(
"Pin to monitor for incoming trigger (10 = ACK, standard for MRI)."
),
label=_translate("Pin Number"),
)
self.params["start_data"] = Param(
"1",
valType="code",
inputType="single",
categ="Device",
hint=_translate("Integer value to send at trigger onset."),
label=_translate("Start Data"),
)
self.params["stop_data"] = Param(
"0",
valType="code",
inputType="single",
categ="Device",
hint=_translate("Integer value to send after pulse to reset the port."),
label=_translate("Stop Data"),
)
self.params["pulse_duration"] = Param(
"0.005",
valType="code",
inputType="single",
categ="Device",
hint=_translate("Pulse duration in seconds."),
label=_translate("Pulse Duration (s)"),
)
# Serial port params
# Based on psychopy.experiment.components.serialOut (SerialOutComponent)
self.params["trigger_char"] = Param(
"b'1'",
valType="code",
inputType="single",
categ="Device",
hint=_translate(
"Bytes value to wait for (e.g. b'1'). Use None to accept any byte."
),
label=_translate("Trigger Byte"),
)
self.params["send_char"] = Param(
"b'\\x01'",
valType="code",
inputType="single",
categ="Device",
hint=_translate("Bytes value to send (e.g. b'\\x01')."),
label=_translate("Send Byte"),
)
# Shared receive param
self.params["timeout"] = Param(
"None",
valType="code",
inputType="single",
categ="Device",
hint=_translate(
"Max seconds to wait for trigger. None = wait indefinitely."
),
label=_translate("Timeout (s)"),
)
# Screen params
self.params["stim_type"] = Param(
"text",
valType="str",
inputType="choice",
categ="Screen",
allowedVals=["none", "text", "image"],
label=_translate("Show on Screen"),
)
self.params["message"] = Param(
"Waiting for trigger...",
valType="str",
inputType="single",
categ="Screen",
label=_translate("Message"),
)
self.params["msg_font"] = Param(
"Open Sans",
valType="str",
inputType="single",
categ="Screen",
label=_translate("Font"),
)
self.params["msg_height"] = Param(
"0.05",
valType="num",
inputType="single",
categ="Screen",
label=_translate("Letter Height"),
)
self.params["msg_color"] = Param(
"'white'",
valType="code",
inputType="single",
categ="Screen",
label=_translate("Text Color"),
)
self.params["msg_pos"] = Param(
"(0, 0)",
valType="code",
inputType="single",
categ="Screen",
label=_translate("Text Position"),
)
self.params["image"] = Param(
"",
valType="file",
inputType="file",
categ="Screen",
label=_translate("Image"),
)
self.params["img_size"] = Param(
"(0.5, 0.5)",
valType="code",
inputType="single",
categ="Screen",
label=_translate("Image Size"),
)
self.params["img_pos"] = Param(
"(0, 0)",
valType="code",
inputType="single",
categ="Screen",
label=_translate("Image Position"),
)
# Show/hide rules
self.depends = [
# Keyboard
_show_when("trigger_mode", "=='keyboard: receive'", "trigger_key"),
# Parallel
_show_when(
"trigger_mode", " in ['parallel: receive', 'parallel: send']", "address"
),
_show_when("trigger_mode", "=='parallel: receive'", "pin_number"),
_show_when("trigger_mode", "=='parallel: send'", "start_data"),
_show_when("trigger_mode", "=='parallel: send'", "stop_data"),
_show_when(
"trigger_mode",
" in ['parallel: send', 'serial: send']",
"pulse_duration",
),
# Serial
_show_when(
"trigger_mode", " in ['serial: receive', 'serial: send']", "deviceLabel"
),
_show_when("trigger_mode", "=='serial: receive'", "trigger_char"),
_show_when("trigger_mode", "=='serial: send'", "send_char"),
# Timeout for all receive modes
_show_when(
"trigger_mode",
" in ['keyboard: receive', 'parallel: receive', 'serial: receive']",
"timeout",
),
# Screen sub-params
_show_when("stim_type", "=='text'", "message"),
_show_when("stim_type", "=='text'", "msg_font"),
_show_when("stim_type", "=='text'", "msg_height"),
_show_when("stim_type", "=='text'", "msg_color"),
_show_when("stim_type", "=='text'", "msg_pos"),
_show_when("stim_type", "=='image'", "image"),
_show_when("stim_type", "=='image'", "img_size"),
_show_when("stim_type", "=='image'", "img_pos"),
]
# Remove unused base-class timing params
for p in (
"startType",
"startVal",
"startEstim",
"stopVal",
"stopType",
"durationEstim",
"saveStartStop",
"syncScreenRefresh",
):
if p in self.params:
del self.params[p]
# Order within Device tab: mode selector first, then mode-specific hardware params
device_order = [
"trigger_mode",
"trigger_key",
"address",
"pin_number",
"start_data",
"stop_data",
"deviceLabel",
"pulse_duration",
"trigger_char",
"send_char",
"timeout",
]
self.order = [p for p in self.order if p not in device_order] + device_order
# Code generation
def _validateBidsOnsetParams(self):
"""Validate experiment structure required by BidsOnsetRoutine."""
def _names_in_flow(routine_type):
return [
getattr(el, "name", None)
for el in self.exp.flow
if getattr(
self.exp.routines.get(getattr(el, "name", None)),
"type",
None,
)
== routine_type
]
onset_names = _names_in_flow("BIDSonset")
if len(onset_names) > 1:
raise ValueError(
f"[psychopy-bids(onset)] Multiple BidsOnset routines found in the flow: {onset_names}. "
"Only one BidsOnset routine is allowed per experiment."
)
export_names = _names_in_flow("BIDSexport")
if len(export_names) > 1:
raise ValueError(
f"[psychopy-bids(onset)] Multiple BidsExport routines found in the flow: {export_names}. "
"Only one BidsExport routine is allowed per experiment."
)
if not export_names:
raise ValueError(
f"[psychopy-bids(onset)] '{self.params['name'].val}' requires a BidsExportRoutine "
"in the experiment flow. Add one or bids_handler will be undefined at runtime."
)
def writeStartCode(self, buff):
self._validateBidsOnsetParams()
def writeInitCode(self, buff):
mode = self.params["trigger_mode"].val
if "parallel" in mode:
buff.writeIndentedLines(
"from psychopy import parallel\n"
"%(name)s_port = parallel.ParallelPort(address=%(address)s)\n"
% self.params
)
elif "serial" in mode:
buff.writeIndentedLines(
"%(name)s_port = deviceManager.getDevice(%(deviceLabel)s)\n"
"if not %(name)s_port.com.is_open:\n"
" %(name)s_port.com.open()\n" % self.params
)
def writeExperimentEndCode(self, buff):
if "serial" in self.params["trigger_mode"].val:
buff.writeIndentedLines(
"if %(name)s_port.com.is_open:\n"
" %(name)s_port.com.close()\n" % self.params
)
def writeMainCode(self, buff):
mode = self.params["trigger_mode"].val
timeout = self.params["timeout"].val
stim_type = self.params["stim_type"].val
buff.writeIndentedLines(
"# BIDS Onset: wait for / send trigger, then record onset time\n"
)
# Draw stimulus and flip before the trigger action
if stim_type == "text":
buff.writeIndentedLines(
"%(name)s_stim = visual.TextStim(win, text=%(message)s, font=%(msg_font)s,\n"
" height=%(msg_height)s, color=%(msg_color)s, pos=%(msg_pos)s)\n"
"%(name)s_stim.draw()\n" % self.params
)
elif stim_type == "image":
buff.writeIndentedLines(
"%(name)s_stim = visual.ImageStim(win, image=%(image)s,\n"
" size=%(img_size)s, pos=%(img_pos)s)\n"
"%(name)s_stim.draw()\n" % self.params
)
if stim_type != "none":
buff.writeIndentedLines("win.flip()\n")
# Trigger action
if mode == "keyboard: receive":
max_wait_arg = (
"" if timeout in ("None", "", None) else " maxWait=%(timeout)s,\n"
)
buff.writeIndentedLines(
(
"_%(name)s_keys = event.waitKeys(\n"
" keyList=[%(trigger_key)s], timeStamped=globalClock,\n"
+ max_wait_arg
+ ")\n"
"bids_handler.setOnsetReference(\n"
" _%(name)s_keys[0][1] if _%(name)s_keys else globalClock.getTime())\n"
)
% self.params
)
elif mode == "parallel: receive":
pin_number = self.params["pin_number"].val or "10"
max_wait = timeout if timeout not in ("None", "", None) else "3600"
buff.writeIndentedLines(
(
"_%(name)s_prev = %(name)s_port.readPin(" + pin_number + ")\n"
"_%(name)s_t0 = globalClock.getTime()\n"
"while True:\n"
" if %(name)s_port.readPin("
+ pin_number
+ ") != _%(name)s_prev:\n"
" bids_handler.setOnsetReference(globalClock.getTime())\n"
" break\n"
" if globalClock.getTime() - _%(name)s_t0 > " + max_wait + ":\n"
" bids_handler.setOnsetReference(globalClock.getTime())\n"
" break\n"
)
% self.params
)
elif mode == "parallel: send":
buff.writeIndentedLines(
"%(name)s_port.setData(int(%(start_data)s))\n"
"bids_handler.setOnsetReference(globalClock.getTime())\n"
"core.wait(%(pulse_duration)s)\n"
"%(name)s_port.setData(int(%(stop_data)s))\n" % self.params
)
elif mode == "serial: receive":
buff.writeIndentedLines(
(
"_%(name)s_char = %(trigger_char)s\n"
"_%(name)s_t0 = globalClock.getTime()\n"
"while True:\n"
" _%(name)s_resp = %(name)s_port.getResponse(length=-1, timeout=0.001)\n"
" if _%(name)s_resp is not None:\n"
" _%(name)s_char_str = (\n"
" _%(name)s_char.decode() if isinstance(_%(name)s_char, bytes) else _%(name)s_char\n"
" )\n"
" if _%(name)s_char_str is None or _%(name)s_char_str in _%(name)s_resp:\n"
" bids_handler.setOnsetReference(globalClock.getTime())\n"
" break\n"
" if %(timeout)s is not None and "
"globalClock.getTime() - _%(name)s_t0 > %(timeout)s:\n"
" bids_handler.setOnsetReference(globalClock.getTime())\n"
" break\n"
)
% self.params
)
elif mode == "serial: send":
buff.writeIndentedLines(
"%(name)s_port.sendMessage(%(send_char)s)\n"
"bids_handler.setOnsetReference(globalClock.getTime())\n" % self.params
)
if stim_type != "none":
buff.writeIndentedLines("win.flip()\n")
buff.writeIndentedLines("continueRoutine = False\n")