from dagster import Array, Bool, Field, In, Noneable, Nothing, Out, Output, op
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster_census.resources import DEFAULT_POLL_INTERVAL
from dagster_census.types import CensusOutput
from dagster_census.utils import generate_materialization
[docs]
@op(
    required_resource_keys={"census"},
    ins={"start_after": In(Nothing)},
    out=Out(
        CensusOutput,
        description=(
            "Parsed json dictionary representing the details of the Census sync after "
            "the sync successfully completes."
        ),
    ),
    config_schema={
        "sync_id": Field(
            int,
            is_required=True,
            description="Id of the parent sync.",
        ),
        "force_full_sync": Field(
            config=Bool,
            default_value=False,
            description=(
                "If this trigger request should be a Full Sync. "
                "Note that some sync configurations such as Append do not support full syncs."
            ),
        ),
        "poll_interval": Field(
            float,
            default_value=DEFAULT_POLL_INTERVAL,
            description="The time (in seconds) to wait between successive polls.",
        ),
        "poll_timeout": Field(
            Noneable(float),
            default_value=None,
            description=(
                "The maximum time to wait before this operation is timed out. By "
                "default, this will never time out."
            ),
        ),
        "yield_materializations": Field(
            config=Bool,
            default_value=True,
            description=(
                "If True, materializations corresponding to the results of the Census sync will "
                "be yielded when the op executes."
            ),
        ),
        "asset_key_prefix": Field(
            config=Array(str),
            default_value=["census"],
            description=(
                "If provided and yield_materializations is True, these components will be used to "
                "prefix the generated asset keys."
            ),
        ),
    },
    tags={COMPUTE_KIND_TAG: "census"},
)
def census_trigger_sync_op(context):
    """Executes a Census sync for a given ``sync_id`` and polls until that sync completes, raising
    an error if it is unsuccessful.
    It outputs a :py:class:`~dagster_census.CensusOutput` which contains the details of the Census
    sync after it successfully completes.
    It requires the use of the :py:class:`~dagster_census.census_resource`, which allows it to
    communicate with the Census API.
    **Examples:**
    .. code-block:: python
        from dagster import job
        from dagster_census import census_resource, census_sync_op
        my_census_resource = census_resource.configured(
            {
                "api_key": {"env": "CENSUS_API_KEY"},
            }
        )
        sync_foobar = census_sync_op.configured({"sync_id": "foobar"}, name="sync_foobar")
        @job(resource_defs={"census": my_census_resource})
        def my_simple_census_job():
            sync_foobar()
    """
    census_output = context.resources.census.trigger_sync_and_poll(
        sync_id=context.op_config["sync_id"],
        force_full_sync=context.op_config["force_full_sync"],
        poll_interval=context.op_config["poll_interval"],
        poll_timeout=context.op_config["poll_timeout"],
    )
    if context.op_config["yield_materializations"]:
        yield generate_materialization(
            census_output, asset_key_prefix=context.op_config["asset_key_prefix"]
        )
    yield Output(census_output)