Skip to content

gce thread lock -> process lock

Aaron Graubert requested to merge github/fork/Marlin-Na/patch-gce into master

Created by: Marlin-Na

Looks like that googleapiclient package is not only thread-unsafe, but also process-unsafe. Try use process lock instead of thread lock.

When running with prefect server, each task may initialize its own backend instance, which leads to BrokenPipe error if happening concurrently.

Sample traceback:

[20210331-13:14:57] [ACBH:dRangerAlgModule.normal.dRangerPreprocess_scatter] Unexpected error: BrokenPipeError(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/jma/hg19_WGS_pipeline_wolF/sv_pipeline.py", line 46, in run
    result_df = super().run(**{k:v for k, v in locals().items() if k not in ["self", "__class__"]})
  File "/usr/local/lib/python3.8/dist-packages/wolf/task.py", line 537, in run
    task = self._get_runnable_copy(bound_inputs=task_inputs)
  File "/usr/local/lib/python3.8/dist-packages/wolf/task.py", line 472, in _get_runnable_copy
    task.backend = canine.orchestrator.BACKENDS[conf['type']](**conf)
  File "/usr/local/lib/python3.8/dist-packages/canine/backends/dockerTransient.py", line 63, in __init__
    self.config["image"] = self.get_latest_image(self.config["image_family"])["name"] if image is None else image
  File "/usr/local/lib/python3.8/dist-packages/canine/backends/dockerTransient.py", line 257, in get_latest_image
    ans = gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute()
  File "/usr/local/lib/python3.8/dist-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 905, in execute
    resp, content = _retry_request(
  File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 207, in _retry_request
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/googleapiclient/http.py", line 176, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
  File "/usr/local/lib/python3.8/dist-packages/google/auth/credentials.py", line 133, in before_request
    self.refresh(request)
  File "/usr/local/lib/python3.8/dist-packages/google/oauth2/credentials.py", line 208, in refresh
    access_token, refresh_token, expiry, grant_response = _client.refresh_grant(
  File "/usr/local/lib/python3.8/dist-packages/google/oauth2/_client.py", line 248, in refresh_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "/usr/local/lib/python3.8/dist-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request
      response = request(method="POST", url=token_uri, headers=headers, body=body)
  File "/usr/local/lib/python3.8/dist-packages/google_auth_httplib2.py", line 119, in __call__
    response, data = self.http.request(
  File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1708, in request
    (response, content) = self._request(
  File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1424, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "/usr/local/lib/python3.8/dist-packages/httplib2/__init__.py", line 1347, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/usr/lib/python3.8/http/client.py", line 1255, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1301, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1250, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1049, in _send_output
    self.send(chunk)
  File "/usr/lib/python3.8/http/client.py", line 971, in send
    self.sock.sendall(data)
  File "/usr/lib/python3.8/ssl.py", line 1204, in sendall
    v = self.send(byte_view[count:])
  File "/usr/lib/python3.8/ssl.py", line 1173, in send
    return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe

Merge request reports