gce thread lock -> process lock
Created by: Marlin-Na
Looks like that googleapiclient package is not only thread-unsafe, but also process-unsafe. Try use process lock instead of thread lock.
When running with prefect server, each task may initialize its own backend instance, which leads to BrokenPipe error if happening concurrently.
Sample traceback:
[20210331-13:14:57] [ACBH:dRangerAlgModule.normal.dRangerPreprocess_scatter] Unexpected error: BrokenPipeError(32, 'Broken pipe')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/jma/hg19_WGS_pipeline_wolF/sv_pipeline.py", line 46, in run
result_df = super().run(**{k:v for k, v in locals().items() if k not in ["self", "__class__"]})
File "/usr/local/lib/python3.8/dist-packages/wolf/task.py", line 537, in run
task = self._get_runnable_copy(bound_inputs=task_inputs)
File "/usr/local/lib/python3.8/dist-packages/wolf/task.py", line 472, in _get_runnable_copy
task.backend = canine.orchestrator.BACKENDS[conf['type']](**conf)
File "/usr/local/lib/python3.8/dist-packages/canine/backends/dockerTransient.py", line 63, in __init__
self.config["image"] = self.get_latest_image(self.config["image_family"])["name"] if image is None else image
File "/usr/local/lib/python3.8/dist-packages/canine/backends/dockerTransient.py", line 257, in get_latest_image
ans = gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute()
File "/usr/local/lib/python3.8/dist-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 905, in execute
resp, content = _retry_request(
File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 207, in _retry_request
raise exception
File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 176, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/google_auth_httplib2.py", line 209, in request
self.credentials.before_request(self._request, method, uri, request_headers)
File "/usr/local/lib/python3.8/dist-packages/google/auth/credentials.py", line 133, in before_request
self.refresh(request)
File "/usr/local/lib/python3.8/dist-packages/google/oauth2/credentials.py", line 208, in refresh
access_token, refresh_token, expiry, grant_response = _client.refresh_grant(
File "/usr/local/lib/python3.8/dist-packages/google/oauth2/_client.py", line 248, in refresh_grant
response_data = _token_endpoint_request(request, token_uri, body)
File "/usr/local/lib/python3.8/dist-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request
response = request(method="POST", url=token_uri, headers=headers, body=body)
File "/usr/local/lib/python3.8/dist-packages/google_auth_httplib2.py", line 119, in __call__
response, data = self.http.request(
File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1708, in request
(response, content) = self._request(
File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1424, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1347, in _conn_request
conn.request(method, request_uri, body, headers)
File "/usr/lib/python3.8/http/client.py", line 1255, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1301, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1250, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1049, in _send_output
self.send(chunk)
File "/usr/lib/python3.8/http/client.py", line 971, in send
self.sock.sendall(data)
File "/usr/lib/python3.8/ssl.py", line 1204, in sendall
v = self.send(byte_view[count:])
File "/usr/lib/python3.8/ssl.py", line 1173, in send
return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe