CSC334 getXGridOutput.py

From CSclasswiki
Jump to: navigation, search

--Thiebaut 15:24, 10 June 2010 (UTC)--D. Thiebaut 10:51, 4 November 2008 (UTC)


#! /usr/bin/env python -iu
# -iu : unbuffered stdin: allows 1 line of input to be processed
#       before end of file on stdin.
#
# getXGridOutput.py
# D. Thiebaut
#
# Modification 6/10/10: added code to make sure input got last } of
#                      an xgrid response before issueing a new 
#                      command
# Modification 6/8/10: makes program react faster by unbuffering
#                      stdin
# Modification 6/3/10: added "-out ." to xgrid command
# getting the results in case output files are generated.
# 
# Version 5: 6/10/10
# 
VERSION="V5 (6/10/2010)"

import sys
import os
import time


from subprocess import Popen, PIPE
 
jobStatus = {}
RUNNING   = 1
STOPPED   = 0
STARTTIMES= []
ENDTIMES  = []
LOG       = False
 
def printResults( jobId, debug=False ):
    """Prints the output of job #jobId.  Format of time: 2008-09-27 19:09:27"""
    global STARTTIMES, ENDTIMES

    if debug: print "--> printResults"

    [status, startTime, endTime] = jobStatus[ jobId ]
    if debug: print "[status, startTime, endTime] = ", status, startTime, endTime

    if startTime[0] == '"': startTime = startTime[1:]
    if endTime[0]   == '"': endTime = endTime[1:]
    time1 = time.mktime( time.strptime( startTime, "%Y-%m-%d %H:%M:%S" ) )
    time2 = time.mktime( time.strptime( endTime, "%Y-%m-%d %H:%M:%S" ) )
    STARTTIMES.append( time1 )
    ENDTIMES.append( time2 )
    print "Job %d stopped: Execution time: %f seconds"  % ( jobId,  time2-time1 )
    # DFT modif
    dothis = "xgrid -job results -id %d -out ." % jobId
    output = Popen( ["xgrid", "-job", "results", "-id", "%d"%jobId, "-out", "." ], stdout=PIPE ).communicate()[0]
    for line in output.split( '\n' ):
        print line
 
def deleteJob( jobId, debug=False ):
    """ deletes a job given its jobId """

    if debug: print "-->deleteJob"
    log( "Deleting jobId %d" % jobId )

    #--- prepare command ---
    dothis = "xgrid -job delete -id %d" % jobId
    output = Popen( ["xgrid", "-job", "delete", "-id", "%d" % jobId ], stdout=PIPE ).communicate()[0]
    return 1
 
 
def getJobsStatus( jobIds, debug=False ):
    """Polls all the jobs and see if they are still running.
    returns the number of stopped jobs"""
 
    global RUNNING, STOPPED
    noStoppedJobs = 0
 
    if debug: print "-->getJobStatus( %s )" % str( jobIds ) 

    for jobId in jobIds:
 
        #--- don't poll the xgrid if the job is already stoppped---
        status = jobStatus[ jobId ]
        if status[ 0 ]==STOPPED:
            log( "Job %d has stopped" % jobId )
            noStoppedJobs += 1
            continue
 
        #--- poll xgrid ---
        dothis = "xgrid -job attributes -id %d" % jobId
        output = Popen( ["xgrid", "-job", "attributes", "-id", "%d" % jobId], stdout=PIPE ).communicate()[0]
 
        pending = []
        #--- read the output from xgrid ---
        for line in output.split( '\n' ):  #while 1:
            if debug: print line

            if line.find( "dateStarted" )!=-1:
                status[1] = line[ line.find( "=" )+1 :-8 ] 
                status[1] = status[1].strip()
            if line.find( "dateStopped" )!=-1:
                status[2] =  line[ line.find( "=" )+1 :-8 ] 
                status[2] = status[2].strip()

            #---- if the job is now finished record it and display its output ---
            if line.find( "jobStatus" )!=-1 and \
                    line.find( "Finished" ) != -1 :
                log( "Job %d is finished" % jobId )
                status[0] = STOPPED 
                noStoppedJobs += 1
                pending.append( ("printResults", jobId ) )
                #printResults( jobId, debug )
                pending.append( ( "deleteJob", jobId ) )
                #deleteJob( jobId, debug )
                
            if line.find( "jobStatus" )!=-1 and \
                    line.find( "Failed" ) != -1 :
                log( "Job %d has *** FAILED ***" % jobId )
                print "Job %d has *** FAILED ***" % jobId 
                status[0] = STOPPED 
                noStoppedJobs += 1
                #deleteJob( jobId, debug )
                pending.append( ( "deleteJob", jobId ) )
        
        #--- now that output is fully read, execute command(s) ---
        for command, jobId in pending:
            if command=="deleteJob":
                deleteJob( jobId, debug )
            if command=="printResults":
                printResults( jobId, debug )

        #--- record status  ---
        jobStatus[ jobId ] = status
 
    #--- returned # of stopped jobs ---
    return noStoppedJobs
 
