Job Managers

Several job managers are available as part of any OSG/VDT/Globus deploymenets. They may restrict access to keywords fundamental to job control and efficiency or may not even work.
The pages here will documents the needed changes or features.

Condor Job Manager

Condor job manager code is provided as-is for quick code inspection. The version below is from the OSG 0.4.1 software stack.

# Globus::GRAM::JobManager::condor package
#
# CVS Information:
# $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/setup/condor/condor.in,v $
# $Date: 2005/05/10 15:26:57 $
# $Revision: 1.15.6.1 $
# $Author: bester $
use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::GRAM::StdioMerger;
use Globus::Core::Paths;
use Config;
# NOTE: This package name must match the name of the .pm file!!
package Globus::GRAM::JobManager::condor;
@ISA = qw(Globus::GRAM::JobManager);
my ($condor_submit, $condor_rm);
BEGIN
{
$condor_submit = '/home/condor/bin/condor_submit';
$condor_rm = '/home/condor/bin/condor_rm';
}
sub new
{
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = $class->SUPER::new(@_);
my $log_dir;
my $description = $self->{JobDescription};
my $stdout = $description->stdout();
my $stderr = $description->stderr();
my $globus_condor_conf = "$ENV{GLOBUS_LOCATION}/etc/globus-condor.conf";
# We want to have individual Condor log files for each job for
# pre-WS GRAM, but still have a single log file for WS GRAM
# (which uses the SEG to monitor job status).
if ( !defined( $description->factoryendpoint() ) ) {
$self->{individual_condor_log} = 1;
} else {
$self->{individual_condor_log} = 0;
}
if (-r $globus_condor_conf)
{
local(*FH);
if (open(FH, "<$globus_condor_conf"))
{
while(<FH>) {
chomp;
if (m/log_path=(.*)$/) {
$self->{condor_logfile} = $1;
break;
}
}
close(FH);
}
}
if (! exists($self->{condor_logfile}) || $self->{individual_condor_log})
{
if(! exists($ENV{GLOBUS_SPOOL_DIR}))
{
$log_dir = $Globus::Core::Paths::tmpdir;
}
else
{
$log_dir = $ENV{GLOBUS_SPOOL_DIR};
}
if ( $self->{individual_condor_log} ) {
$self->{condor_logfile} = "$log_dir/gram_condor_log."
. $description->uniq_id();
} else {
$self->{condor_logfile} = "$log_dir/gram_condor_log";
}
}
if(! -e $self->{condor_logfile})
{
# We make sure that the log file exists with the correct
# permissions. If we just let Condor create it, it will
# have 664 permissions, and when another user submits a job
# they will be unable to write to the log file. We create the
# file in append mode to avoid a race condition, in case
# multiple instantiations of this script open and write
# to the log file.
if ( open(CONDOR_LOG_FILE, '>>' . $self->{condor_logfile}) )
{
close(CONDOR_LOG_FILE);
}
chmod(0666, $self->{condor_logfile});
}
if($description->jobtype() eq 'multiple' && $description->count > 1)
{
$self->{STDIO_MERGER} =
new Globus::GRAM::StdioMerger($self->job_dir(), $stdout, $stderr);
}
else
{
$self->{STDIO_MERGER} = 0;
}
bless $self, $class;
return $self;
}
sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my @environment;
my $environment_string;
my $script_filename;
my $error_filename;
my $requirements = '';
my $rank = '';
my @arguments;
my $argument_string;
my %library_vars;
my @response_text;
my @submit_attrs;
my $submit_attrs_string;
my $multi_output = 0;
my $failure_text = '';
# Reject jobs that want streaming, if so configured
if ( $description->streamingrequested() &&
$description->streamingdisabled() ) {
$self->log("Streaming is not allowed.");
return Globus::GRAM::Error::OPENING_STDOUT;
}
if($description->jobtype() eq 'single' ||
$description->jobtype() eq 'multiple')
{
$universe = 'vanilla';
if ($description->jobtype() eq 'multiple'
&& ($description->count() > 1)) {
$multi_output = 1;
}
}
elsif($description->jobtype() eq 'condor')
{
$universe = 'standard'
}
else
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED();
}
# Validate some RSL parameters
if(!defined($description->directory()))
{
return Globus::GRAM::Error::RSL_DIRECTORY;
}
elsif( $description->stdin() eq '')
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(ref($description->count()) ||
$description->count() != int($description->count()))
{
return Globus::GRAM::Error::INVALID_COUNT();
}
elsif( $description->executable eq '')
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
# In the standard universe, we can validate stdin and directory
# because they will sent to the execution host by condor transparently.
if($universe eq 'standard')
{
if(! -d $description->directory())
{
return Globus::GRAM::Error::BAD_DIRECTORY;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
}
$library_vars{LD_LIBRARY_PATH} = 0;
if($Config{osname} eq 'irix')
{
$library_vars{LD_LIBRARYN32_PATH} = 0;
$library_vars{LD_LIBRARY64_PATH} = 0;
}
@environment = $description->environment();
foreach $tuple (@environment)
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
if(exists($library_vars{$tuple->[0]}))
{
$tuple->[1] .= ":$library_string";
$library_vars{$tuple->[0]} = 1;
}
}
foreach (keys %library_vars)
{
my $library_path = join(':', $description->library_path());
if($library_vars{$_} == 0)
{
push(@environment, [$_, $library_path]);
}
}
$environment_string = join(';',
map {$_->[0] . "=" . $_->[1]} @environment);
@arguments = $description->arguments();
foreach (@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS();
}
}
if($#arguments >= 0)
{
$argument_string = join(' ',
map
{
$_ =~ s/"/\\\"/g; #"
$_;
}
@arguments);
}
else
{
$argument_string = '';
}
@submit_attrs = $description->condorsubmit();
if(defined($submit_attrs[0]))
{
foreach $tuple (@submit_attrs)
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_SCHEDULER_SPECIFIC();
}
}
$submit_attrs_string = join("\n",
map {$_->[0] . "=" . $_->[1]} @submit_attrs);
}
else
{
$submit_attrs_string = '';
}
# Create script for condor submission
$script_filename = $self->job_dir() . '/scheduler_condor_submit_script';
$error_filename = $self->job_dir() . '/scheduler_condor_submit_stderr';
local(*SCRIPT_FILE);
open(SCRIPT_FILE, ">$script_filename")
or return Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED;
print SCRIPT_FILE "#\n# description file for condor submission\n#\n";
print SCRIPT_FILE "Universe = $universe\n";
print SCRIPT_FILE "Notification = Never\n";
if ($description->directory() =~ m|^[^/]|)
{
my $home = (getpwuid($<))[7];
$description->add('directory', "$home/".$description->directory());
}
if ($description->executable() =~ m|^[^/]|)
{
$description->add('executable',
$description->directory() . '/' . $description->executable());
}
print SCRIPT_FILE "Executable = " . $description->executable . "\n";
$requirements = "OpSys == \"" . $description->condor_os() . "\" ";
$requirements .= " && Arch == \"" . $description->condor_arch() . "\" ";
if($description->min_memory() ne '')
{
$requirements .= " && Memory >= " . $description->min_memory();
$rank = "rank = Memory\n";
}
print SCRIPT_FILE "Requirements = $requirements\n";
if($rank ne '')
{
print SCRIPT_FILE "$rank\n";
}
print SCRIPT_FILE "Environment = $environment_string\n";
print SCRIPT_FILE "Arguments = $argument_string\n";
print SCRIPT_FILE "InitialDir = " . $description->directory() . "\n";
print SCRIPT_FILE "Input = " . $description->stdin() . "\n";
print SCRIPT_FILE "Log = " . $self->{condor_logfile} . "\n";
print SCRIPT_FILE "log_xml = True\n";
print SCRIPT_FILE "#Extra attributes specified by client\n";
print SCRIPT_FILE "$submit_attrs_string\n";
for (my $i = 0; $i < $description->count(); $i++) {
if ($multi_output) {
print SCRIPT_FILE "Output = " .
$self->{STDIO_MERGER}->add_file('out') . "\n";
print SCRIPT_FILE "Error = " .
$self->{STDIO_MERGER}->add_file('err') . "\n";
} else {
print SCRIPT_FILE "Output = " . $description->stdout() . "\n";
print SCRIPT_FILE "Error = " . $description->stderr() . "\n";
}
print SCRIPT_FILE "queue 1\n";
}
close(SCRIPT_FILE);
$self->log("About to submit condor job");
local(*FH);
my $pid = open(FH, "-|");
if( !defined($pid))
{
# failure to fork
$failure_text = "fork: $!\n";
}
elsif ($pid)
{
my $rc = 0;
$self->log("I am the parent");
# parent
@response_text = <FH>;
$rc = close(FH);
if ((!$rc) && $! == 0) {
$self->log("submission failed!!!");
# condor submission failed with non-zero exit code
$self->nfssync( $error_filename, 0);
if ($rc == 127 && $response_text[0] =~ m/^exec: /) {
# exec failed
$self->log("exec failed\n");
$failure_text = join(//, @response_text);
@response_text = ();
} elsif (-s $error_filename) {
$self->log("Error file is not empty, and submission failed\n");
# condor_submit stderr is in $error_filename, we'll use that
# as our extended error info
local(*ERR);
open(ERR, "<$error_filename");
local($/);
$failure_text = <ERR>;
$self->log("Error text is $failure_text");
close(ERR);
@response_text = ();
} else {
$self->log("Error file is empty, and submission failed\n");
}
} else {
$self->log("\$rc = $rc, \$! = $!");
}
}
else
{
# child
open (STDERR, '>' . $error_filename);
select(STDERR); $| = 1;
select(STDOUT); $| = 1;
if (! exec { $condor_submit } $condor_submit, $script_filename)
{
print "exec: $!\n";
exit(127);
}
}
if (@response_text)
{
if ($failure_text ne '') {
$self->log("Strange, $failure_text is defined!");
}
$response_line =(grep(/submitted to cluster/, @response_text))[0];
$job_id = (split(/\./, (split(/\s+/, $response_line))[5]))[0];
if($job_id ne '')
{
$status = Globus::GRAM::JobState::PENDING;
if ($description->emit_condor_processes()) {
$job_id = join(',', map { sprintf("%03d.%03d.%03d",
$job_id, $_, 0) } (0..($description->count()-1)));
}
return {JOB_STATE => Globus::GRAM::JobState::PENDING,
JOB_ID => $job_id};
}
} elsif ($failure_text ne '') {
$self->log("Writing extended error information to stderr");
local(*ERR);
open(ERR, '>' . $description->stderr());
print ERR $failure_text;
close(ERR);
$failure_text =~ s/\n/ /g;
$self->respond({GT3_FAILURE_MESSAGE => $failure_text });
}
return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
}
sub poll
{
my $self = shift;
my $description = $self->{JobDescription};
my $state;
my $job_id = $description->job_id();
my ($cluster, $proc, $subproc) = ($job_id, 0, 0);
my $num_done;
my $num_run;
my $num_evict;
my $num_abort;
$self->log("polling job " . $description->jobid());
if (! $description->emit_condor_processes()) {
local(*CONDOR_LOG_FILE);
if ( open(CONDOR_LOG_FILE, '<' . $self->{condor_logfile}) )
{
while (<CONDOR_LOG_FILE>)
{
if (/<c>/) {
if (defined($record)) {
if ($record->{Cluster} == $cluster)
{
# record Matches our job id
if ($record->{EventTypeNumber} == 1)
{
# execute event
$num_run++;
} elsif ($record->{EventTypeNumber} == 4) {
$num_evict++;
} elsif ($record->{EventTypeNumber} == 5) {
$num_done++;
} elsif ($record->{EventTypeNumber} == 9) {
$num_abort++;
}
}
}
$record = {};
} elsif (/<a n="([^"]+)">/) {
my $attr = $1;
if (/<s>([^<]+)<\/s>/) {
$record->{$attr} = $1;
} elsif (/<i>([^<]+)<\/i>/) {
$record->{$attr} = int($1);
} elsif (/<b v="([tf])"\/>/) {
$record->{$attr} = ($1 eq 't');
} elsif (/<r>([^<]+)<\/r>/) {
$record->{$attr} = $1;
}
} elsif (/<\/c>/) {
}
}
if (defined($record)) {
if ($record->{Cluster} == $cluster)
{
# record Matches our job id
if ($record->{EventTypeNumber} == 1)
{
# execute event
$num_run++;
} elsif ($record->{EventTypeNumber} == 4) {
$num_evict++;
} elsif ($record->{EventTypeNumber} == 5) {
$num_done++;
} elsif ($record->{EventTypeNumber} == 9) {
$num_abort++;
}
}
}
@status = grep(/^[0-9]* \(0*${job_id}/, <CONDOR_LOG_FILE>);
close(CONDOR_LOG_FILE);
}
else
{
$self->nfssync( $description->stdout(), 0 )
if $description->stdout() ne '';
$self->nfssync( $description->stderr(), 0 )
if $description->stderr() ne '';
if ( ${self}->{individual_condor_log} ) {
unlink($self->{condor_logfile});
}
return { JOB_STATE => Globus::GRAM::JobState::DONE };
}
if($num_abort > 0)
{
if ( ${self}->{individual_condor_log} ) {
unlink($self->{condor_logfile});
}
$state = Globus::GRAM::JobState::FAILED;
}
elsif($num_done == $description->count())
{
$self->nfssync( $description->stdout(), 0 )
if $description->stdout() ne '';
$self->nfssync( $description->stderr(), 0 )
if $description->stderr() ne '';
if ( ${self}->{individual_condor_log} ) {
unlink($self->{condor_logfile});
}
$state = Globus::GRAM::JobState::DONE;
}
elsif($num_run == 0)
{
$state = Globus::GRAM::JobState::PENDING;
}
elsif($num_run > $num_evict)
{
$state = Globus::GRAM::JobState::ACTIVE;
}
else
{
$state = Globus::GRAM::JobState::SUSPENDED;
}
}
if($self->{STDIO_MERGER}) {
$self->{STDIO_MERGER}->poll($state == Globus::GRAM::JobState::DONE);
}
return { JOB_STATE => $state };
}
sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
my $count = 0;
if ($job_id =~ m/(\d+\.\d+)\.\d+/) {
$job_id = $1;
}
$self->log("cancel job " . $description->jobid());
# we do not need to be too efficient here
$self->log(`$condor_rm $job_id 2>&1`);
if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED };
}
else
{
return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}
}
1;

LSF job manager

LSF job manager code below is from globus 2.4.3.

use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;

use Config;

package Globus::GRAM::JobManager::lsf;

@ISA = qw(Globus::GRAM::JobManager);

my ($lsf_profile, $mpirun, $bsub, $bjobs, $bkill);

BEGIN
{
$lsf_profile = '/usr/lsf/conf/profile.lsf';
$mpirun = 'no';
$bsub = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bsub";
$bjobs = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bjobs";
$bkill = ". $lsf_profile && /usr/lsf/5.1/linux2.4-glibc2.2-x86/bin/bkill";
}

sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() or $tag = $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $lsf_job_script;
my $lsf_job_script_name;
my $errfile = '';
my $queue;
my $job_id;
my $script_url;
my @arguments;
my $email_when = '';
my $library_path;
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my @library_vars;

$self->log('Entering lsf submit');

# check jobtype
if(defined($description->jobtype()))
{
if($description->jobtype !~ /^(mpi|single|multiple)$/)
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
elsif($description->jobtype() eq 'mpi' && $mpirun eq 'no')
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
}
if( $description->directory eq '')
{
return Globus::GRAM::Error::RSL_DIRECTORY;
}
if((! -d $description->directory) || (! -r $description->directory))
{
return Globus::GRAM::Error::BAD_DIRECTORY;
}

# make sure the files are accessible (NFS sync) when you check for them
$self->nfssync( $description->executable() )
unless $description->executable() eq '';
$self->nfssync( $description->stdin() )
unless $description->stdin() eq '';

if( $description->executable eq '')
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
elsif( $description->stdin() eq '')
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}

$self->log('Determining job max time cpu from job description');
if(defined($description->max_cpu_time()))
{
$cpu_time = $description->max_cpu_time();
$self->log(" using maxcputime of $cpu_time");
}
elsif(defined($description->max_time()))
{
$cpu_time = $description->max_time();
$self->log(" using maxtime of $cpu_time");
}
else
{
$cpu_time = 0;
$self->log(' using queue default');
}

$self->log('Determining job max wall time limit from job description');
if(defined($description->max_wall_time()))
{
$wall_time = $description->max_wall_time();
$self->log(" using maxwalltime of $wall_time");
}
else
{
$wall_time = 0;
$self->log(' using queue default');
}

if($description->queue() ne '')
{
$queue = $description->queue();
}

$self->log('Building job script');

$script_url = "$tag/lsf_job_script.$$";
$self->fork_and_exec_cmd( $cache_pgm, '-add', '-t', $tag,
'-n', $script_url, 'file:/dev/null' );
$lsf_job_script_name = $self->pipe_out_cmd( $cache_pgm, '-query', '-t',
$tag, $script_url );
chomp($lsf_job_script_name);
if($lsf_job_script_name eq '')
{
return Globus::GRAM::ERROR::TEMP_SCRIPT_FILE_FAILED();
}

