SGE Job Manager

Under:
SGE job manager code was developed by the UK Grid eScience. It is provided as-is for quick code inspection. The version below is as integrated in VDT 1.5.2 (post OSG 0.4.1). Please, note that the version below includes patches provided by the RHIC/STAR VO. Consult SGE Job Manager patch for more information.

#
# Marko Krznaric
# London eScience Centre
# June 2003
#
# Contributions by David McBride
# London eScience Centre
# Oct 2003

use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;

use IO::File;
use Config;
use POSIX;

package Globus::GRAM::JobManager::sge;

@ISA = qw(Globus::GRAM::JobManager);


my ($qsub, $qstat, $qdel, $qselect, $qhost, $qconf, $qacct,
$mpirun, $sun_mprun,
$CAT,
$SGE_ROOT, $SGE_CELL,
$SGE_MODE, $SGE_RELEASE);

BEGIN
{
$qsub = '/common/sge/6.0u4/bin/lx24-x86//qsub';
$qstat = '/common/sge/6.0u4/bin/lx24-x86//qstat';
$qdel = '/common/sge/6.0u4/bin/lx24-x86//qdel';
$qselect = '/common/sge/6.0u4/bin/lx24-x86//qselect';
$qhost = '/common/sge/6.0u4/bin/lx24-x86//qhost';
$qconf = '/common/sge/6.0u4/bin/lx24-x86//qconf';
$qacct = '/common/sge/6.0u4/bin/lx24-x86//qacct';
#
$mpirun = '/usr/bin/mpirun';
$sun_mprun = 'no';
$mpi_pe = '';
#
if(($mpirun eq "no") && ($sun_mprun eq "no"))
{ $supported_job_types = "(single|multiple)"; }
else
{ $supported_job_types = "(mpi|single|multiple)"; }
#
$cat = '/bin/cat';
#
$SGE_ROOT = '/common/sge/6.0u4';
$SGE_CELL = 'default';
$SGE_MODE = 'SGE';
$SGE_RELEASE = '6.0u4';
}


#########################################################################
#
# SUBMIT
#
########################################################################
sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() or $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $sge_job_script;
my $sge_job_script_name;
my $errfile = "";
my $queue;
my $job_id;
my $rsh_env;
my $script_url;
my @arguments;
my $email_when = "";
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my %library_vars;

$self->log("Entering SGE submit");

#####
# check jobtype
#
if(defined($description->jobtype()))
{
if($description->jobtype !~ /^$supported_job_types$/)
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
}

#####
# check directory
#
if( $description->directory eq "")
{
return Globus::GRAM::Error::RSL_DIRECTORY();
}
chdir $description->directory() or
return Globus::GRAM::Error::BAD_DIRECTORY();

#####
# check executable
#
if( $description->executable eq "")
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
elsif( $description->stdin() eq "")
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}


#####
# RSL attributes max_cpu_time/max_wall_time (given in minutes)
# explicitly set the maximum cpu/wall time. max_time can be used
# for both, max_cpu_time and max_wall_time

#####
# determining max_wall_time
#
$self->log("Determining job WALL time");
if(defined($description->max_wall_time()))
{
$wall_time = $description->max_wall_time();
$self->log(" using max_wall_time of $wall_time minutes");
}
elsif(defined($description->max_time()))
{
$wall_time = $description->max_time();
$self->log(" using max_wall_time of $wall_time minutes");
}
else
{
$wall_time = 0;
$self->log(" using queue default");
}

#####
# determining max_cpu_time
#
$self->log("Determining job CPU time");
if(defined($description->max_cpu_time()))
{
$cpu_time = $description->max_cpu_time();
$self->log(" using max_cpu_time of $cpu_time minutes");
}
elsif(defined($description->max_time()))
{
$cpu_time = $description->max_time();
$self->log(" using max_cpu_time of $cpu_time minutes");
}
else
{
$cpu_time = 0;
$self->log(" using queue default");
}


#####
# start building job script
#
$self->log('Building job script');


#####
# open script file
#
$script_url = "$tag/sge_job_script.$$";
system("$cache_pgm -add -t $tag -n $script_url file:/dev/null");
$sge_job_script_name = `$cache_pgm -query -t $tag $script_url`;
chomp($sge_job_script_name);
if($sge_job_script_name eq "")
{
return Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED();
}

