search
Search
Login
Unlock 100+ guides
menu
menu
web
search toc
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
What does this mean?
Why is this true?
Give me some examples!
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to

Accessing Execution Context within Assets in Dagster

schedule Aug 12, 2023
Last updated
local_offer
Dagster
Tags
mode_heat
Master the mathematics behind data science with 100+ top-tier guides
Start your free 7-days trial now!

Accessing execution context in an asset

We can provide some execution context to our assets by passing in a context object as the first argument. For instance, consider the following code_location_file.py file:

from dagster import asset, Definitions, OpExecutionContext
import pandas as pd

@asset(name="iris_data")
def get_iris_data(context: OpExecutionContext):
context.log.info(f"My run ID is {context.run_id}")
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
return df

defs = Definitions(assets=[get_iris_data])

Here, context is an OpExecutionContext object that has useful properties such as run_id, resources and log. Note that the context object is accessible only if we use this specific name, that is, using the name contextt will not work.

Let's start up the Dagster UI like so:

dagster-webserver -f code_location_file.py

On the UI, materialize the iris_data asset. We should see the logs for our run in the Dagster UI:

Logging using the context object

As we've just demonstrated, the context object has a log property that we can use for logging information, which will be accessible in the Dagster UI. We can log at different levels (debug, info, warning, error, critical, event). Let's modify the code_location_file.py file like so:

from dagster import asset, Definitions, OpExecutionContext
import pandas as pd

@asset(name="iris_data")
def get_iris_data(context: OpExecutionContext):
context.log.info("I am an info log")
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
context.log.warning("I am a warning log")
return df

defs = Definitions(assets=[get_iris_data])

Head over to the Dagster UI and we should see our logged messages:

Note that this view only shows the messages logged by Dagster's logger. For instance, if we call print("Hello") in our code, "Hello" will not appear here. To see our usual stdout and stderr, click on the following button:

Accessing resources using the context object

We can access the resources specified in our Definitions object using the context object as well. For instance, suppose we have an object (DatabaseConn) that interacts with a remote database:

from dagster import Definitions, asset, ConfigurableResource
import pandas as pd

class DatabaseConn():
def __init__(self, username, password) -> None:
self.username = username
self.password = password

def insert_data(self, df: pd.DataFrame):
print("Inserting data into DB...")

@asset(name="iris_data", required_resource_keys={"my_db_conn"})
def get_iris_data(context):
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
context.resources.my_db_conn.insert_data(df)
return df

defs = Definitions(
assets=[get_iris_data],
resources={
"my_db_conn": DatabaseConn(
username="robocat",
password="meow"
)
}
)

Here, note the following:

  • we specified the required_resource_keys property in the asset decorator, which allows us to access the resource within the function via context.resources.{resource_name}.

  • without required_resources_keys, we will not be able to access the resource!

Accessing resources using ConfigurableResource

We have just demonstrated how to access the execution context using the context object. The other way of accessing the resource is as follows:

from dagster import Definitions, asset, ConfigurableResource
import pandas as pd

class DatabaseConn(ConfigurableResource):
username: str
password: str

def insert_data(self, df: pd.DataFrame):
print("Inserting data into DB...")

@asset(name="iris_data")
def get_iris_data(my_db_conn: DatabaseConn):
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
my_db_conn.insert_data(df)
return df

defs = Definitions(
assets=[get_iris_data],
resources={
"my_db_conn": DatabaseConn(
username="robocat",
password="meow"
)
}
)

Here, our resource class DatabaseConn must inherit Dagster's ConfigurableResource so that we can pass in the my_db_conn object as a parameter to the asset - otherwise, an error will be thrown. However, it is bad practice to place our credentials in code. In particular, Dagster UI will show the credentials openly like so:

To prevent this from happening, these sensitive information should be stored in a separate file (e.g. typically in a .env file) and imported into the code as environment variables.

Using environment variables

In Dagster world, objects that are created using ConfigurableResource are referred to as a structured Pydantic config object in the official documentation. What's nice about these Dagster objects is that their properties can be set using Dagster's EnvVar, which reads environment variables from a .env file. Let's now demonstrate this.

Suppose we have the following two files:

.env
main.py

Where the .env file contains the credentials of our database:

DB_USERNAME=robocat
DB_PASSWORD=meow

Where the main.py is as follows:

from dagster import Definitions, asset, ConfigurableResource, EnvVar
import pandas as pd

class DatabaseConn(ConfigurableResource):
username: str
password: str

def insert_data(self, df: pd.DataFrame):
print("Inserting data into DB...")

@asset(name="iris_data")
def get_iris_data(my_db_conn: DatabaseConn):
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
my_db_conn.insert_data(df)
return df

defs = Definitions(
assets=[get_iris_data],
resources={
"my_db_conn": DatabaseConn(
username=EnvVar("DB_USERNAME"),
password=EnvVar("DB_PASSWORD")
)
}
)

In the Dagster UI, we should see the configuration:

Notice how the values of the configuration fields are hidden - this is great for security!

NOTE

Dagster's EnvVar only works when setting values of configurable Dagster objects (e.g. ConfigurableResource). This means that if our DatabaseConn did not inherit ConfigurableResource, EnvVar would not work.

robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_up
thumb_down
Comment
Citation
Ask a question or leave a feedback...