local(*JOB);
open( JOB, '>' . $lsf_job_script_name );
print JOB<<"EOF";
#! /bin/sh
#
# LSF batch job script built by Globus Job Manager
#
EOF

if(defined($queue))
{
print JOB "#BSUB -q $queue\n";
}
if(defined($description->project()))
{
print JOB '#BSUB -P ', $description->project(), "\n";
}

if($cpu_time != 0)
{
if($description->jobtype() eq 'multiple')
{
$total_cpu_time = $cpu_time * $description->count();
}
else
{
$total_cpu_time = $cpu_time;
}
print JOB "#BSUB -c ${total_cpu_time}\n";
}

if($wall_time != 0)
{
print JOB "#BSUB -W $wall_time\n";
}

if($description->max_memory() != 0)
{
$max_memory = $description->max_memory() * 1024;

if($description->jobtype() eq 'multiple')
{
$total_max_memory = $max_memory * $description->count();
}
else
{
$total_max_memory = $max_memory;
}
print JOB "#BSUB -M ${total_max_memory}\n";
}
print JOB '#BSUB -i ', $description->stdin(), "\n";
print JOB '#BSUB -e ', $description->stderr(), "\n";
print JOB '#BSUB -o ', $description->stdout(), "\n";
print JOB "#BSUB -N\n";
print JOB '#BSUB -n ', $description->count(), "\n";

foreach my $tuple ($description->environment())
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
print JOB $tuple->[0], '=', $tuple->[1],
'; export ', $tuple->[0], "\n";
}

$library_path = join(':', $description->library_path());
@library_vars = ('LD_LIBRARY_PATH');

if($Config{osname} eq 'irix')
{
push(@library_vars, 'LD_LIBRARYN32_PATH', 'LD_LIBRARY64_PATH');
}

foreach (@library_vars)
{
print JOB <<"EOF";

if test 'X\${$_}' != 'X'; then
$_="\${LD_LIBRARY_PATH}:$library_path"
else
$_="$library_path"
fi
export $_
EOF
}

print JOB "\n#Change to directory requested by user\n";
print JOB 'cd ', $description->directory(), "\n";

@arguments = $description->arguments();

foreach(@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS;
}
}
if($arguments[0])
{
foreach(@arguments)
{
$_ =~ s/\\/\\\\/g;
$_ =~ s/\$/\\\$/g;
$_ =~ s/"/\\\"/g; #"
$_ =~ s/`/\\\`/g; #`

$args .= '"' . $_ . '" ';
}
}
else
{
$args = '';
}
if($description->jobtype() eq 'mpi')
{
print JOB "$mpirun -np ", $description->count(), ' ';
print JOB $description->executable(), " $args \n";
}
elsif($description->jobtype() eq 'multiple')
{
for(my $i = 0; $i < $description->count(); $i++)
{
print JOB $description->executable(), " $args &\n";
}
print JOB "wait\n";
}
else
{
print JOB $description->executable(), " $args\n";
}
close(JOB);
chmod 0755, $lsf_job_script_name;

if($description->logfile() ne '')
{
$errfile = "2>" . $description->logfile();
}
$self->nfssync( $lsf_job_script_name );
$job_id = (grep(/is submitted/,
split(/\n/, `$bsub < $lsf_job_script_name $errfile`)))[0];

if($? == 0)
{
$job_id =~ m/<([^>]*)>/;
$job_id = $1;

return {
JOB_ID => $job_id,
JOB_STATE => Globus::GRAM::JobState::PENDING
};
}
#system("$cache_pgm -cleanup-url $tag/lsf_job_script.$$");
$self->fork_and_exec_cmd( $cache_pgm, '-cleanup-url',
"$tag/lsf_job_script.$$" );

return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}

sub poll
{
# The LSF bjobs command is used to obtain the current
# status of the job. This status is then returned.
#
# The Status field can contain one of the following strings:
#
# string stands for Globus context meaning
# --------------------------------------------------------------------
# RUN Running ACTIVE
# PEND Wating to be scheduled PENDING
# USUSP Suspended while running SUSPENDED
# PSUSP Suspended while pending SUSPENDED
# SSUSP Suspended by system SUSPENDED
# DONE Completed sucessfully DONE
# EXIT Completed unsuccessfully FAILED
# UNKWN Unknown state *ignore*
# ZOMBI Unknown state FAILED

my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
my $state;
my $status_line;
my $exit_code;

$self->log("polling job $job_id");

# Get first line matching job id
# needs to be back-ticks to source lsf profile
$_ = (grep(/$job_id/, `$bjobs $job_id 2>/dev/null`))[0];

# get the exit code of the bjobs command. For more info, do a
# search for $CHILD_ERROR in perlvar documentation.
$exit_code = $? >> 8;

# Verifying that the job is no longer there.
# return code 255 = "Job <123> is not found"
if($exit_code == 255)
{
$self->log("bjobs rc is 255 == Job <123> is not found == DONE");
$state = Globus::GRAM::JobState::DONE;
$self->nfssync( $description->stdout() )
if $description->stdout() ne '';
$self->nfssync( $description->stderr() )
if $description->stderr() ne '';
}
else
{

# Get 3th field (status)
$_ = (split(/\s+/))[2];

if(/PEND/)
{
$state = Globus::GRAM::JobState::PENDING;
}
elsif(/DONE/)
{
$state = Globus::GRAM::JobState::DONE;
$self->nfssync( $description->stdout() )
if $description->stdout() ne '';
$self->nfssync( $description->stderr() )
if $description->stderr() ne '';
}
elsif(/USUSP|SSUSP|PSUSP/)
{
$state = Globus::GRAM::JobState::SUSPENDED;
}
elsif(/RUN/)
{
$state = Globus::GRAM::JobState::ACTIVE;
}
elsif(/EXIT/)
{
return Globus::GRAM::Error::JOB_EXIT_CODE_NON_ZERO();
}
elsif(/UNKWN/)
{
# We want the JM to ignore this poll and keep the same state
# as the previous state. Returning an empty hash will do the job.
$self->log("bjobs returned the UNKWN state. Telling JM to ignore this poll");
return {};
}
elsif(/ZOMBI/)
{
return Globus::GRAM::Error::LOCAL_SCHEDULER_ERROR();
}
else
{
# This else is reached by an unknown response from lsf.
# It could be that LSF was temporarily unavailable, but that it
# can recover and the submitted job is fine.
# We want the JM to ignore this poll and keep the same state
# as the previous state. Returning an empty hash will do the job.
$self->log("bjobs returned an unknown response. Telling JM to ignore this poll");
return {};
}
}

return {JOB_STATE => $state};
}

sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();

$self->log("cancel job $job_id");
# needs to be back-ticks to source lsf profile
system("$bkill $job_id >/dev/null 2>/dev/null");

if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED };
}
return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}

1;

SGE Job Manager

SGE job manager code was developed by the UK Grid eScience. It is provided as-is for quick code inspection. The version below is as integrated in VDT 1.5.2 (post OSG 0.4.1). Please, note that the version below includes patches provided by the RHIC/STAR VO. Consult SGE Job Manager patch for more information.

#
# Marko Krznaric
# London eScience Centre
# June 2003
#
# Contributions by David McBride
# London eScience Centre
# Oct 2003

use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;

use IO::File;
use Config;
use POSIX;

package Globus::GRAM::JobManager::sge;

@ISA = qw(Globus::GRAM::JobManager);


my ($qsub, $qstat, $qdel, $qselect, $qhost, $qconf, $qacct,
$mpirun, $sun_mprun,
$CAT,
$SGE_ROOT, $SGE_CELL,
$SGE_MODE, $SGE_RELEASE);

BEGIN
{
$qsub = '/common/sge/6.0u4/bin/lx24-x86//qsub';
$qstat = '/common/sge/6.0u4/bin/lx24-x86//qstat';
$qdel = '/common/sge/6.0u4/bin/lx24-x86//qdel';
$qselect = '/common/sge/6.0u4/bin/lx24-x86//qselect';
$qhost = '/common/sge/6.0u4/bin/lx24-x86//qhost';
$qconf = '/common/sge/6.0u4/bin/lx24-x86//qconf';
$qacct = '/common/sge/6.0u4/bin/lx24-x86//qacct';
#
$mpirun = '/usr/bin/mpirun';
$sun_mprun = 'no';
$mpi_pe = '';
#
if(($mpirun eq "no") && ($sun_mprun eq "no"))
{ $supported_job_types = "(single|multiple)"; }
else
{ $supported_job_types = "(mpi|single|multiple)"; }
#
$cat = '/bin/cat';
#
$SGE_ROOT = '/common/sge/6.0u4';
$SGE_CELL = 'default';
$SGE_MODE = 'SGE';
$SGE_RELEASE = '6.0u4';
}


#########################################################################
#
# SUBMIT
#
########################################################################
sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() or $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $sge_job_script;
my $sge_job_script_name;
my $errfile = "";
my $queue;
my $job_id;
my $rsh_env;
my $script_url;
my @arguments;
my $email_when = "";
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my %library_vars;

$self->log("Entering SGE submit");

#####
# check jobtype
#
if(defined($description->jobtype()))
{
if($description->jobtype !~ /^$supported_job_types$/)
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
}

#####
# check directory
#
if( $description->directory eq "")
{
return Globus::GRAM::Error::RSL_DIRECTORY();
}
chdir $description->directory() or
return Globus::GRAM::Error::BAD_DIRECTORY();

#####
# check executable
#
if( $description->executable eq "")
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
elsif( $description->stdin() eq "")
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}


#####
# RSL attributes max_cpu_time/max_wall_time (given in minutes)
# explicitly set the maximum cpu/wall time. max_time can be used
# for both, max_cpu_time and max_wall_time

#####
# determining max_wall_time
#
$self->log("Determining job WALL time");
if(defined($description->max_wall_time()))
{
$wall_time = $description->max_wall_time();
$self->log(" using max_wall_time of $wall_time minutes");
}
elsif(defined($description->max_time()))
{
$wall_time = $description->max_time();
$self->log(" using max_wall_time of $wall_time minutes");
}
else
{
$wall_time = 0;
$self->log(" using queue default");
}

#####
# determining max_cpu_time
#
$self->log("Determining job CPU time");
if(defined($description->max_cpu_time()))
{
$cpu_time = $description->max_cpu_time();
$self->log(" using max_cpu_time of $cpu_time minutes");
}
elsif(defined($description->max_time()))
{
$cpu_time = $description->max_time();
$self->log(" using max_cpu_time of $cpu_time minutes");
}
else
{
$cpu_time = 0;
$self->log(" using queue default");
}


#####
# start building job script
#
$self->log('Building job script');


