Testing Guide

Testing computations and pipelines in data applications is notoriously challenging. Because of this, they often go relatively untested before hitting production. If there is testing in place, these tests are often slow, not run during common developer workflows, and have limited value because of the inability to simulate conditions in the production environment.

Two underlying facts, more than any others, account for this challenge:

  1. Data applications encode much of their business logic in heavy, external systems. Examples include processing systems like Spark, and data warehouses such as Snowflake and Redshift. It is a) difficult to structure software to isolate these dependencies and b) difficult or impossible to run them in a lightweight manner.

  2. Data applications usually do not control their inputs. They must take the data given and compute over it. This means that the computations encode innumerable implicit, unexpressed assumptions about the incoming data.

Without accounting for these unique properties of data applications, blindly applying traditional software techniques can results in tests that are low-value, ineffective, or more trouble than they are worth.

Environments and Business Logic

As noted above, data applications often rely on and encode their business logic in code that is executed by heavy, external dependencies. This means that it is easy and natural to couple your application to a single operating environment. However if you do this, any testing requires your production environment. This is a big problem.

To even make local testing possible, one much structure your software to, as much as possible, cleanly separate this business logic from your operating environment. This is one the reasons why Dagster flows through a context object throughout its entire computation.

Attached to the context is set of user-defined resources. Examples of resources include APIs to data warehouses, Spark clusters, s3 sessions, or some other external dependency or service. We can then configure pipelines to be in different “modes”, which can alter what version of the resource is vended to the user.

Unit Tests in Data Application

Principle: Errors that can be caught by unit tests should be caught by unit tests.

Corollary: Do not attempt to unit test for errors than cannot be caught by unit tests.

Using unit tests without keeping these principles in mind is on the reasons unit tests are frequently treated with skepticism in the data community. Unit testing is too often intepreted as simulating an external system such as Spark or data warehouse in a granular manner. Those are very complex systems which are impossible to faithfully emulate. Do not try to do so.

Attempts to do this generally lead down a road where a programmer is spending huge amounts of time manually mocking, stubbing, or simulating the intricate behaviors of an externalized system. This is a massive waste of time and energy. Data applications will not be fully tested by unit tests, and one should not attempt to do so.

Put another way, unit tests are not acceptance tests. They should not be the arbiter of whether a computation is correct. However unit testing – when properly scoped – is still valuable in data applications.

There are massive classes of errors that can be addressed without interacting with external services: refactoring errors, syntax errors in interpreted languages, configuration errors, graph structure errors, and so on. Errors caught in a fast feedback loop of unit testing can be addressed orders of magnitude faster than those caught during an expensive batch computation in staging or production. With this dynamic in place engineers feel more empowered to do refactoring and other code quality improvements, because those errors can be caught earlier in the process.

In this domain they should be viewed primarily as productivity and code quality tools, which in the end lead to more correct calculations.

Example

We will be using these concepts to build up a reusable Solid that downloads a file from an external s3 bucket that we do not control and caching it in a location that we control. This local can be either a folder in the local file system or our s3 bucket. By default if the file is already there, the download does not occur, but this behavior can be changed via configuration.

First Pass

Let’s build a simple version of this Solid:

def file_cache_folder():
    return 'file_cache'

@solid
def cache_file_from_s3(_, s3_coord: S3Coordinate) -> str:
    # we default the target_key to the last component of the s3 key.
    target_key = s3_coord['key'].split('/')[-1]

    # helpful wrapper around tempfile provided by dagster.utils
    with get_temp_file_name() as tmp_file:
        # boto3 hardcoded dependency, difficult to mock
        boto3.client('s3').download_file(
            Bucket=s3_coord['bucket'],
            Key=s3_coord['key'],
            Filename=tmp_file
        )

        # file_cache_folder dependency, difficult to mock
        target_path = os.path.join(file_cache_folder(), target_key)

        with open(tmp_file, 'rb') as tmp_file_object:
            with open(target_path, 'wb') as target_file_object:
                shutil.copyfileobj(tmp_file_object, target_file_object)
                return target_path

Testing this is possible but it is awkward. This is how one would do a simple test for this code using the built-in python mock module.


