Skip to main content
A job instance can be created by adding a model to a survey with the by() method, or by using the Jobs class to create a job directly. For example:
from edsl import QuestionFreeText, Survey, Model, Jobs

q = QuestionFreeText(
   question_name = "example",
   question_text = "What is your favorite color?",
)
survey = Survey(questions = [q])
model = Model("gpt-4o", service_name = "openai")
job = survey.by(model)

# or using the Jobs class
job = Jobs(survey).by(model)
It can be useful to work with jobs when you want to run a survey with multiple models, agents, or scenarios, or when you want to manage the execution of a survey in a more structured way. There are several methods available in the Jobs class to manage jobs, such as by(), run(), and list():
  • The by() method is used to add a model to the survey and create a job instance.
  • The run() method is used to execute the job.
  • The list() method is used to list details of jobs that have been posted to Coop.
  • The fetch() method is used to retrieve jobs that have been posted to Coop.
For example, to run the above job:
job.run()
To retrieve details about your 10 most recent jobs posted to Coop:
from edsl import Jobs

jobs = Jobs().list()
The following information will be returned:
ColumnDescription
uuidThe UUID of the job.
descriptionA description of the job, if any.
statusThe status of the job (e.g., running, completed, failed).
cost_creditsThe cost of the job in credits.
iterationsThe number of iterations the job has run.
results_uuidThe UUID of the results for the job.
latest_error_report_uuidThe UUID of the latest error report for the job, if any.
latest_failure_reasonThe reason for the latest failure, if any.
versionThe EDSL version used to create the job.
created_atThe date and time the job was created.
You can also specify the page_size parameter to limit the number of jobs returned, and the page parameter to paginate through the jobs:
jobs = Jobs.list(page_size=5, page=2)
You can also filter jobs by their status using the status parameter:
jobs = Jobs.list(status="running")
You can filter jobs by description using the search_query parameter:
jobs = Jobs.list(search_query="testing")
To fetch the Jobs objects directly you can use the fetch() method:
from edsl import Jobs

jobs = Jobs.list(page_size=1).fetch()
Or to fetch the associated results:
from edsl import Jobs

jobs = Jobs.list(page_size=1).fetch_results()

Prompts

It can also be useful to work with Jobs objects in order to inspect user and system prompts before running the job. For example, here we create a survey and use the job to inspect the prompts:
from edsl import QuestionFreeText, Survey, Agent, Model, Jobs

q = QuestionFreeText(
question_name = "example",
question_text = "What is your favorite color?",
)

survey = Survey(questions = [q])

agent = Agent(traits = {"persona":"You are an artist."})

model = Model("gpt-4o", service_name = "openai")

job = survey.by(agent).by(model)

# Inspect the prompts
job.show_prompts()
This will return the following information:
user_promptWhat is your favorite color?
system_promptYou are answering questions as if you were a human. Do not break character.Your traits: {'persona': 'You are an artist.'}
interview_index0
question_nameexample
scenario_index0
agent_index0
modelgpt-4o
estimated_cost0.000373
cache_keys[‘e549b646508cfd459f88379649ebe8ba’]

Jobs class

class edsl.jobs.Jobs(survey: Survey, agents: list[‘Agent’] | ‘AgentList’ | None = None, models: ‘ModelList’ | list[‘LanguageModel’] | None = None, scenarios: ‘ScenarioList’ | list[‘Scenario’] | None = None)[source]

Bases: Base A collection of agents, scenarios, models, and a survey that orchestrates interviews. The Jobs class is the central component for running large-scale experiments or simulations in EDSL. It manages the execution of interviews where agents interact with surveys through language models, possibly in different scenarios. Key responsibilities: 1. Managing collections of agents, scenarios, and models 2. Configuring execution parameters (caching, API keys, etc.) 3. Managing parallel execution of interviews 4. Handling remote cache and inference capabilities 5. Collecting and organizing results A typical workflow involves: 1. Creating a survey with questions 2. Creating a Jobs instance with that survey 3. Adding agents, scenarios, and models using the by() method 4. Running the job with run() or run_async() 5. Analyzing the results Jobs implements a fluent interface pattern, where methods return self to allow method chaining for concise, readable configuration.

init(survey: Survey, agents: list[‘Agent’] | ‘AgentList’ | None = None, models: ‘ModelList’ | list[‘LanguageModel’] | None = None, scenarios: ‘ScenarioList’ | list[‘Scenario’] | None = None)[source]

Initialize a Jobs instance with a survey and optional components. The Jobs constructor requires a survey and optionally accepts collections of agents, models, and scenarios. If any of these optional components are not provided, they can be added later using the by() method or will be automatically populated with defaults when the job is run.

Parameters

surveySurvey The survey containing questions to be used in the job agentsUnion[list[Agent], AgentList], optional The agents that will take the survey modelsUnion[ModelList, list[LanguageModel]], optional The language models to use scenariosUnion[ScenarioList, list[Scenario]], optional The scenarios to run

Raises

ValueError: If the survey contains questions with invalid names (e.g., names containing template variables)

Examples

>>> from edsl.surveys import Survey
>>> from edsl.questions import QuestionFreeText
>>> q = QuestionFreeText(question_name="name", question_text="What is your name?")
>>> s = Survey(questions=[q])
>>> j = Jobs(survey = s)
>>> q = QuestionFreeText(question_name="{{ bad_name }}", question_text="What is your name?")
>>> s = Survey(questions=[q])

Notes

  • The survey’s questions must have valid names without templating variables
  • If agents, models, or scenarios are not provided, defaults will be used when running
  • Upon initialization, a RunConfig is created with default environment and parameters

add_running_env(running_env: RunEnvironment) → Jobs[source]

Add a running environment to the job.

Args:

running_env: A RunEnvironment object containing details about the execution environment like API keys and other configuration.

Returns:

Jobs: The Jobs instance with the updated running environment.

Example:

>>> from edsl import Cache
>>> job = Jobs.example()
>>> my_cache = Cache.example()
>>> env = RunEnvironment(cache=my_cache)
>>> j = job.add_running_env(env)
>>> j.run_config.environment.cache == my_cache
True

property agents[source]

Get the agents associated with this job.

Returns

AgentList: The agents for this job.

all_question_parameters() → set[source]

Return all the fields in the questions in the survey.
>>> from edsl.jobs import Jobs
>>> Jobs.example().all_question_parameters()
{'period'}

by(**args: ‘Agent’ | ‘Scenario’ | ‘LanguageModel’ | Sequence[‘Agent’ | ‘Scenario’ | ‘LanguageModel’]*) → Jobs[source]

Add agents, scenarios, and language models to a job using a fluent interface. This method is the primary way to configure a Jobs instance with components. It intelligently handles different types of objects and collections, making it easy to build complex job configurations with a concise syntax.

Parameters

*argsUnion[Agent, Scenario, LanguageModel, Sequence[Union[Agent, Scenario, LanguageModel]]] Objects or sequences of objects to add to the job. Supported types are Agent, Scenario, LanguageModel, and sequences of these.

Returns

Jobs: The Jobs instance (self) for method chaining

Examples

>>> from edsl.surveys import Survey
>>> from edsl.questions import QuestionFreeText
>>> q = QuestionFreeText(question_name="name", question_text="What is your name?")
>>> j = Jobs(survey = Survey(questions=[q]))
>>> j
Jobs(survey=Survey(...), agents=AgentList([]), models=ModelList([]), scenarios=ScenarioList([]))
>>> from edsl.agents import Agent; a = Agent(traits = {"status": "Sad"})
>>> j.by(a).agents
> AgentList([Agent(traits = {'status': 'Sad'})])
# Adding multiple components at once >>> from edsl.language_models import Model >>> from edsl.scenarios import Scenario >>> j = Jobs.example() >>> _ = j.by(Agent(traits=\{“mood”: “happy”})).by(Model(temperature=0.7)).by(Scenario(\{“time”: “morning”}))

# Adding a sequence of the same type >>> agents = [Agent(traits=\{“age”: i}) for i in range(5)] >>> _ = j.by(agents)

Notes

  • All objects must implement ‘get_value’, ‘set_value’, and ‘add’ methods
  • Agent traits: When adding agents with traits to existing agents, the traits are combined. Avoid overlapping trait names to prevent unexpected behavior.
  • Scenario traits: When adding scenarios with traits to existing scenarios, new traits overwrite existing ones with the same name.
  • Models: New models with the same attributes will override existing models.
  • The method detects object types automatically and routes them to the appropriate collection (agents, scenarios, or models).

code()[source]

Return the code to create this instance.

static compute_job_cost(job_results: Results) → float[source]

Compute the cost of a completed job in USD.

create_bucket_collection() → BucketCollection[source]