#####
# open script file
#
$script_url = "$tag/sge_job_script.$$";
system("$cache_pgm -add -t $tag -n $script_url file:/dev/null");
$sge_job_script_name = `$cache_pgm -query -t $tag $script_url`;
chomp($sge_job_script_name);
if($sge_job_script_name eq "")
{
return Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED();
}

$sge_job_script = new IO::File($sge_job_script_name, '>');
# $self->log(" script location: $sge_job_script_name");


#####
# Writing script header
#
$sge_job_script->print("#!/bin/sh\n");
$sge_job_script->print("# Grid Engine batch job script built by ");
$sge_job_script->print("Globus job manager\n");
$sge_job_script->print("\n");
$sge_job_script->print("#\$ -S /bin/sh\n");


#####
# Whom to send email and when
#
if($description->email_address() ne '')
{
$sge_job_script->print("#\$ -M ". $description->email_address() ."\n");
$self->log("Monitoring job by email");
$self->log(" email address: " . $description->email_address());
}
if($description->emailonabort() eq 'yes')
{
$email_when .= 'a';
$self->log(" email when job is aborted");
}
if($description->emailonexecution() eq 'yes')
{
$email_when .= 'b';
$self->log(" email at the beginning of the job");
}
if($description->emailontermination() eq 'yes')
{
$email_when .= 'e';
$self->log(" email at the end of the job");
}
if($description->emailonsuspend() eq 'yes')
{
$email_when .= 's';
$self->log(" email when job is suspended");
}
if($email_when eq '')
{
$email_when = 'n';
$self->log(" email(s) will not be sent");
}
$sge_job_script->print("#\$ -m $email_when\n");


#####
# Defines a list of queues used to execute this job
#
if(defined($description->queue()))
{
$sge_job_script->print("#\$ -q " . $description->queue() . "\n");
$self->log("Using the following queues:");
$self->log(" " . $description->queue);
}


#####
# Writing project info, but only with SGEEE. Otherwise ignore and warn!
#
$self->log("Checking project details");
if(defined($description->project()))
{
if( $SGE_MODE eq "SGEEE" )
{
$self->log(" SGE Enterprise Edition: projects supported");
$self->log(" Job assigned to " . $description->project());
$sge_job_script->print("#\$ -P ". $description->project() ."\n");
}
else
{
$self->log(" SGE Regular Edition: NO project support");
$self->log(" WARNING: Project set to " . $description->project());
$self->log(" but it will be ignored by Grid Engine");
$sge_job_script->print("#\$ -P ". $description->project() ."\n");
}
}
else
{
$self->log(" Project not specified");
}


#####
# wall_time was in minutes. Converting to SGE time format (h:m:s)
#
if($wall_time != 0)
{
$wall_m = $wall_time % 60;
$wall_h = ( $wall_time - $wall_m ) / 60;

$self->log("Using max WALL time (h:m:s) of $wall_h:$wall_m:00");
$sge_job_script->print("#\$ -l h_rt=$wall_h:$wall_m:00\n");
}

#####
# cpu_time was in minutes. Converting to SGE time format (h:m:s)
#
if($cpu_time != 0)
{
$cpu_m = $cpu_time % 60;
$cpu_h = ( $cpu_time - $cpu_m ) / 60;

$self->log("Using max CPU time (h:m:s) of $cpu_h:$cpu_m:00");
$sge_job_script->print("#\$ -l h_cpu=$cpu_h:$cpu_m:00\n");
}


#####
# RSL attribute for max_memory is given in Mb
#
$max_memory = $description->max_memory();
if($max_memory != 0)
{
$self->log("Total max memory flag is set to $max_memory Mb");
$sge_job_script->print("#\$ -l h_data=$max_memory" . "M\n");
}



#####
# Where to write output and error?
#
if(($description->jobtype() eq "single") && ($description->count() > 1))
{
#####
# It's a single job and we use job arrays
#
$sge_job_script->print("#\$ -o "
. $description->stdout() . ".\$TASK_ID\n") if (! -b $description->stdout() );
$sge_job_script->print("#\$ -e "
. $description->stderr() . ".\$TASK_ID\n") if (! -b $description->stderr() );
}
else
{
# [dwm] Don't use real output paths; copy the output there later.
# Globus doesn't seem to handle streaming of the output
# properly and can result in the output being lost.
# FIXME: We would prefer continuous streaming. Try to determine
# precisely what's failing so that we can fix the problem.
# See Globus bug #1288.
$sge_job_script->print("#\$ -o " . $description->stdout() . ".real\n") if (! -b $description->stdout() );
$sge_job_script->print("#\$ -e " . $description->stderr() . ".real\n") if (! -b $description->stderr() );
}


#####
# Constructing the environment variable
#

$library_vars{LD_LIBRARY_PATH} = 0;
if($Config{osname} eq 'irix')
{
$library_vars{LD_LIBRARYN32_PATH} = 0;
$library_vars{LD_LIBRARY64_PATH} = 0;
}

foreach my $tuple ($description->environment())
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
if(exists($library_vars{$tuple->[0]}))
{
$tuple->[1] .= ":$library_string";
$library_vars{$tuple->[0]} = 1;
}

push(@new_env, $tuple->[0] . "=" . $tuple->[1]);

