// @(#)root/proof:$Name: $:$Id: TProofServ.cxx,v 1.3 2000/06/13 09:43:33 brun Exp $
// Author: Fons Rademakers 16/02/97
/*************************************************************************
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TProofServ //
// //
// TProofServ is the PROOF server. It can act either as the master //
// server or as a slave server, depending on its startup arguments. It //
// receives and handles message coming from the client or from the //
// master server. //
// //
//////////////////////////////////////////////////////////////////////////
#ifdef WIN32
#include <io.h>
typedef long off_t;
#endif
#include "TProofServ.h"
#include "TProof.h"
#include "TROOT.h"
#include "TFile.h"
#include "TSysEvtHandler.h"
#include "TSystem.h"
#include "TInterpreter.h"
#include "TException.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TMessage.h"
#include "TEnv.h"
#include "TError.h"
#include "TTree.h"
TProofServ *gProofServ;
//______________________________________________________________________________
void ProofErrorHandler(int level, Bool_t abort, const char *location, const char *msg)
{
// The PROOF error handler function. It prints the message on stderr and
// if abort is set it aborts the application.
if (level < gErrorIgnoreLevel)
return;
const char *type = 0;
ELogLevel loglevel = kLogWarning;
if (level >= kWarning) {
loglevel = kLogWarning;
type = "Warning";
}
if (level >= kError) {
loglevel = kLogErr;
type = "Error";
}
if (level >= kSysError) {
loglevel = kLogErr;
type = "SysError";
}
if (level >= kFatal) {
//loglevel = kLogEmerg;
loglevel = kLogErr;
type = "Fatal";
}
char *bp;
if (!location || strlen(location) == 0) {
fprintf(stderr, "%s: %sn", type, msg);
bp = Form("%s:%s:%s", gProofServ->GetUser(), type, msg);
} else {
fprintf(stderr, "%s in <%s>: %sn", type, location, msg);
bp = Form("%s:%s:%s:%s", gProofServ->GetUser(), type, location, msg);
}
fflush(stderr);
gSystem->Syslog(loglevel, bp);
if (abort) {
gProofServ->GetSocket()->Send(kPROOF_FATAL);
fprintf(stderr, "abortingn");
fflush(stderr);
gSystem->StackTrace();
gSystem->Abort();
}
}
//----- Interrupt signal handler -----------------------------------------------
//______________________________________________________________________________
class TProofInterruptHandler : public TSignalHandler {
public:
TProofInterruptHandler() : TSignalHandler(kSigUrgent, kFALSE) { }
Bool_t Notify();
};
//______________________________________________________________________________
Bool_t TProofInterruptHandler::Notify()
{
gProofServ->HandleUrgentData();
if (TROOT::Initialized()) {
Throw(GetSignal());
}
return kTRUE;
}
//----- Socket Input handler --------------------------------------------
//______________________________________________________________________________
class TSocketInputHandler : public TFileHandler {
public:
TSocketInputHandler(int fd) : TFileHandler(fd, 1) { }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
//______________________________________________________________________________
Bool_t TSocketInputHandler::Notify()
{
gProofServ->HandleSocketInput();
return kTRUE;
}
ClassImp(TProofServ)
//______________________________________________________________________________
TProofServ::TProofServ(int *argc, char **argv)
: TApplication("proofserv", argc, argv)
{
// Create an application environment. The TProofServ environment provides
// an eventloop via inheritance of TApplication.
// Make sure all registered dictionaries have been initialized
gInterpreter->InitializeDictionaries();
// abort on kSysError's or higher and set error handler
gErrorAbortLevel = kSysError;
SetErrorHandler(ProofErrorHandler);
fNcmd = 0;
fInterrupt = kFALSE;
fLogLevel = 1;
fRealTime = 0.0;
fCpuTime = 0.0;
fSocket = new TSocket(0);
Setup();
RedirectOutput();
// Load user functions
const char *logon;
logon = gEnv->GetValue("Proof.Load", (char*)0);
if (logon && !gSystem->AccessPathName(logon, kReadPermission))
ProcessLine(Form(".L %s",logon));
// Execute logon macro
logon = gEnv->GetValue("Proof.Logon", (char*)0);
if (logon && !NoLogOpt() && !gSystem->AccessPathName(logon, kReadPermission))
ProcessFile(logon);
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
// Install interrupt and terminal input handlers
TProofInterruptHandler *ih = new TProofInterruptHandler;
gSystem->AddSignalHandler(ih);
TSocketInputHandler *th = new TSocketInputHandler(0);
gSystem->AddFileHandler(th);
gProofServ = this;
}
//______________________________________________________________________________
TProofServ::~TProofServ()
{
// Cleanup. Not really necessary since after this dtor there is no
// live anyway.
delete fSocket;
}
//______________________________________________________________________________
TObject *TProofServ::Get(const char *namecycle)
{
// Get object with name "name;cycle" (e.g. "aap;2") from master or client.
// This method is called by TDirectory::Get() in case the object can not
// be found locally.
fSocket->Send(namecycle, kPROOF_GETOBJECT);
TMessage *mess;
if (fSocket->Recv(mess) < 0)
return 0;
TObject *idcur = 0;
if (mess->What() == kMESS_OBJECT)
idcur = mess->ReadObject(mess->GetClass());
delete mess;
return idcur;
}
//______________________________________________________________________________
void TProofServ::GetLimits(Int_t dim, Int_t nentries, Int_t *nbins, Double_t *vmin, Double_t *vmax)
{
// Get limits of histogram from master. This method is called by
// TTree::TakeEstimate().
TMessage mess(kPROOF_LIMITS);
mess << dim << nentries << nbins[0] << vmin[0] << vmax[0];
if (dim == 2)
mess << nbins[1] << vmin[1] << vmax[1];
if (dim == 3)
mess << nbins[2] << vmin[2] << vmax[2];
fSocket->Send(mess);
TMessage *answ;
if (fSocket->Recv(answ) != -1) {
(*answ) >> nbins[0] >> vmin[0] >> vmax[0];
if (dim == 2)
(*answ) >> nbins[1] >> vmin[1] >> vmax[1];
if (dim == 3)
(*answ) >> nbins[2] >> vmin[2] >> vmax[2];
delete answ;
}
}
//______________________________________________________________________________
Bool_t TProofServ::GetNextPacket(Int_t &nentries, Stat_t &firstentry)
{
// Get next range of entries to be processed on this server.
fSocket->Send(kPROOF_GETPACKET);
TMessage *mess;
if (fSocket->Recv(mess) < 0)
return kFALSE;
(*mess) >> nentries >> firstentry >> fEntriesProcessed;
if (nentries == -1)
return kFALSE;
return kTRUE;
}
//______________________________________________________________________________
void TProofServ::GetOptions(int *argc, char **argv)
{
// Get and handle command line options.
for (int i = 1; i < *argc; i++) {
if (!strcmp(argv[i], "proofserv") || !strcmp(argv[i], "proofslave")) {
fService = argv[i];
fMasterServ = kTRUE;
if (!strcmp(argv[i], "proofslave")) fMasterServ = kFALSE;
} else {
fConfDir = argv[i];
}
}
}
//______________________________________________________________________________
void TProofServ::HandleSocketInput()
{
// Handle input coming from the client or from the master server.
static TStopwatch timer;
TMessage *mess;
char str[2048];
Int_t what;
if (fSocket->Recv(mess) < 0)
return; // do something more intelligent here
what = mess->What();
timer.Start();
fNcmd++;
switch (what) {
case kMESS_CINT:
mess->ReadString(str, sizeof(str));
if (fLogLevel > 1) printf("Processing: %s...n", str);
//gSystem->Syslog(kLogInfo, "%s", str);
ProcessLine(str);
SendLogFile();
break;
case kMESS_STRING:
mess->ReadString(str, sizeof(str));
break;
case kMESS_OBJECT:
mess->ReadObject(mess->GetClass());
break;
case kPROOF_GROUPVIEW:
mess->ReadString(str, sizeof(str));
sscanf(str, "%d %d", &fGroupId, &fGroupSize);
break;
case kPROOF_LOGLEVEL:
mess->ReadString(str, sizeof(str));
sscanf(str, "%d", &fLogLevel);
break;
case kPROOF_PING:
// do nothing
break;
case kPROOF_RESET:
mess->ReadString(str, sizeof(str));
Reset(str);
break;
case kPROOF_STATUS:
SendStatus();
break;
case kPROOF_STOP:
Terminate(0);
break;
case kPROOF_TREEDRAW:
mess->ReadString(str, sizeof(str));
{
Int_t maxv, est;
char name[64];
sscanf(str, "%s %d %d", name, &maxv, &est);
TTree *t = (TTree*)gDirectory->Get(name);
if (t) {
t->SetMaxVirtualSize(maxv);
t->SetEstimate(est);
}
}
break;
default:
Error("HandleSocketInput", "unknown command");
break;
}
fRealTime += (Float_t)timer.RealTime();
fCpuTime += (Float_t)timer.CpuTime();
delete mess;
}
//______________________________________________________________________________
void TProofServ::HandleUrgentData()
{
// Handle Out-Of-Band data sent by the master or client.
char oob_byte;
int n, nch, wasted = 0;
const int kBufSize = 1024;
char waste[kBufSize];
if (fLogLevel > 5)
printf("HandleUrgentData()...");
// Receive the OOB byte
while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
if (n == -2) { // EWOULDBLOCK
//
// The OOB data has not yet arrived: flush the input stream
//
// In some systems (Solaris) regular recv() does not return upon
// receipt of the oob byte, which makes the below call to recv()
// block indefinitely if there are no other data in the queue.
// FIONREAD ioctl can be used to check if there are actually any
// data to be flushed. If not, wait for a while for the oob byte
// to arrive and try to read it again.
//
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("HandleUrgentData", "error receiving waste");
break;
}
wasted = 1;
} else {
Error("HandleUrgentData", "error receiving OOB");
return;
}
}
if (fLogLevel > 5)
printf("got OOB byte: %dn", oob_byte);
switch (oob_byte) {
case TProof::kHardInterrupt:
if (IsMaster())
gSystem->Syslog(kLogInfo, "*** Master: Hard Interrupt");
else
gSystem->Syslog(kLogInfo, Form("*** Slave %d: Hard Interrupt", fOrdinal));
// If master server, propagate interrupt to slaves
if (IsMaster() && gProof)
gProof->Interrupt(TProof::kHardInterrupt);
// Flush input socket
while (1) {
int atmark;
fSocket->GetOption(kAtMark, atmark);
if (atmark) {
// Send the OOB byte back so that the client knows where
// to stop flushing its input stream of obsolete messages
n = fSocket->SendRaw(&oob_byte, 1, kOob);
if (n <= 0)
Error("HandleUrgentData", "error sending OOB");
break;
}
// find out number of bytes to read before atmark
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("HandleUrgentData", "error receiving waste (2)");
break;
}
}
break;
case TProof::kSoftInterrupt:
if (IsMaster())
gSystem->Syslog(kLogInfo, "Master: Soft Interrupt");
else
gSystem->Syslog(kLogInfo, Form("Slave %d: Soft Interrupt", fOrdinal));
// If master server, propagate interrupt to slaves
if (IsMaster() && gProof)
gProof->Interrupt(TProof::kSoftInterrupt);
if (wasted) {
Error("HandleUrgentData", "soft interrupt flushed stream");
break;
}
Interrupt();
break;
case TProof::kShutdownInterrupt:
if (IsMaster())
gSystem->Syslog(kLogInfo, "Master: Shutdown Interrupt");
else
gSystem->Syslog(kLogInfo, Form("Slave %d: Shutdown Interrupt", fOrdinal));
// If master server, propagate interrupt to slaves
if (IsMaster() && gProof)
gProof->Interrupt(TProof::kShutdownInterrupt);
Terminate(0); // will not return from here....
break;
default:
Error("HandleUrgentData", "unexpected OOB byte");
break;
}
SendLogFile();
}
//______________________________________________________________________________
void TProofServ::Print(Option_t *)
{
// Print status of slave server.
Printf("This is slave %s", gSystem->HostName());
}
//______________________________________________________________________________
void TProofServ::RedirectOutput()
{
// Redirect stdout to a log file. This log file will be flushed to the
// client or master after each command.
// Duplicate the initial (0) socket, this will yield a socket with a
// descriptor >0, which will free id=0 for stdout.
int isock;
if ((isock = dup(fSocket->GetDescriptor())) < 0)
SysError("RedirectOutput", "could not duplicate output socket");
fSocket->SetDescriptor(isock);
// Remove all previous log files and create new log files.
char logfile[512];
if (IsMaster()) {
gSystem->Exec(Form("/bin/rm -f %s/proof_*.log", fLogDir.Data()));
sprintf(logfile, "%s/proof_%d.log", fLogDir.Data(), gSystem->GetPid());
} else {
gSystem->Exec(Form("/bin/rm -f %s/proofs%d_*.log", fLogDir.Data(),
fOrdinal));
sprintf(logfile, "%s/proofs%d_%d.log", fLogDir.Data(), fOrdinal,
gSystem->GetPid());
}
if ((freopen(logfile, "w", stdout)) == 0)
SysError("RedirectOutput", "could not freopen stdout");
if ((dup2(fileno(stdout), fileno(stderr))) < 0)
SysError("RedirectOutput", "could not redirect stderr");
if ((fLogFile = fopen(logfile, "r")) == 0)
SysError("RedirectOutput", "could not open logfile");
#if 0
// Send message of the day to the client.
if (IsMaster()) CatMotd();
#endif
}
//______________________________________________________________________________
void TProofServ::Reset(const char *dir)
{
// Reset PROOF environment to be ready for execution of next command.
// First go to new directory.
gDirectory->cd(dir);
// Clear interpreter environment.
gROOT->Reset();
// Make sure current directory is empty (don't delete anything when
// we happen to be in the ROOT memory only directory!?)
if (gDirectory != gROOT) {
TObject *obj;
TIter next(gDirectory->GetList());
while ((obj = next()))
if (!obj->InheritsFrom(TTree::Class())) {
gDirectory->GetList()->Remove(obj);
delete obj;
}
}
}
//______________________________________________________________________________
void TProofServ::Run(Bool_t retrn)
{
// Main server eventloop.
TApplication::Run(retrn);
}
//______________________________________________________________________________
void TProofServ::SendLogFile()
{
// Send log file to master.
// Determine the number of bytes left to be read from the log file.
fflush(stdout);
off_t ltot, lnow;
Int_t left;
ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
lnow = lseek(fileno(fLogFile), (off_t) 0, SEEK_CUR);
left = Int_t(ltot - lnow);
if (left <= 0)
fSocket->Send(kPROOF_LOGDONE);
else {
fSocket->Send(kPROOF_LOGFILE);
while (left > 0) {
char line[256];
if (fgets(line, sizeof(line), fLogFile) == 0) {
left = 0;
} else {
left -= strlen(line);
fSocket->Send(line);
}
if (!left)
fSocket->Send(kPROOF_LOGDONE);
}
}
}
//______________________________________________________________________________
void TProofServ::SendStatus()
{
// Send status of slave server to master or client.
char str[64];
sprintf(str, "%g %.3f %.3f", TFile::GetFileBytesRead(), fRealTime, fCpuTime);
fSocket->Send(str, kPROOF_STATUS);
}
//______________________________________________________________________________
void TProofServ::Setup()
{
// Print the ProofServ logo on standard output.
char str[512];
if (IsMaster()) {
sprintf(str, "**** Welcome to the Proof server @ %s ****", gSystem->HostName());
} else {
sprintf(str, "*** Proof slave server @ %s started ****", gSystem->HostName());
}
fSocket->Send(str);
fSocket->Recv(str, sizeof(str));
char user[16], vers[16], userpass[64], curdir[256];
if (IsMaster()) {
sscanf(str, "%s %s %s %d", user, vers, userpass, &fProtocol);
fUserPass = userpass;
} else {
sscanf(str, "%s %s %s %d %d %d", user, vers, curdir, &fProtocol,
&fMasterPid, &fOrdinal);
}
fUser = user;
fVersion = vers;
// deny write access for group and world
gSystem->Umask(022);
if (IsMaster())
gSystem->Openlog("proofserv", kLogPid | kLogCons, kLogLocal6);
else
gSystem->Openlog("proofslave", kLogPid | kLogCons, kLogLocal7);
// Set $HOME and $PATH. The HOME directory was already set to the
// user's home directory by proofd.
gSystem->Setenv("HOME", gSystem->HomeDirectory());
#ifdef R__UNIX
gSystem->Setenv("PATH", "/bin:/usr/bin:/usr/contrib/bin:/usr/local/bin");
#endif
// set the working directory to $HOME/proof
char workdir[256];
sprintf(workdir, "%s/proof", gSystem->HomeDirectory());
if (gSystem->AccessPathName(workdir)) {
gSystem->MakeDirectory(workdir);
if (!gSystem->ChangeDirectory(workdir)) {
SysError("Setup", "can not change working directory");
}
} else {
if (!gSystem->ChangeDirectory(workdir)) {
gSystem->Unlink(workdir);
gSystem->MakeDirectory(workdir);
if (!gSystem->ChangeDirectory(workdir)) {
SysError("Setup", "can not change working directory");
}
}
}
// for master server the work and log directory are the same
fLogDir = workdir;
// Slave servers set their work directory to the work directory of the
// master server.
if (!IsMaster()) {
if (!gSystem->ChangeDirectory(curdir))
SysError("Setup", "can not change to the current directory");
}
// Incoming OOB should generate a SIGURG
fSocket->SetOption(kProcessGroup, gSystem->GetPid());
// Send packages of immediately to reduce latency
fSocket->SetOption(kNoDelay, 1);
}
//______________________________________________________________________________
void TProofServ::Terminate(int status)
{
// Terminate the proof server.
gSystem->Exit(status);
}
//______________________________________________________________________________
Bool_t TProofServ::IsActive()
{
// Static function that returns kTRUE in case we are a PROOF server.
return gProofServ ? kTRUE : kFALSE;
}
//______________________________________________________________________________
TProofServ *TProofServ::This()
{
// Static function returning pointer to global object gProofServ.
// Mainly for use via CINT, where the gProofServ symbol might be
// deleted from the symbol table.
return gProofServ;
}
ROOT page - Class index - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.