Skip to content

prefect.server.api.flows

Routes for interacting with flow objects.

count_flows async

Count flows.

Source code in prefect/server/api/flows.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@router.post("/count")
async def count_flows(
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> int:
    """
    Count flows.
    """
    async with db.session_context() as session:
        return await models.flows.count_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
        )

create_flow async

Gracefully creates a new flow from the provided schema. If a flow with the same name already exists, the existing flow is returned.

Source code in prefect/server/api/flows.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@router.post("/")
async def create_flow(
    flow: schemas.actions.FlowCreate,
    response: Response,
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """Gracefully creates a new flow from the provided schema. If a flow with the
    same name already exists, the existing flow is returned.
    """
    # hydrate the input model into a full flow model
    flow = schemas.core.Flow(**flow.dict())

    now = pendulum.now("UTC")

    async with db.session_context(begin_transaction=True) as session:
        model = await models.flows.create_flow(session=session, flow=flow)

    if model.created >= now:
        response.status_code = status.HTTP_201_CREATED
    return model

delete_flow async

Delete a flow by id.

Source code in prefect/server/api/flows.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
):
    """
    Delete a flow by id.
    """
    async with db.session_context(begin_transaction=True) as session:
        result = await models.flows.delete_flow(session=session, flow_id=flow_id)
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )

read_flow async

Get a flow by id.

Source code in prefect/server/api/flows.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@router.get("/{id}")
async def read_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """
    Get a flow by id.
    """
    async with db.session_context() as session:
        flow = await models.flows.read_flow(session=session, flow_id=flow_id)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flow_by_name async

Get a flow by name.

Source code in prefect/server/api/flows.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@router.get("/name/{name}")
async def read_flow_by_name(
    name: str = Path(..., description="The name of the flow"),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """
    Get a flow by name.
    """
    async with db.session_context() as session:
        flow = await models.flows.read_flow_by_name(session=session, name=name)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flows async

Query for flows.

Source code in prefect/server/api/flows.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@router.post("/filter")
async def read_flows(
    limit: int = dependencies.LimitBody(),
    offset: int = Body(0, ge=0),
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> List[schemas.core.Flow]:
    """
    Query for flows.
    """
    async with db.session_context() as session:
        return await models.flows.read_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
            sort=sort,
            offset=offset,
            limit=limit,
        )

update_flow async

Updates a flow.

Source code in prefect/server/api/flows.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@router.patch("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_flow(
    flow: schemas.actions.FlowUpdate,
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
):
    """
    Updates a flow.
    """
    async with db.session_context(begin_transaction=True) as session:
        result = await models.flows.update_flow(
            session=session, flow=flow, flow_id=flow_id
        )
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )