PipelineTasks and Gen3 middleware


From static to 

Caveats

  • This is a high level overview of PipeLineTasks and the accompanying middleware
  • Not intended to be a tutorial
  • Look for formal documentation to come
  • The roadmap for the future will be discussed later on this afternoon

What is a PipelineTask?

  • Input and output are configurable and introspectable

  • Have a defined unit of work

  • Have input and output handled through butler

  • This all enables an activator to run generic PipelineTasks

What makes a PipelineTask

                                
class MergeDetectionsConfig(PipelineTaskConfig):
...
    catalogs = InputDatasetField(
        doc="Detection Catalogs to be merged",
        nameTemplate="{inputCoaddName}Coadd_det",
        storageClass="SourceCatalog",
        dimensions=("Tract", "Patch", "SkyMap", "AbstractFilter")
    )

    skyMap = InputDatasetField(
        doc="SkyMap to be used in merging",
        nameTemplate="{inputCoaddName}Coadd_skyMap",
        storageClass="SkyMap",
        dimensions=("SkyMap",),
        scalar=True
    )

    outputCatalog = OutputDatasetField(
        doc="Merged Detection catalog",
        nameTemplate="{outputCoaddName}Coadd_mergeDet",
        storageClass="SourceCatalog",
        dimensions=("Tract", "Patch", "SkyMap"),
        scalar=True
    ...
    def setDefaults(self):
        ...
        self.quantum.dimensions = ("Tract", "Patch", "SkyMap")
        ...
                                
                            

Config classes define the input and output datasets that a task will use,
and the unit of data on which the processing occurs

                                
class MergeDetectionsTask(PipelineTask, CmdLineTask):
...
    def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
        packedId, maxBits = butler.registry.packDataId("TractPatch", inputDataIds["outputCatalog"])
        inputData["skySeed"] = packedId
        inputData["idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
        catalogDict = {dataId['abstract_filter']: cat for dataId, cat in zip(inputDataIds['catalogs'],
            inputData['catalogs'])}
        inputData['catalogs'] = catalogDict
        ...
        return self.run(**inputData)

    def run(self, catalogs, skyInfo, idFactory, skySeed):
        ...
        return Struct(outputCatalog=mergedList)
                                
                            

Tasks define an entry point where input manipulation may happen,
and the run method returns a struct matching the config's outputs

Navigate with z and x keys

Running Pipeline Tasks

  • Run using an activator
  • Many different types of activators
  • Can run any different number of tasks

Anatomy of "Laptop" activator command

                        
pipetask -b $CI_HSC_DIR/DATA/butler.yaml -i input_collection -o output_collection -t mergeDetections.MergeDetectionsTask:mdt -C mdt:config_file 
                        
                    
Butler
Input Location
Output Location
Task
Label
Config

Full ci_hsc command

                        
pipetask -d "Patch.patch = 69" -j 8 -b $CI_HSC_DIR/DATA/butler.yaml -p lsst.meas.base -p lsst.ip.isr -p lsst.pipe.tasks \
-i 'raw','calib',ref/ps1_pv3_3pi_20170110,demo_collection_input -o demo_collection run --register-dataset-types \
-t isrTask.IsrTask:isr -C isr:$DEMO_HSC_PIPELINETASK_DIR/config/isr.py \
-t characterizeImage.CharacterizeImageTask:cit -C cit:$DEMO_HSC_PIPELINETASK_DIR/config/charImage.py \
-t calibrate.CalibrateTask:ct -C ct:$DEMO_HSC_PIPELINETASK_DIR/config/calibrate.py \
-t makeCoaddTempExp.MakeWarpTask:mwt -C mwt:$DEMO_HSC_PIPELINETASK_DIR/config/makeWarp.py \
-t assembleCoadd.CompareWarpAssembleCoaddTask:cwact -C cwact:$DEMO_HSC_PIPELINETASK_DIR/config/compareWarpAssembleCoadd.py \
-t multiBand.DetectCoaddSourcesTask \
-t mergeDetections.MergeDetectionsTask:mdt -C mdt:$DEMO_HSC_PIPELINETASK_DIR/config/mergeDetections.py \
-t deblendCoaddSourcesPipeline.DeblendCoaddSourcesSingleTask \
-t multiBand.MeasureMergedCoaddSourcesTask:mmcst -C mmcst:$DEMO_HSC_PIPELINETASK_DIR/config/measureMerged.py \
-t mergeMeasurements.MergeMeasurementsTask:mmt -C mmt:$DEMO_HSC_PIPELINETASK_DIR/config/mergeCoaddMeasurements.py \
-t forcedPhotCcd.ForcedPhotCcdTask:fpccdt -C fpccdt:$DEMO_HSC_PIPELINETASK_DIR/config/forcedPhotCcd.py \
-t forcedPhotCoadd.ForcedPhotCoaddTask:fpct -C fpct:$DEMO_HSC_PIPELINETASK_DIR/config/forcedPhotCoadd.py \
                        
                    

What's New

  • Data associated with spatial regions for easy querying
  • Butler knows about dimensions other than direct observational dimensions
  • Much faster butler startup times!
  • Dynamic execution based on input constraints
  • More extensible to new file formats
  • Better provenance about exactly what was run (coming soon)

Advantages to the new system

  • Easier task writing
  • Easier to run a series of data processing (less developer time)
  • Ability to query info about more metadata and relations
  • Ability to efficiently query what datasets are present given constraints
  • Visualization of pipeline flow to aid development
  • Easier to introduce and integrate new data-set types
  • Pipeline can be defined via configuration vs. hard coding specific tasks