Accessing Execution Context within Assets in Dagster
Start your free 7-days trial now!
Accessing execution context in an asset
We can provide some execution context to our assets by passing in a context
object as the first argument. For instance, consider the following code_location_file.py
file:
from dagster import asset, Definitions, OpExecutionContextimport pandas as pd
@asset(name="iris_data")def get_iris_data(context: OpExecutionContext): context.log.info(f"My run ID is {context.run_id}") return df
defs = Definitions(assets=[get_iris_data])
Here, context
is an OpExecutionContext
object that has useful properties such as run_id
, resources
and log
. Note that the context
object is accessible only if we use this specific name, that is, using the name contextt
will not work.
Let's start up the Dagster UI like so:
dagster-webserver -f code_location_file.py
On the UI, materialize the iris_data
asset. We should see the logs for our run in the Dagster UI:
Logging using the context object
As we've just demonstrated, the context
object has a log
property that we can use for logging information, which will be accessible in the Dagster UI. We can log at different levels (debug
, info
, warning
, error
, critical
, event
). Let's modify the code_location_file.py
file like so:
from dagster import asset, Definitions, OpExecutionContextimport pandas as pd
@asset(name="iris_data")def get_iris_data(context: OpExecutionContext): context.log.info("I am an info log") context.log.warning("I am a warning log") return df
defs = Definitions(assets=[get_iris_data])
Head over to the Dagster UI and we should see our logged messages:
Note that this view only shows the messages logged by Dagster's logger. For instance, if we call print("Hello")
in our code, "Hello"
will not appear here. To see our usual stdout
and stderr
, click on the following button:
Accessing resources using the context object
We can access the resources
specified in our Definitions
object using the context
object as well. For instance, suppose we have an object (DatabaseConn
) that interacts with a remote database:
from dagster import Definitions, asset, ConfigurableResourceimport pandas as pd
class DatabaseConn(): def __init__(self, username, password) -> None: self.username = username self.password = password
def insert_data(self, df: pd.DataFrame): print("Inserting data into DB...")
@asset(name="iris_data", required_resource_keys={"my_db_conn"})def get_iris_data(context): df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv") context.resources.my_db_conn.insert_data(df) return df
defs = Definitions( assets=[get_iris_data], resources={ "my_db_conn": DatabaseConn( username="robocat", password="meow" ) })
Here, note the following:
we specified the
required_resource_keys
property in the asset decorator, which allows us to access the resource within the function viacontext.resources.{resource_name}
.without
required_resources_keys
, we will not be able to access the resource!
Accessing resources using ConfigurableResource
We have just demonstrated how to access the execution context using the context
object. The other way of accessing the resource is as follows:
from dagster import Definitions, asset, ConfigurableResourceimport pandas as pd
class DatabaseConn(ConfigurableResource): username: str password: str
def insert_data(self, df: pd.DataFrame): print("Inserting data into DB...")
@asset(name="iris_data")def get_iris_data(my_db_conn: DatabaseConn): my_db_conn.insert_data(df) return df
defs = Definitions( assets=[get_iris_data], resources={ "my_db_conn": DatabaseConn( username="robocat", password="meow" ) })
Here, our resource class DatabaseConn
must inherit Dagster's ConfigurableResource
so that we can pass in the my_db_conn
object as a parameter to the asset - otherwise, an error will be thrown. However, it is bad practice to place our credentials in code. In particular, Dagster UI will show the credentials openly like so:
To prevent this from happening, these sensitive information should be stored in a separate file (e.g. typically in a .env
file) and imported into the code as environment variables.
Using environment variables
In Dagster world, objects that are created using ConfigurableResource
are referred to as a structured Pydantic config object in the official documentation. What's nice about these Dagster objects is that their properties can be set using Dagster's EnvVar
, which reads environment variables from a .env
file. Let's now demonstrate this.
Suppose we have the following two files:
.envmain.py
Where the .env
file contains the credentials of our database:
DB_USERNAME=robocatDB_PASSWORD=meow
Where the main.py
is as follows:
from dagster import Definitions, asset, ConfigurableResource, EnvVarimport pandas as pd
class DatabaseConn(ConfigurableResource): username: str password: str
def insert_data(self, df: pd.DataFrame): print("Inserting data into DB...")
@asset(name="iris_data")def get_iris_data(my_db_conn: DatabaseConn): df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv") my_db_conn.insert_data(df) return df
defs = Definitions( assets=[get_iris_data], resources={ "my_db_conn": DatabaseConn( username=EnvVar("DB_USERNAME"), password=EnvVar("DB_PASSWORD") ) })
In the Dagster UI, we should see the configuration:
Notice how the values of the configuration fields are hidden - this is great for security!
Dagster's EnvVar
only works when setting values of configurable Dagster objects (e.g. ConfigurableResource
). This means that if our DatabaseConn
did not inherit ConfigurableResource
, EnvVar
would not work.