search
Search
Login
Unlock 100+ guides
menu
menu
web
search toc
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
What does this mean?
Why is this true?
Give me some examples!
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to

Comprehensive Guide on IO Managers in Dagster

schedule Aug 11, 2023
Last updated
local_offer
Dagster
Tags
mode_heat
Master the mathematics behind data science with 100+ top-tier guides
Start your free 7-days trial now!

What is an IOManager in Dagster?

The IO manager handles where data is read from as well as where it is written to. Recall from our Getting Started with Dagster guide that Dagster writes assets and run-related data to a temporary folder that is deleted when the Dagster server is terminated.

We can also store our output files permanently in a remote blob storage (e.g. Google Cloud Storage) or in a persistent folder within our local repository that will not be deleted even after terminating the Dagster server. One way of achieving this is by using IO managers.

Setting up our Dagster environment

To demonstrate how to use IOManagers in Dagster, we need to first set up our own Dagster environment. We will use the same simple setup as the one used in our Getting Started with Dagster guide:

my_dagster_code_location
├── __init__.py
└── my_assets.py

Where the __init__.py is:

from dagster import Definitions, load_assets_from_modules
from . import my_assets

all_assets = load_assets_from_modules([my_assets])
defs = Definitions(assets=all_assets)

And the my_assets.py is:

from dagster import asset
import pandas as pd

@asset(name="iris_data")
def get_iris_data():
return pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")

@asset(name="setosa")
def get_setosa(iris_data):
return iris_data.query("species == 0")

For an explanation of what all of this code mean, please refer to our Getting Started with Dagster guide.

Saving to a persistent local folder

As a simple example, let's use the FilesystemIOManager to write our assets in a persistent local folder. All IOManagers are a Dagster resource, which means we must specify them in our Definitions object in the root __init__.py file like so:

from dagster import Definitions, load_assets_from_modules, FilesystemIOManager
from . import my_assets

all_assets = load_assets_from_modules([my_assets])
io_manager = FilesystemIOManager(base_dir="my_data")

defs = Definitions(
assets=all_assets,
resources={
"io_manager": io_manager,
},
)

By default, all our assets (defined in my_assets.py) will use the IOManager defined under the io_manager key unless specified otherwise.

Now, launch the Dagster UI server like so:

dagster dev -m my_dagster_code_location
...
2023-07-15 13:22:52 +0800 - dagit - INFO - Serving dagit on http://127.0.0.1:3000 in process 49252

Head over to the Dagster UI and click on the Materialize all button. We should see a new folder called my_data, and within it, our materialized assets like so:

my_dagster_code_location
├── __init__.py
└── my_assets.py
my_data
├── iris_data
└── setosa

Here, the path of the my_data directory is determined based on the path we were at when we launched the Dagster server. Since we launched the server in the root project directory, this is where Dagster created the my_data directory.

We can also see the path of our iris_data in the Dagster UI by clicking on the iris_data node in the graph:

By default, the materialized assets are in pickle format. We will later define our custom IO manager to save our assets in another format (csv). Note that even when we terminate the Dagster server, the my_data folder will not be deleted.

Defining a custom IO manager

To define a custom IO manager, we can create a class that extends Dagster's ConfigurableIOManager class. As an example, let's define an IO manager in a new file called my_io_manager.py that writes to and reads from a Pandas DataFrame as a CSV file:

from dagster import ConfigurableIOManager, InputContext, OutputContext
import pandas as pd

class MyIOManager(ConfigurableIOManager):

def _get_path(self, context) -> str:
return "/".join(context.asset_key.path) + ".csv"

def handle_output(self, context: OutputContext, df: pd.DataFrame):
df.to_csv(self._get_path(context))

def load_input(self, context: InputContext) -> pd.DataFrame:
return pd.read_csv(self._get_path(context))

Note the following:

  • the methods handle_output(-) and load_input(-) are mandatory. These are the two main methods that perform the write and read, respectively.

  • the first parameter of the methods handle_output(-) and load_input(-) is Dagster's context object, which includes useful information such as the name of the asset that is to be written/read. The second parameter depends on our use case - since our assets are a Pandas DataFrame, this parameter will be a Pandas DataFrame.

  • the context.asset_key.path property is a list holding the paths of the asset. In our case, it is a simple list containing only the name of the assets (["iris_data"] and ["setosa"]).

Since IO managers are a Dagster resource, we must update our Definitions in the __init__.py file like so:

from dagster import Definitions, load_assets_from_modules
from . import my_assets
from .my_io_manager import MyIOManager

all_assets = load_assets_from_modules([my_assets])

defs = Definitions(
assets=all_assets,
resources={
"my_io_manager": MyIOManager()
}
)

Next, we must add a new property io_manager_key to the asset decorators in our my_assets.py file like so:

from dagster import asset
import pandas as pd

@asset(name="iris_data", io_manager_key="my_io_manager")
def get_iris_data():
return pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")

@asset(name="setosa", io_manager_key="my_io_manager")
def get_setosa(iris_data):
return iris_data.query("species == 0")

Note that if we use the key io_manager instead of my_io_manager, we would not have to add io_manager_key property in the asset decorator since all assets use the IO manager defined under io_manager by default.

Now, head over to the Dagster UI, reload the definitions and materialize all assets. We should now be able to see our materialized assets in CSV format:

my_dagster_code_location
├── __init__.py
├── my_io_manager.py
└── my_assets.py
iris_data.csv
setosa.csv

Passing in a configuration

We can customize our IO managers programmatically by defining custom properties. To demonstrate, let's create a new property called path_prefix, which indicates where we wish to store and read the CSV file:

