From 7e41e21d57fef013b0769ec5b422a94cc28d28fa Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Thu, 30 Apr 2026 16:49:36 -0700 Subject: [PATCH 1/3] Support staging Python packages given via a URL. --- .../apache_beam/runners/portability/stager.py | 16 +++++++-- .../runners/portability/stager_test.py | 33 +++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 668477ce1461..b1da63cb4e5e 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -230,7 +230,7 @@ def create_job_resources( '--requirements_file command line option.' % setup_options.requirements_file) extra_packages, thinned_requirements_file = ( - Stager._extract_local_packages(setup_options.requirements_file)) + Stager._extract_local_packages(setup_options.requirements_file, temp_dir)) if extra_packages: setup_options.extra_packages = ( setup_options.extra_packages or []) + extra_packages @@ -701,14 +701,26 @@ def _remove_dependency_from_requirements( return tmp_requirements_filename @staticmethod - def _extract_local_packages(requirements_file): + def _extract_local_packages(requirements_file, temp_dir): local_deps = [] pypi_deps = [] with open(requirements_file, 'r') as fin: + staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) for line in fin: dep = line.strip() if os.path.exists(dep): local_deps.append(dep) + elif urlparse(dep).scheme: + last_component = dep.rsplit('/', 1)[-1] + if not last_component: + _LOGGER.warning('Extra package %s has an unexpected format hence will not be downloaded locally.' % dep) + continue + # Download remote package. + _LOGGER.info( + 'Downloading remote extra package: %s locally before staging', dep) + local_file_path = FileSystems.join(staging_temp_dir, last_component) + Stager._download_file(dep, local_file_path) + local_deps.append(local_file_path) else: pypi_deps.append(dep) if local_deps: diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 4ec1c697fbff..405b0ac469f9 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -156,6 +156,39 @@ def test_download_file_unrecognized( self.stager._download_file(from_url, to_path) assert mock_mkdir.called + @mock.patch('apache_beam.runners.portability.stager.Stager._download_file') + def test_extract_local_packages(self, mock_download): + temp_dir = self.make_temp_dir() + req_file = os.path.join(temp_dir, 'requirements.txt') + + local_file = os.path.join(temp_dir, 'local.tar.gz') + self.create_temp_file(local_file, 'nothing') + + url = 'http://example.com/remote.tar.gz' + invalid_url = 'http://example.com/' + pypi_dep = 'pytest' + + contents = '\n'.join([local_file, url, invalid_url, pypi_dep]) + self.create_temp_file(req_file, contents) + + def fake_download(src, dst): + with open(dst, 'w') as f: + f.write('downloaded') + + mock_download.side_effect = fake_download + + local_deps, thinned_req = stager.Stager._extract_local_packages(req_file, temp_dir) + + self.assertEqual(len(local_deps), 2) + self.assertEqual(local_deps[0], local_file) + self.assertTrue(local_deps[1].endswith('remote.tar.gz')) + + mock_download.assert_called_once_with(url, mock.ANY) + + with open(thinned_req, 'r') as f: + lines = f.read().splitlines() + self.assertEqual(lines, [pypi_dep]) + def test_no_staging_location(self): with self.assertRaises(RuntimeError) as cm: self.stager.stage_job_resources([], staging_location=None) From 94f45487a59e978b72b464619ba175a282ba03cc Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Tue, 5 May 2026 09:16:40 -0700 Subject: [PATCH 2/3] Fix yapf --- sdks/python/apache_beam/runners/portability/stager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index b1da63cb4e5e..f325e612ca49 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -713,11 +713,14 @@ def _extract_local_packages(requirements_file, temp_dir): elif urlparse(dep).scheme: last_component = dep.rsplit('/', 1)[-1] if not last_component: - _LOGGER.warning('Extra package %s has an unexpected format hence will not be downloaded locally.' % dep) + _LOGGER.warning( + 'Extra package %s has an unexpected format hence will not be downloaded locally.' + % dep) continue # Download remote package. _LOGGER.info( - 'Downloading remote extra package: %s locally before staging', dep) + 'Downloading remote extra package: %s locally before staging', + dep) local_file_path = FileSystems.join(staging_temp_dir, last_component) Stager._download_file(dep, local_file_path) local_deps.append(local_file_path) From 68a4e3a792b39545772209d0a5630d2102b0b444 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Tue, 5 May 2026 13:40:14 -0700 Subject: [PATCH 3/3] Improves URL parsing logic --- sdks/python/apache_beam/runners/portability/stager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index f325e612ca49..4386e3d36bb7 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -708,10 +708,11 @@ def _extract_local_packages(requirements_file, temp_dir): staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) for line in fin: dep = line.strip() + parsed_url = urlparse(dep) if os.path.exists(dep): local_deps.append(dep) - elif urlparse(dep).scheme: - last_component = dep.rsplit('/', 1)[-1] + elif parsed_url.scheme: + last_component = os.path.basename(parsed_url.path) if not last_component: _LOGGER.warning( 'Extra package %s has an unexpected format hence will not be downloaded locally.'