34#include <netinet/in.h>
81int XrdCmsNode::LastFree = 0;
88const char *msrcmsg =
"Cluster does not support multi-source access.";
89int msrclen = strlen(msrcmsg)+1;
90const char *mtrymsg =
"Cluster retry limit exceeded.";
91int mtrylen = strlen(mtrymsg)+1;
99 int port,
int lvl,
int id)
102 static const SMask_t smask_1(1);
106 NodeMask = (
id < 0 ? 0 : smask_1 << id);
110 myNID = strdup(nid ? nid :
"?");
111 if ((myCID = index(myNID,
' '))) myCID++;
118 setName(lnkp, theIF, (nid ? port : 0));
135 if (cidP) {cidP->
RemNode(
this); cidP = 0;}
137 if (myNID) free(myNID);
138 if (myName)free(myName);
148 const char *hname = lnkp->
Host();
153 {
if (!strcmp(myName,hname) && port == netIF.
Port()
165 if (theIF && !netIF.
InDomain(&netID)) theIF = 0;
166 netIF.
SetIF(&netID, theIF, port);
171 myName = strdup(hname);
172 myNlen = strlen(hname);
174 if (!port) strcpy(buff, lnkp->
ID);
175 else sprintf(buff,
"%s:%d", lnkp->
ID, port);
177 Ident = strdup(buff);
187 static const int warnIntvl = 60;
188 int totWait = 0, tmoWarn = 60;
213 if (totWait >= tmoWarn)
214 {
unsigned int theCnt = refCnt;
216 tmoWarn += warnIntvl;
226 if (doDel)
delete this;
227 else {
char eBuff[256];
228 snprintf(eBuff,
sizeof(eBuff),
229 " (%p) delete timeout; node object lost!",
this);
238void XrdCmsNode::DeleteWarn(
unsigned int lkVal)
245 snprintf(eBuff,
sizeof(eBuff),
"delete sync stall; refs = %u", lkVal);
260 if (needLock) nodeMutex.
Lock();
274 if (needLock) nodeMutex.
UnLock();
291 DiskUtil =
static_cast<int>(Arg.dskUtil);
319 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
328 return (rc ? fsFail(Arg.
Ident,
"chmod", Arg.
Path, rc) : 0);
344 Say.
Emsg(
"Node", Link->
Name(),
"requested a disconnect");
367 static const SMask_t allNodes(~0);
409 static const SMask_t allNodes(~0);
460 uint32_t pcpu, pnet, pxeq, pmem, ppag, pdsk;
484 DEBUGR(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
485 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk
486 <<
"% " <<
DiskFree <<
"MB load=" <<myLoad <<
" mass=" <<myMass);
506 long long tRefs = Cluster.
Refs();
507 long long nRefs =
static_cast<long long>(RefTotW + RefTotR)*100;
508 long long sRefs =
static_cast<long long>(Share) * Shrin * 100;
509 int myShr = (Share ? Share : 100);
510 if (tRefs) {nRefs /= tRefs; sRefs /= tRefs;}
511 else nRefs = sRefs = 0;
512 snprintf(buff,
sizeof(buff)-1,
513 "load=%d; cpu=%d net=%d inq=%d mem=%d pag=%d dsk=%d utl=%d "
514 "shr=[%d %lld %lld] ref=[%d %d]",
515 myLoad, pcpu, pnet, pxeq, pmem, ppag, Arg.
dskFree, pdsk,
516 myShr, nRefs, sRefs, RefTotR+RefR, RefTotW+RefW);
539 struct iovec ioV[2] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
542 char eBuff[128], theopts[8], *toP = theopts;
546 bool lsuniq =
false, oksel =
false, lsall = (*Arg.
Path ==
'*');
561 {lsuniq =
true; *toP++=
'u';}
589 reqInfo.
lsLU =
static_cast<char>(lsopts);
600 if ((rc = Cluster.
Locate(Sel)))
603 bytes =
sizeof(Resp.Val); Why =
"delay ";
608 bytes =
strlcpy(Resp.outbuff,
"No servers have access to the file",
609 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
611 }
else {Why =
"?"; bytes = 0;}
616 {
if (!Sel.
Vec.hf || !(sP=Cluster.
List(Sel.
Vec.hf, lsopts, oksel)))
621 sprintf(eBuff,
"No servers are reachable via %s network",
626 eTxt =
"No servers have the file";
628 bytes =
strlcpy(Resp.outbuff, eTxt,
629 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
636 {Resp.Val = htonl(rc);
639 bytes =
do_LocFmt(Resp.outbuff, sP, Sel.
Vec.pf, Sel.
Vec.wf, lsall,lsuniq)
640 +
sizeof(Resp.Val) + 1;
648 ioV[1].iov_len = bytes;
675 {
if (haverw) pP->
Status |= Skip;
676 else {
if (xP) xP->
Status |= Skip;
678 haverw = (pP->
Mask & wfVec) != 0;
692 if (sP->
Status & Hung) *oP = tolower(*oP);
693 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
695 if (sP->
next) *oP++ =
' ';
696 pP = sP; sP = sP->
next;
delete pP;
700 {
if (!(sP->
Status & Skip))
702 if (sP->
Mask & pfVec) *oP = tolower(*oP);
703 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
705 if (sP->
next) *oP++ =
' ';
707 pP = sP; sP = sP->
next;
delete pP;
736 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
745 return (rc ? fsFail(Arg.
Ident,
"mkdir", Arg.
Path, rc) : 0);
768 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
777 return (rc ? fsFail(Arg.
Ident,
"mkpath", Arg.
Path, rc) : 0);
789 static const SMask_t allNodes(~0);
813 if ((rc = Cluster.
Select(Sel2)))
814 {
if (rc > 0) {Arg.waitVal = rc;
return "!mv";}
815 else if (Sel2.
Vec.hf)
817 return "target file exists";
833 return (rc ? fsFail(Arg.
Ident,
"mv", Arg.
Path, rc) : 0);
850 Link->
Send((
char *)&pongIt,
sizeof(pongIt));
919 static const SMask_t allNodes(~0);
942 return (rc ? fsFail(Arg.
Ident,
"rm", Arg.
Path, rc) : 0);
954 static const SMask_t allNodes(~0);
977 return (rc ? fsFail(Arg.
Ident,
"rmdir", Arg.
Path, rc) : 0);
985 char *Avoid,
bool &doRedir)
994 do {
if ((Comma = index(Avoid,
','))) *Comma =
'\0';
995 if (*Avoid ==
'+') Sel.
nmask |= Cluster.
getMask(Avoid+1);
996 else if (!avoidAddr.
Set(Avoid,0))
998 Avoid = Comma+1; avNum++;
999 }
while(Comma && *Avoid);
1021 strncpy(Sel.
Resp.Data, msrcmsg,
sizeof(Sel.
Resp.Data));
1022 Sel.
Resp.DLen = msrclen;
1037 strncpy(Sel.
Resp.Data, mtrymsg,
sizeof(Sel.
Resp.Data));
1038 Sel.
Resp.DLen = mtrylen;
1062 struct iovec ioV[2];
1063 char theopts[16], *toP = theopts;
1133 bool doRedir =
false;
1140 if (!doRedir && (rc || (rc = Cluster.
Select(Sel))))
1152 Sel.
Resp.Port = rtEC[rtRC];
1155 Sel.
Resp.DLen = sprintf(Sel.
Resp.Data,
"%s",
"Item not found.")+1;
1160 }
else if (!Sel.
Resp.DLen)
return 0;
1163 <<Sel.
Resp.Port <<
" for " <<Arg.
Path);
1168 bytes = Sel.
Resp.DLen+
sizeof(Sel.
Resp.Port);
1170 Sel.
Resp.Port = htonl(Sel.
Resp.Port);
1174 ioV[0].iov_base = (
char *)&Arg.
Request;
1175 ioV[0].iov_len =
sizeof(Arg.
Request);
1176 ioV[1].iov_base = (
char *)&Sel.
Resp;
1177 ioV[1].iov_len = bytes;
1223 rc = Cluster.
Select(Scl);
1225 DEBUGR(
"coloc to " <<Arg.clPath <<
" delayed " <<rc <<
" seconds");
1229 else Sel.
nmask = ~Scl.smask;
1234 if ((rc = Cluster.
Select(Sel)))
1238 DEBUGR(
"prep delayed " <<rc <<
" seconds");
1242 PrepQ.
Inform(
"unavail", &Arg);
1259 struct iovec xmsg[2];
1261 char buff[
sizeof(int)*2+2], *bp = buff;
1262 int blen, maxfr, tutil;
1271 DEBUGR(maxfr <<
"MB free; " <<tutil <<
"% util");
1277 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1283 else {xmsg[0].iov_base = (
char *)&mySpace;
1284 xmsg[0].iov_len =
sizeof(mySpace);
1285 xmsg[1].iov_base = buff;
1286 xmsg[1].iov_len = blen;
1287 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1288 Link->
Send(xmsg, 2);
1302 struct iovec xmsg[2];
1321 pinfo.
rovec = NodeMask;
1332 {
TRACER(Files,Arg.
Path <<
" responding have!");
1333 xmsg[0].iov_base = (
char *)&Arg.
Request;
1334 xmsg[0].iov_len =
sizeof(Arg.
Request);
1335 xmsg[1].iov_base = Arg.
Buff;
1336 xmsg[1].iov_len = Arg.
Dlen;
1339 Link->
Send(xmsg, 2);
1351 static const SMask_t allNodes(~0);
1359 <<int(rP->Mod) <<
" rc=" <<rc <<
" path=" <<rP->
Path);
1360 Sel.Path.Hash = rP->
Sid;
1404 static const SMask_t allNodes(~0);
1412 {
DEBUGR(
"Path find failed for state " <<Arg.
Path);
1418 Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.bf = 0;
1436 {
if (retc < 0)
return 0;
1460 if (!retc || Sel.
Vec.bf != 0)
1483 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1484 {(
char *)&Zero,
sizeof(Zero)},
1485 {(
char *)&buff, 0}};
1495 {bytes = sprintf(buff,
"A %lld %lld %d",
1498 : theSpace.
wFree)) + 1;
1500 bytes = sprintf(buff,
"%d %d %d %d %d %d",
1504 }
else bytes =
strlcpy(buff,
"-1 -1 -1 -1 -1 -1",
sizeof(buff)) + 1;
1508 ioV[2].iov_len = bytes;
1509 bytes +=
sizeof(Zero);
1524 static const unsigned short szLen =
sizeof(
kXR_unt32);
1526 static int statsz = 0;
1527 static int statln = 0;
1528 static char *statbuff = 0;
1529 static time_t statlast = 0;
1532 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1533 {(
char *)&theSize,
sizeof(theSize)},
1541 if (!statsz || !statbuff)
1542 {statsz = Cluster.
Stats(0,0);
1543 statbuff = (
char *)malloc(statsz);
1544 theSize = htonl(statsz);
1550 {ioV[1].iov_len =
sizeof(theSize);
1561 if (statlast+9 >= tNow)
1562 {statln = Cluster.
Stats(statbuff, statsz); statlast = tNow;}
1566 ioV[2].iov_base = statbuff;
1567 ioV[2].iov_len = statln;
1568 Arg.
Request.
datalen = htons(
static_cast<unsigned short>(szLen+statln));
1590 const char *srvMsg, *stgMsg;
1596 int add2Activ, add2Stage, port;
1600 DEBUGR( (Reset ?
"reset " :
"")
1601 <<(Resume ?
"resume " : (Suspend ?
"suspend " :
""))
1602 <<(Stage ?
"stage " : (noStage ?
"nostage " :
"")));
1614 if (noStage) {add2Stage = -1;
isNoStage = 1; stgMsg=
"staging suspended";}
1615 else {add2Stage = 1;
isNoStage = 0; stgMsg=
"staging resumed";}
1616 else {add2Stage = 0; stgMsg = 0;}
1621 if (Suspend) {add2Activ = -1;
1622 Cluster.
SLock(
true,
false);
1624 Cluster.
SLock(
false);
1625 srvMsg=
"service suspended";
1628 else {add2Activ = 1;
1629 Cluster.
SLock(
true,
false);
1630 isBad &= ~isSuspend;
1631 Cluster.
SLock(
false);
1632 srvMsg=
"service resumed";
1633 stgMsg = (
isNoStage ?
"(no staging)" :
"(staging)");
1635 if (port && port != netIF.
Port())
1637 DEBUGR(
"set data port to " <<port);
1640 else {add2Activ = 0; srvMsg = 0;}
1644 if (
isOffline) {srvMsg =
"service offline"; stgMsg = 0;}
1645 else if (
isBad &
isDisabled) {srvMsg =
"service disabled"; stgMsg = 0;}
1646 else if (
isBad &
isBlisted ) {srvMsg =
"service blacklisted"; stgMsg = 0;}
1650 if (add2Activ || add2Stage)
1667 long long Size = -1;
1678 if (Size < 0 && !getSize(Arg.
Mode, Size))
return "invalid size";
1687 return (rc ? fsFail(Arg.
Ident,
"trunc", Arg.
Path, rc) : 0);
1713 return ".redirected";
1754 struct iovec xmsg[2];
1756 char respbuff[
sizeof(loadbuff)+2+
sizeof(
int)+2], *bp = respbuff;
1757 int blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk;
1761 maxfr =
Meter.
Report(pcpu, pnet, pxeq, pmem, ppag, pdsk);
1772 myLoad.
Hdr.
datalen = htons(
static_cast<unsigned short>(blen));
1774 xmsg[0].iov_base = (
char *)&myLoad;
1775 xmsg[0].iov_len =
sizeof(myLoad);
1776 xmsg[1].iov_base = respbuff;
1777 xmsg[1].iov_len = blen;
1778 if (lp) lp->
Send(xmsg, 2);
1783 DEBUG(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
1784 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk <<
' ' <<maxfr);
1800 {old_free = LastFree; LastFree =
DiskFree;}
1821int XrdCmsNode::fsExec(
XrdOucProg *Prog,
char *Arg1,
char *Arg2)
1824 char Pfn1[PfnSZ], Pfn2[PfnSZ];
1842 return Prog->
Run(Arg1, Arg2);
1849const char *XrdCmsNode::fsFail(
const char *Who,
const char *What,
1850 const char *
Path,
int rc)
1857 if (rc == fsL2PFail1) return "lfn2pfn path1 failed";
1858 if (rc == fsL2PFail2) return "lfn2pfn path2 failed";
1859 if (rc != ENOENT)
Say.Emsg("Node", rc, What,
Path);
1860 else {
struct {
const char *Ident;} Arg = {Who};
1861 DEBUGR(
"rc=" <<rc <<
' ' <<What <<
' ' <<
Path);
1870int XrdCmsNode::getMode(
const char *theMode, mode_t &
Mode)
1876 if (!(
Mode = strtol(theMode, &eP, 8)) || *eP || (
Mode >> 9))
return 0;
1884int XrdCmsNode::getSize(
const char *theSize,
long long &Size)
1890 if (!(Size = strtoll(theSize, &eP, 10)) || *eP)
return 0;
1907 {
if (!(slash = index(spos,
'/')))
return;
1908 acount--; spos = slash+1;
1921 if (Sel.
Path.
Val[i] ==
'/') i--;
1924 {
if (Sel.
Path.
Val[i] ==
'/' && !(++acount))
break;}
unsigned long long SMask_t
#define XrdCmsMAX_PATH_LEN
const char * XrdSysE2T(int errcode)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
void Bounce(SMask_t smask, int SNum)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int DelFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
XrdCmsNode * RemNode(XrdCmsNode *nP)
SMask_t getMask(const XrdNetAddr *addr)
void SLock(bool dolock, bool wrmode=true)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
static const int EReplete
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
static const int RetryErr
static const int Wait4CBk
int Stats(char *bfr, int bln)
XrdOucName2Name * lcl_N2N
void Add(const XrdNetAddr *netAddr, char *redList, int manport, int lvl)
static void Inform(const char *What, const char *Data, int Dlen)
void Record(int pcpu, int pnet, int pxeq, int pmem, int ppag, int pdsk)
int Report(int &pcpu, int &pnet, int &pxeq, int &pmem, int &ppag, int &pdsk)
int FreeSpace(int &tutil)
int calcLoad(uint32_t pcpu, uint32_t pio, uint32_t pload, uint32_t pmem, uint32_t ppag)
const char * do_PrepDel(XrdCmsRRData &Arg)
int do_StateFWD(XrdCmsRRData &Arg)
const char * do_Gone(XrdCmsRRData &Arg)
const char * do_Locate(XrdCmsRRData &Arg)
const char * do_Update(XrdCmsRRData &Arg)
const char * do_Try(XrdCmsRRData &Arg)
const char * do_State(XrdCmsRRData &Arg)
void Delete(XrdSysRWLock &gMutex)
static void do_StateDFS(XrdCmsBaseFR *rP, int rc)
const char * do_Space(XrdCmsRRData &Arg)
int do_SelAvoid(XrdCmsRRData &Arg, XrdCmsSelect &Sel, char *Avoid, bool &doRedir)
const char * do_Select(XrdCmsRRData &Arg)
const char * do_Mv(XrdCmsRRData &Arg)
const char * do_Trunc(XrdCmsRRData &Arg)
static void Report_Usage(XrdLink *lp)
const char * do_Usage(XrdCmsRRData &Arg)
const char * do_Chmod(XrdCmsRRData &Arg)
static const char isDisabled
const char * do_Load(XrdCmsRRData &Arg)
static int do_SelPrep(XrdCmsPrepArgs &Arg)
const char * do_Rm(XrdCmsRRData &Arg)
const char * do_PrepAdd(XrdCmsRRData &Arg)
const char * do_Ping(XrdCmsRRData &Arg)
const char * do_Have(XrdCmsRRData &Arg)
static const char isSuspend
const char * do_Stats(XrdCmsRRData &Arg)
const char * do_Disc(XrdCmsRRData &Arg)
const char * do_Avail(XrdCmsRRData &Arg)
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
void Disc(const char *reason=0, int needLock=1)
const char * do_Mkpath(XrdCmsRRData &Arg)
XrdCmsNode(XrdLink *lnkp, const char *theIF=0, const char *sid=0, int port=0, int lvl=0, int id=-1)
const char * do_Pong(XrdCmsRRData &Arg)
void setName(XrdLink *lnkp, const char *theIF, int port)
const char * do_Mkdir(XrdCmsRRData &Arg)
const char * do_StatFS(XrdCmsRRData &Arg)
const char * do_Rmdir(XrdCmsRRData &Arg)
static const char isDoomed
static const char isBlisted
const char * do_Status(XrdCmsRRData &Arg)
int Find(const char *pname, XrdCmsPInfo &masks)
void Inform(const char *cmd, XrdCmsPrepArgs *pargs)
struct XrdCmsSelect::@93 Resp
struct XrdCmsSelect::@92 Vec
void Update(StateType StateT, int ActivVal, int StageVal=0)
void sendState(XrdLink *Link)
int setEtext(const char *text)
int Close(bool defer=false)
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
const char * Host() const
const char * Name() const
int Same(const XrdNetAddrInfo *ipAddr, bool plusPort=false)
const char * Set(const char *hSpec, int pNum=PortInSpec)
bool SetIF(XrdNetAddrInfo *src, const char *ifList, int port=0, netType nettype=netDefault, const char *xName=0)
static bool InDomain(XrdNetAddrInfo *epaddr)
static const char * Name(ifType ifT)
ifType
The enum that is used to index into ifData to get appropriate interface.
static void Privatize(ifType &x)
virtual int Mkdir(const char *path, mode_t mode, int mkpath=0, XrdOucEnv *envP=0)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Remdir(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
static int Pack(struct iovec **, const char *, unsigned short &buff)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static const unsigned char kYR_Version