from dagster import ConfigurableIOManager, InputContext, OutputContext
import pandas as pd
import os

class MyIOManager(ConfigurableIOManager):

path_prefix: str = ""

def _get_path(self, context) -> str:
os.makedirs(self.path_prefix, exist_ok=True) # make sure the directory exists
return self.path_prefix + "/".join(context.asset_key.path) + ".csv"

def handle_output(self, context: OutputContext, df: pd.DataFrame):
df.to_csv(self._get_path(context))

def load_input(self, context: InputContext) -> pd.DataFrame:
return pd.read_csv(self._get_path(context))

In our __init__.py file, we can pass in the parameter path_prefix when instantiating the MyIOManager like so:

from dagster import Definitions, load_assets_from_modules
from . import my_assets
from .my_io_manager import MyIOManager

all_assets = load_assets_from_modules([my_assets])

defs = Definitions(
assets=all_assets,
resources={
"my_io_manager": MyIOManager(path_prefix="my_data/")
}
)

Now, when we materialize our assets, we will get:

...
my_data
├── iris_data.csv
└── setosa.csv

Passing in a configuration using environment variables

To demonstrate how we can read environment variables in Dagster, suppose we have a .env file like so:

DATA_PATH=my_data/

The current file structure of our project looks like so:

.env
my_dagster_code_location
├── __init__.py
├── my_io_manager.py
└── my_assets.py

In our __init__.py, we can read the environment variable using Dagster's EnvVar class like so:

from dagster import Definitions, load_assets_from_modules, EnvVar
from . import my_assets
from .my_io_manager import MyIOManager

all_assets = load_assets_from_modules([my_assets])

defs = Definitions(
assets=all_assets,
resources={
"my_io_manager": MyIOManager(path_prefix=EnvVar("DATA_PATH"))
}
)

Materializing our assets will give us the following:

.env
my_dagster_code_location/
my_data
├── iris_data.csv
└── setosas.csv

We could read environment variables using Python's os.env(-) or dotenv package but using Dagster's EnvVar is the standard practice when using Dagster. This is because the name of the environment variables will appear in the Dagster UI only when using EnvVar.

Let's see our environment variable in the Dagster UI. Click on Deployment in the header to get:

Click on our code location and then click on the Resources tab:

Click on our custom resource manager and we will see the configuration (defined as an environment variable) for this resource:

We see that the path_prefix configuration parameter is assigned the value of DATA_PATH, which is defined as an environment variable!

Logging asset metadata

When defining custom IO managers, we can also log metadata about our outputs. Let's extend our previous example such that we also log some metadata about our output DataFrames:

from dagster import ConfigurableIOManager, InputContext, OutputContext
import pandas as pd
import os

class MyIOManager(ConfigurableIOManager):

path_prefix: str = ""

def _get_path(self, context) -> str:
os.makedirs(self.path_prefix, exist_ok=True)
return self.path_prefix + "/".join(context.asset_key.path) + ".csv"

def handle_output(self, context: OutputContext, df: pd.DataFrame):
context.add_output_metadata({
"nrows": len(df),
"preview": df.head().to_markdown(),
})
df.to_csv(self._get_path(context))

def load_input(self, context: InputContext) -> pd.DataFrame:
return pd.read_csv(self._get_path(context))

Now, materialize our assets using Dagster UI. If we click on our materialized assets, we should see the logged metadata:

Using built-in blob storage IO managers

Dagster provides IO managers that connect to popular blob storages such as Google Cloud Storage, AWS S3 and Azure Blob Storage. In this section, we will demonstrate how to use the Azure Blob Storage IO manager, but setting up the other two IO managers should be very similar.

WARNING

The built-in IO manager for Azure Blob Storage is only for Azure Data Lake Storage Gen2 (adls2). This does not work for the legacy Azure Blob storage unless you manually upgrade it via the Azure portal.

By default, the built-in IO managers are not part of the core dagster package. Therefore, we must install them like so:

pip install dagster-azure

Now, consider the following main.py file:

from dagster import Definitions, asset
from dagster_azure.adls2 import adls2_pickle_io_manager, adls2_resource

@asset
def asset_one():
return 3

@asset
def asset_two(asset_one):
return asset_one + 2

defs = Definitions(
assets=[asset_one, asset_two],
resources={
"io_manager": adls2_pickle_io_manager.configured(
{
"adls2_file_system": "democontainer", # name of the container
"adls2_prefix": "my_dagster", # name of the directory to place data
}
),
"adls2": adls2_resource.configured({
"storage_account": "demostorageskytowner",
"credential": {
"sas": "?sv=2022-11-02&ss=bfqt&s*****"
}
}),
},
)

Note the following:

  • under resources, we must specify both io_manager and adls2.

  • under io_manager, we supply the container name and the directory where the data will be stored.

  • under adls2, we specify the storage account name as well as the credential (SAS token).

Now, launch the Dagster UI like so:

dagster dev -f main.py

Let's materialize all the assets using the Materialize all button. Here's what happens:

  1. asset_one is materialized and written to Azure Blob Storage.

  2. asset_one is read from Azure Blob Storage and asset_two is materialized.

  3. asset_two is written to Azure Blob Storage.

In our Azure Blob Storage, we should see our my_dagster folder at the root level:

Within the my_dagster directory, we should see both our assets:

These are pickle files - if we wish to output them in other formats (e.g. CSV or parquet), we must write our own custom IO manager. Personally, I feel that these built-in IO managers are neither intuitive nor flexible to use, so I would just go ahead and write my own custom IO manager.

robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_up
thumb_down
Comment
Citation
Ask a question or leave a feedback...