Create a collection of buckets for each model. These buckets are used to track API calls and token usage. For test models and scripted response models, infinity buckets are used to avoid rate limiting delays.
>>> from edsl.jobs import Jobs
>>> from edsl import Model
>>> j = Jobs.example().by(Model(temperature = 1), Model(temperature = 0.5))
>>> bc = j.create_bucket_collection()
>>> bc
BucketCollection(...)

duplicate()[source]

Create a duplicate copy of this Jobs instance.

Returns

Jobs: A new Jobs instance that is a copy of this one.

estimate_job_cost(iterations: int = 1) → dict[source]

Estimate the cost of running the job. Parameters: iterations – the number of iterations to run

estimate_job_cost_from_external_prices(price_lookup: dict, iterations: int = 1) → dict[source]

Estimate the cost of running the job using external price lookup.

Args:

price_lookup: Dictionary containing price information. iterations: Number of iterations to run.

Returns:

dict: Cost estimation details.

static estimate_prompt_cost(system_prompt: str, user_prompt: str, price_lookup: dict, inference_service: str, model: str) → dict[source]

Estimate the cost of running the prompts. Parameters:
  • iterations – the number of iterations to run
  • system_prompt – the system prompt
  • user_prompt – the user prompt
  • price_lookup – the price lookup
  • inference_service – the inference service
  • model – the model name

classmethod example(throw_exception_probability: float = 0.0, randomize: bool = False, test_model=False) → Jobs[source]

Return an example Jobs instance. Parameters:
  • throw_exception_probability – the probability that an exception will be thrown when answering a question. This is useful for testing error handling.
  • randomize – whether to randomize the job by adding a random string to the period
  • test_model – whether to use a test model
>>> Jobs.example()
Jobs(…)

classmethod from_dict(data: dict) → Jobs[source]

Create a Jobs instance from a dictionary.

classmethod from_interviews(interview_list: list[‘Interview’]) → Jobs[source]

Return a Jobs instance from a list of interviews. This is useful when you have, say, a list of failed interviews and you want to create a new job with only those interviews.

generate_interviews() → Generator[source]

Generate interviews.
Note:Note that this sets the agents, model and scenarios if they have not been set. This is a side effect of the method. This is useful because a user can create a job without setting the agents, models, or scenarios, and the job will still run, with us filling in defaults.

html()[source]

Return the HTML representations for each scenario.

humanize(project_name: str = ‘Project’, scenario_list_method: Literal[‘randomize’, ‘loop’, ‘single_scenario’, ‘ordered’] | None = None, survey_description: str | None = None, survey_alias: str | None = None, survey_visibility: Literal[‘private’, ‘public’, ‘unlisted’] | None = ‘unlisted’, scenario_list_description: str | None = None, scenario_list_alias: str | None = None, scenario_list_visibility: Literal[‘private’, ‘public’, ‘unlisted’] | None = ‘unlisted’)[source]

Send the survey and scenario list to Coop. Then, create a project on Coop so you can share the survey with human respondents.

inspect()[source]

Create an interactive inspector widget for this job.

interviews() → list[source]

Return a list of edsl.jobs.interviews.Interview objects. It returns one Interview for each combination of Agent, Scenario, and LanguageModel. If any of Agents, Scenarios, or LanguageModels are missing, it fills in with defaults.
>>> from edsl.jobs import Jobs
>>> j = Jobs.example()
>>> len(j.interviews())
4
>>> j.interviews()[0]
Interview(agent = Agent(traits = {'status': 'Joyful'}), survey = Survey(...), scenario = Scenario({'period': 'morning'}), model = Model(...))

property models[source]

Get the models associated with this job.

Returns

ModelList: The models for this job.

property num_interviews*: int*[source]

Calculate the total number of interviews that will be run.
>>> Jobs.example().num_interviews
4
This is the product of the number of scenarios, agents, and models, multiplied by the number of iterations specified in the run configuration.

prompts(iterations=1) → Dataset[source]

Return a Dataset of prompts that will be used.
>>> from edsl.jobs import Jobs
>>> Jobs.example().prompts()
Dataset(...)

push(*args, **kwargs) → None[source]

Push the job to the remote server.

replace_missing_objects() → None[source]

If the agents, models, or scenarios are not set, replace them with defaults.

run(***, config: RunConfig) → ‘Results’ | None[source]

Run the job by conducting interviews and return their results. This is the main entry point for executing a job. It processes all interviews (combinations of agents, scenarios, and models) and returns a Results object containing all responses and metadata.

Parameters