def log( s ):
    global LOG

    if not LOG: 
        return

    fileName = sys.argv[0].split( '/' )[-1]
    fileName = fileName.split( '.' )[0] + ".log"
    file = open( fileName, "at" )
    s = "%s:%s\n"% ( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), s )  
    #print "logging:", s
    file.write( s )
    file.close()

def main( debug=False ):
    """ ---------------------------- M A I N ---------------------------
    """
    global LOG

    count = 1
    jobIds = []
    stdinActive = True
    allJobsCompleted = False
    insideResponse   = False

    for i  in range( 1, len( sys.argv ) ):
        if sys.argv[i].lower()=="-log":
            LOG = True
        if sys.argv[i] in [ "-?", "-h", "-H" ]:
            print "Syntax: xgrid command | getXGridOutput.py [-log] [-debug]\n"
            print "Version:", VERSION, "\n\n"
            return
        if sys.argv[i].lower()=="-debug":
            debug = True

    log(  "\n" + 60*'-' )
    log( "Starting" )
    log ( 60*'-' )

    while True:
        line = sys.stdin.readline()
        if not line:
            if debug: print "EOF on stdin"
            stdinActive = False
        else:
            stdinActive = True

        if line.find( "{" )==0:
            insideResponse=True
        
        if debug: print "line = ", line

        if line.find( "jobIdentifier" ) != -1:
            #print "line = ", line
            jobId =  int( line[ line.find("=")+1 :-2 ] )
            jobIds.append( jobId )
            jobStatus[jobId] = [ RUNNING, 0, 0 ] # running, start time, end time
            log ( "Found job identifier Id %d" % jobId )

        if allJobsCompleted and not stdinActive:
            log( "No more jobs, no more stdin lines to read... stopping." )
            break

        #--- wait till we have the full response, which ends by '}' by itself on a line
        line = line.strip()
        if line == "}":
            insideResponse = False

        #--- keep reading line until we've finished with the current XGrid answer
        if insideResponse:
            time.sleep( 0.1 )
            continue

        #--- get job status ---
        noStoppedJobs = getJobsStatus( jobIds, debug )
        if debug: print "noStoppedJobs = ", noStoppedJobs

        if noStoppedJobs == len( jobIds ):
            if debug: print "all current jobs have completed..."
            log( "All current jobs have completed" )
            allJobsCompleted = True

        if debug:
            time.sleep( 1 ) # wait 0.1 second before polling the xgrid again
        else:
            time.sleep( 0.1 ) # wait 0.1 second before polling the xgrid again
        
    #--- print total execution time ---
    #print "ENDTIMES = ",  ENDTIMES
    #print "STARTTIMES = ", STARTTIMES
    try:
        print "\nTotal execution time of " + sys.argv[0] + ": %f seconds\n\n" % ( max( ENDTIMES )-min( STARTTIMES ) )
        log( "Total execution time of " + sys.argv[0] + ": %f seconds\n\n" % ( max( ENDTIMES )-min( STARTTIMES ) ) )
    except:
        print "\nCould not print total execution time\n\n"
        log( "Could not print total execution time\n\n" )
 
 
 
main( False ) #  True )