$sge_job_script = new IO::File($sge_job_script_name, '>');
# $self->log(" script location: $sge_job_script_name");


#####
# Writing script header
#
$sge_job_script->print("#!/bin/sh\n");
$sge_job_script->print("# Grid Engine batch job script built by ");
$sge_job_script->print("Globus job manager\n");
$sge_job_script->print("\n");
$sge_job_script->print("#\$ -S /bin/sh\n");


#####
# Whom to send email and when
#
if($description->email_address() ne '')
{
$sge_job_script->print("#\$ -M ". $description->email_address() ."\n");
$self->log("Monitoring job by email");
$self->log(" email address: " . $description->email_address());
}
if($description->emailonabort() eq 'yes')
{
$email_when .= 'a';
$self->log(" email when job is aborted");
}
if($description->emailonexecution() eq 'yes')
{
$email_when .= 'b';
$self->log(" email at the beginning of the job");
}
if($description->emailontermination() eq 'yes')
{
$email_when .= 'e';
$self->log(" email at the end of the job");
}
if($description->emailonsuspend() eq 'yes')
{
$email_when .= 's';
$self->log(" email when job is suspended");
}
if($email_when eq '')
{
$email_when = 'n';
$self->log(" email(s) will not be sent");
}
$sge_job_script->print("#\$ -m $email_when\n");


#####
# Defines a list of queues used to execute this job
#
if(defined($description->queue()))
{
$sge_job_script->print("#\$ -q " . $description->queue() . "\n");
$self->log("Using the following queues:");
$self->log(" " . $description->queue);
}


#####
# Writing project info, but only with SGEEE. Otherwise ignore and warn!
#
$self->log("Checking project details");
if(defined($description->project()))
{
if( $SGE_MODE eq "SGEEE" )
{
$self->log(" SGE Enterprise Edition: projects supported");
$self->log(" Job assigned to " . $description->project());
$sge_job_script->print("#\$ -P ". $description->project() ."\n");
}
else
{
$self->log(" SGE Regular Edition: NO project support");
$self->log(" WARNING: Project set to " . $description->project());
$self->log(" but it will be ignored by Grid Engine");
$sge_job_script->print("#\$ -P ". $description->project() ."\n");
}
}
else
{
$self->log(" Project not specified");
}


#####
# wall_time was in minutes. Converting to SGE time format (h:m:s)
#
if($wall_time != 0)
{
$wall_m = $wall_time % 60;
$wall_h = ( $wall_time - $wall_m ) / 60;

$self->log("Using max WALL time (h:m:s) of $wall_h:$wall_m:00");
$sge_job_script->print("#\$ -l h_rt=$wall_h:$wall_m:00\n");
}

#####
# cpu_time was in minutes. Converting to SGE time format (h:m:s)
#
if($cpu_time != 0)
{
$cpu_m = $cpu_time % 60;
$cpu_h = ( $cpu_time - $cpu_m ) / 60;

$self->log("Using max CPU time (h:m:s) of $cpu_h:$cpu_m:00");
$sge_job_script->print("#\$ -l h_cpu=$cpu_h:$cpu_m:00\n");
}


#####
# RSL attribute for max_memory is given in Mb
#
$max_memory = $description->max_memory();
if($max_memory != 0)
{
$self->log("Total max memory flag is set to $max_memory Mb");
$sge_job_script->print("#\$ -l h_data=$max_memory" . "M\n");
}



#####
# Where to write output and error?
#
if(($description->jobtype() eq "single") && ($description->count() > 1))
{
#####
# It's a single job and we use job arrays
#
$sge_job_script->print("#\$ -o "
. $description->stdout() . ".\$TASK_ID\n") if (! -b $description->stdout() );
$sge_job_script->print("#\$ -e "
. $description->stderr() . ".\$TASK_ID\n") if (! -b $description->stderr() );
}
else
{
# [dwm] Don't use real output paths; copy the output there later.
# Globus doesn't seem to handle streaming of the output
# properly and can result in the output being lost.
# FIXME: We would prefer continuous streaming. Try to determine
# precisely what's failing so that we can fix the problem.
# See Globus bug #1288.
$sge_job_script->print("#\$ -o " . $description->stdout() . ".real\n") if (! -b $description->stdout() );
$sge_job_script->print("#\$ -e " . $description->stderr() . ".real\n") if (! -b $description->stderr() );
}