configRunConfig Configuration object containing runtime parameters and environment settings nint, optional Number of iterations to run each interview (default: 1) progress_barbool, optional Whether to show a progress bar (default: False) stop_on_exceptionbool, optional Whether to stop the job if an exception is raised (default: False) check_api_keysbool, optional Whether to verify API keys before running (default: False) verbosebool, optional Whether to print extra messages during execution (default: True) print_exceptionsbool, optional Whether to print exceptions as they occur (default: True) remote_cache_descriptionstr, optional Description for entries in the remote cache remote_inference_descriptionstr, optional Description for the remote inference job remote_inference_results_visibilityVisibilityType, optional Visibility of results on Coop (“private”, “public”, “unlisted”) disable_remote_cachebool, optional Whether to disable the remote cache (default: False) disable_remote_inferencebool, optional Whether to disable remote inference (default: False) freshbool, optional Whether to ignore cache and force new results (default: False) skip_retrybool, optional Whether to skip retrying failed interviews (default: False) raise_validation_errorsbool, optional Whether to raise validation errors (default: False) backgroundbool, optional Whether to run in background mode (default: False) job_uuidstr, optional UUID for the job, used for tracking cacheCache, optional Cache object to store results bucket_collectionBucketCollection, optional Object to track API keys key_lookupKeyLookup, optional Object to manage API keys memory_thresholdint, optional Memory threshold in bytes for the Results object’s SQLList, controlling when data is offloaded to SQLite storage new_formatbool, optional If True, uses remote_inference_create method, if False uses old_remote_inference_create method (default: True) expected_parrot_api_keystr, optional Custom EXPECTED_PARROT_API_KEY to use for this job run

Returns

Results: A Results object containing all responses and metadata

Notes

  • This method will first try to use remote inference if available
  • If remote inference is not available, it will run locally
  • For long-running jobs, consider using progress_bar=True
  • For maximum performance, ensure appropriate caching is configured
Example:
>>> from edsl.jobs import Jobs
>>> from edsl.caching import Cache
>>> job = Jobs.example()
>>> from edsl import Model
>>> m = Model('test')
>>> results = job.by(m).run(cache=Cache(), progress_bar=False, n=2, disable_remote_inference=True)
...

run_async(***, config: RunConfig) → Results[source]

Asynchronously runs the job by conducting interviews and returns their results. This method is the asynchronous version of run(). It has the same functionality and parameters but can be awaited in an async context for better integration with asynchronous code.

Parameters

configRunConfig Configuration object containing runtime parameters and environment settings nint, optional Number of iterations to run each interview (default: 1) progress_barbool, optional Whether to show a progress bar (default: False) stop_on_exceptionbool, optional Whether to stop the job if an exception is raised (default: False) check_api_keysbool, optional Whether to verify API keys before running (default: False) verbosebool, optional Whether to print extra messages during execution (default: True) print_exceptionsbool, optional Whether to print exceptions as they occur (default: True) remote_cache_descriptionstr, optional Description for entries in the remote cache remote_inference_descriptionstr, optional Description for the remote inference job remote_inference_results_visibilityVisibilityType, optional Visibility of results on Coop (“private”, “public”, “unlisted”) disable_remote_cachebool, optional Whether to disable the remote cache (default: False) disable_remote_inferencebool, optional Whether to disable remote inference (default: False) freshbool, optional Whether to ignore cache and force new results (default: False) skip_retrybool, optional Whether to skip retrying failed interviews (default: False) raise_validation_errorsbool, optional Whether to raise validation errors (default: False) backgroundbool, optional Whether to run in background mode (default: False) job_uuidstr, optional UUID for the job, used for tracking cacheCache, optional Cache object to store results bucket_collectionBucketCollection, optional Object to track API calls key_lookupKeyLookup, optional Object to manage API keys memory_thresholdint, optional Memory threshold in bytes for the Results object’s SQLList, controlling when data is offloaded to SQLite storage new_formatbool, optional If True, uses remote_inference_create method, if False uses old_remote_inference_create method (default: True) expected_parrot_api_keystr, optional Custom EXPECTED_PARROT_API_KEY to use for this job run

Returns

Results: A Results object containing all responses and metadata

Notes

  • This method should be used in async contexts (e.g., with await)
  • For non-async contexts, use the run() method instead
  • This method is particularly useful in notebook environments or async applications
Example:
>>> import asyncio
>>> from edsl.jobs import Jobs
>>> from edsl.caching import Cache
>>> job = Jobs.example()
>>> # In an async context
>>> async def run_job():
...     results = await job.run_async(cache=Cache(), progress_bar=True)
...     return results

