Pipette

pipette package

Submodules

pipette.pipette module

Pipette speeds up your experiments by figuring out which parts of your workflow have to be re-run, and which parts can be read from earlier results. It also organizes the results of the tasks in the workflow, so that you can run different steps on different machines, locally, or in the cloud. You no longer need to track files in spreadsheets, and invent clever naming schemes to keep everything straight.

class pipette.pipette.Task(**kwargs)

Bases: typing.Generic

The base class for pipette Tasks.

When you are writing your own tasks, at a minimum, override VERSION_TAG and INPUTS. You usually also want to set DEFAULTS, and an OUTPUT_FORMAT.

Here is a complete example of a task:

class PredictNextChampion(pipette.Task[str]):
    VERSION_TAG = "002arsenal"
    INPUTS = {
        "goals_per_team": pipette.Task[Dict[str, int]],
        "wins_per_team": pipette.Task[Dict[str, int]],
        "b": float
    }
    DEFAULTS = {
        "b": 0.37
    }
    OUTPUT_FORMAT = pipette.jsonFormat

    def do(self, goals_per_team: Dict[str, int], wins_per_team: Dict[str, int], b: float):
        results = []
        for team, goals in goals_per_team.items():
            results.append((goals * b + wins_per_team.get(team, 0), team))
        results.sort(reverse=True)
        return results[0][1]
CACHE_RESULTS = True

If this is set to False, pipette will never cache the results of this step. That might be useful for debugging, or for steps that run quickly.

Of course, the whole point of pipette is to cache results, so I don’t expect this to be used much.

DEFAULTS = {}

Specifies the defaults for input parameters for this task.

Here is an example:

DEFAULTS = {
    "count_home_goals": True,
    "count_away_goals": True,
    # You could specify another task as a default argument, but we won't do that in the example.
    #"matches": SimplifyTask(csv_file_path="season-1819.csv")
}
INPUTS = {}

Specifies the possible input parameters for this task, mapping the parameters’ names to their types.

Here is an example:

INPUTS = {
    "count_home_goals": bool,
    "count_away_goals": bool,
    "matches": pipette.Task[Iterable[Match]]    # You can specify another task as an input like this.
}
NO_HASH_INPUTS = {}

Specifies the names of inputs that, when set to the values given here, are not used to compute the unique name for the output.

This is useful when you want to add a parameter, but you don’t want to invalidate existing output that you created when you didn’t have that parameter yet.

Note that the value for this can be different from the value in DEFAULTS. You can always change the values in DEFAULTS, but you can never change the values in NO_HASH_INPUTS. You can only add new values to NO_HASH_INPUTS.

OUTPUT_FORMAT = <pipette.pipette.DillFormat object>

Specifies the output format of the results of this task.

Good choices are

  • pipette.jsonlGzFormat for things you also want to manipulate outside of Python, i.e., with jq

  • pipette.json for small things, like final scores, or individual strings

  • pipette.dillFormat for arbitrary Python objects

VERSION_TAG = NotImplemented

VERSION_TAG specifies the version of the task. Bump it when the output changes in a significant way. It will cause the task itself, and all downstream tasks to be re-run. By convention, the tags look like "001foo", with a number up front, and a micro-description afterwards.

It is recommended, but not required, that version tags increase lexicographically with each iteration.

dependencies() → Dict[str, List[pipette.pipette.Task]]

Returns all tasks that this task depends on.

This is extracted from the inputs given to the task.

Results come back in a dictionary mapping the name of the input parameter to a list of tasks that are specified under that name. The same task might show up multiple times.

do(**inputs)

Do the actual work of the task.

This receives the parameters that were defined in the INPUTS dict. Pipette performs some rudimentary type checking on these inputs before passing them to this function.

flat_unique_dependencies() → Iterable[pipette.pipette.Task]

Returns an iterable of tasks that this task depends on.

This is extracted from the inputs given to the task.

static hash_object(o: Any) → str

Returns a 16-character hash code of arbitrary Python objects.

incomplete_dependencies() → Dict[str, List[pipette.pipette.Task]]
incomplete_dependency_count() → int
output_exists() → bool

Returns whether or not the output for this task already exists.

output_locked() → bool

Returns whether or not the output for this task is locked.

Outputs should only be locked while a process is working on producing this output.

output_name()

Returns the name of the results of this task.

This works whether or not the task has been completed yet.

output_url() → str

Returns a copy-and-paste friendly rendition of the output file of this task.

printable_inputs() → str
recursive_unique_dependencies() → Iterable[pipette.pipette.Task]
results()

Returns the results of this task.

This also runs all tasks that this task depends on, and caches all results, including the result from this task itself.

serialized_task_config() → str

Returns a serialized configuration of this task.

You can use the result of this to transfer the input parameters to another machine and run the task there.

class pipette.pipette.TaskStub

Bases: tuple

We use this to cut off the dependency chain for tasks that are already done.

file_name

Alias for field number 0

format

Alias for field number 1

pipette.pipette.create_from_serialized_task_config(task_config: str) → pipette.pipette.Task

Creates a task using the serialized form from Task.serialized_task_config().

pipette.pipette.to_graphviz(task_or_tasks: Union[pipette.pipette.Task, Iterable[pipette.pipette.Task]]) → str

Returns the complete task graph, in Dot format, ready for Graphviz.

You can paste the results into http://www.webgraphviz.com to see the graph in rendered form.

pipette.pipette.main(args: List[str], tasks: Union[pipette.pipette.Task, List[pipette.pipette.Task], None] = None) → int

