StRoot  1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
MySQLAppender.cxx
1 /***************************************************************************
2  MySQLappender.cpp - class MySQLAppender
3  -------------------
4  begin : jeu mai 8 2003
5  copyright : (C) 2003 by Michael CATANZARITI
6  email : mcatan@free.fr
7  ***************************************************************************/
8 
9 /***************************************************************************
10  * Copyright (C) The Apache Software Foundation. All rights reserved. *
11  * *
12  * This software is published under the terms of the Apache Software *
13  * License version 1.1, a copy of which has been included with this *
14  * distribution in the LICENSE.txt file. *
15  ***************************************************************************/
16 #ifdef WIN32
17 #include <windows.h>
18 #endif
19 
20 #include "MySQLAppender.h"
21 #include "TSystem.h"
22 #include "TString.h"
23 
24 #if 1
25 // def HAVE_MySQL
26 
27 #include <log4cxx/helpers/loglog.h>
28 #include <log4cxx/helpers/optionconverter.h>
29 #include <log4cxx/patternlayout.h>
30 
31 using namespace log4cxx;
32 using namespace log4cxx::helpers;
33 using namespace log4cxx::db;
34 using namespace log4cxx::spi;
35 
36 IMPLEMENT_LOG4CXX_OBJECT(MySQLAppender)
37 
38 //_________________________________________________________________________
40 : connection(0), bufferSize(5),fLastId(0),fIsConnectionOpen(false)
41 {
42  // fprintf(stderr,"MySQLAppender::MySQLAppender() \n");
43 }
44 
45 //_________________________________________________________________________
46 MySQLAppender::~MySQLAppender()
47 {
48  // fprintf(stderr,"MySQLAppender::~MySQLAppender()\n" );
49  finalize();
50 }
51 
52 //_________________________________________________________________________
53 void MySQLAppender::setOption(const String& option,
54  const String& value)
55 {
56  if (equalsIgnoreCase(option, _T("buffersize")))
57  {
58  setBufferSize((size_t)OptionConverter::toInt(value, 1));
59  }
60  else if (equalsIgnoreCase(option, _T("password")))
61  {
62  setPassword(value);
63  }
64  else if (equalsIgnoreCase(option, _T("sql")))
65  {
66  setSql(value);
67  }
68  else if (equalsIgnoreCase(option, _T("url"))
69  || equalsIgnoreCase(option, _T("dns")))
70  {
71  setURL(value);
72  }
73  else if (equalsIgnoreCase(option, _T("user")))
74  {
75  setUser(value);
76  }
77  else
78  {
79  AppenderSkeleton::setOption(name, value);
80  }
81 }
82 
83 //_________________________________________________________________________
84 void MySQLAppender::append(const spi::LoggingEventPtr& event)
85 {
86  buffer.push_back(event);
87 
88  if (buffer.size() >= bufferSize)
89  flushBuffer();
90 }
91 
92 //_________________________________________________________________________
93 // String MySQLAppender::getLogStatement(const spi::LoggingEventPtr& event) const
94 String MySQLAppender::getLogStatement(const spi::LoggingEventPtr& event)
95 {
96 #if (STAR_LOG4CXX_VERSION == 9)
97  StringBuffer sbuf;
98  ((MySQLAppender*)this)->getLayout()->format(sbuf, event);
99  return sbuf.str();
100 #else
101  String sbuf;
102  ((MySQLAppender*)this)->getLayout()->format(sbuf, event,pool);
103  return sbuf;
104 #endif
105 }
106 
107 //_________________________________________________________________________
108 unsigned int MySQLAppender::execute(const String& sql)
109 {
110  unsigned int ret=1;
111  if (getConnection()) {
112 // fprintf(stderr,"MYSQL: ---- > execute the MySQL query <%s> \n\n",sql.c_str());
113 // String query = "INSERT INTO StarLogger VALUES (\"";
114 // query += sql;
115 // query += "\");";
116 
117  String query = sql; //
118  if (( ret = mysql_query(connection,query.c_str()) )) {
119  fprintf(stderr, "MYSQL QUERY: %s \n",mysql_error(connection));
120  } else {
121 //
122 // unsigned int last = mysql_insert_id(connection);
123 // if (last && !fLastId) fLastId = last;
124 // fprintf(stderr," ID = %d\n",fLastId);
125  }
126  }
127 // fprintf(stderr,"MYSQL: ---- > return=%d \n",ret);
128  return ret;
129  //tcout << _T("Execute: ") << sql << std::endl;
130 }
131 
132 //_________________________________________________________________________
133 /* The default behavior holds a single connection open until the appender is closed (typically when garbage collected). */
134 void MySQLAppender::closeConnection()
135 {
136  if (fIsConnectionOpen) {
137  // fprintf(stderr," ++++++++ ----> closing the connection %p\n", (void *)connection);
138  mysql_close(connection);
139  if (mysql_errno(connection)) fprintf(stderr,"MYSQL close ERROR %s \n",mysql_error(connection));
140  connection = 0;
141  fIsConnectionOpen = false;
142  }
143 }
144 
145 //_________________________________________________________________________
146 MYSQL *MySQLAppender::getConnection()
147 {
148  if (!fIsConnectionOpen) {
149 
150  if ( !(connection= mysql_init(connection)) ) {
151  fprintf(stderr,"MYSQL: ---- > No init connection \n");
152  } else {
153 
154  const char *host = "heston.star.bnl.gov";
155  const char *user = "StarLogger";
156  const char *passwd = "logger";
157  const char *db = "logger";
158  unsigned int port = 3306;
159 // fprintf(stderr,"MYSQL: ---- > Establishing MySQL connection open %d \n", fIsConnectionOpen);
160  if (!(mysql_real_connect(connection
161  , host
162  , user
163  , passwd
164  , db
165  , port
166  , 0,0
167  )))
168  {
169  fprintf(stderr, "MYSQL: ---- > No connection: %s \n",mysql_error(connection));
170  connection = 0;
171  fIsConnectionOpen = false;
172  } else {
173  fIsConnectionOpen = true;
174  }
175  }
176  }
177  return connection;
178 }
179 
180 //_________________________________________________________________________
181 void MySQLAppender::close()
182 {
183  flushBuffer();
184  closeConnection();
185  this->closed = true;
186 }
187 //_________________________________________________________________________
188 static void ReplaceVariable(TString &string, const char *var)
189 {
190 // replace the $VAR with its value if any
191  TString spec;
192  const char *varValue = gSystem->Getenv(var);
193  if (!varValue) {
194  // Special cases
195  spec = var;
196  if (spec == "REQUESTID") {
197  spec.Form("%d",gSystem->GetPid());
198  varValue= spec.Data();
199  } else if (spec == "JOBINDEX") {
200  spec.Form("%d",0);
201  varValue= spec.Data();
202  }
203  }
204 
205  if (varValue) {
206  TString fullName = "$"; fullName += var;
207  // fullName.ToUpper();
208  string.ReplaceAll(fullName,varValue);
209  }
210 }
211 //_________________________________________________________________________
212 void MySQLAppender::flushBuffer()
213 {
214  //Do the actual logging
215  //removes.ensureCapacity(buffer.size());
216  static bool TaskEntryDone = false;
217  std::list<spi::LoggingEventPtr>::iterator i;
218  if ( getConnection()) {
219  for (i = buffer.begin(); i != buffer.end(); i++)
220  {
221  TString expandCommand;
222  String sql;
223  if (!TaskEntryDone) {
224 
226  expandCommand =
227 // expandCommand ="INSERT IGNORE TaskDescription (taskId, jobID_MD5, nProcesses, submissionTime, time, TaskUser,JobName,JobDescription,TaskJobUser)"
228 #ifdef OLDTABLE
229  "INSERT DELAYED IGNORE TaskDescription (TaskDescriptionID, TaskRequestID_MD5, TaskSize, TaskRemainSize, EntryTime, UpdateTime, TaskUser,TaskDescription,TaskCredential,BrokerID)"
230  " VALUES ( DEFAULT, \"$REQUESTID\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\",\"$SUMS_AUTHENTICATED_USER\",\"SUMS\");";
231 #else
232  "INSERT DELAYED IGNORE Tasks (taskID, brokerTaskID, taskName, taskSize, taskRemainSize, submitTime, updateTime, requesterID,taskDescription)"
233  " VALUES ( DEFAULT, \"$REQUESTID\", \"Short name of task\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\");";
234 #endif
235 // Edit meta symbols
236 //-----------------------
237 // $hostid = $HOSTNAME
238 // $JobUser = $USER
239 // $SUMSJobId = $REQUESTID
240 // $SUMSProcessID = $JOBINDEX (was $PROCESSID)
241 
242  ReplaceVariable(expandCommand, "REQUESTID");
243  ReplaceVariable(expandCommand, "SUMS_nProcesses");
244  ReplaceVariable(expandCommand, "SUBMIT_TIME");
245 
246  ReplaceVariable(expandCommand, "SUMS_name");
247  ReplaceVariable(expandCommand, "SUMS_USER");
248  ReplaceVariable(expandCommand, "SUMS_AUTHENTICATED_USER");
249  sql = expandCommand.Data();
250  if (!execute(sql)) TaskEntryDone = true;
251  }
252 // -- TaskDescription block
253  if (TaskEntryDone) {
254 //--- Job description
255 
256 // expandCommand ="INSERT IGNORE INTO JobDescription SET ";
257 
258 #ifdef OLDTABLE
259  expandCommand ="INSERT DELAYED IGNORE INTO JobDescription SET ";
260 
261  expandCommand += "TaskDescriptionID = (SELECT TaskDescriptionID FROM TaskDescription WHERE TaskRequestID_MD5=\"$REQUESTID\")";
262  expandCommand += ", ";
263  expandCommand += "TaskRequestID_MD5=\"$REQUESTID\"";
264  expandCommand += ", ";
265  expandCommand += "BrokerProcessID=\"$JOBINDEX\"";
266  expandCommand += ", ";
267  expandCommand += "JobLocationURL=\"$HOSTNAME\"";
268  expandCommand += ", ";
269  expandCommand += "JobUser=\"$USER\"";
270  expandCommand += "; ";
271 #else
272  expandCommand ="INSERT DELAYED IGNORE INTO Jobs SET ";
273 
274  expandCommand += "taskID = (SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\")";
275  expandCommand += ", ";
276  expandCommand += "brokerJobID=\"$JOBINDEX\"";
277  expandCommand += ", ";
278  expandCommand += "startTime=NOW()";
279  expandCommand += ", ";
280  expandCommand += "nodeLocation=\"$HOSTNAME\"";
281  expandCommand += ", ";
282  expandCommand += "stateID=\"4\""; // (4, 'Active', 'Scheduler running job', 'ucmAdmin', '2007-07-12 10:25:35')
283  expandCommand += ", ";
284  expandCommand += "executionUserName=\"$USER\"";
285  expandCommand += "; ";
286 #endif
287 // Edit meta symbols
288 //-----------------------
289 // $hostid = $HOSTNAME
290 // $JobUser = $USER
291 // $SUMSJobId = $REQUESTID
292 // $SUMSProcessID = $JOBINDEX (was $PROCESSID)
293 
294  ReplaceVariable(expandCommand, "USER");
295  ReplaceVariable(expandCommand, "HOSTNAME");
296  ReplaceVariable(expandCommand, "REQUESTID");
297  ReplaceVariable(expandCommand, "JOBINDEX");
298  sql = expandCommand.Data();
299  if (!execute(sql) ) {
300 
301 
302 // Job tracking block
303  const LoggingEventPtr& logEvent = *i;
304  String sql = getLogStatement(logEvent);
305  expandCommand = sql.c_str();
306  // fprintf(stderr," MYSQL QUERY: <%s>\n", sql.c_str());
307  ReplaceVariable(expandCommand, "REQUESTID");
308  ReplaceVariable(expandCommand, "JOBINDEX");
309 
310  sql = expandCommand.Data();
311  if (!execute(sql)) {
312 #ifdef NEWTABLE_EXPANSION
313  expandCommand = "UPDATE LOW_PRIORITY IGNORE Jobs SET updateTime=NOW() WHERE brokerJobID=\"$JOBINDEX\" AND taskID=(SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\");";
314  ReplaceVariable(expandCommand, "REQUESTID");
315  ReplaceVariable(expandCommand, "JOBINDEX");
316  if (execute(sql)) {
317  fprintf(stderr," MYSQL ----> can not update the Jobs record%s \n", expandCommand.c_str());
318  }
319 #endif
320  } else {
321  // clear the buffer of reported events
322  fprintf(stderr," MYSQL ----> skip and lose event \n");
323  }
324  }
325  }
326  }
327  buffer.clear();
328  }
329  closeConnection();
330 }
331 
332 //_________________________________________________________________________
333 void MySQLAppender::setSql(const String& s)
334 {
335  sqlStatement = s;
336  if (getLayout() == 0)
337  {
338  this->setLayout(new PatternLayout(s));
339  }
340  else
341  {
342  PatternLayoutPtr patternLayout = this->getLayout();
343  if (patternLayout != 0)
344  {
345  patternLayout->setConversionPattern(s);
346  }
347  }
348 }
349 #if (STAR_LOG4CXX_VERSION == 10)
350 //_________________________________________________________________________
351 void MySQLAppender::append(const spi::LoggingEventPtr& event, log4cxx::helpers::Pool& p)
352 {
353  append(event);
354 }
355 #endif
356 #endif //HAVE_MySQL