property scenarios*: ScenarioList*[source]

Get the scenarios associated with this job.

Returns

ScenarioList: The scenarios for this job.

show_flow(filename: str | None = None) → None[source]

Visualize either the Job dependency/post-processing flow or the underlying survey flow. The method automatically decides which flow to render:
  1. If the job has dependencies created via Jobs.to() (i.e. _depends_on is not None) or has post-run methods queued in _post_run_methods, the job flow (dependencies → post-processing chain) is rendered using edsl.jobs.job_flow_visualization.JobsFlowVisualization.
  2. Otherwise, it falls back to the original behaviour and shows the survey question flow using edsl.surveys.survey_flow_visualization.SurveyFlowVisualization.
>>> from edsl.jobs import Jobs
>>> job = Jobs.example()
>>> job.show_flow()  # Visualises survey flow (no deps/post-run methods)
>>> job2 = job.select('how_feeling').to_pandas()  # add post-run methods
>>> job2.show_flow()  # Now visualises job flow

show_prompts(all: bool = False) → None[source]

Print the prompts.

table()[source]

Return a table view of the job’s prompts.

Returns

Table representation of the job’s prompts.

then(method_name, *args, **kwargs) → Jobs[source]

Schedule a method to be called on the results object after the job runs. This allows for method chaining like: jobs.then(‘to_scenario_list’).then(‘to_pandas’).then(‘head’, 10)

Args:

method_name: Name of the method to call on the results *args: Positional arguments to pass to the method **kwargs: Keyword arguments to pass to the method

to(question_or_survey_or_jobs: ‘Question’ | ‘Survey’ | ‘Jobs’) → Jobs[source]

Create a new Jobs instance from self and a target object. The target can be one of the following:
  • Question – A single question which will be wrapped in a one-question survey.
  • Survey – A survey object that will be used directly.
  • Jobs – An existing Jobs object. In this case the target Jobs is returned unchanged, but its _depends_on attribute is set to reference self, establishing an execution dependency chain.

Args:

question_or_survey_or_jobs (Union[Question, Survey, Jobs]): The object used to build (or identify) the new Jobs instance.

Returns:

Jobs: A new Jobs instance that depends on the current instance, or the target Jobs instance when the target itself is a Jobs.

Raises:

ValueError: If question_or_survey_or_jobs is not one of the supported types.

Examples:

The following doctest demonstrates sending one job to another and verifying the dependency link via the private _depends_on attribute:
>>> from edsl.jobs import Jobs
>>> base_job = Jobs.example()
>>> downstream_job = Jobs.example()
>>> new_job = base_job.to(downstream_job)
>>> new_job is downstream_job  # the same object is returned
True
>>> new_job._depends_on is base_job  # dependency recorded
True

to_dict(add_edsl_version=True, full_dict=None)[source]

Convert the Jobs instance to a dictionary representation.

Args:

add_edsl_version: Whether to include EDSL version information. full_dict: Additional dictionary to merge (currently unused).

Returns:

dict: Dictionary representation of this Jobs instance.

use_remote_cache() → bool[source]

Determine whether to use remote cache for this job.

Returns

bool: True if remote cache should be used, False otherwise.

using(obj) → Jobs[source]

Add a Cache, BucketCollection, or KeyLookup object to the job.

Args:

obj: The object to add to the job’s configuration. Must be one of: Cache, BucketCollection, or KeyLookup.

Returns:

Jobs: The Jobs instance with the updated configuration object.

using_bucket_collection(bucket_collection: BucketCollection) → Jobs[source]

Add a BucketCollection object to the job.

Args:

bucket_collection: The BucketCollection object to add to the job’s configuration.

Returns:

Jobs: The Jobs instance with the updated bucket collection.

using_cache(cache: Cache) → Jobs[source]

Add a Cache object to the job.

Args:

cache: The Cache object to add to the job’s configuration.

Returns:

Jobs: The Jobs instance with the updated cache.

using_key_lookup(key_lookup: KeyLookup) → Jobs[source]

Add a KeyLookup object to the job.

Args:

key_lookup: The KeyLookup object to add to the job’s configuration.

Returns:

Jobs: The Jobs instance with the updated key lookup.

where(expression: str) → Jobs[source]

Filter the agents, scenarios, and models based on a condition. Parameters: expression – a condition to filter the agents, scenarios, and models
I