def test_cache_file_from_s3_step_one_one():
    boto_s3 = mock.MagicMock()
    # mock.patch is difficult to get right and requires monkeypatching of global artifacts
    with get_temp_dir() as temp_dir, mock.patch(
        file_cache_folder.__module__ + '.file_cache_folder',
        new=lambda: temp_dir
    ), mock.patch(
        'boto3.client',
        new=lambda *_args, **_kwargs: boto_s3
    ):
        @solid
        def emit_value(_):
            return {'bucket': 'some-bucket', 'key': 'some-key'}

        @pipeline
        def pipe():

            return cache_file_from_s3(emit_value())

        execute_pipeline(pipe)

        assert boto_s3.download_file.call_count == 1

        assert os.path.exists(os.path.join(temp_dir, 'some-key'))

This is for a simple case. For more complex cases this would becoming increasingly difficult to the point where the cost of building and maintaining the test could very well be greater than the value of the test.

We’re going to start to introduce dagster concepts to make this testable and executable in more contexts.

Testing utility: execute_solid

This first thing we are going to do is introduce execute_solid, which is very convenient for unit-testing. This makes it so that the author no longer has to construct emphemeral pipelines for the sole purpose or running a test or shimming in a value.

def test_cache_file_from_s3():
    boto_s3 = mock.MagicMock()
    # mock.patch is difficult to get right and requires
    # monkeypatching of global artifacts
    with get_temp_dir() as temp_dir, mock.patch(
        file_cache_folder.__module__ + '.file_cache_folder',
        new=lambda: temp_dir
    ), mock.patch(
        'boto3.client',
        new=lambda *_args, **_kwargs: boto_s3
    ):
        execute_solid(
            cache_file_from_s3,
            input_values={
                's3_coord': {
                    'bucket': 'some-bucket', 'key': 'some-key'
                }
            },
        )

        assert boto_s3.download_file.call_count == 1

        assert os.path.exists(os.path.join(temp_dir, 'some-key'))

Using the FileCache resource

Dagster with the FileCache resource out-of-the-box, which is designed to fulfill this exact use case. Thie provides a layer of indirection for easily providing test implementations in a structure member as well as (as we’ll see later) swapping out different implementations for different environments (such as a cloud environment.)

