XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfcIO.hh"
22#include "XrdPfcTrace.hh"
23#include <cstdio>
24#include <sstream>
25#include <fcntl.h>
26#include <assert.h>
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
31#include "XrdSys/XrdSysTimer.hh"
32#include "XrdOss/XrdOss.hh"
33#include "XrdOuc/XrdOucEnv.hh"
35#include "XrdPfc.hh"
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_prefetch_state(kOff),
71 m_prefetch_bytes(0),
72 m_prefetch_read_cnt(0),
73 m_prefetch_hit_cnt(0),
74 m_prefetch_score(0)
75{}
76
77File::~File()
78{
79 if (m_info_file)
80 {
81 TRACEF(Debug, "~File() close info ");
82 m_info_file->Close();
83 delete m_info_file;
84 m_info_file = NULL;
85 }
86
87 if (m_data_file)
88 {
89 TRACEF(Debug, "~File() close output ");
90 m_data_file->Close();
91 delete m_data_file;
92 m_data_file = NULL;
93 }
94
95 TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
96}
97
98//------------------------------------------------------------------------------
99
100File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
101{
102 File *file = new File(path, offset, fileSize);
103 if ( ! file->Open())
104 {
105 delete file;
106 file = 0;
107 }
108 return file;
109}
110
111//------------------------------------------------------------------------------
112
114{
115 // Called from Cache::Unlink() when the file is currently open.
116 // Cache::Unlink is also called on FSync error and when wrong number of bytes
117 // is received from a remote read.
118 //
119 // From this point onward the file will not be written to, cinfo file will
120 // not be updated, and all new read requests will return -ENOENT.
121 //
122 // File's entry in the Cache's active map is set to nullptr and will be
123 // removed from there shortly, in any case, well before this File object
124 // shuts down. So we do not communicate to Cache about our destruction when
125 // it happens.
126
127 {
128 XrdSysCondVarHelper _lck(m_state_cond);
129
130 m_in_shutdown = true;
131
132 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
133 {
134 m_prefetch_state = kStopped;
135 cache()->DeRegisterPrefetchFile(this);
136 }
137 }
138
139}
140
141//------------------------------------------------------------------------------
142
144{
145 // Not locked, only used from Cache / Purge thread.
146
147 Stats delta = m_last_stats;
148
149 m_last_stats = m_stats.Clone();
150
151 delta.DeltaToReference(m_last_stats);
152
153 return delta;
154}
155
156//------------------------------------------------------------------------------
157
159{
160 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
161
162 XrdSysCondVarHelper _lck(m_state_cond);
163 dec_ref_count(b);
164}
165
166void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
167{
168 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
169
170 XrdSysCondVarHelper _lck(m_state_cond);
171
172 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
173 {
174 dec_ref_count(*i);
175 }
176}
177
178//------------------------------------------------------------------------------
179
181{
182 std::string loc(io->GetLocation());
183 XrdSysCondVarHelper _lck(m_state_cond);
184 insert_remote_location(loc);
185}
186
187//------------------------------------------------------------------------------
188
190{
191 // Returns true if delay is needed.
192
193 TRACEF(Debug, "ioActive start for io " << io);
194
195 std::string loc(io->GetLocation());
196
197 {
198 XrdSysCondVarHelper _lck(m_state_cond);
199
200 IoSet_i mi = m_io_set.find(io);
201
202 if (mi != m_io_set.end())
203 {
204 unsigned int n_active_reads = io->m_active_read_reqs;
205
206 TRACE(Info, "ioActive for io " << io <<
207 ", active_reads " << n_active_reads <<
208 ", active_prefetches " << io->m_active_prefetches <<
209 ", allow_prefetching " << io->m_allow_prefetching <<
210 ", ios_in_detach " << m_ios_in_detach);
211 TRACEF(Info,
212 "\tio_map.size() " << m_io_set.size() <<
213 ", block_map.size() " << m_block_map.size() << ", file");
214
215 insert_remote_location(loc);
216
217 io->m_allow_prefetching = false;
218 io->m_in_detach = true;
219
220 // Check if any IO is still available for prfetching. If not, stop it.
221 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
222 {
223 if ( ! select_current_io_or_disable_prefetching(false) )
224 {
225 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
226 }
227 }
228
229 // On last IO, consider write queue blocks. Note, this also contains
230 // blocks being prefetched.
231
232 bool io_active_result;
233
234 if (n_active_reads > 0)
235 {
236 io_active_result = true;
237 }
238 else if (m_io_set.size() - m_ios_in_detach == 1)
239 {
240 io_active_result = ! m_block_map.empty();
241 }
242 else
243 {
244 io_active_result = io->m_active_prefetches > 0;
245 }
246
247 if ( ! io_active_result)
248 {
249 ++m_ios_in_detach;
250 }
251
252 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
253
254 return io_active_result;
255 }
256 else
257 {
258 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
259 return false;
260 }
261 }
262}
263
264//------------------------------------------------------------------------------
265
267{
268 XrdSysCondVarHelper _lck(m_state_cond);
269 m_detach_time_logged = false;
270}
271
273{
274 // Returns true if sync is required.
275 // This method is called after corresponding IO is detached from PosixCache.
276
277 XrdSysCondVarHelper _lck(m_state_cond);
278 if ( ! m_in_shutdown)
279 {
280 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
281 {
282 Stats loc_stats = m_stats.Clone();
283 m_cfi.WriteIOStatDetach(loc_stats);
284 m_detach_time_logged = true;
285 m_in_sync = true;
286 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
287 return true;
288 }
289 }
290 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
291 return false;
292}
293
294//------------------------------------------------------------------------------
295
297{
298 // Called from Cache::GetFile() when a new IO asks for the file.
299
300 TRACEF(Debug, "AddIO() io = " << (void*)io);
301
302 time_t now = time(0);
303 std::string loc(io->GetLocation());
304
305 m_state_cond.Lock();
306
307 IoSet_i mi = m_io_set.find(io);
308
309 if (mi == m_io_set.end())
310 {
311 m_io_set.insert(io);
312 io->m_attach_time = now;
313 m_stats.IoAttach();
314
315 insert_remote_location(loc);
316
317 if (m_prefetch_state == kStopped)
318 {
319 m_prefetch_state = kOn;
320 cache()->RegisterPrefetchFile(this);
321 }
322 }
323 else
324 {
325 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
326 }
327
328 m_state_cond.UnLock();
329}
330
331//------------------------------------------------------------------------------
332
334{
335 // Called from Cache::ReleaseFile.
336
337 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
338
339 time_t now = time(0);
340
341 m_state_cond.Lock();
342
343 IoSet_i mi = m_io_set.find(io);
344
345 if (mi != m_io_set.end())
346 {
347 if (mi == m_current_io)
348 {
349 ++m_current_io;
350 }
351
352 m_stats.IoDetach(now - io->m_attach_time);
353 m_io_set.erase(mi);
354 --m_ios_in_detach;
355
356 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
357 {
358 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
359 m_prefetch_state = kStopped;
360 cache()->DeRegisterPrefetchFile(this);
361 }
362 }
363 else
364 {
365 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
366 }
367
368 m_state_cond.UnLock();
369}
370
371//------------------------------------------------------------------------------
372
373bool File::Open()
374{
375 // Sets errno accordingly.
376
377 static const char *tpfx = "Open() ";
378
379 TRACEF(Dump, tpfx << "open file for disk cache");
380
382
383 XrdOss &myOss = * Cache::GetInstance().GetOss();
384 const char *myUser = conf.m_username.c_str();
385 XrdOucEnv myEnv;
386 struct stat data_stat, info_stat;
387
388 std::string ifn = m_filename + Info::s_infoExtension;
389
390 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
391 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
392
393 // Create the data file itself.
394 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
395 myEnv.Put("oss.asize", size_str);
396 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
397
398 int res;
399
400 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
401 {
402 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
403 errno = -res;
404 return false;
405 }
406
407 m_data_file = myOss.newFile(myUser);
408 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
409 {
410 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
411 errno = -res;
412 delete m_data_file; m_data_file = 0;
413 return false;
414 }
415
416 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
417 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
418 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
419 {
420 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
421 errno = -res;
422 m_data_file->Close(); delete m_data_file; m_data_file = 0;
423 return false;
424 }
425
426 m_info_file = myOss.newFile(myUser);
427 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
428 {
429 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
430 errno = -res;
431 delete m_info_file; m_info_file = 0;
432 m_data_file->Close(); delete m_data_file; m_data_file = 0;
433 return false;
434 }
435
436 bool initialize_info_file = true;
437
438 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
439 {
440 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
441 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
442 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
443
444 // Check if data file exists and is of reasonable size.
445 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
446 {
447 initialize_info_file = false;
448 } else {
449 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
450 m_cfi.ResetAllAccessStats();
451 m_data_file->Ftruncate(0);
452 }
453 }
454
455 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
456 {
458 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
459 {
460 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
461 initialize_info_file = true;
462 m_cfi.ResetAllAccessStats();
463 m_data_file->Ftruncate(0);
464 } else {
465 // TODO: If the file is complete, we don't need to reset net cksums.
466 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
467 }
468 }
469
470 if (initialize_info_file)
471 {
472 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
473 m_cfi.SetCkSumState(conf.get_cs_Chk());
474 m_cfi.ResetNoCkSumTime();
475 m_cfi.Write(m_info_file, ifn.c_str());
476 m_info_file->Fsync();
477 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
478 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
479 }
480
481 m_cfi.WriteIOStatAttach();
482 m_state_cond.Lock();
483 m_block_size = m_cfi.GetBufferSize();
484 m_num_blocks = m_cfi.GetNBlocks();
485 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
486 m_state_cond.UnLock();
487
488 return true;
489}
490
491int File::Fstat(struct stat &sbuff)
492{
493 // Stat on an open file.
494 // Corrects size to actual full size of the file.
495 // Sets atime to 0 if the file is only partially downloaded, in accordance
496 // with pfc.onlyifcached settings.
497 // Called from IO::Fstat() and Cache::Stat() when the file is active.
498 // Returns 0 on success, -errno on error.
499
500 int res;
501
502 if ((res = m_data_file->Fstat(&sbuff))) return res;
503
504 sbuff.st_size = m_file_size;
505
506 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
507 if ( ! is_cached)
508 sbuff.st_atime = 0;
509
510 return 0;
511}
512
513//==============================================================================
514// Read and helpers
515//==============================================================================
516
517bool File::overlap(int blk, // block to query
518 long long blk_size, //
519 long long req_off, // offset of user request
520 int req_size, // size of user request
521 // output:
522 long long &off, // offset in user buffer
523 long long &blk_off, // offset in block
524 int &size) // size to copy
525{
526 const long long beg = blk * blk_size;
527 const long long end = beg + blk_size;
528 const long long req_end = req_off + req_size;
529
530 if (req_off < end && req_end > beg)
531 {
532 const long long ovlp_beg = std::max(beg, req_off);
533 const long long ovlp_end = std::min(end, req_end);
534
535 off = ovlp_beg - req_off;
536 blk_off = ovlp_beg - beg;
537 size = (int) (ovlp_end - ovlp_beg);
538
539 assert(size <= blk_size);
540 return true;
541 }
542 else
543 {
544 return false;
545 }
546}
547
548//------------------------------------------------------------------------------
549
550Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
551{
552 // Must be called w/ state_cond locked.
553 // Checks on size etc should be done before.
554 //
555 // Reference count is 0 so increase it in calling function if you want to
556 // catch the block while still in memory.
557
558 const long long off = i * m_block_size;
559 const int last_block = m_num_blocks - 1;
560 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
561
562 int blk_size, req_size;
563 if (i == last_block) {
564 blk_size = req_size = m_file_size - off;
565 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
566 } else {
567 blk_size = req_size = m_block_size;
568 }
569
570 Block *b = 0;
571 char *buf = cache()->RequestRAM(req_size);
572
573 if (buf)
574 {
575 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
576
577 if (b)
578 {
579 m_block_map[i] = b;
580
581 // Actual Read request is issued in ProcessBlockRequests().
582
583 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
584 {
585 m_prefetch_state = kHold;
586 cache()->DeRegisterPrefetchFile(this);
587 }
588 }
589 else
590 {
591 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
592 }
593 }
594
595 return b;
596}
597
598void File::ProcessBlockRequest(Block *b)
599{
600 // This *must not* be called with block_map locked.
601
603
604 if (XRD_TRACE What >= TRACE_Dump) {
605 char buf[256];
606 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
607 b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
608 TRACEF(Dump, "ProcessBlockRequest() " << buf);
609 }
610
611 if (b->req_cksum_net())
612 {
613 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
614 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
615 } else {
616 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
617 }
618}
619
620void File::ProcessBlockRequests(BlockList_t& blks)
621{
622 // This *must not* be called with block_map locked.
623
624 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
625 {
626 ProcessBlockRequest(*bi);
627 }
628}
629
630//------------------------------------------------------------------------------
631
632void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
633{
634 int n_chunks = ioVec.size();
635 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
636
637 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
638 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
639
640 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
641
642 int pos = 0;
643 while (n_chunks > XrdProto::maxRvecsz) {
644 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
645 pos += XrdProto::maxRvecsz;
646 n_chunks -= XrdProto::maxRvecsz;
647 }
648 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
649}
650
651//------------------------------------------------------------------------------
652
653int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
654{
655 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
656
657 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
658
659 if (rs < 0)
660 {
661 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
662 return rs;
663 }
664
665 if (rs != expected_size)
666 {
667 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
668 return -EIO;
669 }
670
671 return (int) rs;
672}
673
674//------------------------------------------------------------------------------
675
676int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
677{
678 // rrc_func is ONLY called from async processing.
679 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
680 // This streamlines implementation of synchronous IO::Read().
681
682 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
683
684 m_state_cond.Lock();
685
686 if (m_in_shutdown || io->m_in_detach)
687 {
688 m_state_cond.UnLock();
689 return m_in_shutdown ? -ENOENT : -EBADF;
690 }
691
692 // Shortcut -- file is fully downloaded.
693
694 if (m_cfi.IsComplete())
695 {
696 m_state_cond.UnLock();
697 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
698 if (ret > 0) m_stats.AddBytesHit(ret);
699 return ret;
700 }
701
702 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
703
704 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
705}
706
707//------------------------------------------------------------------------------
708
709int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
710{
711 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
712
713 m_state_cond.Lock();
714
715 if (m_in_shutdown || io->m_in_detach)
716 {
717 m_state_cond.UnLock();
718 return m_in_shutdown ? -ENOENT : -EBADF;
719 }
720
721 // Shortcut -- file is fully downloaded.
722
723 if (m_cfi.IsComplete())
724 {
725 m_state_cond.UnLock();
726 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
727 if (ret > 0) m_stats.AddBytesHit(ret);
728 return ret;
729 }
730
731 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
732}
733
734//------------------------------------------------------------------------------
735
736int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
737 ReadReqRH *rh, const char *tpfx)
738{
739 // Non-trivial processing for Read and ReadV.
740 // Entered under lock.
741 //
742 // loop over reqired blocks:
743 // - if on disk, ok;
744 // - if in ram or incoming, inc ref-count
745 // - otherwise request and inc ref count (unless RAM full => request direct)
746 // unlock
747
748 int prefetch_cnt = 0;
749
750 ReadRequest *read_req = nullptr;
751 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
752
753 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
754
755 std::vector<XrdOucIOVec> iovec_disk;
756 std::vector<XrdOucIOVec> iovec_direct;
757 int iovec_disk_total = 0;
758 int iovec_direct_total = 0;
759
760 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
761 {
762 const XrdOucIOVec &iov = readV[iov_idx];
763 long long iUserOff = iov.offset;
764 int iUserSize = iov.size;
765 char *iUserBuff = iov.data;
766
767 const int idx_first = iUserOff / m_block_size;
768 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
769
770 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
771
772 enum LastBlock_e { LB_other, LB_disk, LB_direct };
773
774 LastBlock_e lbe = LB_other;
775
776 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
777 {
778 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
779 BlockMap_i bi = m_block_map.find(block_idx);
780
781 // overlap and read
782 long long off; // offset in user buffer
783 long long blk_off; // offset in block
784 int size; // size to copy
785
786 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
787
788 // In RAM or incoming?
789 if (bi != m_block_map.end())
790 {
791 inc_ref_count(bi->second);
792 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
793
794 if (bi->second->is_finished())
795 {
796 // note, blocks with error should not be here !!!
797 // they should be either removed or reissued in ProcessBlockResponse()
798 assert(bi->second->is_ok());
799
800 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
801
802 if (bi->second->m_prefetch)
803 ++prefetch_cnt;
804 }
805 else
806 {
807 if ( ! read_req)
808 read_req = new ReadRequest(io, rh);
809
810 // We have a lock on state_cond --> as we register the request before releasing the lock,
811 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
812
813 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
814 ++read_req->m_n_chunk_reqs;
815 }
816
817 lbe = LB_other;
818 }
819 // On disk?
820 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
821 {
822 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
823
824 if (lbe == LB_disk)
825 iovec_disk.back().size += size;
826 else
827 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
828 iovec_disk_total += size;
829
830 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
831 ++prefetch_cnt;
832
833 lbe = LB_disk;
834 }
835 // Neither ... then we have to go get it ...
836 else
837 {
838 if ( ! read_req)
839 read_req = new ReadRequest(io, rh);
840
841 // Is there room for one more RAM Block?
842 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
843 if (b)
844 {
845 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
846 inc_ref_count(b);
847 blks_to_request.push_back(b);
848
849 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
850 ++read_req->m_n_chunk_reqs;
851
852 lbe = LB_other;
853 }
854 else // Nope ... read this directly without caching.
855 {
856 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
857
858 iovec_direct_total += size;
859 read_req->m_direct_done = false;
860
861 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
862 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
863 // is determined in the RequestBlocksDirect().
864 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
865 iovec_direct.back().size += size;
866 } else {
867 long long in_offset = block_idx * m_block_size + blk_off;
868 char *out_pos = iUserBuff + off;
869 while (size > XrdProto::maxRVdsz) {
870 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
871 in_offset += XrdProto::maxRVdsz;
872 out_pos += XrdProto::maxRVdsz;
873 size -= XrdProto::maxRVdsz;
874 }
875 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
876 }
877
878 lbe = LB_direct;
879 }
880 }
881 } // end for over blocks in an IOVec
882 } // end for over readV IOVec
883
884 inc_prefetch_hit_cnt(prefetch_cnt);
885
886 m_state_cond.UnLock();
887
888 // First, send out remote requests for new blocks.
889 if ( ! blks_to_request.empty())
890 {
891 ProcessBlockRequests(blks_to_request);
892 blks_to_request.clear();
893 }
894
895 // Second, send out remote direct read requests.
896 if ( ! iovec_direct.empty())
897 {
898 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
899
900 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
901 }
902
903 // Begin synchronous part where we process data that is already in RAM or on disk.
904
905 long long bytes_read = 0;
906 int error_cond = 0; // to be set to -errno
907
908 // Third, process blocks that are available in RAM.
909 if ( ! blks_ready.empty())
910 {
911 for (auto &bvi : blks_ready)
912 {
913 for (auto &cr : bvi.second)
914 {
915 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
916 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
917 bytes_read += cr.m_size;
918 }
919 }
920 }
921
922 // Fourth, read blocks from disk.
923 if ( ! iovec_disk.empty())
924 {
925 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
926 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
927 if (rc >= 0)
928 {
929 bytes_read += rc;
930 }
931 else
932 {
933 error_cond = rc;
934 TRACEF(Error, tpfx << "failed read from disk");
935 }
936 }
937
938 // End synchronous part -- update with sync stats and determine actual state of this read.
939 // Note: remote reads might have already finished during disk-read!
940
941 m_state_cond.Lock();
942
943 for (auto &bvi : blks_ready)
944 dec_ref_count(bvi.first, (int) bvi.second.size());
945
946 if (read_req)
947 {
948 read_req->m_bytes_read += bytes_read;
949 if (error_cond)
950 read_req->update_error_cond(error_cond);
951 read_req->m_stats.m_BytesHit += bytes_read;
952 read_req->m_sync_done = true;
953
954 if (read_req->is_complete())
955 {
956 // Almost like FinalizeReadRequest(read_req) -- but no callout!
957 m_state_cond.UnLock();
958
959 m_stats.AddReadStats(read_req->m_stats);
960
961 int ret = read_req->return_value();
962 delete read_req;
963 return ret;
964 }
965 else
966 {
967 m_state_cond.UnLock();
968 return -EWOULDBLOCK;
969 }
970 }
971 else
972 {
973 m_stats.m_BytesHit += bytes_read;
974 m_state_cond.UnLock();
975
976 // !!! No callout.
977
978 return error_cond ? error_cond : bytes_read;
979 }
980}
981
982
983//==============================================================================
984// WriteBlock and Sync
985//==============================================================================
986
988{
989 // write block buffer into disk file
990 long long offset = b->m_offset - m_offset;
991 long long size = b->get_size();
992 ssize_t retval;
993
994 if (m_cfi.IsCkSumCache())
995 if (b->has_cksums())
996 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
997 else
998 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
999 else
1000 retval = m_data_file->Write(b->get_buff(), offset, size);
1001
1002 if (retval < size)
1003 {
1004 if (retval < 0)
1005 {
1006 GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
1007 }
1008 else
1009 {
1010 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1011 }
1012
1013 XrdSysCondVarHelper _lck(m_state_cond);
1014
1015 dec_ref_count(b);
1016
1017 return;
1018 }
1019
1020 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1021
1022 // Set written bit.
1023 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1024
1025 bool schedule_sync = false;
1026 {
1027 XrdSysCondVarHelper _lck(m_state_cond);
1028
1029 m_cfi.SetBitWritten(blk_idx);
1030
1031 if (b->m_prefetch)
1032 {
1033 m_cfi.SetBitPrefetch(blk_idx);
1034 }
1035 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1036 {
1037 m_cfi.ResetCkSumNet();
1038 }
1039
1040 dec_ref_count(b);
1041
1042 // Set synced bit or stash block index if in actual sync.
1043 // Synced state is only written out to cinfo file when data file is synced.
1044 if (m_in_sync)
1045 {
1046 m_writes_during_sync.push_back(blk_idx);
1047 }
1048 else
1049 {
1050 m_cfi.SetBitSynced(blk_idx);
1051 ++m_non_flushed_cnt;
1052 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1053 ! m_in_shutdown)
1054 {
1055 schedule_sync = true;
1056 m_in_sync = true;
1057 m_non_flushed_cnt = 0;
1058 }
1059 }
1060 }
1061
1062 if (schedule_sync)
1063 {
1064 cache()->ScheduleFileSync(this);
1065 }
1066}
1067
1068//------------------------------------------------------------------------------
1069
1071{
1072 TRACEF(Dump, "Sync()");
1073
1074 int ret = m_data_file->Fsync();
1075 bool errorp = false;
1076 if (ret == XrdOssOK)
1077 {
1078 Stats loc_stats = m_stats.Clone();
1079 m_cfi.WriteIOStat(loc_stats);
1080 m_cfi.Write(m_info_file, m_filename.c_str());
1081 int cret = m_info_file->Fsync();
1082 if (cret != XrdOssOK)
1083 {
1084 TRACEF(Error, "Sync cinfo file sync error " << cret);
1085 errorp = true;
1086 }
1087 }
1088 else
1089 {
1090 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1091 errorp = true;
1092 }
1093
1094 if (errorp)
1095 {
1096 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1097
1098 // Unlink will also call this->initiate_emergency_shutdown()
1099 Cache::GetInstance().UnlinkFile(m_filename, false);
1100
1101 XrdSysCondVarHelper _lck(&m_state_cond);
1102
1103 m_writes_during_sync.clear();
1104 m_in_sync = false;
1105
1106 return;
1107 }
1108
1109 int written_while_in_sync;
1110 bool resync = false;
1111 {
1112 XrdSysCondVarHelper _lck(&m_state_cond);
1113 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1114 {
1115 m_cfi.SetBitSynced(*i);
1116 }
1117 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1118 m_writes_during_sync.clear();
1119
1120 // If there were writes during sync and the file is now complete,
1121 // let us call Sync again without resetting the m_in_sync flag.
1122 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1123 resync = true;
1124 else
1125 m_in_sync = false;
1126 }
1127 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1128
1129 if (resync)
1130 Sync();
1131}
1132
1133
1134//==============================================================================
1135// Block processing
1136//==============================================================================
1137
1138void File::free_block(Block* b)
1139{
1140 // Method always called under lock.
1141 int i = b->m_offset / m_block_size;
1142 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1143 size_t ret = m_block_map.erase(i);
1144 if (ret != 1)
1145 {
1146 // assert might be a better option than a warning
1147 TRACEF(Error, "free_block did not erase " << i << " from map");
1148 }
1149 else
1150 {
1151 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1152 delete b;
1153 }
1154
1155 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1156 {
1157 m_prefetch_state = kOn;
1158 cache()->RegisterPrefetchFile(this);
1159 }
1160}
1161
1162//------------------------------------------------------------------------------
1163
1164bool File::select_current_io_or_disable_prefetching(bool skip_current)
1165{
1166 // Method always called under lock. It also expects prefetch to be active.
1167
1168 int io_size = (int) m_io_set.size();
1169 bool io_ok = false;
1170
1171 if (io_size == 1)
1172 {
1173 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1174 if (io_ok)
1175 {
1176 m_current_io = m_io_set.begin();
1177 }
1178 }
1179 else if (io_size > 1)
1180 {
1181 IoSet_i mi = m_current_io;
1182 if (skip_current && mi != m_io_set.end()) ++mi;
1183
1184 for (int i = 0; i < io_size; ++i)
1185 {
1186 if (mi == m_io_set.end()) mi = m_io_set.begin();
1187
1188 if ((*mi)->m_allow_prefetching)
1189 {
1190 m_current_io = mi;
1191 io_ok = true;
1192 break;
1193 }
1194 ++mi;
1195 }
1196 }
1197
1198 if ( ! io_ok)
1199 {
1200 m_current_io = m_io_set.end();
1201 m_prefetch_state = kStopped;
1202 cache()->DeRegisterPrefetchFile(this);
1203 }
1204
1205 return io_ok;
1206}
1207
1208//------------------------------------------------------------------------------
1209
1210void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1211{
1212 // Called from DirectResponseHandler.
1213 // NOT under lock.
1214
1215 if (error_cond)
1216 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1217
1218 m_state_cond.Lock();
1219
1220 if (error_cond)
1221 rreq->update_error_cond(error_cond);
1222 else {
1223 rreq->m_stats.m_BytesBypassed += bytes_read;
1224 rreq->m_bytes_read += bytes_read;
1225 }
1226
1227 rreq->m_direct_done = true;
1228
1229 bool rreq_complete = rreq->is_complete();
1230
1231 m_state_cond.UnLock();
1232
1233 if (rreq_complete)
1234 FinalizeReadRequest(rreq);
1235}
1236
1237void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1238{
1239 // Called from ProcessBlockResponse().
1240 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1241 // Does not manage m_read_req.
1242 // Will not complete the request.
1243
1244 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1245 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1246
1247 rreq->update_error_cond(b->get_error());
1248 --rreq->m_n_chunk_reqs;
1249
1250 dec_ref_count(b);
1251}
1252
1253void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1254{
1255 // Called from ProcessBlockResponse().
1256 // NOT under lock as it does memcopy ofor exisf block data.
1257 // Acquires lock for block, m_read_req and rreq state update.
1258
1259 ReadRequest *rreq = creq.m_read_req;
1260
1261 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1262 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1263
1264 m_state_cond.Lock();
1265
1266 rreq->m_bytes_read += creq.m_size;
1267
1268 if (b->get_req_id() == (void*) rreq)
1269 rreq->m_stats.m_BytesMissed += creq.m_size;
1270 else
1271 rreq->m_stats.m_BytesHit += creq.m_size;
1272
1273 --rreq->m_n_chunk_reqs;
1274
1275 if (b->m_prefetch)
1276 inc_prefetch_hit_cnt(1);
1277
1278 dec_ref_count(b);
1279
1280 bool rreq_complete = rreq->is_complete();
1281
1282 m_state_cond.UnLock();
1283
1284 if (rreq_complete)
1285 FinalizeReadRequest(rreq);
1286}
1287
1288void File::FinalizeReadRequest(ReadRequest *rreq)
1289{
1290 // called from ProcessBlockResponse()
1291 // NOT under lock -- does callout
1292
1293 m_stats.AddReadStats(rreq->m_stats);
1294
1295 rreq->m_rh->Done(rreq->return_value());
1296 delete rreq;
1297}
1298
1299void File::ProcessBlockResponse(Block *b, int res)
1300{
1301 static const char* tpfx = "ProcessBlockResponse ";
1302
1303 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1304
1305 if (res >= 0 && res != b->get_size())
1306 {
1307 // Incorrect number of bytes received, apparently size of the file on the remote
1308 // is different than what the cache expects it to be.
1309 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1310 Cache::GetInstance().UnlinkFile(m_filename, false);
1311 }
1312
1313 m_state_cond.Lock();
1314
1315 // Deregister block from IO's prefetch count, if needed.
1316 if (b->m_prefetch)
1317 {
1318 IO *io = b->get_io();
1319 IoSet_i mi = m_io_set.find(io);
1320 if (mi != m_io_set.end())
1321 {
1322 --io->m_active_prefetches;
1323
1324 // If failed and IO is still prefetching -- disable prefetching on this IO.
1325 if (res < 0 && io->m_allow_prefetching)
1326 {
1327 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1328 io->m_allow_prefetching = false;
1329
1330 // Check if any IO is still available for prfetching. If not, stop it.
1331 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1332 {
1333 if ( ! select_current_io_or_disable_prefetching(false) )
1334 {
1335 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1336 }
1337 }
1338 }
1339
1340 // If failed with no subscribers -- delete the block and exit.
1341 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1342 {
1343 free_block(b);
1344 m_state_cond.UnLock();
1345 return;
1346 }
1347 m_prefetch_bytes += b->get_size();
1348 }
1349 else
1350 {
1351 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1352 }
1353 }
1354
1355 if (res == b->get_size())
1356 {
1357 b->set_downloaded();
1358 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1359 if ( ! m_in_shutdown)
1360 {
1361 // Increase ref-count for the writer.
1362 inc_ref_count(b);
1363 m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1364 cache()->AddWriteTask(b, true);
1365 }
1366
1367 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1368 vChunkRequest_t creqs_to_notify;
1369 creqs_to_notify.swap( b->m_chunk_reqs );
1370
1371 m_state_cond.UnLock();
1372
1373 for (auto &creq : creqs_to_notify)
1374 {
1375 ProcessBlockSuccess(b, creq);
1376 }
1377 }
1378 else
1379 {
1380 if (res < 0) {
1381 bool new_error = b->get_io()->register_block_error(res);
1382 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1383 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1384 << ", io=" << b->get_io() << ", error=" << res);
1385 } else {
1386 bool first_p = b->get_io()->register_incomplete_read();
1387 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1388 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1389 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1390#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1391 res = -EIO;
1392#else
1393 res = -EREMOTEIO;
1394#endif
1395 }
1396 b->set_error(res);
1397
1398 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1399 // Collect others with a different IO, the first of them will be used to reissue the request.
1400 // This is then done outside of lock.
1401 std::list<ReadRequest*> rreqs_to_complete;
1402 vChunkRequest_t creqs_to_keep;
1403
1404 for(ChunkRequest &creq : b->m_chunk_reqs)
1405 {
1406 ReadRequest *rreq = creq.m_read_req;
1407
1408 if (rreq->m_io == b->get_io())
1409 {
1410 ProcessBlockError(b, rreq);
1411 if (rreq->is_complete())
1412 {
1413 rreqs_to_complete.push_back(rreq);
1414 }
1415 }
1416 else
1417 {
1418 creqs_to_keep.push_back(creq);
1419 }
1420 }
1421
1422 bool reissue = false;
1423 if ( ! creqs_to_keep.empty())
1424 {
1425 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1426
1427 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1428 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1429
1430 b->reset_error_and_set_io(rreq->m_io, rreq);
1431 b->m_chunk_reqs.swap( creqs_to_keep );
1432 reissue = true;
1433 }
1434
1435 m_state_cond.UnLock();
1436
1437 for (auto rreq : rreqs_to_complete)
1438 FinalizeReadRequest(rreq);
1439
1440 if (reissue)
1441 ProcessBlockRequest(b);
1442 }
1443}
1444
1445//------------------------------------------------------------------------------
1446
1447const char* File::lPath() const
1448{
1449 return m_filename.c_str();
1450}
1451
1452//------------------------------------------------------------------------------
1453
1454int File::offsetIdx(int iIdx) const
1455{
1456 return iIdx - m_offset/m_block_size;
1457}
1458
1459
1460//------------------------------------------------------------------------------
1461
1463{
1464 // Check that block is not on disk and not in RAM.
1465 // TODO: Could prefetch several blocks at once!
1466 // blks_max could be an argument
1467
1468 BlockList_t blks;
1469
1470 TRACEF(DumpXL, "Prefetch() entering.");
1471 {
1472 XrdSysCondVarHelper _lck(m_state_cond);
1473
1474 if (m_prefetch_state != kOn)
1475 {
1476 return;
1477 }
1478
1479 if ( ! select_current_io_or_disable_prefetching(true) )
1480 {
1481 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1482 return;
1483 }
1484
1485 // Select block(s) to fetch.
1486 for (int f = 0; f < m_num_blocks; ++f)
1487 {
1488 if ( ! m_cfi.TestBitWritten(f))
1489 {
1490 int f_act = f + m_offset / m_block_size;
1491
1492 BlockMap_i bi = m_block_map.find(f_act);
1493 if (bi == m_block_map.end())
1494 {
1495 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1496 if (b)
1497 {
1498 TRACEF(Dump, "Prefetch take block " << f_act);
1499 blks.push_back(b);
1500 // Note: block ref_cnt not increased, it will be when placed into write queue.
1501
1502 inc_prefetch_read_cnt(1);
1503 }
1504 else
1505 {
1506 // This shouldn't happen as prefetching stops when RAM is 70% full.
1507 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1508 }
1509 break;
1510 }
1511 }
1512 }
1513
1514 if (blks.empty())
1515 {
1516 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1517 m_prefetch_state = kComplete;
1518 cache()->DeRegisterPrefetchFile(this);
1519 }
1520 else
1521 {
1522 (*m_current_io)->m_active_prefetches += (int) blks.size();
1523 }
1524 }
1525
1526 if ( ! blks.empty())
1527 {
1528 ProcessBlockRequests(blks);
1529 }
1530}
1531
1532
1533//------------------------------------------------------------------------------
1534
1536{
1537 return m_prefetch_score;
1538}
1539
1541{
1542 return Cache::GetInstance().GetLog();
1543}
1544
1549
1550void File::insert_remote_location(const std::string &loc)
1551{
1552 if ( ! loc.empty())
1553 {
1554 size_t p = loc.find_first_of('@');
1555 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1556 }
1557}
1558
1559std::string File::GetRemoteLocations() const
1560{
1561 std::string s;
1562 if ( ! m_remote_locations.empty())
1563 {
1564 size_t sl = 0;
1565 int nl = 0;
1566 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1567 {
1568 sl += i->size();
1569 }
1570 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1571 s = '[';
1572 int j = 1;
1573 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1574 {
1575 s += '"'; s += *i; s += '"';
1576 if (j < nl) s += ',';
1577 }
1578 s += ']';
1579 }
1580 else
1581 {
1582 s = "[]";
1583 }
1584 return s;
1585}
1586
1587//==============================================================================
1588//======================= RESPONSE HANDLERS ==============================
1589//==============================================================================
1590
1592{
1593 m_block->m_file->ProcessBlockResponse(m_block, res);
1594 delete this;
1595}
1596
1597//------------------------------------------------------------------------------
1598
1600{
1601 m_mutex.Lock();
1602
1603 int n_left = --m_to_wait;
1604
1605 if (res < 0) {
1606 if (m_errno == 0) m_errno = res; // store first reported error
1607 } else {
1608 m_bytes_read += res;
1609 }
1610
1611 m_mutex.UnLock();
1612
1613 if (n_left == 0)
1614 {
1615 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1616 delete this;
1617 }
1618}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:96
#define XRD_TRACE
bool Debug
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:319
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:402
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1179
XrdOss * GetOss() const
Definition XrdPfc.hh:389
XrdSysError * GetLog()
Definition XrdPfc.hh:401
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
bool register_incomplete_read()
Definition XrdPfcIO.hh:92
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
bool register_block_error(int res)
Definition XrdPfcIO.hh:95
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:72
const char * GetLocation()
Definition XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
void ResetCkSumNet()
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
bool IsCkSumNet() const
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void ResetNoCkSumTime()
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
XrdSysTrace * GetTrace()
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition XrdPfcFile.hh:92
bool is_complete() const
Definition XrdPfcFile.hh:94
int return_value() const
Definition XrdPfcFile.hh:95
long long m_bytes_read
Definition XrdPfcFile.hh:79