StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
TxUCMCollector.cxx
1 /*****************************************************************
2  * @file TxUCMCollector.cpp
3  * @author Roopa Pundaleeka
4  *
5  * @(#)cpp/api:$Id: TxUCMCollector.cxx,v 1.26 2010/09/17 20:15:15 fine Exp $
6  *
7  * Please see TxUCMCollector.h for more documentation.
8  * "Translated" from the original TxUCMCOllector.java version
9  */
11 #include "TxUCMCollector.h"
12 #include "StDbFieldI.h"
13 #include "FieldList.h"
14 #include "StDbFieldIIterator.h"
15 #include "StUcmTasks.h"
16 #include "StUcmJobs.h"
17 #include "StUcmEvents.h"
18 
19 #include <stdlib.h>
20 #include <iostream>
21 #include <sstream>
22 #include <log4cxx/logger.h>
23 #include <log4cxx/consoleappender.h>
24 #include <log4cxx/patternlayout.h>
25 #include <log4cxx/helpers/loglog.h>
26 #include <log4cxx/helpers/optionconverter.h>
27 #include <log4cxx/helpers/stringhelper.h>
28 #include <stdio.h>
29 using namespace log4cxx;
30 using namespace log4cxx::helpers;
31 using namespace log4cxx::spi;
32 using namespace std;
33 
34 using namespace TxLogging;
35 using namespace StDbField;
36 
37 #define TRY
38 #define CATCH(a)
39 
40  const char *TxUCMCollector::fgTs = "ts";
41  const char *TxUCMCollector::fgEvent = "event";
42  const char *TxUCMCollector::fgBJobID = "broker.job.id";
43  const char *TxUCMCollector::fgBTaskID = "broker.task.id";
44  const char *TxUCMCollector::fgRequester = "requester.name";
45  const char *TxUCMCollector::fgContext = "context";
46  const char *TxUCMCollector::fgLevel = "level";
47  const char *TxUCMCollector::fgStage = "stage";
48  const char *TxUCMCollector::fgKey = "key";
49  const char *TxUCMCollector::fgValue = "value";
50 
51  const char *TxUCMCollector::fgNewTask = "com.txcorp.ucm.newtask";
52  const char *TxUCMCollector::fgUpdateTask = "com.txcorp.ucm.updatetask";
53  const char *TxUCMCollector::fgAddJob = "com.txcorp.ucm.addjob";
54  const char *TxUCMCollector::fgUpdateJob = "com.txcorp.ucm.updatejob";
55  const char *TxUCMCollector::fgSiteLocation = "com.txcorp.ucm.job.siteLocation";
56  const char *TxUCMCollector::fgStateID = "com.txcorp.ucm.job.stateID";
57  const char *TxUCMCollector::fgGridJobID = "com.txcorp.ucm.job.gridJobID";
58  const char *TxUCMCollector::fgAppStart = "com.txcorp.ucm.app.start";
59  const char *TxUCMCollector::fgAppEnd = "com.txcorp.ucm.app.end";
60 
61  const char *TxUCMCollector::fgStatusFile = "txucmcollectorstatus.properties";
62  const char *TxUCMCollector::fgStatusFileName = "current.logfile.name";
63  const char *TxUCMCollector::fgStatusFileModTime = "current.logfile.modtime";
64  const char *TxUCMCollector::fgStatusFilePos = "current.logfile.pos";
65 
66 
67 //_____________________________________________________________________________
68 namespace {
69  string itoa(int i)
70  {
71  char buffer[100];
72  sprintf(buffer,"%d",i);
73  return string(buffer);
74  }
75 }
76 
77 //_________________________________________________________________________
78 MYSQL *TxUCMCollector::getConnection()
79 {
80  const char *host = "heston.star.bnl.gov";
81  const char *user = "StarLogger";
82  const char *passwd = "logger";
83  return getConnection(host,user,passwd);
84 }
85 
86 //_________________________________________________________________________
87 MYSQL *TxUCMCollector::getConnection (const string&dbUrl,const string&dbUsername
88  , const string&dbPassword)
89 {
90  return getConnection (dbUrl.c_str(),dbUsername.c_str(),dbPassword.c_str());
91 }
92 
93 //_________________________________________________________________________
94 MYSQL *TxUCMCollector::getConnection (const char *cdbUrl,const char *cdbUsername
95  , const char *cdbPassword)
96 {
97  if (!fIsConnectionOpen) {
98  if ( !(connection= mysql_init(connection)) ) {
99  log->error("MYSQL: ---- > No init connection");
100  } else {
101  const char *host = cdbUrl;
102  const char *user = cdbUsername;
103  const char *passwd = cdbPassword;
104  const char *db = dbName.c_str();
105  unsigned int port = 3306;
106  // fprintf(stderr,"TxUCMCollector::getConnection: ---- > Establishing MySQL connection open %d \n", fIsConnectionOpen);
107  if (!(mysql_real_connect(connection
108  , host
109  , user
110  , passwd
111  , db
112  , port
113  , 0,0
114  )))
115  {
116  string error = __FUNCTION__
117  + string("host: ") + host
118  + string("; user: ") + user
119  + string(" passwd: ") + passwd
120  + string(" db: ") + db
121  + string(" port:") + itoa(port)
122  + " error: " + mysql_error(connection);
123  log->debug(error.c_str());
124  connection = 0;
125  fIsConnectionOpen = false;
126  } else {
127  string error = "Ok connection to Db : "
128  + string("host: <") + host
129  + string("> user: <") + user
130  + string("> passwd: <") + passwd
131  + string("> db: <") + db
132  + string("> port: ") + itoa(port);
133  log->debug(error.c_str());
134  fIsConnectionOpen = true;
135  }
136  }
137  }
138  return connection;
139 }
140 
141 //_________________________________________________________________________
142 unsigned int TxUCMCollector::execute(const string &sql)
143 {
144  return execute(sql.c_str());
145 }
146 
147 //_________________________________________________________________________
148 unsigned int TxUCMCollector::execute(const char *sql)
149 {
150  unsigned int ret=1;
151  if (getConnection()) {
152  String query = sql;
153  log->debug(string("TxUCMCollector::execute ") + sql);
154  if (fResult) {
155  mysql_free_result(fResult);
156  fResult = 0;
157  }
158  if (( ret = mysql_query(connection,query.c_str()) )) {
159  log->error(std::string("MYSQL QUERY:") + mysql_error(connection));
160  } else {
161  fResult = mysql_store_result(connection);
162  }
163  }
164  return ret;
165 }
166 //_________________________________________________________________________
167 /* The default behavior holds a single connection open until the appender is closed (typically when garbage collected). */
168 void TxUCMCollector::closeConnection()
169 {
170  if (fIsConnectionOpen) {
171  if (fResult) {
172  mysql_free_result(fResult);
173  fResult = 0;
174  }
175  mysql_close(connection);
176  if (mysql_errno(connection)) fprintf(stderr,"MYSQL close ERROR %s \n",mysql_error(connection));
177  connection = 0;
178  fIsConnectionOpen = false;
179  }
180 }
181 
182 //______________________________________________________________________
183 TxUCMCollector::TxUCMCollector ()
184 : connection()
185  ,fResult(),fField(),fRow()
186  ,fIsConnectionOpen(false), sleepTime(10),currLogFilePos(0)
187  ,fBrokerJobID(-1), fDbJobID(-1)
188 
189 {
190  // init the logger
191  log = Logger::getLogger(_T("TxUCMCollector"));
192  // check for appender
193  AppenderList apps = log->getAllAppenders();
194  if (!apps.size()) {
195  // make one
196  ConsoleAppenderPtr appender = new ConsoleAppender(
197  new PatternLayout("TxUCMCollector: %-3c{2}:%-5p - %m%n"));
198  appender->setName(_T("TxUCMCollectorAppender"));
199  log->addAppender(appender);
200  }
201  // log->setLevel(Level::DEBUG);
202 }
203 
204 TxUCMCollector::~TxUCMCollector ()
205 {
206  closeConnection();
207 }
208 
212 boolean endsWith (std::string str, const char *suffix)
213 {
214  return StringHelper::endsWith(str, _T(suffix));
215 }
216 
220 static string trim (std::string str)
221 {
222  return StringHelper::trim(str);
223 }
224 
232 static vector<std::string> split(const std::string &str, const std::string &sep)
233 {
234  vector<std::string> splits;
235  size_t posOld = 0;
236  size_t posNew = 0;
237  while ((posNew = str.find(sep,posOld)) != string::npos)
238  {
239  splits.push_back(str.substr(posOld,posNew-posOld));
240  posOld=posNew+sep.size();
241  }
242  // add the tail
243  if (posOld < str.size()) splits.push_back(str.substr(posOld,string::npos));
244 
245  return splits;
246 }
247 //________________________________________________
249  boolean success = false;
250  TRY
251  {
252  log->debug (" TxUCMCollector::initDb . . . ");
253  success = this->init ();
254  success = success & this->loadDatabase ();
255  }
256 // CATCH (Exception e) {
257 // log->error ("Failed to initialize and/or load database");
258 // log->error (e.getMessage());
259 // }
260 
261  // if any failures, exit
262  if (!success) {
263  log->error("Failed to initialize, review log file");
264  TRY {
265  exit (1);
266  }
267 // CATCH (SecurityException se) {
268 // log->error ("Problem, exiting");
269 // }
270  }
271  return success;
272 }
273 
279 //________________________________________________
280 boolean TxUCMCollector::init ()
281 // throws IOException, Exception
282 {
283  boolean success = false;
284 #if 0
285  Properties properties = new Properties();
286  TRY {
287  properties.load (new FileInputStream ("txucmcollector.properties"));
288 
289  // Read db info from properties file
290  dbName = properties.getProperty ("db.name");
291  dbUrl = "jdbc:mysql://" + properties.getProperty ("db.host") + ":"
292  + properties.getProperty ("db.port") + "/" + dbName;
293  dbUsername = properties.getProperty ("db.username");
294  dbPassword = properties.getProperty ("db.password");
295 
296  // Read log file directory and sleep time as set by user
297  // from properties file
298  logsDir = properties.getProperty ("logs.dir");
299  sleepTime = (new Long (properties.getProperty ("sleep.time.sec"))).longValue ();
300 
301  success = true;
302  }
303 // CATCH (IOException ioe)
304 // {
305 // log->error (ioe.getMessage ());
306 // throw ioe;
307 // }
308 // CATCH (Exception e)
309 // {
310 // log->error (e.getMessage ());
311 // throw e;
312 // }
313 #else
314  dbName = "logger";
315  dbUrl = "heston.star.bnl.gov";
316  dbUsername = "StarLogger";
317  dbPassword = "logger";
318 #endif
319  return success=true;
320  }
321 
322 
328 //________________________________________________
329 boolean TxUCMCollector::loadDatabase()
330 // throws ClassNotFoundException, IllegalAccessException,
331 // SQLException, InstantiationException
332 {
333  boolean success = false;
334 
335  TRY {
336  log->debug ("dbName: " + dbName);
337  log->debug ("dbUrl: " + dbUrl);
338 
339  connection = getConnection (dbUrl.c_str(), dbUsername.c_str(), dbPassword.c_str());
340  log->debug ("Successfully loaded database: dbName = " + dbName);
341 
342  success = true;
343  }
344 // CATCH (ClassNotFoundException cnfe)
345 // {
346 // log->error("Mysql driver class not found");
347 // throw cnfe;
348 // } CATCH (InstantiationException ie)
349 // {
350 //
351 // log->error("Failed to instantiate MySQL driver");
352 // throw ie;
353 // } CATCH (IllegalAccessException iae) {
354 // log->error("Failed to get access to MySQL driver");
355 // throw iae;
356 // } CATCH (SQLException se) {
357 // log->error("Failed to load driver, or get DB connection");
358 // throw se;
359 // }
360 
361  return success;
362  }
363 
369 //________________________________________________
371  TRY {
372 #ifdef FUTURE
373  while (true) {
374  // set new file to be processed
375  setCurrLogFile ();
376 
377  // Start processing the oldest log file first
378  processLogFile ();
379  // processing this file is done, now check to see if it is
380  // older than 24 hours. If so, rename to *.log->archive, so it
381  // wont show up in the processing again.
382  File file = new File (currLogFile);
383  if ((Calendar.getInstance ().getTimeInMillis () - file.lastModified ())
384  >= (24 * 60 * 60 * 1000)) {
385  file.renameTo (new File (currLogFile + ".archive"));
386  }
387  // the file that was just processed is not over a day
388  // old. So, store its name, the last modified time and
389  // the current position in a properties file.
390  else {
391  Properties properties = new Properties();
392  TRY {
393  properties.setProperty (statusFileName, currLogFile);
394  properties.setProperty (statusFileModTime,
395  new Long (file.lastModified ()).toString ());
396  properties.setProperty (statusFilePos,
397  new Long (currLogFilePos).toString ());
398  properties.store (new FileOutputStream (statusFile), null);
399 
400  // reset position
401  currLogFilePos = 0;
402  } CATCH (IOException ioe) {
403  log->error ("Failed to write current file info to <" +
404  statusFile + ">");
405  log->error (ioe.getMessage ());
406  }
407  }
408  }
409 #endif
410  }
411 // CATCH (Exception e) {
412 // log->error ("Failed while processing log file <" + currLogFile + ">");
413 // log->error (e.getMessage ());
414 // System.exit (2);
415 // }
416 }
417 
418 
434 //________________________________________________
435 void TxUCMCollector::setCurrLogFile () {
436  TRY {
437 #ifdef FUTURE
438 
439  //
440  // Read the status properties file, if it exists, to get
441  // the last file that was not processed completely or was
442  // not archived
443  //
444  if (new File (statusFile).exists ()) {
445  Properties properties = new Properties();
446  properties.load (new FileInputStream (statusFile));
447 
448  currLogFile = properties.getProperty (statusFileName);
449  if (currLogFile != null) {
450  const char * strTime = properties.getProperty (statusFileModTime);
451  long longTime = (new Long (strTime)).longValue ();
452  currLogFilePos =
453  (new Long (properties.getProperty (statusFilePos))).longValue ();
454 
455  // Sleep if the file being processed has not changed
456  if (new File (currLogFile).exists ()) {
457  while (true) {
458  if (new File (currLogFile).lastModified () == longTime) {
459  System.out.println ("Waiting for " + currLogFile + " to be updated...");
460  log->info ("Waiting for " + currLogFile + " to be updated...");
461  Thread.sleep (sleepTime * 1000);
462  }
463  else {
464  break;
465  }
466  }
467  }
468  // Seems like the status file is outdated. Delete it
469  else {
470  new File (statusFile).delete ();
471  }
472  }
473  }
474 
475  //
476  // Look at the logs dir for new files
477  //
478  currLogFile = "";
479  currLogFilePos = 0;
480  File logFileDir = new File (logsDir);
481 
482  while (true) {
483  FilenameFilter filter = new FilenameFilter() {
484  boolean accept (File file, const char * name) {
485  return name.endsWith (".log");
486  }
487  };
488 
489  File[] logFiles = logFileDir.listFiles (filter);
490 
491  System.out.println ("Found " + logFiles.length + " log files in " + logsDir);
492  log->info ("Found " + logFiles.length + " log files in " + logsDir);
493 
494  // If there are log files available to be processed,
495  // find the oldest one and continue processing it.
496  if (logFiles.length > 0) {
497  File file = logFiles [0];
498  for (int i = 1; i < logFiles.length; i++) {
499  if (file.lastModified () > logFiles [i].lastModified ()) {
500  file = logFiles [i];
501  }
502  }
503 
504  // oldest .log file in the logs dir is set to be processed
505  currLogFile = file.toString ();
506  break;
507  }
508  // If there are no log files available in the
509  // directory, sleep for a while and check again
510  else {
511  System.out.println ("Waiting for new log files in " + logsDir);
512  log->info ("Waiting for new log files in " + logsDir);
513  Thread.sleep (sleepTime * 1000);
514  }
515  }
516 #endif
517  }
518 // CATCH (IOException ioe) {
519 // log->error (ioe.getMessage ());
520 // }
521 // CATCH (Exception e) {
522 // log->error (e.getMessage ());
523 // }
524 }
525 
531 //________________________________________________
532 void TxUCMCollector::processLogFile () {
533  TRY {
534 #ifdef FUTURE
535  System.out.println ("processing " + currLogFile);
536 
537  BufferedReader in = new BufferedReader (new FileReader (currLogFile));
538  const char * str;
539 
540  // Skip to the next message that needs to be processed
541  for (int i = 0; i < currLogFilePos; i++) {
542  in.readLine ();
543  }
544 
545  while ((str = in.readLine()) != null) {
546  processMessage (str);
547  currLogFilePos ++;
548  }
549  in.close();
550 #endif
551  }
552 // CATCH (IOException e) {
553 // log->error ("Failed while processing log file <" + currLogFile + ">");
554 // log->error (e.getMessage ());
555 // System.exit (2);
556 // }
557 }
558 
559 
567 //________________________________________________
568 void TxUCMCollector::processMessage (const string &msg)
569 { processMessage(msg.c_str()); }
570 
571 //________________________________________________
572 void TxUCMCollector::processMessage (const char * msg) {
573  //
574  // Assumption: Header and message are separated by ":"
575  //
576  // If message has a colon, make sure it is at the end of the
577  // header and not a part of the message itself.
578  //
579  msgHashMap.clear();
580  vector<std::string> keysNVals;
581  std::string message = msg;
582  size_t hdrDelimIndex = message.find(':');
583  if (hdrDelimIndex != string::npos && hdrDelimIndex < message.find("=\"")) {
584  keysNVals = split(message.substr(hdrDelimIndex + 1),"\" ");
585  } else {
586  //
587  // Taking care of where there is no header at all
588  //
589  keysNVals = split(message,"\" ");
590  }
591  //const char *[] keysNVals = (message.split (":") [1]).split ("\" ");
592  log->debug(_T("TxUCMCollector::processMessage: ")+ message);
593  for (size_t i = 0; i < keysNVals.size(); i++) {
594 
595  // get the key value pairs separated by ="
596  vector<std::string> keyNVal = split(keysNVals [i],"=\"");
597 
598  // take care of the case where the value is null
599  std::string value = (keyNVal.size() == 2)
600  ? trim(keyNVal [1])
601  : " ";
602 
603  // remove trailing double quotes if any
604  value = endsWith(value,"\"")
605  ? value.substr (0, value.size() - 1)
606  : value;
607  // add it to the message hash map
608  msgHashMap.insert(pair<std::string,std::string>(trim(keyNVal [0]), value));
609  log->debug(string("next pair: ") + trim(keyNVal [0]) + "<" + value + ">");
610  }
611  //
612  // Check for special messages:
613  //
614  std::string keyVal = msgHashMap[string(fgKey)];
615  if (keyVal.empty()) {
616  char buffer[20];
617  sprintf(buffer,"%d",(int)keysNVals.size());
618  log->error (string("Wrong message format: \"")
619  + message
620  + "\" par:"
621  + buffer
622  + "does not contains any <"
623  + fgKey
624  + ">");
625 
626  return;
627  }
628 
629  //
630  // newTask = "com.txcorp.ucm.newtask";
631  //
632  if (keyVal == fgNewTask ) this->createNewTask ();
633  //
634  // updateTask = "com.txcorp.ucm.updatetask";
635  //
636  else if (keyVal == fgUpdateTask) this->updateTask ();
637  //
638  // addJob = "com.txcorp.ucm.addjob";
639  //
640  else if (keyVal == fgAddJob) this->addJob ();
641  //
642  // updateJob = "com.txcorp.ucm.updatejob";
643  //
644  else if (keyVal == fgUpdateJob) this->updateJob ();
645  //
646  // siteLocation = "com.txcorp.ucm.job.siteLocation";
647  //
648  else if (keyVal == fgSiteLocation) this->setJobsField ("siteLocation");
649  //
650  // stateID = "com.txcorp.ucm.job.stateID";
651  //
652  else if (keyVal == fgStateID) this->setJobsField ("stateID");
653  //
654  // gridJobID = "com.txcorp.ucm.job.gridJobID";
655  //
656  else if (keyVal == fgGridJobID) this->setJobsField ("gridJobID");
657  //
658  // appStart = "com.txcorp.ucm.app.start";
659  // appEnd = "com.txcorp.ucm.app.end";
660  // and all others just go into the associated events table
661  //
662  else this->addEvent ();
663 }
664 
665 
671 //________________________________________________
672  void TxUCMCollector::createNewTask () {
673  string newTaskKeys = "brokerTaskID, requesterID";
674  string newTaskVals = "'" + msgHashMap[fgBTaskID] + "'" +
675  ", '" + msgHashMap[fgRequester] + "'";
676 
677  // Insert new record only if it does not exist
678  if (!this->recordExists (string("brokerTaskID = \"") + msgHashMap[fgBTaskID] + "\"",
679  "Tasks")) {
680  vector <string> newTask = split(msgHashMap[fgValue],"', ");
681  for (size_t i = 0; i < newTask.size(); i++) {
682  vector <string> taskKeyNVal = split(newTask [i],"='");
683  if (taskKeyNVal.size() == 2) {
684  newTaskKeys += ", " + trim(taskKeyNVal [0]);
685 
686  newTaskVals += ", '";
687  newTaskVals += endsWith (trim(taskKeyNVal [1]),"'")
688  ? taskKeyNVal [1].substr (0, taskKeyNVal[1].size()-1)
689  : trim(taskKeyNVal [1]);
690  newTaskVals += "'";
691  }
692  }
693  insertRecord (string("(") + newTaskKeys + ") VALUES (" + newTaskVals + ")",
694  "Tasks");
695 
696  this->createJobsTable ();
697  this->createEventsTable ();
698  }
699  else {
700  log->debug (string("Record with brokerTaskID = ") + msgHashMap[fgBTaskID] +
701  " already exists");
702 
703  }
704  }
710 //________________________________________________
711  void TxUCMCollector::updateTask () {
712  // Update only if a record exists. If it does not exist, just
713  // create a new task entry
714  if (this->recordExists (string("brokerTaskID = \"") + msgHashMap[fgBTaskID] + "\"",
715  "Tasks")) {
716  updateRecord (msgHashMap[fgValue],
717  "Tasks",
718  string("brokerTaskID = '") + msgHashMap[fgBTaskID] + "'");
719  }
720  else {
721  log->error (string("Record with brokerTaskID = ") + msgHashMap[fgBTaskID] +
722  " does not exist, so creating a new record instead of updating");
723  this->createNewTask ();
724  // Try one more time
725  if (this->recordExists (string("brokerTaskID = \"") + msgHashMap[fgBTaskID] + "\"",
726  "Tasks")) {
727  updateRecord (msgHashMap[fgValue],
728  "Tasks",
729  string("brokerTaskID = '") + msgHashMap[fgBTaskID] + "'");
730  }
731  }
732  }
733 
740 //________________________________________________
741 void TxUCMCollector::addJob () {
742  // Check if a task table entry exists corresponding to this
743  // job. If not, create one for this new job. Also create new
744  // jobs and events tables
745  if (!this->recordExists (string("brokerTaskID = \"") + msgHashMap[fgBTaskID] + "\"",
746  "Tasks")) {
747  log->info (msgHashMap[fgBTaskID]
748  + " does not exist in Tasks table");
749  insertRecord (string("(brokerTaskID, requesterID) VALUES ") +
750  "('" + msgHashMap[fgBTaskID] + "', " +
751  "'" + msgHashMap[fgRequester] + "')",
752  "Tasks");
753  }
754  // Create new tables
755  this->createJobsTable ();
756  this->createEventsTable ();
757 
758  // Insert new record only if it does not exist
759  if (!this->recordExists (string("brokerJobID = \"") + msgHashMap[fgBJobID] + "\"",
760  jobTableName())) {
761 
762  std::string newJobKeys = "taskID, brokerJobID";
763  std::string newJobVals = string("(SELECT taskID FROM Tasks WHERE brokerTaskID=") +
764  "'" + msgHashMap[fgBTaskID] + "')" +
765  ", '" + msgHashMap[fgBJobID] + "'";
766 
767  vector<std::string> newJob = split(msgHashMap[fgValue],"', ");
768 
769  for (size_t i = 0; i < newJob.size(); i++) {
770  vector<std::string> jobKeyNVal = split(newJob [i],"='");
771  if (jobKeyNVal.size() == 2) {
772  newJobKeys += ", " + trim(jobKeyNVal [0]);
773 
774  newJobVals += ", '";
775  newJobVals += endsWith(trim(jobKeyNVal [1]),"'")
776  ? jobKeyNVal [1].substr (0, jobKeyNVal.size() - 1)
777  : trim(jobKeyNVal [1]);
778  newJobVals += "'";
779  }
780  }
781 
782  insertRecord (string("(") + newJobKeys + ") VALUES (" + newJobVals + ")",
783  jobTableName());
784  } else {
785  log->debug ("Record with brokerJobID = " + msgHashMap[fgBJobID] +
786  " already exists");
787  }
788 }
789 
790 //________________________________________________
791 string TxUCMCollector::tableNamePrefix(const char *prefix) const
792 {
793  string fullTableName =
794  string(prefix)
795  + string("_") + msgHashMap.find(fgRequester)->second
796  + string("_") + msgHashMap.find(fgBTaskID)->second;
797  ((TxUCMCollector*) this)->log->debug(string(__FUNCTION__)+ "<" + fullTableName + ">");
798  return fullTableName;
799 }
800 
801 //________________________________________________
802 string TxUCMCollector::jobTableName() const
803 {
804  return tableNamePrefix("Jobs");
805 }
806 //________________________________________________
807 std::string TxUCMCollector::eventTableName() const
808 {
809  return tableNamePrefix("Events");
810 }
811 
817 //________________________________________________
818 void TxUCMCollector::updateJob () {
819  // Update only if a record exists. If it does not exist, just
820  // create a new entry
821  if (this->recordExists (string("brokerJobID = \"") + msgHashMap[fgBJobID] + "\"",
822  jobTableName() ) ) {
823  updateRecord ( msgHashMap[fgValue], jobTableName(),
824  "brokerJobID = '" + msgHashMap[fgBJobID] + "'");
825  } else {
826  log->debug ("Record with brokerJobID = " + msgHashMap[fgBJobID] +
827  " does not exist, so creating a new record instead of updating");
828  this->addJob ();
829  }
830 }
831 
836 void TxUCMCollector::setJobsField (const string &fieldName) {
837  setJobsField (fieldName.c_str());
838 }
839 
840 void TxUCMCollector::setJobsField (const char * fieldName) {
841  if (this->recordExists (string("brokerJobID = \"") + msgHashMap[fgBJobID] + "\"",
842  jobTableName() )) {
843  updateRecord (string(fieldName) + " = '" + msgHashMap[fgValue] + "'",
844  jobTableName() ,
845  "brokerJobID = '" + msgHashMap[fgBJobID] + "'");
846  } else {
847  log->error (string("Record with brokerJobID = ") + msgHashMap[fgBJobID] +
848  " does not exist, so creating a new record instead of updating");
849  }
850 }
851 
858 //________________________________________________
859 void TxUCMCollector::addEvent () {
860  // Check if a task table entry exists corresponding to this
861  // event/job. If not, create one for this new job. Also create new
862  // jobs and events tables
863  if (!this->recordExists (string("brokerTaskID = \"") + msgHashMap[fgBTaskID] + "\"",
864  "Tasks")) {
865  log->info (msgHashMap[fgBTaskID]
866  + " does not exist in Tasks table");
867  insertRecord (string("(brokerTaskID, requesterID) VALUES ") +
868  "('" + msgHashMap[fgBTaskID] + "', " +
869  "'" + msgHashMap[fgRequester] + "')",
870  "Tasks");
871  }
872 
873  // Create new tables
874  this->createJobsTable ();
875  this->createEventsTable ();
876 
877  // Check if a jobs table entry exists corresponding to this
878  // event. If not, create one for this new job. Also create new
879  // jobs and events tables if they dont exist already
880  if (!this->recordExists (string("brokerJobID = \"") + msgHashMap[fgBJobID] + "\"",
881  jobTableName())) {
882 
883  log->info (msgHashMap[fgBTaskID]
884  + " does not exist in Jobs table");
885 
886  std::string newJobKeys = "taskID, brokerJobID";
887  std::string newJobVals = string("(SELECT taskID FROM Tasks WHERE brokerTaskID=")
888  + "'" + msgHashMap[fgBTaskID] + "')"
889  + ", '" + msgHashMap[fgBJobID] + "'";
890 
891  insertRecord (string("(taskID, brokerJobID) VALUES ") +
892  "((SELECT taskID FROM Tasks WHERE brokerTaskID=" +
893  "'" + msgHashMap[fgBTaskID] + "')" +
894  ", '" + msgHashMap[fgBJobID] + "')",
895  jobTableName());
896  }
897 
898  // Insert the new events table record
899 
900  std::string newEventKeys = "jobID, levelID, context, time, stageID, messageKey, messageValue";
901  std::string newEventVals = string("(SELECT jobID FROM `") + jobTableName() + "` WHERE brokerJobID=" +
902  "'" + msgHashMap[fgBJobID] + "')" +
903  ", '" + msgHashMap[fgLevel] + "'" +
904  ", '" + msgHashMap[fgContext] + "'" +
905  ", '" + msgHashMap[fgTs] + "'" +
906  ", '" + msgHashMap[fgStage] + "'" +
907  ", '" + msgHashMap[fgKey] + "'" +
908  ", '" + msgHashMap[fgValue] + "'";
909 
910  insertRecord (string("(") + newEventKeys + ") VALUES (" + newEventVals + ")",
911  eventTableName());
912  static string FAILED_JOB_ATTRIBUTE;
913  if ( FAILED_JOB_ATTRIBUTE.empty() ) {
914  stringstream oss(FAILED_JOB_ATTRIBUTE);
915  oss << TxEventLog::FAILED;
916  }
917  static string DONE_JOB_ATTRIBUTE;
918  if ( DONE_JOB_ATTRIBUTE.empty() ) {
919  stringstream oss(DONE_JOB_ATTRIBUTE);
920  oss << TxEventLog::DONE;
921  }
922  if (msgHashMap[fgStage] == FAILED_JOB_ATTRIBUTE || msgHashMap[fgStage] == DONE_JOB_ATTRIBUTE ) {
923  updateRecord ("taskRemainSize=taskRemainSize-1"
924  , "Task"
925  , string("brokerTaskID=") + "'" + msgHashMap[fgBTaskID] + "'" );
926  }
927 }
928 
932 //________________________________________________
933 void TxUCMCollector::createJobsTable () {
934  // create new jobs table if it does not exist
935  std::string tableName = "`" + jobTableName() + "` ";
936  this->createTable (tableName , std::string("jobspattern"));
937 // this->createTable (tableName + fgJobsTableCols);
938 }
939 
943 //________________________________________________
944 void TxUCMCollector::createEventsTable () {
945  // create new events table if it does not exist
946  std::string tableName = "`" + eventTableName() + "` ";
947  this->createTable (tableName, std::string("eventspattern"));
948 // this->createTable (tableName + fgEventsTableCols);
949 }
950 
958 void TxUCMCollector::insertRecord (const string &insertStr, const string &tableName)
959 {
960  insertRecord(insertStr.c_str(),tableName.c_str());
961 }
962 
963 void TxUCMCollector::insertRecord (const char * insertStr, const char * tableName) {
964  TRY{
965 // Statement stmt = connection->createStatement();
966  if (!execute(string("INSERT INTO `") + tableName + "` " + insertStr))
967  log->debug (string("Created new record for ") + tableName + ": " + insertStr);
968  else
969  log->error (string(mysql_error(connection)) + " the new record for " + tableName + ": " + insertStr);
970  closeConnection();
971  }
972 // CATCH(SQLException se) {
973 // log->error (se.getMessage ());
974 // }
975 }
976 
985 //________________________________________________
986 void TxUCMCollector::updateRecord (const string&updateStr, const string&tableName, const string&condition)
987 {
988  updateRecord (updateStr.c_str(), tableName.c_str(), condition.c_str());
989 }
990 //________________________________________________
991 void TxUCMCollector::updateRecord (const char * updateStr, const char * tableName, const char * condition)
992 {
993  TRY{
994  // Statement stmt = connection->createStatement();
995  if (!execute(string("UPDATE `") + tableName
996  + "` SET " + updateStr
997  + " WHERE " + condition))
998  log->debug(string("Updated new record for ") + tableName + " with values: " + updateStr);
999  closeConnection();
1000  }
1001 // CATCH(SQLException se) {
1002 // log->error (se.getMessage ());
1003 // }
1004  }
1005 
1012 //________________________________________________
1013 boolean TxUCMCollector::recordExists (const string&selectStr, const string&tableName)
1014 {
1015  return
1016  recordExists (selectStr.c_str(),tableName.c_str());
1017 }
1018 
1019 //________________________________________________
1020 boolean TxUCMCollector::recordExists (const char * selectStr, const char * tableName) {
1021  unsigned long nRows = 0;
1022  TRY
1023  {
1024  execute(string("SELECT * FROM `") + tableName
1025  + "` WHERE " + selectStr);
1026  if (fResult) {
1027  nRows = mysql_num_rows(fResult);
1028  }
1029  }
1030  closeConnection();
1031 // CATCH(SQLException se) {
1032 // log->error (se.getMessage ());
1033 // exists = false;
1034 // }
1035 
1036  return nRows ? true: false;
1037 }
1038 
1039 //___________________________________________________________________________________
1046 void TxUCMCollector::createTable (const string&table,const string&like)
1047 {
1048  const char *likestr = like.empty() ? 0 : like.c_str();
1049  createTable (table.c_str(), likestr);
1050 }
1051 
1052 //___________________________________________________________________________________
1053 void TxUCMCollector::createTable (const char * table, const char *like) {
1054  TRY{
1055  std::string query = string("CREATE TABLE IF NOT EXISTS ") + table;
1056  if (like && like[0]) query += std::string(" LIKE `") + like + "`";
1057  if (!execute(query))
1058  log->debug (string("Created new table: ") + table);
1059  }
1060  closeConnection();
1061 // CATCH(SQLException se) {
1062 // log->info (se.getMessage ());
1063 // }
1064 }
1065 
1071 //___________________________________________________________________________________
1072 void TxUCMCollector::usage (Options options)
1073 {
1074 #ifdef FUTURE
1075  // Use the built-in formatter class
1076  HelpFormatter formatter = new HelpFormatter ();
1077  formatter.printHelp ("Tx UCM Collector", options);
1078 #endif
1079 }
1080 
1081 //___________________________________________________________________________________
1082 StUcmTasks *TxUCMCollector::getTaskList(int limit, int offset)
1083 {
1084  static StUcmTasks tasks;
1085  fillTaskList(tasks,limit,offset);
1086  return &tasks;
1087 }
1088 
1089 //___________________________________________________________________________________
1090 StUcmJobs *TxUCMCollector::getJobList(StRecord *task, int limit, int offset)
1091 {
1092  static StUcmJobs jobs;
1093  fillJobList(jobs,limit,offset,task);
1094  return &jobs;
1095 }
1096 //___________________________________________________________________________________
1097 int TxUCMCollector::getJobId(const char *reqName, const char *taskBrokerID, int brokerJobID)
1098 {
1099  int id = -1;
1100  setRequesterName(reqName);
1101  setBrokerTaskID (taskBrokerID);
1102  setBrokerJobID (brokerJobID);
1103  try {
1104  string where = string(" brokerJobID='") + itoa(brokerJobID) + "' ";
1105  queryTable(jobTableName().c_str(),0,0,where.c_str());
1106  if (fResult) {
1107  int nRows = mysql_num_rows(fResult) ;
1108  if (nRows<=0 || nRows >1 ) {
1109  log->error(string("Can not fetch the job id for the ") + taskBrokerID + " broker id=" + itoa(brokerJobID) + " nrow=" + itoa(nRows));
1110  } else {
1111  StRecord *job = new StRecord;
1112  fillFields(job->getFields());
1113  id = job->getField("jobID")->toInt();
1114  }
1115  }
1116  } catch (const StDataException &e) {
1117  log->error(e.getDescription() );
1118  }
1119 
1120  setDbJobID(id);
1121  return id;
1122 }
1123 
1124 //___________________________________________________________________________________
1125 StUcmEvents *TxUCMCollector::getEventList(StRecord *job,int limit, int offset)
1126 {
1127  static StUcmEvents events;
1128  fillEventList(events,limit,offset);
1129  return &events;
1130 }
1131 
1132 //___________________________________________________________________________________
1133 int TxUCMCollector::fillTaskList(StUcmTasks &tasks, int limit, int offset)
1134 {
1135  RecordList &l = tasks.getTasks();
1136  return fillUcmList("Tasks",l,limit,offset);
1137 }
1138 
1139 //___________________________________________________________________________________
1140 int TxUCMCollector::fillUcmList(const char *type, RecordList &records, int limit, int offset)
1141 {
1142  my_ulonglong nRows = 0;
1143  records.Clear();
1144  try {
1145  if (string(type) == "Tasks") {
1146  queryTaskTable(limit,offset);
1147  } else if (string(type) == "Jobs" ) {
1148  queryJobTable(limit,offset);
1149  } else if (string(type) == "Events" ) {
1150  queryEventTable(limit,offset);
1151  } else {
1152  queryTable(type,limit,offset);
1153  }
1154  if (fResult) {
1155  nRows = mysql_num_rows(fResult) ;
1156  log->debug(string(itoa(nRows)) + " rows from " + itoa(offset) + " row" );
1157  for (my_ulonglong i=0; i<nRows;++i)
1158  {
1159  StRecord *task = new StRecord;
1160  fillFields(task->getFields());
1161  records.push_back(task);
1162  }
1163  }
1164  } catch (const StDataException &e) {
1165  log->error(e.getDescription() );
1166  }
1167  return nRows;
1168 }
1169 
1170 //___________________________________________________________________________________
1171 int TxUCMCollector::fillJobList(StUcmJobs &jobs, int limit, int offset)
1172 {
1173  RecordList &l = jobs.getJobs();
1174  return fillUcmList("Jobs",l,limit,offset);
1175 }
1176 
1177 //___________________________________________________________________________________
1178 void TxUCMCollector::setBrokerTaskID(const StRecord *task)
1179 {
1180  const char *requestId = 0;
1181  if (task) {
1182  requestId = task->getField("brokerTaskID")->getValueAsString();
1183  setBrokerTaskID(requestId);
1184  }
1185 }
1186 
1187 //___________________________________________________________________________________
1188 void TxUCMCollector::setBrokerJobID(const StRecord *job)
1189 {
1190  if (job) {
1191  int brokerJobID = job->getField("brokerJobID")->toInt();
1192  setBrokerJobID(brokerJobID);
1193  int jobID = job->getField("jobID")->toInt();
1194  setDbJobID(jobID);
1195  }
1196 }
1197 //___________________________________________________________________________________
1198 int TxUCMCollector::fillJobList(StUcmJobs &jobs, int limit, int offset,const StRecord *task)
1199 {
1200  int row = 0;
1201  RecordList &l = jobs.getJobs();
1202  setBrokerTaskID(task);
1203  row = fillUcmList("Jobs",l,limit,offset);
1204 // if (!row) { //old style to be done yet if needed
1205 // requestId = task->getField("taskID")->getValueAsString();
1206 // }
1207  return row;
1208 }
1209 
1210 
1211 //___________________________________________________________________________________
1212 int TxUCMCollector::fillEventList(StUcmEvents &events, int limit, int offset,const StRecord *job)
1213 {
1214  RecordList &l = events.getEvents();
1215  if (job) setBrokerJobID (job);
1216  return fillUcmList("Events",l,limit,offset);
1217 }
1218 
1219 //___________________________________________________________________________________
1220 void TxUCMCollector::fillFields(FieldList &fields)
1221 {
1222  fRow = mysql_fetch_row(fResult);
1223  if (fRow) {
1224  fField = mysql_fetch_fields(fResult);
1225  unsigned int n_fields = mysql_num_fields(fResult);
1226  log->debug(string("Fetching ") + itoa(n_fields) + " fields ");
1227  for (unsigned int i=0;i<n_fields; ++i)
1228  {
1229  fields.push_back(createField(i));
1230  }
1231  } else {
1232  log->error(mysql_error(connection));
1233  }
1234 }
1235 //___________________________________________________________________________________
1236 StDbFieldI::EDataType TxUCMCollector::MapSqlField(enum_field_types type)
1237 {
1238  StDbFieldI::EDataType ucmType = StDbFieldI::kINVALID;
1239  switch (type) {
1240  case MYSQL_TYPE_TINY: case MYSQL_TYPE_SHORT:case MYSQL_TYPE_LONG:
1241  ucmType =StDbFieldI::kINT;
1242  break;
1243  case MYSQL_TYPE_TIMESTAMP: case MYSQL_TYPE_DATE: case MYSQL_TYPE_TIME:
1244  case MYSQL_TYPE_DATETIME: case MYSQL_TYPE_YEAR:
1245  ucmType = StDbFieldI::kUNIXTIME;
1246  break;
1247 #if 0
1248  case ucmType =StDbFieldI::kLONG; break;
1249  case ucmType =StDbFieldI::kULONG; break;
1250  case ucmType =StDbFieldI::kDOUBLE; break;
1251 #endif
1252  case MYSQL_TYPE_STRING: case MYSQL_TYPE_VAR_STRING:
1253  ucmType =StDbFieldI::kCHAR; break;
1254  // case ucmType =StDbFieldI::kSTRING; break;
1255  default: break;
1256  }
1257  return ucmType;
1258 }
1259 
1260 //___________________________________________________________________________________
1261 StDbFieldI *TxUCMCollector::createField(unsigned int fieldIndx)
1262 {
1263  StDbFieldI *field = new StDbFieldI(fField[fieldIndx].org_name, fRow[fieldIndx],MapSqlField(fField[fieldIndx].type),1);
1264  return field;
1265 }
1266 
1267 
1272  void TxUCMCollector::printVersion(){
1273  log->info("Tx UCM Collector, version 0.4");
1274  }
1275 
1276 
1280  void TxUCMCollector::main(const char *args[]) {
1281 #ifdef FUTURE
1282  boolean debug = false;
1283  Option h = new Option ("h", "help", false, "print this message");
1284  Option v = new Option ("v", "version", false, "print the version information");
1285  Option d = new Option ("d", "debug", false, "set log level to debug");
1286  Option t = new Option ("t", "test", false, "run the setUpTest()");
1287  Option m = new Option ("m", "message", true, "parse one message and exit");
1288 
1289  // create options list
1290  Options options = new Options ();
1291  options.addOption (h);
1292  options.addOption (v);
1293  options.addOption (d);
1294  options.addOption (t);
1295  options.addOption (m);
1296 
1297  CommandLineParser parser = new PosixParser();
1298  CommandLine cmd;
1299  const char * jobMessage=null;
1300  TRY{
1301  cmd = parser.parse (options, args);
1302  }
1303  CATCH (ParseException pe) {
1304  System.out.println ("** Error ** : Check your input parametres:");
1305  usage (options);
1306  return;
1307  }
1308 
1309  if (cmd.hasOption ("m")) {
1310  //useString (options);
1311  jobMessage = cmd.getOptionValue("m");
1312  if (jobMessage.isEmpty () ) {
1313  System.out.println ("Error: no message has been provided");
1314  usage (options);
1315  return;
1316  }
1317  const char * jobMessagesArgs[] = cmd.getArgs() ;
1318  int n = 0;
1319  for (n=0; n < jobMessagesArgs.length; n++)
1320  jobMessage += " " + jobMessagesArgs[n] ;
1321  }
1322  if (cmd.hasOption ("h")) {
1323  usage (options);
1324  return;
1325  }
1326 
1327  if (cmd.hasOption ("v")) {
1328  printVersion ();
1329  return;
1330  }
1331 
1332  if (cmd.hasOption ("d") ){
1333  // set log level to debug
1334  debug = true;
1335  if (debug) {
1336  System.out.println("option:debug");
1337  }
1338  }
1339 
1340  // create the collector object
1341  TxUCMCollector collector = new TxUCMCollector ();
1342  if (collector.initDb() ) {
1343  if (jobMessage == null ) {
1344  // start processing log files and creating database entries
1345  collector.startProcess ();
1346  } else {
1347  collector.processMessage(jobMessage);
1348  }
1349  }
1350  if (debug) {
1351  System.out.println ("created collector object");
1352  }
1353 #endif
1354 }// end main()
1355  // Task table column names
1356  const char * TxUCMCollector::fgTaskCols = "('taskID', 'brokerTaskID', 'brokerID', "
1357  "'requesterID', 'taskName', 'taskDescription', 'taskSize', "
1358  "'taskRemainSize', 'submitTime', 'updateTime', 'archiveFlag')";
1359 
1360  // Jobs table columns
1361  const char *TxUCMCollector::fgJobsTableCols =
1362  "("
1363  "jobID int(11) NOT NULL AUTO_INCREMENT KEY COMMENT 'ID of job when entry is created, unique within table', "
1364  "updateTime timestamp NOT NULL default CURRENT_TIMESTAMP COMMENT 'Time that job execution state was last updated', "
1365  "brokerJobID int(11) NOT NULL COMMENT 'ID of job as assigned by Broker', "
1366  "taskID int(11) NOT NULL COMMENT 'Foreign key reference to Tasks table', "
1367  "gridJobID varchar(64) default NULL COMMENT 'ID for job as assigned by Grid Resource Allocation Manager (GRAM)', "
1368  "localJobID int(11) default NULL COMMENT 'ID for job as assigned by local resource manager or scheduler', "
1369  "gridSubmitTime datetime default NULL COMMENT 'Time that job was submitted to the GRAM', "
1370  "localSubmitTime datetime default NULL COMMENT 'Time that job was submitted to the local resource manager or scheduler', "
1371  "siteLocation varchar(64) default NULL COMMENT 'Physical local of job, could be grid site or local cluster description', "
1372  "queue varchar(64) default NULL COMMENT 'Name and short description of queue that shedules job', "
1373  "queuePosition int(11) default NULL COMMENT 'Integer slot position of job in local resource manager or scheduler', "
1374  "nodeLocation varchar(64) default NULL COMMENT 'Name of worker node that job lands on', "
1375  "startTime datetime default NULL COMMENT 'Time that job started execution', "
1376  "executionUserName varchar(32) default NULL COMMENT 'A login ID on the local resource site & worker node that actually executes', "
1377  "stateID int(11) NOT NULL default '1' COMMENT 'Foreign key reference to StateDictionary table', "
1378  "CONSTRAINT UNIQUE INDEX jobID (brokerJobID, taskID)"
1379  ")";
1380 
1381  // Jobs table column names
1382  const char * TxUCMCollector::fgJobCols = "('jobID', 'brokerJobID', 'taskID', "
1383  "'gridJobID', 'localJobID', 'gridSubmitTime', "
1384  "'localSubmitTime', 'siteLocation', 'queue', 'queuePosition', "
1385  "'nodeLocation', 'startTime', 'executionUserName', 'stateID')";
1386 
1387 
1388  // Events table columns
1389  const char * TxUCMCollector::fgEventsTableCols =
1390  "("
1391  "eventID int(11) NOT NULL AUTO_INCREMENT KEY COMMENT 'ID of event when entry is created, unique within table', "
1392  "time timestamp NOT NULL default CURRENT_TIMESTAMP COMMENT 'Time that event was recorded by the Tracking Library', "
1393  "jobID int(11) NOT NULL COMMENT 'Job that this message is associated with', "
1394  "levelID int(11) NOT NULL COMMENT 'The ID of the log level of the event (WARNING, DEBUG, ERROR, etc.)', "
1395  "context VARCHAR(40) NOT NULL COMMENT 'The bulk category of the log event or the facilty or code where the event happens', "
1396  "stageID int(11) NOT NULL COMMENT 'The ID of the logging stage of the event (i.e., START, STATUS, or END)', "
1397  "messageKey VARCHAR(40) COMMENT 'A user defined property key or SYSTEM for system event', "
1398  "messageValue VARCHAR(120) COMMENT 'A user defined property value or textual content of a log message for a system event', "
1399  "cpuLoad double default NULL COMMENT 'Optional benchmarking value, program CPU load in %', "
1400  "totalMem int(11) default NULL COMMENT 'Optional benchmarking value, total system memory in KiB', "
1401  "usedMem int(11) default NULL COMMENT 'Optional benchmarking value, total used memory in KiB', "
1402  "appMem int(11) default NULL COMMENT 'Optional benchmarking value, total app memory in KiB', "
1403  "INDEX (jobID)"
1404  ")";
1405 
1406  // Events table column names
1407  const char * TxUCMCollector::fgEventCols = "('eventID', 'jobID', 'levelID', "
1408  "'context', 'time', 'stageID', 'messageKey', 'messageValue', "
1409  "'cpuLoad', 'totalMem', 'usedMem', 'appMem')";
1410 //_____________________________________________________________________________
1411 int TxUCMCollector::queryTableSize(const char *tableName,const StRecord *where)
1412 {
1413  int size = 0;
1414  if(string(tableName) == "Jobs") {
1415  setBrokerTaskID(where);
1416  } else if (string(tableName) == "Events") {
1417  setBrokerJobID(where);
1418  }
1419  size = queryTableSize(tableName);
1420  return size;
1421 }
1422 //_____________________________________________________________________________
1423 int TxUCMCollector::queryTableSize(const char *tableName, const char *where)
1424 {
1425  int size = 0;
1426  string whichTask;
1427  bool countSelectedEvents = false;
1428  if ( !where ) {
1429  if ((string(tableName) == "Tasks" ) && (msgHashMap.find(fgRequester) != msgHashMap.end())) {
1430  whichTask=string("requesterID='")+ msgHashMap[fgRequester]+"' ";
1431  where = whichTask.c_str();
1432  } else if (string(tableName) == "Jobs" ) {
1433  tableName = jobTableName().c_str();
1434  } else if (string(tableName) == "Events" ) {
1435  tableName = eventTableName().c_str();
1436  countSelectedEvents = (fDbJobID >= 0 );
1437  }
1438  }
1439  string query = string("select count(*) from `")+ tableName + "`";
1440  if (where &&where[0]) {
1441  query += string(" WHERE ") + where;
1442  } else if (countSelectedEvents) {
1443  query += string(" WHERE ") + string("jobID='")+ itoa(fDbJobID)+"' ";
1444  }
1445  execute (query);
1446  if (fResult) {
1447  if (mysql_num_rows(fResult)==1) {
1448  fRow = mysql_fetch_row(fResult);
1449  if (!fRow) {
1450  log->error(mysql_error(connection));
1451  } else {
1452  size = atoi(*fRow);
1453  } //
1454  } else {
1455  log->error(string("wrong result for <") + query + ">");
1456  }
1457  }
1458  return size;
1459 }
1460 //_____________________________________________________________________________
1461 void TxUCMCollector::queryTable(const char *tableName, int limit, int offset, const char *where)
1462 {
1463  // limit = 0 - no limit
1464  string query = string("select * from `") + tableName + "` ";
1465  if (where && where[0] ) {
1466  query += string("WHERE ") + where;
1467  }
1468  if (limit > 0) query += " LIMIT " + itoa(limit);
1469  if (offset > 0) {
1470  if(limit <= 0 ) query += "LIMIT 9999999 ";
1471  query += " OFFSET " + itoa(offset);
1472  }
1473  execute (query);
1474 }
1475 
1476 //_____________________________________________________________________________
1477 void TxUCMCollector::queryTaskTable(int limit, int offset)
1478 {
1479  string where=string(" requesterID='")+ msgHashMap[fgRequester]+"' ";
1480  where += string(" ORDER BY ") + "taskID" + " DESC ";
1481 // where += " ORDER BY " + fgTaskCols[0] + " DESC ";
1482  queryTable("Tasks",limit,offset,where.c_str());
1483 }
1484 //_____________________________________________________________________________
1485 void TxUCMCollector::queryJobTable(int limit, int offset)
1486 {
1487  queryTable(jobTableName().c_str(),limit,offset);
1488  if (!fResult) {
1489  string where = " taskID=47948 ";
1490  queryTable("Jobs",limit,offset, where.c_str()); // old style
1491  }
1492 }
1493 
1494 //_____________________________________________________________________________
1495 void TxUCMCollector::queryJobTable(const char *taskID,int limit, int offset)
1496 {
1497  queryTable(jobTableName().c_str(),limit,offset);
1498  if (!fResult) {
1499  string where = string(" taskID='") + taskID + "' ";
1500  queryTable("Jobs",limit,offset, where.c_str()); // old style
1501  }
1502 }
1503 //_____________________________________________________________________________
1504 void TxUCMCollector::queryEventTable(int limit, int offset)
1505 {
1506  string where=string("jobID='")+ itoa(fDbJobID)+"' ";
1507  queryTable(eventTableName().c_str(),limit,offset,where.c_str());
1508  if (!fResult) queryTable("Messages",limit,offset,where.c_str()); // old style
1509 }
1510 
1511 //_____________________________________________________________________________
1512 void TxUCMCollector::queryEventTable(const char *jobDbID, int limit, int offset)
1513 {
1514  string where=string(" jobID='")+ jobDbID+"' ";
1515  queryTable(eventTableName().c_str(),limit,offset,where.c_str());
1516  if (!fResult) queryTable("Messages",limit,offset,where.c_str()); // old style
1517 }
1518 
1519 // TxLogEvent interface implemantation
1520 
1521 
1522 void TxUCMCollector::writeDown(const std::string& message)
1523 {
1524 
1525 }
1526 //___________________________________________________________________________________________________
1527 void TxUCMCollector::setEnvBrokerTaskID (const std::string& envBrokerTaskID)
1528 {
1529 }
1530 
1531 //___________________________________________________________________________________________________
1532 void TxUCMCollector::setEnvBrokerJobID (const std::string& envBrokerJobID)
1533 {
1534 }
1535 
1536 //___________________________________________________________________________________________________
1537 void TxUCMCollector::setBrokerTaskID (const std::string& brokerTaskID)
1538 {
1539  msgHashMap[fgBTaskID] = brokerTaskID;
1540 }
1541 
1542 //___________________________________________________________________________________________________
1543 void TxUCMCollector::setBrokerJobID (int brokerJobID)
1544 {
1545  fBrokerJobID = brokerJobID;
1546 }
1547 
1548 //___________________________________________________________________________________________________
1549 void TxUCMCollector::setDbJobID (int dbJobID)
1550 {
1551  fDbJobID = dbJobID;
1552 }
1553 
1554 //___________________________________________________________________________________________________
1555 void TxUCMCollector::setRequesterName (const std::string& requester)
1556 {
1557  msgHashMap[fgRequester] = requester;
1558 }
1559 
1560 //___________________________________________________________________________________________________
1561 void TxUCMCollector::setContext (const std::string& context)
1562 {
1563 }
1564 
1565 //___________________________________________________________________________________________________
1566 void TxUCMCollector::logStart (const std::string& key, const std::string& value)
1567 {
1568 }
1569 
1570 //___________________________________________________________________________________________________
1571 void TxUCMCollector::logJobAttribute (const std::string& key , const std::string&value)
1572 {
1573 
1574 }
1575 
1576 //___________________________________________________________________________________________________
1577 void TxUCMCollector::logJobSubmitLocation (const std::string&url)
1578 {
1579 
1580 }
1581 
1582 //___________________________________________________________________________________________________
1583 void TxUCMCollector::setJobSubmitLocation (const std::string& url)
1584 {
1585 
1586 }
1587 
1588 //___________________________________________________________________________________________________
1589 void TxUCMCollector::logTask (unsigned int size)
1590 {
1591 }
1592 
1593 //___________________________________________________________________________________________________
1594 void TxUCMCollector::logTask (const std::string& taskAttributes)
1595 {
1596 }
1597 
1598 //___________________________________________________________________________________________________
1600 {
1601 }
1602 
1603 //___________________________________________________________________________________________________
1604 void TxUCMCollector::setJobSubmitState (State state)
1605 {
1606 }
1607 
1608 void TxUCMCollector::logJobSubmitID (const std::string& ID)
1609 {
1610 }
1611 
1612 void TxUCMCollector::setJobSubmitID (const std::string& ID)
1613 {
1614 }
1615 
1616 void TxUCMCollector::logEvent (const std::string& logMsg,
1617  Level level,Stage stage, const std::string& msgContext)
1618 {
1619 }
1620 
1621 void TxUCMCollector::logEvent (const std::string& userKey,
1622  const std::string& userValue, Level level, Stage stage,
1623  const std::string& msgContext)
1624 {
1625 }
1626 
1627 void TxUCMCollector::logEnd (const std::string& key, const std::string& value)
1628 {
1629 }
1630 
1631 TXEVENT_DEFAULT_IMPLEMENTAION(TxUCMCollector)
1632 
virtual void setContext(const std::string &context)
static void main(const char *args[])
void processMessage(const std::string &message)
virtual void logEvent(const std::string &logMsg, Level level=LEVEL_INFO, Stage stage=STATUS, const std::string &msgContext=TxUCMConstants::defaultContext)
StDbFieldI * getField(const char *name) const
Definition: StRecord.cxx:129
virtual std::string getDescription() const
virtual void logTask(unsigned int size=1)
virtual void setEnvBrokerJobID(const std::string &envBrokerJobID)
virtual void setEnvBrokerTaskID(const std::string &envBrokerTaskID)
virtual void logEnd(const std::string &key, const std::string &value)
virtual void logJobSubmitState(State state)
const char * getValueAsString() const
Definition: StDbFieldI.cxx:213
virtual void logJobSubmitID(const std::string &ID)
virtual void setRequesterName(const std::string &requester)
virtual void logStart(const std::string &key, const std::string &value)
TxLogging::FieldList & getFields()
Definition: StRecord.cxx:124
virtual void setDbJobID(int dbJobID)
virtual void logJobAttribute(const std::string &key, const std::string &value)
virtual void logJobSubmitLocation(const std::string &url)
virtual StUcmTasks * getTaskList()