Skip to content

Commit cd89286

Browse files
committed
Add meta-output-dir to aggregate workflow
1 parent bfad3df commit cd89286

1 file changed

Lines changed: 70 additions & 22 deletions

File tree

Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <unordered_map>
2424
#include <vector>
2525
#include <fmt/format.h>
26+
#include <filesystem>
27+
#include <fstream>
2628
#include "TMemFile.h"
2729
#include "TParameter.h"
2830
#include "Framework/Task.h"
@@ -45,6 +47,7 @@
4547
#include "DetectorsBase/GRPGeomHelper.h"
4648
#include "MemoryResources/MemoryResources.h"
4749
#include "CommonUtils/StringUtils.h"
50+
#include "DetectorsCommonDataFormats/FileMetaData.h"
4851

4952
using namespace o2::framework;
5053
using o2::header::gDataOriginTPC;
@@ -91,9 +94,13 @@ class TPCAggregateCMVDevice : public o2::framework::Task
9194
{
9295
o2::base::GRPGeomHelper::instance().setRequest(mCCDBRequest);
9396
mOutputDir = ic.options().get<std::string>("output-dir");
94-
if ((mOutputDir != "none") && (mOutputDir != "/dev/null")) {
97+
if (mOutputDir != "/dev/null") {
9598
mOutputDir = o2::utils::Str::rectifyDirectory(mOutputDir);
9699
}
100+
mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
101+
if (mMetaFileDir != "/dev/null") {
102+
mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
103+
}
97104
mUseCompressionVarint = ic.options().get<bool>("use-compression-varint");
98105
mUseSparse = ic.options().get<bool>("use-sparse");
99106
mUseCompressionHuffman = ic.options().get<bool>("use-compression-huffman");
@@ -128,6 +135,15 @@ class TPCAggregateCMVDevice : public o2::framework::Task
128135
return;
129136
}
130137

138+
if (mSetDataTakingCont) {
139+
mDataTakingContext = pc.services().get<DataTakingContext>();
140+
mSetDataTakingCont = false;
141+
}
142+
143+
if (!mRun) {
144+
mRun = processing_helpers::getRunNumber(pc);
145+
}
146+
131147
const auto currTF = processing_helpers::getCurrentTF(pc);
132148

133149
if (mTFFirst == -1) {
@@ -252,13 +268,16 @@ class TPCAggregateCMVDevice : public o2::framework::Task
252268
CMVPerTFCompressed compressed{};
253269
};
254270

255-
const int mLaneId{0}; ///< aggregate lane index (matches the distribute output lane)
256-
std::vector<uint32_t> mCRUs{}; ///< CRUs expected on this lane (sorted for binary_search)
257-
const unsigned int mTimeFrames{}; ///< number of CMV batches per calibration interval (= total TFs / nTFsBuffer)
258-
const bool mSendCCDB{false}; ///< send serialised TTree to the CCDB populator
259-
const bool mUsePreciseTimestamp{false}; ///< use orbit-reset info forwarded by the distribute lane for precise CCDB timestamps
260-
const int mNTFsBuffer{1}; ///< number of real TFs packed into one CMV batch (must match TPCFLPCMVSpec)
261-
std::string mOutputDir{"none"}; ///< directory to write local ROOT files ("none" or "/dev/null" to disable)
271+
const int mLaneId{0}; ///< aggregate lane index (matches the distribute output lane)
272+
std::vector<uint32_t> mCRUs{}; ///< CRUs expected on this lane (sorted for binary_search)
273+
const unsigned int mTimeFrames{}; ///< number of CMV batches per calibration interval (= total TFs / nTFsBuffer)
274+
const bool mSendCCDB{false}; ///< send serialised TTree to the CCDB populator
275+
const bool mUsePreciseTimestamp{false}; ///< use orbit-reset info forwarded by the distribute lane for precise CCDB timestamps
276+
const int mNTFsBuffer{1}; ///< number of real TFs packed into one CMV batch (must match TPCFLPCMVSpec)
277+
std::string mOutputDir{}; ///< directory to write local ROOT files ("/dev/null" to disable)
278+
std::string mMetaFileDir{}; ///< directory to write calibration metadata files ("/dev/null" to disable)
279+
o2::framework::DataTakingContext mDataTakingContext{};
280+
bool mSetDataTakingCont{true}; ///< flag to capture DataTakingContext only once
262281
bool mUseCompressionVarint{false}; ///< delta+zigzag+varint compression for all values (dense path); combined with mUseSparse → sparse+varint
263282
bool mUseSparse{false}; ///< sparse encoding (skip zero time bins); alone = raw uint16; combined with varint/Huffman → sparse+compressed
264283
bool mUseCompressionHuffman{false}; ///< Huffman encoding; combined with mUseSparse → sparse+Huffman
@@ -285,6 +304,10 @@ class TPCAggregateCMVDevice : public o2::framework::Task
285304
uint32_t mLastOrbitStep{0}; ///< cached orbit stride from the last complete batch; fallback for the EOS partial batch
286305
uint32_t mLastSeenTF{0}; ///< last TF counter seen in run(); used to compute lastTF metadata in the TTree
287306
unsigned int mIntervalTFCount{0}; ///< number of TTree entries filled for the current interval
307+
uint64_t mRun{0}; ///< run number, captured once per run
308+
uint32_t mIntervalFirstOrbit{0}; ///< first orbit of the first TF in the current interval
309+
uint32_t mIntervalLastOrbit{0}; ///< first orbit of the last TF in the current interval
310+
bool mIntervalOrbitSet{false}; ///< true once first orbit has been captured for the current interval
288311
dataformats::Pair<long, int> mTFInfo{}; ///< orbit-reset time (ms) and NHBFPerTF forwarded by distribute lane 0 for precise timestamps
289312
std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest; ///< GRPECS request so GRPGeomHelper::getNHBFPerTF() is valid in this process
290313
std::unique_ptr<TTree> mIntervalTree{}; ///< in-memory TTree accumulating one entry per real TF; serialised to CCDB/disk at interval end
@@ -457,6 +480,11 @@ class TPCAggregateCMVDevice : public o2::framework::Task
457480

458481
const auto firstOrbit = static_cast<uint32_t>(orbitInfo >> 32);
459482
const auto firstBC = static_cast<uint16_t>(orbitInfo & 0xFFFFu);
483+
if (!mIntervalOrbitSet) {
484+
mIntervalFirstOrbit = firstOrbit;
485+
mIntervalOrbitSet = true;
486+
}
487+
mIntervalLastOrbit = firstOrbit + static_cast<uint32_t>(nTFsInBatch - 1) * orbitStep;
460488
const uint8_t flags = buildCompressionFlags();
461489
std::vector<PreparedTF> prepared(nTFsInBatch);
462490
const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch));
@@ -533,27 +561,43 @@ class TPCAggregateCMVDevice : public o2::framework::Task
533561
LOGP(detail, "CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF);
534562
auto start = timer::now();
535563

536-
const bool writeToDisk = (mOutputDir != "none") && (mOutputDir != "/dev/null");
537-
if (writeToDisk) {
538-
const std::string fname = fmt::format("{}CMV_timestamp{}.root", mOutputDir, mTimestampStart);
564+
const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
565+
const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
566+
567+
if (mOutputDir != "/dev/null") {
568+
const std::string calibFName = fmt::format("CMV_run_{}_orbit_{}_{}_timestamp_{}_{}.root",
569+
mRun, mIntervalFirstOrbit, mIntervalLastOrbit, mTimestampStart, timeStampEnd);
539570
try {
540-
CMVPerTF::writeToFile(fname, mIntervalTree);
541-
LOGP(detail, "CMV file written to {}", fname);
571+
CMVPerTF::writeToFile(mOutputDir + calibFName, mIntervalTree);
572+
LOGP(detail, "CMV file written to {}", mOutputDir + calibFName);
542573
} catch (const std::exception& e) {
543-
LOGP(error, "Failed to write CMV file {}: {}", fname, e.what());
574+
LOGP(error, "Failed to write CMV file {}: {}", mOutputDir + calibFName, e.what());
544575
}
545-
}
546576

547-
if (!mSendCCDB) {
548-
if (!writeToDisk) {
549-
LOGP(warning, "Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
577+
if (mMetaFileDir != "/dev/null") {
578+
o2::dataformats::FileMetaData calMetaData;
579+
calMetaData.fillFileData(mOutputDir + calibFName);
580+
calMetaData.setDataTakingContext(mDataTakingContext);
581+
calMetaData.type = "calib";
582+
calMetaData.priority = "low";
583+
auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
584+
auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
585+
try {
586+
std::ofstream metaFileOut(metaFileNameTmp);
587+
metaFileOut << calMetaData;
588+
metaFileOut.close();
589+
std::filesystem::rename(metaFileNameTmp, metaFileName);
590+
} catch (std::exception const& e) {
591+
LOG(error) << "Failed to store CMV meta data file " << metaFileName << ", reason: " << e.what();
592+
}
550593
}
594+
}
595+
596+
if ((!mSendCCDB) && (mOutputDir == "/dev/null")) {
597+
LOGP(warning, "Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
551598
return;
552599
}
553600

554-
// use the actual number of TFs (mIntervalTFCount) so the CCDB validity end is correct for partial last intervals
555-
const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
556-
const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
557601
if (timeStampEnd <= mTimestampStart) {
558602
LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd);
559603
return;
@@ -604,6 +648,9 @@ class TPCAggregateCMVDevice : public o2::framework::Task
604648
mLastOrbitStep = 0;
605649
mLastSeenTF = 0;
606650
mIntervalTFCount = 0;
651+
mIntervalFirstOrbit = 0;
652+
mIntervalLastOrbit = 0;
653+
mIntervalOrbitSet = false;
607654
mCurrentTF = CMVPerTF{};
608655
mCurrentCompressedTF = CMVPerTFCompressed{};
609656
initIntervalTree();
@@ -647,7 +694,8 @@ inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
647694
inputSpecs,
648695
outputSpecs,
649696
AlgorithmSpec{adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
650-
Options{{"output-dir", VariantType::String, "none", {"CMV output directory, must exist"}},
697+
Options{{"output-dir", VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
698+
{"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
651699
{"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
652700
{"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
653701
{"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},

0 commit comments

Comments
 (0)