Pipette¶
pipette package¶
Submodules¶
pipette.pipette module¶
Pipette speeds up your experiments by figuring out which parts of your workflow have to be re-run, and which parts can be read from earlier results. It also organizes the results of the tasks in the workflow, so that you can run different steps on different machines, locally, or in the cloud. You no longer need to track files in spreadsheets, and invent clever naming schemes to keep everything straight.
-
class
pipette.pipette.
Task
(**kwargs)¶ Bases:
typing.Generic
The base class for pipette Tasks.
When you are writing your own tasks, at a minimum, override
VERSION_TAG
andINPUTS
. You usually also want to setDEFAULTS
, and anOUTPUT_FORMAT
.Here is a complete example of a task:
class PredictNextChampion(pipette.Task[str]): VERSION_TAG = "002arsenal" INPUTS = { "goals_per_team": pipette.Task[Dict[str, int]], "wins_per_team": pipette.Task[Dict[str, int]], "b": float } DEFAULTS = { "b": 0.37 } OUTPUT_FORMAT = pipette.jsonFormat def do(self, goals_per_team: Dict[str, int], wins_per_team: Dict[str, int], b: float): results = [] for team, goals in goals_per_team.items(): results.append((goals * b + wins_per_team.get(team, 0), team)) results.sort(reverse=True) return results[0][1]
-
CACHE_RESULTS
= True¶ If this is set to False, pipette will never cache the results of this step. That might be useful for debugging, or for steps that run quickly.
Of course, the whole point of pipette is to cache results, so I don’t expect this to be used much.
-
DEFAULTS
= {}¶ Specifies the defaults for input parameters for this task.
Here is an example:
DEFAULTS = { "count_home_goals": True, "count_away_goals": True, # You could specify another task as a default argument, but we won't do that in the example. #"matches": SimplifyTask(csv_file_path="season-1819.csv") }
-
INPUTS
= {}¶ Specifies the possible input parameters for this task, mapping the parameters’ names to their types.
Here is an example:
INPUTS = { "count_home_goals": bool, "count_away_goals": bool, "matches": pipette.Task[Iterable[Match]] # You can specify another task as an input like this. }
-
NO_HASH_INPUTS
= {}¶ Specifies the names of inputs that, when set to the values given here, are not used to compute the unique name for the output.
This is useful when you want to add a parameter, but you don’t want to invalidate existing output that you created when you didn’t have that parameter yet.
Note that the value for this can be different from the value in
DEFAULTS
. You can always change the values inDEFAULTS
, but you can never change the values inNO_HASH_INPUTS
. You can only add new values toNO_HASH_INPUTS
.
-
OUTPUT_FORMAT
= <pipette.pipette.DillFormat object>¶ Specifies the output format of the results of this task.
Good choices are
pipette.jsonlGzFormat
for things you also want to manipulate outside of Python, i.e., withjq
pipette.json
for small things, like final scores, or individual stringspipette.dillFormat
for arbitrary Python objects
-
VERSION_TAG
= NotImplemented¶ VERSION_TAG
specifies the version of the task. Bump it when the output changes in a significant way. It will cause the task itself, and all downstream tasks to be re-run. By convention, the tags look like"001foo"
, with a number up front, and a micro-description afterwards.It is recommended, but not required, that version tags increase lexicographically with each iteration.
-
dependencies
() → Dict[str, List[pipette.pipette.Task]]¶ Returns all tasks that this task depends on.
This is extracted from the inputs given to the task.
Results come back in a dictionary mapping the name of the input parameter to a list of tasks that are specified under that name. The same task might show up multiple times.
-
do
(**inputs)¶ Do the actual work of the task.
This receives the parameters that were defined in the INPUTS dict. Pipette performs some rudimentary type checking on these inputs before passing them to this function.
-
flat_unique_dependencies
() → Iterable[pipette.pipette.Task]¶ Returns an iterable of tasks that this task depends on.
This is extracted from the inputs given to the task.
-
static
hash_object
(o: Any) → str¶ Returns a 16-character hash code of arbitrary Python objects.
-
incomplete_dependencies
() → Dict[str, List[pipette.pipette.Task]]¶
-
incomplete_dependency_count
() → int¶
-
output_exists
() → bool¶ Returns whether or not the output for this task already exists.
-
output_locked
() → bool¶ Returns whether or not the output for this task is locked.
Outputs should only be locked while a process is working on producing this output.
-
output_name
()¶ Returns the name of the results of this task.
This works whether or not the task has been completed yet.
-
output_url
() → str¶ Returns a copy-and-paste friendly rendition of the output file of this task.
-
printable_inputs
() → str¶
-
recursive_unique_dependencies
() → Iterable[pipette.pipette.Task]¶
-
results
()¶ Returns the results of this task.
This also runs all tasks that this task depends on, and caches all results, including the result from this task itself.
-
serialized_task_config
() → str¶ Returns a serialized configuration of this task.
You can use the result of this to transfer the input parameters to another machine and run the task there.
-
-
class
pipette.pipette.
TaskStub
¶ Bases:
tuple
We use this to cut off the dependency chain for tasks that are already done.
-
file_name
¶ Alias for field number 0
-
format
¶ Alias for field number 1
-
-
pipette.pipette.
create_from_serialized_task_config
(task_config: str) → pipette.pipette.Task¶ Creates a task using the serialized form from Task.serialized_task_config().
-
pipette.pipette.
to_graphviz
(task_or_tasks: Union[pipette.pipette.Task, Iterable[pipette.pipette.Task]]) → str¶ Returns the complete task graph, in Dot format, ready for Graphviz.
You can paste the results into http://www.webgraphviz.com to see the graph in rendered form.
-
pipette.pipette.
main
(args: List[str], tasks: Union[pipette.pipette.Task, List[pipette.pipette.Task], None] = None) → int¶ A main function that can operate on lists of tasks.
There are two uses for this. One is as a standalone program, which lets you do things like:
python -m pipette run "tests.premierleague.PredictNextChampion-002arsenal(eNprYI4QYGBgMCu3sMg3rqjKNzVISjbJrihkqC1k1IhgBEolFTK529966L5OpOphBB9QID0_Mac4viC1KL4kNTG3kNmbMTBCGyjuDhIPSC0KAYrqGhgYpmUWFZfo5iQVVeZkF5uU5uYUp5unFuhlFefnFbIkF2QWpJaUpOpBaS4voLBbflFuYglXIatmYyFbWyF7YyFHBC_Q5PLMPCQLOUEWagGFw4HCGPZl5hSX56fkmZVlmhRmpGWaQ-zjygCax91YyFPaVsirBwA5FU99)"
You can also use it as a substitute for your own main method, by putting this at the bottom of your script:
if __name__ == "__main__": import sys pipette.main(sys.argv, tasks)
-
class
pipette.pipette.
Store
¶ Bases:
object
A key/value store that Pipette uses to store the results from Tasks.
-
exists
(name: str) → bool¶ Returns True if the given result already exists in the store.
-
id
() → str¶ Every store has an id. It is unique string that helps recognize when the store changes.
-
locked
(name: str) → bool¶ Returns True if the given result is locked in the store.
Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.
-
read
(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any¶ Reads a result from the store.
-
url_for_name
(name: str) → str¶ Returns a copy-and-paste worthy URL for the result with the given name.
-
write
(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → None¶ Writes a result to the store.
While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.
-
-
class
pipette.pipette.
LocalStore
(base_path: Union[str, pathlib.Path])¶ Bases:
pipette.pipette.Store
A store that stores the files in a local filesystem.
This is particularly effective when the local filesystem is NFS-mounted and available from multiple machines.
It is safe, though useless, to create multiple instances of LocalStore that all use the same directory.
-
exists
(name: str) → bool¶ Returns True if the given result already exists in the store.
-
id
() → str¶ Every store has an id. It is unique string that helps recognize when the store changes.
-
locked
(name: str) → bool¶ Returns True if the given result is locked in the store.
Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.
-
read
(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any¶ Reads a result from the store.
-
url_for_name
(name: str) → str¶ Returns a copy-and-paste worthy URL for the result with the given name.
-
write
(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → None¶ Writes a result to the store.
While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.
-
-
class
pipette.pipette.
BeakerStore
(local_cache_path: Union[None, str, pathlib.Path] = None)¶ Bases:
pipette.pipette.Store
A store that stores results in Beaker.
For this to work, the beaker command line tool needs to be on the path.
Optionally, this store supports a local cache. Using this cache is highly recommended, because it makes locking much more reliable. It is particularly effective to put the cache on an NFS-mounted drive, so multiple machines can share it.
Do not use the same directory both as a local cache for BeakerStore, and as the storage location for LocalStore. I don’t know what happens when you do that.
-
exists
(name: str) → bool¶ Returns True if the given result already exists in the store.
-
id
() → str¶ There is only one beaker, so this always returns the same id.
-
locked
(name: str) → bool¶ Returns True if the given result is locked in the store.
Results get locked when a task starts working on them, but is not yet complete. This prevents multiple processes from working on the same task at the same time, and overwriting each other’s results.
-
read
(name: str, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>) → Any¶ Reads a result from the store.
-
write
(name: str, content: Any, format: pipette.pipette.Format = <pipette.pipette.DillFormat object>)¶ Writes a result to the store.
While the writing is going on, this locks the results it is writing to, so that no other task writes to the same result at the same time.
-
-
class
pipette.pipette.
Format
¶ Bases:
typing.Generic
Base class for file formats.
To implement, override SUFFIX, read(), and write(). Formats are usually singleton classes, and are instantiated right here in the module.
-
SUFFIX
= NotImplemented¶
-
read
(input: BinaryIO) → T¶ Reads input, parses it, and returns it.
-
write
(input: T, output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
class
pipette.pipette.
DillFormat
¶ Bases:
pipette.pipette.Format
A format that uses dill to serialize arbitrary Python objects.
This format has special handling for iterable types. It takes care not to read the entire iterable into memory during either reading or writing.
To use this format, simply refer to
pipette.dillFormat
.-
SUFFIX
= '.dill'¶
-
read
(input: BinaryIO) → Any¶ Reads input, parses it, and returns it.
-
write
(input: Any, output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
class
pipette.pipette.
DillIterableFormat
¶ Bases:
pipette.pipette.Format
-
SUFFIX
= '.dill'¶
-
read
(input: BinaryIO) → Iterable[Any]¶ Reads input, parses it, and returns it.
-
write
(input: Iterable[Any], output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
class
pipette.pipette.
JsonFormat
¶ Bases:
pipette.pipette.Format
A format that serializes Python object with JSON.
If you are looking to serialize lists of things, you probably want JsonlFormat or JsonlGzFormat.
To use this format, simply refer to
pipette.jsonFormat
.-
SUFFIX
= '.json'¶
-
read
(input: BinaryIO) → Any¶ Reads input, parses it, and returns it.
-
write
(input: Any, output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
class
pipette.pipette.
JsonlFormat
¶ Bases:
pipette.pipette.Format
A format that serializes lists of Python objects to JSON, one line per item.
To use this format, simply refer to
pipette.jsonlFormat
.-
SUFFIX
= '.jsonl'¶
-
read
(input: BinaryIO) → Iterable[Any]¶ Reads input, parses it, and returns it.
-
write
(input: Iterable[Any], output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
class
pipette.pipette.
TsvFormat
¶ Bases:
pipette.pipette.Format
-
SUFFIX
= '.tsv'¶
-
read
(input: BinaryIO) → Iterable[List[str]]¶ Reads input, parses it, and returns it.
-
write
(input: Iterable[List[str]], output: BinaryIO) → None¶ Writes the given input out to disk.
-
-
pipette.pipette.
random_string
(length: int = 16) → str¶ Returns a random string of readable characters.