#####
# Constructing the environment variable
#

$library_vars{LD_LIBRARY_PATH} = 0;
if($Config{osname} eq 'irix')
{
$library_vars{LD_LIBRARYN32_PATH} = 0;
$library_vars{LD_LIBRARY64_PATH} = 0;
}

foreach my $tuple ($description->environment())
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
if(exists($library_vars{$tuple->[0]}))
{
$tuple->[1] .= ":$library_string";
$library_vars{$tuple->[0]} = 1;
}

push(@new_env, $tuple->[0] . "=" . $tuple->[1]);

$tuple->[0] =~ s/\\/\\\\/g;
$tuple->[0] =~ s/\$/\\\$/g;
$tuple->[0] =~ s/"/\\\"/g;
$tuple->[0] =~ s/`/\\\`/g;

$tuple->[1] =~ s/\\/\\\\/g;
$tuple->[1] =~ s/\$/\\\$/g;
$tuple->[1] =~ s/"/\\\"/g;
$tuple->[1] =~ s/`/\\\`/g;


#####
# Special treatment for GRD_PE or SGE_PE.
# If jobType is mpi, this can conflict with the default PE.
# In that case, we override the default PE. Please note, that
# this can be overriden by RSL attribute parallel_envirnment!
#
if (($tuple->[0] eq "GRD_PE") || ($tuple->[0] eq "SGE_PE"))
{
if($description->jobtype() eq "mpi")
{
$mpi_pe = $tuple->[1];
}
else
{
$sge_job_script->print("#\$ -pe " . $tuple->[1] . " " .
$description->count() . "\n");
}
}
else
{
$sge_job_script->print($tuple->[0] . "=" . $tuple->[1]
. "; export " . $tuple->[0] . "\n");
}

}

foreach (keys %library_vars)
{
if($library_vars{$_} == 0)
{
push(@new_env, $_ . "=" . $library_path);
$sge_job_script->print("$_=$library_path;\n" . "export $_;\n");
}
}


#####
# Load SGE settings
#
$sge_job_script->print(". $SGE_ROOT/$SGE_CELL/common/settings.sh\n");


#####
#
#
$sge_job_script->print("# Change to directory requested by user\n");
$sge_job_script->print('cd ' . $description->directory() . "\n");


#####
# Transforing arguments
#
@arguments = $description->arguments();

foreach(@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS;
}
}
if($arguments[0])
{
foreach(@arguments)
{
$self->log("Transforming argument \"$_\"");
$_ =~ s/\\/\\\\/g;
$_ =~ s/\$/\\\$/g;
$_ =~ s/"/\\\"/g;
$_ =~ s/`/\\\`/g;
$self->log("Transformed to \"$_\"");

$args .= '"' . $_ . '" ';
}
}
else
{
$args = '';
}


#####
# Determining job request type.
#
$self->log("Determining job type");
$self->log(" Job is of type " . $description->jobtype());
if($description->jobtype() eq "mpi")
{
#####
# It's MPI job
#

#####
# Check if RSL attribute parallel_environment is provided
#
if($description->parallel_environment())
{
$mpi_pe = $description->parallel_environment();
}

if(!$mpi_pe || $mpi_pe eq "NONE"){
$self->log("ERROR: Parallel Environment (PE) failure!");
$self->log(" MPI job was submitted, but no PE set");
$self->log(" by neither user nor administrator");
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
else
{
$self->log(" PE is $mpi_pe");
$sge_job_script->print("#\$ -pe $mpi_pe "
. $description->count() . "\n");
}

if (($sun_mprun eq "no") && ($mpirun eq "no"))
{
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
elsif ($sun_mprun ne "no")
{
#####
# Using Sun's MPI.
#
$sge_job_script->print("$sun_mprun -np ". $description->count() ." "
. $description->executable() . " $args < "
. $description->stdin() . "\n");
}
else
{
#####
# Using non-Sun's MPI.
#
$sge_job_script->print("$mpirun -np ". $description->count() . " "
. $description->executable() . " $args < "
. $description->stdin() . "\n");
}
}
elsif($description->jobtype() eq "multiple")
{
#####
# It's a multiple job
#
$self->log(" forking multiple requests");
for(my $i = 0; $i < $description->count(); $i++)
{
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "&\n");
}
$sge_job_script->print("wait\n");
}
elsif($description->count() > 1)
{
#####
# (single & count>1) -> Using job arrays
#
$self->log(" using job arrays with count " . $description->count());
$sge_job_script->print("#\$ -t 1-" . $description->count() . "\n");
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "\n");
}
else
{
#####
# Single execution job
#
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "\n");
}

#####
# SGE job script is successfully built! :-)
#
$self->log("SGE job script successfully built! :-)");
$sge_job_script->close();

if($description->logfile() ne "")
{
$errfile = "2>>" . $description->logfile();
}


#####
# Submitting a job
#
$self->log("Submitting a job");
# [dwm] Set SGE_ROOT in the environment; environment now appears to get
# emptied in GT3.
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;
if ( -r "$ENV{HOME}/.chos" ){
$chos=`cat $ENV{HOME}/.chos`;
$chos=~s/\n.*//;
$ENV{CHOS}=$chos;
}

chomp($job_id = `$qsub $sge_job_script_name`);

if($? == 0)
{
$self->log(" successfully submitted");

# get job ID
$job_id = (split(/\s+/, $job_id))[2];

# in the case we used job arrays
if ($job_id =~ m/\./)
{
$job_id = $`;
}

