Source code for streamsets.sdk.codegen.generator

#  IBM Confidential
#  PID 5900-BAF
#  Copyright StreamSets Inc., an IBM Company 2025
from pathlib import Path
from typing import Union

from streamsets.sdk.codegen.processors import PipelineProcessor
from streamsets.sdk.codegen.sources import Source, source_factory
from streamsets.sdk.sch_models import Pipeline


[docs]class PythonGenerator: """Pipeline code generation entrypoint. Args: source (:obj:`Union[str, Path, Pipeline]`): Source from which pipeline definition will be load. destination (:obj:`Union[str, Path]`): Location where save generated script. **kwargs (:obj:`Union[str, object]`): Additional configuration values. """ def __init__( self, source: Union[str, Path, Pipeline], destination: Union[str, Path], **kwargs: Union[str, object], ) -> None: self._source: Source = source_factory(source) self._destination = Path(destination) sch_credential_id = kwargs.get("sch_credential_id", "SCH_CREDENTIAL_ID") sch_token = kwargs.get("sch_token", "SCH_TOKEN") aster_url = kwargs.get("aster_url") self._processor = PipelineProcessor( source_data=self._source.load(), sch_credential_id=sch_credential_id, sch_token=sch_token, aster_url=aster_url, )
[docs] def save(self) -> Path: """Run code generation then save it to destination. Returns: Path to the location where the generated script was saved. """ code = self._processor.run() return code.save(self._destination)