multicrab.py
unknown
python
3 years ago
16 kB
6
Indexable
#!/usr/bin/env python """ This is a small script that does the equivalent of multicrab. """ import os import sys from optparse import OptionParser import json import copy from multiprocessing import Process from CRABAPI.RawCommand import crabCommand from CRABClient.ClientExceptions import ClientException from httplib import HTTPException def merge_dicts(*dict_args): """ Given any number of dicts, shallow copy and merge into a new dict, precedence goes to key value pairs in latter dicts. """ result = {} for dictionary in dict_args: result.update(dictionary) return result def getOptions(): """ Parse and return the arguments provided by the user. """ usage = ("Usage: %prog --crabCmd CMD [--workArea WAD --crabCmdOpts OPTS --sampleType TYPE --era ERA --subEra SUBERA --dataTier DATATIER]" "\nThe multicrab command executes 'crab CMD OPTS' for each project directory contained in WAD" "\nUse multicrab -h for help") parser = OptionParser(usage=usage) parser.add_option('-c', '--crabCmd', dest = 'crabCmd', default = 'submit', help = "crab command", metavar = 'CMD') parser.add_option('-w', '--workArea', dest = 'workArea', default = 'crab', help = "work area directory. Default: 'crab'.", metavar = 'WAD') parser.add_option('-o', '--crabCmdOpts', dest = 'crabCmdOpts', default = '', help = "options for crab command CMD", metavar = 'OPTS') parser.add_option('-p', '--particle', dest = 'particle', default = 'muon', help = "Particle to run T&P on. Currently 'muon' is the only supported option.", metavar = 'PART') parser.add_option('-r', '--resonance', dest = 'resonance', default = 'Z', help = "Resonance to run T&P with. Options are 'Z' (default) and 'JPsi'.", metavar = 'RESON') parser.add_option('-e', '--era', dest = 'era', default = 'Run2018', help = "Era to run samples over. Options are 'Run2018'/'Run2017'/'Run2016'/'Run2018_UL'/'Run2017_UL'/'Run2016_UL'/'Run2016_UL_HIPM'. Default is 'Run2018'.", metavar = 'ERA') parser.add_option('-s', '--subEra', dest = 'subEra', default = 'all', help = "Sub-era to process: 'all' (default), custom (e.g. 'Run2016B').", metavar = 'SUBERA') parser.add_option('-d', '--dataTier', dest = 'dataTier', default = 'AOD', help = "Data tier: AOD (default) or MINIAOD.", metavar = 'DATATIER') parser.add_option('-t', '--sampleType', dest = 'sampleType', default = 'all', help = "Samples to process: 'all' (default), 'data', data_dm, 'mc'. Only allowed if subEra='all'.", metavar = 'TYPE') parser.add_option('-k', '--storageSite', dest = 'storageSite', default = 'CERN', help = "Storage site: 'CERN' (default), 'CERNBOX' (note: requires permission), 'FNAL' or 'IMPERIAL'.", metavar = 'STORAGE') parser.add_option('-n', '--numThreads', dest = 'numThreads', default = 1, help = "Number of CMSSW threads (default 1)", metavar = 'NUMTHRD') parser.add_option('-f', '--configFile', dest = 'configFile', default = os.path.join(os.environ['CMSSW_BASE'], 'src/MuonAnalysis/MuonAnalyzer/test/run_muonAnalyzer_cfg.py'), help = "CMSSW cfg file to use (default 'run_muonAnalyzer_cfg.py' in /test).", metavar = 'CFG') parser.add_option('-x', '--customSuffix', dest = 'customSuffix', default = '', help = "Custom CRAB name suffix to output dataset tag", metavar = 'SUFFIX') parser.add_option('-b', '--eraDB', dest = 'eraDB', default = '', help = "Database file. default: data/samples/PART/RESON/ERA/database.json", metavar = 'ERA_DB_FILE') parser.add_option('--splittingData', dest = 'splittingData', default = 'LumiBased', help = "job splitting option for data", metavar = 'SPLITTING_DATA') parser.add_option('--unitsPerJobData', dest = 'unitsPerJobData', type = 'int', default = 100, help = "unitsPerJob option for data", metavar = 'UNIT_PER_JOB_DATA') parser.add_option('--splittingMC', dest = 'splittingMC', default = 'LumiBased', help = "job splitting option for MC", metavar = 'SPLITTING_MC') parser.add_option('--unitsPerJobMC', dest = 'unitsPerJobMC', type = 'int', default = 100, help = "unitsPerJob option for MC", metavar = 'UNIT_PER_JOB_MC') parser.add_option('--dryrun', dest = 'dryrun', action = 'store_true', default = False, help = "print out CRAB configuration instead of submitting it") (options, arguments) = parser.parse_args() if arguments: parser.error("Found positional argument(s): %s." % (arguments)) if not options.crabCmd: parser.error("(-c CMD, --crabCmd=CMD) option not provided.") if options.crabCmd != 'submit': if not os.path.isdir(options.workArea): parser.error("'%s' is not a valid directory." % (options.workArea)) if options.crabCmd == 'submit' and options.dataTier not in ['AOD', 'MINIAOD']: parser.error('dataTier must be "AOD" or "MINIAOD"') return options def main(): # Read options options = getOptions() doData = options.sampleType in ['all', 'data', 'data_dm'] doMC = options.sampleType in ['all', 'mc'] doDataDM = options.sampleType == 'data_dm' particle = options.particle resonance = options.resonance era = options.era subEra = options.subEra dataTier = options.dataTier customSuffix = options.customSuffix numThreads = options.numThreads storageSite = options.storageSite crabCmd = options.crabCmd crabCmdOpts = options.crabCmdOpts workArea = options.workArea configFile = options.configFile # The submit command needs special treatment. if crabCmd == 'submit': #-------------------------------------------------------- # This is the base config: #-------------------------------------------------------- from CRABClient.UserUtilities import config, getUsername config = config() config.General.workArea = workArea config.General.transferOutputs = True config.General.transferLogs = True config.JobType.pluginName = 'Analysis' config.JobType.psetName = configFile config.JobType.numCores = numThreads config.JobType.allowUndistributedCMSSW = True #config.JobType.maxMemoryMB = 4000 config.Data.publication = False config.Data.allowNonValidInputDataset = True # for validation samples if storageSite == 'FNAL': # Requires write access to FNAL EOS space config.Site.storageSite = 'T3_US_FNALLPC' config.Data.outLFNDirBase = '/store/user/%s/TnP_ntuples/%s/%s/%s/%s' % (getUsername(), particle, resonance, era, dataTier) elif storageSite == 'CERN': # default option # Requires write access to Muon POG EOS space at CERN config.Site.storageSite = 'T2_CH_CERN' config.Data.outLFNDirBase = '/store/group/phys_muon/%s/TnP_ntuples/%s/%s/%s/%s' % (getUsername(), particle, resonance, era, dataTier) elif storageSite == 'CERNBOX': # CERNBOX write access from CRAB requires special permission from CERN IT # See https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#Can_I_send_CRAB_output_to_CERNBO # and to ask permission: https://cern.service-now.com/service-portal?id=sc_cat_item&name=request-map-dn-to-gridmap&se=CERNBox-Service config.Site.storageSite = 'T2_CH_CERNBOX' config.Data.outLFNDirBase = '/store/user/%s/TnP_ntuples/%s/%s/%s/%s' % (getUsername(), particle, resonance, era, dataTier) elif storageSite == 'IMPERIAL': #Requires write access to Imperial College grid space config.Site.storageSite = 'T2_UK_London_IC' config.Data.outLFNDirBase = '/store/user/%s/TnP_ntuples/%s/%s/%s/%s' % (getUsername(), particle, resonance, era, dataTier) #config.Site.ignoreGlobalBlacklist = True #config.Data.ignoreLocality = True #config.Site.whitelist = ['T2_US_*'] #-------------------------------------------------------- if options.eraDB != '': if not os.path.isfile(options.eraDB): print 'Error!! database file "{}" does not exist. Please check argument.'.format(options.eraDB) else: sample_db = options.eraDB else: sample_db = os.path.join(os.environ['CMSSW_BASE'], "src/MuonAnalysis/MuonAnalyzer/data/samples", particle, resonance, era, "database.json") with open(sample_db, 'r') as db_file: db = json.load(db_file) samples = {} try: suberas = db['suberas'][dataTier] if subEra == 'all': samples = suberas else: samples = dict({subEra: suberas[subEra]}) except: print "Error!! Requested era+sub-era is likely not valid. Please check argument." sys.exit() for subera_name, subera_cfg in samples.items(): isData = 'Run' in subera_name globalTag = subera_cfg['globalTag'] if 'globalTag' in subera_cfg else '' input_dataset = subera_cfg['dataset'] datatier = input_dataset.split('/')[-1] if 'AOD' not in datatier or 'NANOAOD' in datatier: print 'Input dataset is not AOD(SIM) or MINIAOD(SIM). Ignoring...' continue isFullAOD = False if 'MINIAOD' in datatier else True if isData and not doData: continue if not isData and not doMC: continue # if sampleType is data_dm, submit only DoubleMuon datasets isDataDM = ('Run' in subera_name and 'DM' in subera_name) if doDataDM and not isDataDM: continue config.Data.lumiMask = '' if isData: if 'UL' in era: if '2018' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions18/13TeV/Legacy_2018/Cert_314472-325175_13TeV_Legacy2018_Collisions18_JSON.txt' elif '2017' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions17/13TeV/Legacy_2017/Cert_294927-306462_13TeV_UL2017_Collisions17_GoldenJSON.txt' elif '2016' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions16/13TeV/Legacy_2016/Cert_271036-284044_13TeV_Legacy2016_Collisions16_JSON.txt' else: if '2018' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions18/13TeV/PromptReco/Cert_314472-325175_13TeV_PromptReco_Collisions18_JSON.txt' elif '2017' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions17/13TeV/Final/Cert_294927-306462_13TeV_PromptReco_Collisions17_JSON.txt' elif '2016' in era: config.Data.lumiMask = '/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/certification/Collisions16/13TeV/Final/Cert_271036-284044_13TeV_PromptReco_Collisions16_JSON.txt' #config.Data.splitting = 'Automatic' # Not working after rucio transition config.Data.splitting = options.splittingData config.Data.unitsPerJob = options.unitsPerJobData else: config.Data.splitting = options.splittingMC config.Data.unitsPerJob = options.unitsPerJobMC config.JobType.pyCfgParams = [ 'resonance={}'.format(resonance), 'isFullAOD={}'.format(isFullAOD), 'isMC={}'.format(not isData), 'globalTag={}'.format(globalTag), 'numThreads={}'.format(numThreads), 'era={}'.format(era), 'includeJets={}'.format(False), 'fromCRAB={}'.format(True) ] config.Data.inputDataset = input_dataset requestName = '_'.join(['TnP_ntuplizer', particle, resonance, era, dataTier, subera_name]) if options.eraDB != '': requestName += '_prompt' config.General.requestName = '_'.join([requestName, customSuffix]) if customSuffix != '' else requestName #config.Data.outputDatasetTag = sample (default CRAB dataset tag is 'crab_' + requestName) # If we need to pull input files from a list file instead of CRAB: # config.Data.userInputFiles = open(basedir + sample + '.list').readlines() # Submit. def submit(config, options): try: print "Submitting for input dataset %s with options %s" % (input_dataset, options.crabCmdOpts) if options.dryrun: print '-'*50 print config else: crabCommand(options.crabCmd, config = config, *options.crabCmdOpts.split()) except HTTPException as hte: print "Submission for input dataset %s failed: %s" % (input_dataset, hte.headers) except ClientException as cle: print "Submission for input dataset %s failed: %s" % (input_dataset, cle) # Need to submit using multiprocessing module because of CRAB issue with different configs p = Process(target=submit, args=(config,options,)) p.start() p.join() # All other commands can be simply executed. elif workArea: for dir in os.listdir(workArea): projDir = os.path.join(workArea, dir) if not os.path.isdir(projDir): continue # Execute the crab command. msg = "Executing (the equivalent of): crab %s --dir %s %s" % (crabCmd, projDir, crabCmdOpts) print "-"*len(msg) print msg print "-"*len(msg) try: crabCommand(crabCmd, dir = projDir, *crabCmdOpts.split()) except HTTPException as hte: print "Failed executing command %s for task %s: %s" % (crabCmd, projDir, hte.headers) except ClientException as cle: print "Failed executing command %s for task %s: %s" % (crabCmd, projDir, cle) if __name__ == '__main__': main()
Editor is loading...