LSF job manager

Under:

LSF job manager code below is from globus 2.4.3.

use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;

use Config;

package Globus::GRAM::JobManager::lsf;

@ISA = qw(Globus::GRAM::JobManager);

my ($lsf_profile, $mpirun, $bsub, $bjobs, $bkill);

BEGIN
{
$lsf_profile = '/usr/lsf/conf/profile.lsf';
$mpirun = 'no';
$bsub = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bsub";
$bjobs = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bjobs";
$bkill = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bkill";
}

sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() or $tag = $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $lsf_job_script;
my $lsf_job_script_name;
my $errfile = '';
my $queue;
my $job_id;
my $script_url;
my @arguments;
my $email_when = '';
my $library_path;
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my @library_vars;

$self->log('Entering lsf submit');

# check jobtype
if(defined($description->jobtype()))
{
if($description->jobtype !~ /^(mpi|single|multiple)$/)
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
elsif($description->jobtype() eq 'mpi' && $mpirun eq 'no')
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
}
if( $description->directory eq '')
{
return Globus::GRAM::Error::RSL_DIRECTORY;
}
if((! -d $description->directory) || (! -r $description->directory))
{
return Globus::GRAM::Error::BAD_DIRECTORY;
}

# make sure the files are accessible (NFS sync) when you check for them
$self->nfssync( $description->executable() )
unless $description->executable() eq '';
$self->nfssync( $description->stdin() )
unless $description->stdin() eq '';

if( $description->executable eq '')
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
elsif( $description->stdin() eq '')
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}

$self->log('Determining job max time cpu from job description');
if(defined($description->max_cpu_time()))
{
$cpu_time = $description->max_cpu_time();
$self->log(" using maxcputime of $cpu_time");
}
elsif(defined($description->max_time()))
{
$cpu_time = $description->max_time();
$self->log(" using maxtime of $cpu_time");
}
else
{
$cpu_time = 0;
$self->log(' using queue default');
}

$self->log('Determining job max wall time limit from job description');
if(defined($description->max_wall_time()))
{
$wall_time = $description->max_wall_time();
$self->log(" using maxwalltime of $wall_time");
}
else
{
$wall_time = 0;
$self->log(' using queue default');
}

if($description->queue() ne '')
{
$queue = $description->queue();
}

$self->log('Building job script');

$script_url = "$tag/lsf_job_script.$$";
$self->fork_and_exec_cmd( $cache_pgm, '-add', '-t', $tag,
'-n', $script_url, 'file:/dev/null' );
$lsf_job_script_name = $self->pipe_out_cmd( $cache_pgm, '-query', '-t',
$tag, $script_url );
chomp($lsf_job_script_name);
if($lsf_job_script_name eq '')
{
return Globus::GRAM::ERROR::TEMP_SCRIPT_FILE_FAILED();
}

local(*JOB);
open( JOB, '>' . $lsf_job_script_name );
print JOB<<"EOF";
#! /bin/sh
#
# LSF batch job script built by Globus Job Manager
#
EOF

if(defined($queue))
{
print JOB "#BSUB -q $queue\n";
}
if(defined($description->project()))
{
print JOB '#BSUB -P ', $description->project(), "\n";
}

if($cpu_time != 0)
{
if($description->jobtype() eq 'multiple')
{
$total_cpu_time = $cpu_time * $description->count();
}
else
{
$total_cpu_time = $cpu_time;
}
print JOB "#BSUB -c ${total_cpu_time}\n";
}

if($wall_time != 0)
{
print JOB "#BSUB -W $wall_time\n";
}

if($description->max_memory() != 0)
{
$max_memory = $description->max_memory() * 1024;

if($description->jobtype() eq 'multiple')
{
$total_max_memory = $max_memory * $description->count();
}
else
{
$total_max_memory = $max_memory;
}
print JOB "#BSUB -M ${total_max_memory}\n";
}
print JOB '#BSUB -i ', $description->stdin(), "\n";
print JOB '#BSUB -e ', $description->stderr(), "\n";
print JOB '#BSUB -o ', $description->stdout(), "\n";
print JOB "#BSUB -N\n";
print JOB '#BSUB -n ', $description->count(), "\n";

foreach my $tuple ($description->environment())
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
print JOB $tuple->[0], '=', $tuple->[1],
'; export ', $tuple->[0], "\n";
}

$library_path = join(':', $description->library_path());
@library_vars = ('LD_LIBRARY_PATH');

if($Config{osname} eq 'irix')
{
push(@library_vars, 'LD_LIBRARYN32_PATH', 'LD_LIBRARY64_PATH');
}

foreach (@library_vars)
{
print JOB <<"EOF";

if test 'X\${$_}' != 'X'; then
$_="\${LD_LIBRARY_PATH}:$library_path"
else
$_="$library_path"
fi
export $_
EOF
}

print JOB "\n#Change to directory requested by user\n";
print JOB 'cd ', $description->directory(), "\n";

@arguments = $description->arguments();

foreach(@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS;
}
}
if($arguments[0])
{
foreach(@arguments)
{
$_ =~ s/\\/\\\\/g;
$_ =~ s/\$/\\\$/g;
$_ =~ s/"/\\\"/g; #"
$_ =~ s/`/\\\`/g; #`

$args .= '"' . $_ . '" ';
}
}
else
{
$args = '';
}
if($description->jobtype() eq 'mpi')
{
print JOB "$mpirun -np ", $description->count(), ' ';
print JOB $description->executable(), " $args \n";
}
elsif($description->jobtype() eq 'multiple')
{
for(my $i = 0; $i < $description->count(); $i++)
{
print JOB $description->executable(), " $args &\n";
}
print JOB "wait\n";
}
else
{
print JOB $description->executable(), " $args\n";
}
close(JOB);
chmod 0755, $lsf_job_script_name;

if($description->logfile() ne '')
{
$errfile = "2>" . $description->logfile();
}
$self->nfssync( $lsf_job_script_name );
$job_id = (grep(/is submitted/,
split(/\n/, `$bsub < $lsf_job_script_name $errfile`)))[0];

if($? == 0)
{
$job_id =~ m/<([^>]*)>/;
$job_id = $1;

return {
JOB_ID => $job_id,
JOB_STATE => Globus::GRAM::JobState::PENDING
};
}
#system("$cache_pgm -cleanup-url $tag/lsf_job_script.$$");
$self->fork_and_exec_cmd( $cache_pgm, '-cleanup-url',
"$tag/lsf_job_script.$$" );

return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}

sub poll
{
# The LSF bjobs command is used to obtain the current
# status of the job. This status is then returned.
#
# The Status field can contain one of the following strings:
#
# string stands for Globus context meaning
# --------------------------------------------------------------------
# RUN Running ACTIVE
# PEND Wating to be scheduled PENDING
# USUSP Suspended while running SUSPENDED
# PSUSP Suspended while pending SUSPENDED
# SSUSP Suspended by system SUSPENDED
# DONE Completed sucessfully DONE
# EXIT Completed unsuccessfully FAILED
# UNKWN Unknown state *ignore*
# ZOMBI Unknown state FAILED

my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
my $state;
my $status_line;
my $exit_code;

$self->log("polling job $job_id");

# Get first line matching job id
# needs to be back-ticks to source lsf profile
$_ = (grep(/$job_id/, `$bjobs $job_id 2>/dev/null`))[0];

# get the exit code of the bjobs command. For more info, do a
# search for $CHILD_ERROR in perlvar documentation.
$exit_code = $? >> 8;

# Verifying that the job is no longer there.
# return code 255 = "Job <123> is not found"
if($exit_code == 255)
{
$self->log("bjobs rc is 255 == Job <123> is not found == DONE");
$state = Globus::GRAM::JobState::DONE;
$self->nfssync( $description->stdout() )
if $description->stdout() ne '';
$self->nfssync( $description->stderr() )
if $description->stderr() ne '';
}
else
{

# Get 3th field (status)
$_ = (split(/\s+/))[2];

if(/PEND/)
{
$state = Globus::GRAM::JobState::PENDING;
}
elsif(/DONE/)
{
$state = Globus::GRAM::JobState::DONE;
$self->nfssync( $description->stdout() )
if $description->stdout() ne '';
$self->nfssync( $description->stderr() )
if $description->stderr() ne '';
}
elsif(/USUSP|SSUSP|PSUSP/)
{
$state = Globus::GRAM::JobState::SUSPENDED;
}
elsif(/RUN/)
{
$state = Globus::GRAM::JobState::ACTIVE;
}
elsif(/EXIT/)
{
return Globus::GRAM::Error::JOB_EXIT_CODE_NON_ZERO();
}
elsif(/UNKWN/)
{
# We want the JM to ignore this poll and keep the same state
# as the previous state. Returning an empty hash will do the job.
$self->log("bjobs returned the UNKWN state. Telling JM to ignore this poll");
return {};
}
elsif(/ZOMBI/)
{
return Globus::GRAM::Error::LOCAL_SCHEDULER_ERROR();
}
else
{
# This else is reached by an unknown response from lsf.
# It could be that LSF was temporarily unavailable, but that it
# can recover and the submitted job is fine.
# We want the JM to ignore this poll and keep the same state
# as the previous state. Returning an empty hash will do the job.
$self->log("bjobs returned an unknown response. Telling JM to ignore this poll");
return {};
}
}

return {JOB_STATE => $state};
}

sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();

$self->log("cancel job $job_id");
# needs to be back-ticks to source lsf profile
system("$bkill $job_id >/dev/null 2>/dev/null");

if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED };
}
return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}

1;