
在apache beam dataflow应用中,直接通过自定义管道选项传递环境变量到工作器可能无法按预期生效。本文将深入探讨dataflow配置传递机制,并推荐使用beam内置的`pipelineoptions`结合`argparse`来定义和访问应用程序级参数,确保配置在所有工作器中正确且一致地可用,避免因环境变量缺失导致的启动错误。
当您在本地环境中运行一个Apache Beam管道时,操作系统环境变量是可用的。然而,当管道部署到Google Cloud Dataflow服务时,Dataflow工作器是独立的虚拟机实例,它们的环境与您提交管道的本地环境是隔离的。这意味着,您在本地设置的环境变量,或者作为PipelineOptions的直接关键字参数传递的自定义变量,通常不会自动作为操作系统的环境变量在Dataflow工作器上可用。
Dataflow的PipelineOptions主要用于配置Beam运行器本身的行为(例如项目ID、区域、临时存储位置等),以及提供Beam管道内部逻辑所需的参数。如果应用程序的某个Python包(如uplight-telemetry)期望读取特定的操作系统环境变量来获取配置,那么仅仅将其作为PipelineOptions的自定义属性传递是不够的,因为它不会被解析并设置为工作器进程的环境变量。
为了在Dataflow工作器中可靠地访问应用程序所需的配置,最佳实践是利用Beam的PipelineOptions机制,并通过argparse库定义自定义参数。这样,这些参数会在管道启动时被解析,并可以通过PipelineOptions对象在管道的任何部分(例如在DoFn中)访问。
首先,创建一个继承自apache_beam.options.pipeline_options.PipelineOptions的子类,并使用argparse添加您需要的自定义参数。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import argparse
import os
class CustomPipelineOptions(PipelineOptions):
    """
    自定义管道选项,用于传递应用程序特定参数。
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        super()._add_argparse_args(parser)
        parser.add_argument(
            '--otel_service_name',
            dest='otel_service_name',
            default='default-service',
            help='OpenTelemetry服务名称。'
        )
        parser.add_argument(
            '--otel_resource_attributes',
            dest='otel_resource_attributes',
            default='key1=value1,key2=value2',
            help='OpenTelemetry资源属性,格式为key=value,key2=value2。'
        )
        # 您可以根据需要添加更多自定义参数在您的管道代码中,您可以创建CustomPipelineOptions实例,并从其中获取这些参数。在DoFn等转换中,PipelineOptions对象通常可以通过DoFn的setup方法或直接在process方法中访问。
class ProcessBillRequests:
    class FetchBillInformation(beam.DoFn):
        def setup(self):
            # 在DoFn初始化时获取管道选项
            # self.pipeline_options = self.get_options() # Beam 2.x 推荐的方式
            # 或者通过main函数传递
            pass
        def process(self, element, otel_service_name, otel_resource_attributes):
            # 在这里使用 otel_service_name 和 otel_resource_attributes
            print(f"Processing with OTEL_SERVICE_NAME: {otel_service_name}")
            print(f"Processing with OTEL_RESOURCE_ATTRIBUTES: {otel_resource_attributes}")
            # 您的业务逻辑...
            yield element
    @staticmethod
    def parse_bill_data_requests(data):
        # 示例解析函数
        return data
def run_pipeline():
    # 从命令行或程序中解析管道选项
    # 注意:这里我们使用 CustomPipelineOptions 而不是 ProcessBillRequests.CustomOptions
    pipeline_options = CustomPipelineOptions()
    # 设置DataflowRunner所需的标准选项
    # 从环境变量获取或直接指定
    gcp_project_id = os.getenv("GCP_PROJECT_ID", "your-gcp-project")
    job_name = "process-bills-job"
    tas_gcs_bucket_name_prefix = os.getenv("TAS_GCS_BUCKET_NAME_PREFIX", "your-bucket-prefix")
    up_platform_env = os.getenv("UP_PLATFORM_ENV", "dev")
    service_account = os.getenv("SERVICE_ACCOUNT_EMAIL", "your-service-account@your-project.iam.gserviceaccount.com")
    subnetwork_url = os.getenv("SUBNETWORK_URL", None) # 例如 "regions/us-east1/subnetworks/default"
    uplight_telemetry_tar_file_path = "path/to/uplight-telemetry.tar.gz" # 替换为实际路径
    setup_file_path = "./setup.py" # 替换为实际路径
    # 将自定义选项的值传递给DataflowRunner的参数
    # DataflowRunner会从 pipeline_options 对象中解析这些值
    dataflow_options = pipeline_options.view_as(StandardOptions)
    dataflow_options.runner = 'DataflowRunner'
    dataflow_options.project = gcp_project_id
    dataflow_options.region = "us-east1"
    dataflow_options.job_name = job_name
    dataflow_options.temp_location = f'gs://{tas_gcs_bucket_name_prefix}{up_platform_env}/temp'
    dataflow_options.staging_location = f'gs://{tas_gcs_bucket_name_prefix}{up_platform_env}/staging'
    dataflow_options.save_main_session = True
    dataflow_options.service_account_email = service_account
    if subnetwork_url:
        dataflow_options.subnetwork = subnetwork_url
    dataflow_options.extra_packages = [uplight_telemetry_tar_file_path]
    dataflow_options.setup_file = setup_file_path
    # 获取自定义参数的值
    otel_service_name = pipeline_options.otel_service_name
    otel_resource_attributes = pipeline_options.otel_resource_attributes
    with beam.Pipeline(options=pipeline_options) as pipeline:
        read_from_db = beam.Create(["record1", "record2"]) # 模拟从DB读取
        result = (
            pipeline
            | "ReadPendingRecordsFromDB" >> read_from_db
            | "Parse input PCollection" >> beam.Map(ProcessBillRequests.parse_bill_data_requests)
            # 将自定义参数作为额外参数传递给DoFn
            | "Fetch bills " >> beam.ParDo(
                ProcessBillRequests.FetchBillInformation(),
                otel_service_name=otel_service_name,
                otel_resource_attributes=otel_resource_attributes
            )
        )
        pipeline.run().wait_until_finish()
if __name__ == '__main__':
    run_pipeline()当您运行此管道时,可以通过命令行参数传递自定义值:
python your_pipeline_file.py \
    --runner=DataflowRunner \
    --project=your-gcp-project \
    --region=us-east1 \
    --job_name=my-bill-processing-job \
    --temp_location=gs://your-bucket/temp \
    --staging_location=gs://your-bucket/staging \
    --otel_service_name=my-billing-service \
    --otel_resource_attributes="env=prod,version=1.0"pipeline_options = CustomPipelineOptions([
    '--runner=DataflowRunner',
    # ... 其他选项
    '--worker_env={"OTEL_SERVICE_NAME": "my-billing-service", "OTEL_RESOURCE_ATTRIBUTES": "env=prod"}'
])然而,这通常不是推荐的首选方法,因为它将应用程序配置与环境配置混淆,并且在某些情况下可能无法完全兼容。优先使用Beam的PipelineOptions机制。
在Apache Beam Dataflow应用中,为了确保应用程序级别的配置(例如OTEL_SERVICE_NAME)在所有工作器中正确可用,应采用PipelineOptions结合argparse来定义和传递这些参数。通过将这些参数作为显式的PipelineOptions属性,并在管道的DoFn中直接访问它们,可以构建更健壮、可维护且易于调试的Dataflow管道,避免因环境隔离导致的问题。
以上就是Dataflow工作器中环境变量配置的最佳实践的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号