$tuple->[0] =~ s/\\/\\\\/g;
$tuple->[0] =~ s/\$/\\\$/g;
$tuple->[0] =~ s/"/\\\"/g;
$tuple->[0] =~ s/`/\\\`/g;

$tuple->[1] =~ s/\\/\\\\/g;
$tuple->[1] =~ s/\$/\\\$/g;
$tuple->[1] =~ s/"/\\\"/g;
$tuple->[1] =~ s/`/\\\`/g;


#####
# Special treatment for GRD_PE or SGE_PE.
# If jobType is mpi, this can conflict with the default PE.
# In that case, we override the default PE. Please note, that
# this can be overriden by RSL attribute parallel_envirnment!
#
if (($tuple->[0] eq "GRD_PE") || ($tuple->[0] eq "SGE_PE"))
{
if($description->jobtype() eq "mpi")
{
$mpi_pe = $tuple->[1];
}
else
{
$sge_job_script->print("#\$ -pe " . $tuple->[1] . " " .
$description->count() . "\n");
}
}
else
{
$sge_job_script->print($tuple->[0] . "=" . $tuple->[1]
. "; export " . $tuple->[0] . "\n");
}

}

foreach (keys %library_vars)
{
if($library_vars{$_} == 0)
{
push(@new_env, $_ . "=" . $library_path);
$sge_job_script->print("$_=$library_path;\n" . "export $_;\n");
}
}


#####
# Load SGE settings
#
$sge_job_script->print(". $SGE_ROOT/$SGE_CELL/common/settings.sh\n");


#####
#
#
$sge_job_script->print("# Change to directory requested by user\n");
$sge_job_script->print('cd ' . $description->directory() . "\n");


#####
# Transforing arguments
#
@arguments = $description->arguments();

foreach(@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS;
}
}
if($arguments[0])
{
foreach(@arguments)
{
$self->log("Transforming argument \"$_\"");
$_ =~ s/\\/\\\\/g;
$_ =~ s/\$/\\\$/g;
$_ =~ s/"/\\\"/g;
$_ =~ s/`/\\\`/g;
$self->log("Transformed to \"$_\"");

$args .= '"' . $_ . '" ';
}
}
else
{
$args = '';
}


#####
# Determining job request type.
#
$self->log("Determining job type");
$self->log(" Job is of type " . $description->jobtype());
if($description->jobtype() eq "mpi")
{
#####
# It's MPI job
#

#####
# Check if RSL attribute parallel_environment is provided
#
if($description->parallel_environment())
{
$mpi_pe = $description->parallel_environment();
}

if(!$mpi_pe || $mpi_pe eq "NONE"){
$self->log("ERROR: Parallel Environment (PE) failure!");
$self->log(" MPI job was submitted, but no PE set");
$self->log(" by neither user nor administrator");
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
else
{
$self->log(" PE is $mpi_pe");
$sge_job_script->print("#\$ -pe $mpi_pe "
. $description->count() . "\n");
}

if (($sun_mprun eq "no") && ($mpirun eq "no"))
{
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
elsif ($sun_mprun ne "no")
{
#####
# Using Sun's MPI.
#
$sge_job_script->print("$sun_mprun -np ". $description->count() ." "
. $description->executable() . " $args < "
. $description->stdin() . "\n");
}
else
{
#####
# Using non-Sun's MPI.
#
$sge_job_script->print("$mpirun -np ". $description->count() . " "
. $description->executable() . " $args < "
. $description->stdin() . "\n");
}
}
elsif($description->jobtype() eq "multiple")
{
#####
# It's a multiple job
#
$self->log(" forking multiple requests");
for(my $i = 0; $i < $description->count(); $i++)
{
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "&\n");
}
$sge_job_script->print("wait\n");
}
elsif($description->count() > 1)
{
#####
# (single & count>1) -> Using job arrays
#
$self->log(" using job arrays with count " . $description->count());
$sge_job_script->print("#\$ -t 1-" . $description->count() . "\n");
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "\n");
}
else
{
#####
# Single execution job
#
$sge_job_script->print($description->executable() . " $args < "
. $description->stdin() . "\n");
}

#####
# SGE job script is successfully built! :-)
#
$self->log("SGE job script successfully built! :-)");
$sge_job_script->close();

if($description->logfile() ne "")
{
$errfile = "2>>" . $description->logfile();
}


#####
# Submitting a job
#
$self->log("Submitting a job");
# [dwm] Set SGE_ROOT in the environment; environment now appears to get
# emptied in GT3.
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;
if ( -r "$ENV{HOME}/.chos" ){
$chos=`cat $ENV{HOME}/.chos`;
$chos=~s/\n.*//;
$ENV{CHOS}=$chos;
}

chomp($job_id = `$qsub $sge_job_script_name`);

if($? == 0)
{
$self->log(" successfully submitted");

# get job ID
$job_id = (split(/\s+/, $job_id))[2];

# in the case we used job arrays
if ($job_id =~ m/\./)
{
$job_id = $`;
}

#system("$cache_pgm -cleanup-url $script_url");

# [dwm]
# FIXME: Cheat and append the stdout and stderr paths to the returned $jobid.
# (For use in poll(). This is to workaround a globus bug.)
# DANGER! This may fail badly if the job ID, stdout path or stderr path
# contain a "|" (pipe) character!
# Currently, this shouldn't happen.

return {JOB_ID => $job_id . "|" . $description->stdout() . "|" . $description->stderr(),
JOB_STATE => Globus::GRAM::JobState::PENDING };
}
else
{
$self->log(" ERROR: job submission failed");
if ($description->project())
{
$self->log(" check if the project specified does exist");
}
}

# system("$cache_pgm -cleanup-url $tag/sge_job_script.$$");


#####
# If we reach this - invalid script response
#
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}


#########################################################################
#
# POLL
#
########################################################################
sub poll
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->job_id();
my $state;
my $status_line;

my $job_out = "$$.log"; # make adefault name for polling stdout
my $job_err = "$$.err"; # make a default name for polling stderr

my $desc_job_id = $description->job_id(); # need to print this out for debug purposes
my $time = `/bin/date`;

# Assign to value real value coming from $description if not special device
#$job_out = $description->stdout() if (! -b $description->stdout() );
#$job_err = $description->stderr() if (! -b $description->stderr() );