#system("$cache_pgm -cleanup-url $script_url");

# [dwm]
# FIXME: Cheat and append the stdout and stderr paths to the returned $jobid.
# (For use in poll(). This is to workaround a globus bug.)
# DANGER! This may fail badly if the job ID, stdout path or stderr path
# contain a "|" (pipe) character!
# Currently, this shouldn't happen.

return {JOB_ID => $job_id . "|" . $description->stdout() . "|" . $description->stderr(),
JOB_STATE => Globus::GRAM::JobState::PENDING };
}
else
{
$self->log(" ERROR: job submission failed");
if ($description->project())
{
$self->log(" check if the project specified does exist");
}
}

# system("$cache_pgm -cleanup-url $tag/sge_job_script.$$");


#####
# If we reach this - invalid script response
#
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}


#########################################################################
#
# POLL
#
########################################################################
sub poll
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->job_id();
my $state;
my $status_line;

my $job_out = "$$.log"; # make adefault name for polling stdout
my $job_err = "$$.err"; # make a default name for polling stderr

my $desc_job_id = $description->job_id(); # need to print this out for debug purposes
my $time = `/bin/date`;

# Assign to value real value coming from $description if not special device
#$job_out = $description->stdout() if (! -b $description->stdout() );
#$job_err = $description->stderr() if (! -b $description->stderr() );


# system ("/bin/echo -n polling job $job_id from PID $$ at '' >> $job_out");
# system ("/bin/echo `/bin/date` >> $job_out");

# [dwm] Cheat and seperate the job_out path from the job_id() string.
# This is to workaround the fact we don't get job_out() and _err()
# given to us!
# See Globus bug #1287.
$job_id =~ /(.*)\|(.*)\|(.*)/;
$job_id = $1;
$job_out = $2;
$job_err = $3;

# So, if it was a special block device, we would recover it once more here
# BUT we would only redirect to it via >> therefore, no need for checks.
#
# HOWEVER, the job_out/job_err would come in the form off
# /home/atems/.globus/job/rhic23.physics.wayne.edu/20993.1144432774/stdout
# /home/atems/.globus/job/rhic23.physics.wayne.edu/20993.1144432774/stderr
#
# so the multiple lines after $cat (no cleaning)



$self->log("polling job $job_id");
# open (JO, ">>$job_out");
# print JO "polling job $job_id from PID $$ [ $desc_job_id ] at $time";
# close (JO);
# system ("/bin/echo -n polling job $job_id from PID $$ [".$description->job_id()."] >> $job_out");
# system ("/bin/echo -n polling job $job_id from PID $$ originally $desc_job_id at '' >> $job_out");
# system ("/bin/echo `/bin/date` >> $job_out");