def test_cache_file_from_s3():
    boto_s3 = mock.MagicMock()
    with get_temp_dir() as temp_dir, mock.patch(
        'boto3.client', new=lambda *_args, **_kwargs: boto_s3
    ):
        execute_solid(
            cache_file_from_s3,
            # This helper method allows resource instances
            # to be passed in directly
            ModeDefinition.from_resources({
                'file_cache': FSFileCache(temp_dir)
            }),
            input_values={
                's3_coord': {
                    'bucket': 'some-bucket', 'key': 'some-key'
                }
            },

        assert boto_s3.download_file.call_count == 1

        assert os.path.exists(os.path.join(temp_dir, 'some-key'))

Configuration

These concepts also plug into Dagster’s configuration system. Here we accomplish run the same test but by specificying configuration.


def test_cache_file_from_s3():
    boto_s3 = mock.MagicMock()
    with get_temp_dir() as temp_dir, mock.patch(
        'boto3.resource', new=lambda *_args, **_kwargs: boto_s3
    ):
        execute_solid(
            cache_file_from_s3,
            ModeDefinition(resource_defs={'file_cache': fs_file_cache}),
            environment_dict={
                'resources': {
                    'file_cache': {'config': {'target_folder': temp_dir}}
                },
                'solids': {
                    'cache_file_from_s3': {
                        'inputs': {
                            's3_coord': {
                                'bucket': 'some-bucket',
                                'key': 'some-key'
                            },
                        },
                    },
                },
            },
        )

        assert boto_s3.download_file.call_count == 1

        assert os.path.exists(os.path.join(temp_dir, 'some-key'))

Mocks and Fakes

Now we add also add an S3Resource to ease testability. You’ll note that the monkeypatch of a global symbol via mock.patch has been eliminated.

def unittest_for_local_mode_def(temp_dir, s3_session):
    return ModeDefinition.from_resources({
        'file_cache': FSFileCache(temp_dir),
        's3': S3Resource(s3_session),
    })

def test_cache_file_from_s3():
    s3_session = mock.MagicMock()
    # the mock.patch to the global boto3.resource is gone
    with get_temp_dir() as temp_dir:
        execute_solid(
            cache_file_from_s3,
            unittest_for_local_mode_def(temp_dir, s3_session),
            input_values={
                's3_coord': {
                    'bucket': 'some-bucket', 'key': 'some-key'
                }
            },
        )

        assert s3_session.download_file.call_count == 1

        assert os.path.exists(os.path.join(temp_dir, 'some-key'))

However we can go further. For commonly used resources with a well-defined APIs, we encourage the creation of fakes instead of mocks. Furthermore, we hope that an ecosystem of fake resources develops alongside the ecosystem. With a fake, we can write better test. We’ll demonstrate using the S3FakeSession instead a mock.

def test_cache_file_from_s3():
    s3_session = S3FakeSession({'some-bucket': {'some-key': b'foo'}})

    with get_temp_dir() as temp_dir:
        execute_solid(
            cache_file_from_s3,
            unittest_for_local_mode_def(temp_dir, s3_session),
            input_values={
                's3_coord': {
                    'bucket': 'some-bucket',
                    'key': 'some-key'
                }
            },
        )

        # now check the local file system that the fake has written to
        target_file = os.path.join(temp_dir, 'some-key')
        assert os.path.exists(target_file)

        with open(target_file, 'rb') as ff:
            assert ff.read() == b'foo

Multiple Environments

In our last stage, we will make this solid executable in multiple environments. Namely we will use an s3-based file cache instead of a fs-based file cache and verify that it places files in s3 as its contract promises.

The first step is that we have to make a minor modification to have the solid return a FileHandle instead of a str.

@solid
def cache_file_from_s3(context, s3_coord: S3Coordinate) -> FileHandle:
    # we default the target_key to the last component of the s3 key.
    target_key = s3_coord['key'].split('/')[-1]

    with get_temp_file_name() as tmp_file:
        context.resources.s3.session.download_file(
            Bucket=s3_coord['bucket'],
            Key=s3_coord['key'],
            Filename=tmp_file,
        )

        file_cache = context.resources.file_cache
        with open(tmp_file, 'rb') as tmp_file_object:
            # returns a handle rather than a path
            file_handle = file_cache.write_file_object(
                target_key,
                tmp_file_object
            )
            return file_handle

When the s3 file cache is used, write_file_object returns an S3FileHandle, and in the local file system case, it returns a LocalFileHandle. From the standpoint of a user they can effectively ignore this as long as they are interacting with resources like the FileCache that deal with file handles.

With this we can use an S3FileCache instead:

def unittest_for_aws_mode_def(s3_file_cache_session, s3_session):
    return ModeDefinition.from_resources(
        {
            'file_cache': S3FileCache(
                'file-cache-bucket',
                'file-cache',
                s3_file_cache_session
             ),
            's3': S3Resource(s3_session),
        }
    )


def test_cache_file_from_s3_step_four(snapshot):
    s3_session = S3FakeSession(
        {'source-bucket': {'source-file': b'foo'}}
    )
    s3_file_cache_session = S3FakeSession()

    solid_result = execute_solid(
        cache_file_from_s3,
        unittest_for_aws_mode_def(s3_file_cache_session, s3_session),
        input_values={
            's3_coord': {
                'bucket': 'source-bucket', 'key': 'source-file'
            }
        },
    )

    path_desc = solid_result.output_value().path_desc

    # returned correct s3 path
    assert path_desc == 's3://file-cache-bucket/file-cache/source-file'

    # confirm that the correct data is in the right spot
    file_cache_obj = s3_file_cache_session.get_object(
        Bucket='file-cache-bucket', Key='file-cache/source-file'
    )

    assert file_cache_obj['Body'].read() == b'foo'

Note how the developer can keep the two fakes separate in these cases, so one only has to deal with one thing at a time.

This resource system provides

  1. An intuitive and logical seam where application designers can provide mock and fake implementations testing.

  2. This same seam can be used to execute the same business logic in different environments and clouds, giving users the opportunity to build reusable, sharable Solids.

  3. A point of configuration. Resources declare their configuration in a self-describing way, and can be configured in the typeahead in dagit and with high quality error message.

  4. The foundations of an entire ecosystem of resources and fakes.