# system ("/bin/echo -n polling job $job_id from PID $$ at '' >> $job_out");
# system ("/bin/echo `/bin/date` >> $job_out");

# [dwm] Cheat and seperate the job_out path from the job_id() string.
# This is to workaround the fact we don't get job_out() and _err()
# given to us!
# See Globus bug #1287.
$job_id =~ /(.*)\|(.*)\|(.*)/;
$job_id = $1;
$job_out = $2;
$job_err = $3;

# So, if it was a special block device, we would recover it once more here
# BUT we would only redirect to it via >> therefore, no need for checks.
#
# HOWEVER, the job_out/job_err would come in the form off
# /home/atems/.globus/job/rhic23.physics.wayne.edu/20993.1144432774/stdout
# /home/atems/.globus/job/rhic23.physics.wayne.edu/20993.1144432774/stderr
#
# so the multiple lines after $cat (no cleaning)



$self->log("polling job $job_id");
# open (JO, ">>$job_out");
# print JO "polling job $job_id from PID $$ [ $desc_job_id ] at $time";
# close (JO);
# system ("/bin/echo -n polling job $job_id from PID $$ [".$description->job_id()."] >> $job_out");
# system ("/bin/echo -n polling job $job_id from PID $$ originally $desc_job_id at '' >> $job_out");
# system ("/bin/echo `/bin/date` >> $job_out");

# system ("/bin/echo polling job $job_id from PID $$ $0 at $^T >> $job_out");
# local $time = `\bin\date`;
# system ("/bin/echo polling job $job_id from PID $$ at $time >> $job_out");

# [dwm] Replacement state checking code.
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;

my ($tries)=0;
my ($notexist);

POLL_AGAIN:
my (@output) = `$qstat -j $job_id 2>&1`;# Query job_id by number.
my ($status,$stsstr)=($?,$!);

if ($#output == -1 ){
if ($tries < 2){
sleep(2); # sleep 2 seconds
$tries++;
# open (JO, ">>$job_out");
# print JO "ERROR::Poll: On ".localtime()." we detected a failed polling. ",
# "Errors are [$status] [$stsstr]\n";
# close(JO);
goto POLL_AGAIN;
} else {
$notexist = "error"; # ensures that a queue failure will equate
# to job error from the Condor side
}
} else {
# there is a result
$notexist = $output[0]; # Obtain first line of output (STDOUT or STDERR)

# open (JO, ">>$job_out"); # print the qstat output
# print JO "The output of qstat has $#output+1 lines [$status] [$stsstr]\n";
# for ($i=0 ; $i<=$#output ; $i++) {
# print JO "Line $i: $output[$i]";
# }
# close (JO);
}


if ($notexist =~ /do not exist/) # Check to see if first line indicates job doesn't exist any more.
{
# Job no longer exists in SGE job manager. It must have finished.
$self->log("Job $job_id has completed.");
system ("/bin/echo Job $job_id has completed. >> $job_out");
$state = Globus::GRAM::JobState::DONE;

$self->log("Writing job STDOUT and STDERR to cache files.");
system ("/bin/echo Writing job STDOUT and STDERR to cache files. >> $job_out");

if(($description->jobtype() eq "single") && ($description->count() > 1))
#####
# Jobtype is single and count>1. Therefore, we used job arrays. We
# need to merge individual output/error files into one.
#
{
# [dwm] Use append, not overwrite to work around file streaming issues.
system ("$cat $job_out.* >> $job_out"); unlink(glob("$job_out.*"));
system ("$cat $job_err.* >> $job_err"); unlink(glob("$job_err.*"));
}
else
{
# [dwm] We still need to append the job output to the GASS cache file.
# We can't let SGE do this directly because it appears to
# *overwrite* the file, not append to it -- which the Globus
# file streaming components don't seem to handle properly.
# So append the output manually now.
system("$cat $job_out.real >> $job_out"); unlink("$job_out.real");
system("$cat $job_err.real >> $job_err"); unlink("$job_err.real");
}

$self->log("Returning job success.");
system ("/bin/echo Returning job success. >> $job_out");
}
else
{
# SGE still knows about the job, hence it cannot have completed yet.
# Determine it's current state and notify any interested parties.

$_ = pop @output; # Obtain scheduler details from output, if any.

# FIXME:
# Unfortunately, the nice single-character state field isn't printed
# if we do a lookup on a specific job with qstat, so we have to guess
# a little more that we would like.
# This is probably best fixed in SGE itself.

if (/"error"/) {
$self->log(" Job $job_id has failed!");
system ("/bin/echo Job $job_id has failed! >> $job_out");
$state = Globus::GRAM::JobState::FAILED;
}
elsif(/queue|pending/) {
$self->log(" Job is still queued for execution.");
system ("/bin/echo Job is still queued for execution. >> $job_out");
$state = Globus::GRAM::JobState::PENDING;
}
elsif(/hold|suspend/) {
$self->log(" Job is suspended.");
system ("/bin/echo Job is suspended. >> $job_out");
$state = Globus::GRAM::JobState::SUSPENDED;
}
else {
$self->log(" Job is running.");
system ("/bin/echo Job is running. >> $job_out");
$state = Globus::GRAM::JobState::ACTIVE;
}
}

return {JOB_STATE => $state};
}



#########################################################################
#
# CANCEL
#
########################################################################
sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();

$self->log("cancel job $job_id");

# system("$qdel $job_id >/dev/null 2>/dev/null");
$ENV{"SGE_ROOT"} = $SGE_ROOT;
$ENV{"SGE_CELL"} = $SGE_CELL;
$job_id =~ /(.*)\|(.*)\|(.*)/;
$job_id = $1;
system("$qdel $job_id");

if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED }
}

return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}

1;