# system ("/bin/echo polling job $job_id from PID $$ $0 at $^T >> $job_out");
# local $time = `\bin\date`;
# system ("/bin/echo polling job $job_id from PID $$ at $time >> $job_out");

# [dwm] Replacement state checking code.
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;

my ($tries)=0;
my ($notexist);

POLL_AGAIN:
my (@output) = `$qstat -j $job_id 2>&1`;# Query job_id by number.
my ($status,$stsstr)=($?,$!);

if ($#output == -1 ){
if ($tries < 2){
sleep(2); # sleep 2 seconds
$tries++;
# open (JO, ">>$job_out");
# print JO "ERROR::Poll: On ".localtime()." we detected a failed polling. ",
# "Errors are [$status] [$stsstr]\n";
# close(JO);
goto POLL_AGAIN;
} else {
$notexist = "error"; # ensures that a queue failure will equate
# to job error from the Condor side
}
} else {
# there is a result
$notexist = $output[0]; # Obtain first line of output (STDOUT or STDERR)

# open (JO, ">>$job_out"); # print the qstat output
# print JO "The output of qstat has $#output+1 lines [$status] [$stsstr]\n";
# for ($i=0 ; $i<=$#output ; $i++) {
# print JO "Line $i: $output[$i]";
# }
# close (JO);
}


if ($notexist =~ /do not exist/) # Check to see if first line indicates job doesn't exist any more.
{
# Job no longer exists in SGE job manager. It must have finished.
$self->log("Job $job_id has completed.");
system ("/bin/echo Job $job_id has completed. >> $job_out");
$state = Globus::GRAM::JobState::DONE;

$self->log("Writing job STDOUT and STDERR to cache files.");
system ("/bin/echo Writing job STDOUT and STDERR to cache files. >> $job_out");

if(($description->jobtype() eq "single") && ($description->count() > 1))
#####
# Jobtype is single and count>1. Therefore, we used job arrays. We
# need to merge individual output/error files into one.
#
{
# [dwm] Use append, not overwrite to work around file streaming issues.
system ("$cat $job_out.* >> $job_out"); unlink(glob("$job_out.*"));
system ("$cat $job_err.* >> $job_err"); unlink(glob("$job_err.*"));
}
else
{
# [dwm] We still need to append the job output to the GASS cache file.
# We can't let SGE do this directly because it appears to
# *overwrite* the file, not append to it -- which the Globus
# file streaming components don't seem to handle properly.
# So append the output manually now.
system("$cat $job_out.real >> $job_out"); unlink("$job_out.real");
system("$cat $job_err.real >> $job_err"); unlink("$job_err.real");
}

$self->log("Returning job success.");
system ("/bin/echo Returning job success. >> $job_out");
}
else
{
# SGE still knows about the job, hence it cannot have completed yet.
# Determine it's current state and notify any interested parties.

$_ = pop @output; # Obtain scheduler details from output, if any.

# FIXME:
# Unfortunately, the nice single-character state field isn't printed
# if we do a lookup on a specific job with qstat, so we have to guess
# a little more that we would like.
# This is probably best fixed in SGE itself.

if (/"error"/) {
$self->log(" Job $job_id has failed!");
system ("/bin/echo Job $job_id has failed! >> $job_out");
$state = Globus::GRAM::JobState::FAILED;
}
elsif(/queue|pending/) {
$self->log(" Job is still queued for execution.");
system ("/bin/echo Job is still queued for execution. >> $job_out");
$state = Globus::GRAM::JobState::PENDING;
}
elsif(/hold|suspend/) {
$self->log(" Job is suspended.");
system ("/bin/echo Job is suspended. >> $job_out");
$state = Globus::GRAM::JobState::SUSPENDED;
}
else {
$self->log(" Job is running.");
system ("/bin/echo Job is running. >> $job_out");
$state = Globus::GRAM::JobState::ACTIVE;
}
}

return {JOB_STATE => $state};
}



#########################################################################
#
# CANCEL
#
########################################################################
sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();

$self->log("cancel job $job_id");

# system("$qdel $job_id >/dev/null 2>/dev/null");
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;
$job_id =~ /(.*)\|(.*)\|(.*)/;
$job_id = $1;
system("$qdel $job_id");

if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED }
}

return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}

1;