A main function that can operate on lists of tasks.

There are two uses for this. One is as a standalone program, which lets you do things like:

python -m pipette run "tests.premierleague.PredictNextChampion-002arsenal(eNprYI4QYGBgMCu3sMg3rqjKNzVISjbJrihkqC1k1IhgBEolFTK529966L5OpOphBB9QID0_Mac4viC1KL4kNTG3kNmbMTBCGyjuDhIPSC0KAYrqGhgYpmUWFZfo5iQVVeZkF5uU5uYUp5unFuhlFefnFbIkF2QWpJaUpOpBaS4voLBbflFuYglXIatmYyFbWyF7YyFHBC_Q5PLMPCQLOUEWagGFw4HCGPZl5hSX56fkmZVlmhRmpGWaQ-zjygCax91YyFPaVsirBwA5FU99)"

You can also use it as a substitute for your own main method, by putting this at the bottom of your script:

if __name__ == "__main__":
    import sys
    pipette.main(sys.argv, tasks)
class pipette.pipette.Store

Bases: object

A key/value store that Pipette uses to store the results from Tasks.

exists(name: str) → bool

Returns True if the given result already exists in the store.

id() → str

Every store has an id. It is unique string that helps recognize when the store changes.

locked(name: str) → bool

Returns True if the given result is locked in the store.

Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.

read(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any

Reads a result from the store.

url_for_name(name: str) → str

Returns a copy-and-paste worthy URL for the result with the given name.

write(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → None

Writes a result to the store.

While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.

class pipette.pipette.LocalStore(base_path: Union[str, pathlib.Path])

Bases: pipette.pipette.Store

A store that stores the files in a local filesystem.

This is particularly effective when the local filesystem is NFS-mounted and available from multiple machines.

It is safe, though useless, to create multiple instances of LocalStore that all use the same directory.

exists(name: str) → bool

Returns True if the given result already exists in the store.

id() → str

Every store has an id. It is unique string that helps recognize when the store changes.

locked(name: str) → bool

Returns True if the given result is locked in the store.

Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.

read(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any

Reads a result from the store.

url_for_name(name: str) → str

Returns a copy-and-paste worthy URL for the result with the given name.

write(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → None

Writes a result to the store.

While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.

class pipette.pipette.BeakerStore(local_cache_path: Union[None, str, pathlib.Path] = None)

Bases: pipette.pipette.Store

A store that stores results in Beaker.

For this to work, the beaker command line tool needs to be on the path.

Optionally, this store supports a local cache. Using this cache is highly recommended, because it makes locking much more reliable. It is particularly effective to put the cache on an NFS-mounted drive, so multiple machines can share it.

Do not use the same directory both as a local cache for BeakerStore, and as the storage location for LocalStore. I don’t know what happens when you do that.

exists(name: str) → bool

Returns True if the given result already exists in the store.

id() → str

There is only one beaker, so this always returns the same id.

locked(name: str) → bool

Returns True if the given result is locked in the store.

Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.

read(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any

Reads a result from the store.

write(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>)

Writes a result to the store.

While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.

class pipette.pipette.Format

Bases: typing.Generic

Base class for file formats.

To implement, override SUFFIX, read(), and write(). Formats are usually singleton classes, and are instantiated right here in the module.

SUFFIX = NotImplemented
read(input: BinaryIO) → T

Reads input, parses it, and returns it.

write(input: T, output: BinaryIO) → None

Writes the given input out to disk.

class pipette.pipette.DillFormat

Bases: pipette.pipette.Format

A format that uses dill to serialize arbitrary Python objects.

This format has special handling for iterable types. It takes care not to read the entire iterable into memory during either reading or writing.

To use this format, simply refer to pipette.dillFormat.

SUFFIX = '.dill'
read(input: BinaryIO) → Any

Reads input, parses it, and returns it.

write(input: Any, output: BinaryIO) → None

Writes the given input out to disk.

class pipette.pipette.DillIterableFormat

Bases: pipette.pipette.Format

SUFFIX = '.dill'
read(input: BinaryIO) → Iterable[Any]

Reads input, parses it, and returns it.

write(input: Iterable[Any], output: BinaryIO) → None

Writes the given input out to disk.

class pipette.pipette.JsonFormat

Bases: pipette.pipette.Format

A format that serializes Python object with JSON.

If you are looking to serialize lists of things, you probably want JsonlFormat or JsonlGzFormat.

To use this format, simply refer to pipette.jsonFormat.

SUFFIX = '.json'
read(input: BinaryIO) → Any

Reads input, parses it, and returns it.

write(input: Any, output: BinaryIO) → None

Writes the given input out to disk.

class pipette.pipette.JsonlFormat

Bases: pipette.pipette.Format

A format that serializes lists of Python objects to JSON, one line per item.

To use this format, simply refer to pipette.jsonlFormat.

SUFFIX = '.jsonl'
read(input: BinaryIO) → Iterable[Any]

Reads input, parses it, and returns it.

write(input: Iterable[Any], output: BinaryIO) → None

Writes the given input out to disk.

class pipette.pipette.TsvFormat

Bases: pipette.pipette.Format

SUFFIX = '.tsv'
read(input: BinaryIO) → Iterable[List[str]]

Reads input, parses it, and returns it.

write(input: Iterable[List[str]], output: BinaryIO) → None

Writes the given input out to disk.

pipette.pipette.random_string(length: int = 16) → str

Returns a random string of readable characters.