GCC Code Coverage Report


Directory: ./
File: tasks/akimov_i_star/mpi/src/ops_mpi.cpp
Date: 2026-01-10 02:40:41
Exec Total Coverage
Lines: 104 173 60.1%
Functions: 13 13 100.0%
Branches: 53 160 33.1%

Line Branch Exec Source
1 #include "akimov_i_star/mpi/include/ops_mpi.hpp"
2
3 #include <mpi.h>
4
5 #include <algorithm>
6 #include <array>
7 #include <cctype>
8 #include <cstddef>
9 #include <cstdlib>
10 #include <sstream>
11 #include <string>
12 #include <vector>
13
14 #include "akimov_i_star/common/include/common.hpp"
15
16 namespace akimov_i_star {
17
18 namespace {
19
20 std::string ToString(const InType &buf) {
21 3 return {buf.begin(), buf.end()};
22 }
23
24
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 inline void Trim(std::string &s) {
25 const char *ws = " \t\n\r\f\v";
26 auto first = s.find_first_not_of(ws);
27
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (first == std::string::npos) {
28 s.clear();
29 return;
30 }
31 auto last = s.find_last_not_of(ws);
32 4 s = s.substr(first, last - first + 1);
33 }
34
35 2 std::vector<AkimovIStarMPI::Op> ParseOpsFromString(const std::string &s) {
36 2 std::vector<AkimovIStarMPI::Op> res;
37
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 std::istringstream ss(s);
38 std::string line;
39
3/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
4 while (std::getline(ss, line)) {
40
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 Trim(line);
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (line.empty()) {
42 2 continue;
43 }
44
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 const std::string prefix = "send:";
45
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (!line.starts_with(prefix)) {
46 2 continue;
47 }
48 std::string rest = line.substr(prefix.size());
49 size_t p1 = rest.find(':');
50 if (p1 == std::string::npos) {
51 continue;
52 }
53 size_t p2 = rest.find(':', p1 + 1);
54 if (p2 == std::string::npos) {
55 continue;
56 }
57 std::string srcs = rest.substr(0, p1);
58 std::string dsts = rest.substr(p1 + 1, p2 - (p1 + 1));
59 std::string msg = rest.substr(p2 + 1);
60 Trim(srcs);
61 Trim(dsts);
62 Trim(msg);
63 try {
64 int src = std::stoi(srcs);
65 int dst = std::stoi(dsts);
66 res.push_back(AkimovIStarMPI::Op{.src = src, .dst = dst, .msg = msg});
67 } catch (...) {
68 continue;
69 }
70 }
71 2 return res;
72 2 }
73
74 int CountDstZero(const std::vector<AkimovIStarMPI::Op> &ops) {
75 int cnt = 0;
76 for (const auto &op : ops) {
77 if (op.dst == 0) {
78 ++cnt;
79 }
80 }
81 return cnt;
82 }
83
84 1 void SendOutgoingToCenter(int myrank, const std::vector<AkimovIStarMPI::Op> &ops) {
85 const int center = 0;
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 for (const auto &op : ops) {
87 if (op.src != myrank) {
88 continue;
89 }
90 std::array<int, 2> header{op.dst, static_cast<int>(op.msg.size())};
91 MPI_Send(header.data(), static_cast<int>(header.size()), MPI_INT, center, 0, MPI_COMM_WORLD);
92 if (header[1] > 0) {
93 MPI_Send(op.msg.data(), header[1], MPI_CHAR, center, 0, MPI_COMM_WORLD);
94 }
95 }
96 1 }
97
98 1 int ReceiveForwardedFromCenter(int expected) {
99 int recvd = 0;
100 const int center = 0;
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 for (int i = 0; i < expected; ++i) {
102 std::array<int, 2> header{0, 0};
103 MPI_Recv(header.data(), static_cast<int>(header.size()), MPI_INT, center, MPI_ANY_TAG, MPI_COMM_WORLD,
104 MPI_STATUS_IGNORE);
105 int payload_len = header[1];
106 std::string payload;
107 payload.resize(payload_len);
108 if (payload_len > 0) {
109 MPI_Recv(payload.data(), payload_len, MPI_CHAR, center, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
110 }
111 ++recvd;
112 }
113 1 return recvd;
114 }
115
116 1 void CenterProcessLocalOutgoing(const std::vector<AkimovIStarMPI::Op> &ops, int &received_count) {
117 const int center = 0;
118
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 for (const auto &op : ops) {
119 if (op.src != center) {
120 continue;
121 }
122 if (op.dst == center) {
123 ++received_count;
124 } else {
125 std::array<int, 2> header{op.src, static_cast<int>(op.msg.size())};
126 MPI_Send(header.data(), static_cast<int>(header.size()), MPI_INT, op.dst, 0, MPI_COMM_WORLD);
127 if (header[1] > 0) {
128 MPI_Send(op.msg.data(), header[1], MPI_CHAR, op.dst, 0, MPI_COMM_WORLD);
129 }
130 }
131 }
132 1 }
133
134 1 void CenterReceiveAndForward(int recv_from_others, int &received_count) {
135 const int center = 0;
136
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 for (int i = 0; i < recv_from_others; ++i) {
137 std::array<int, 2> header{0, 0};
138 MPI_Recv(header.data(), static_cast<int>(header.size()), MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
139 MPI_STATUS_IGNORE);
140 int dst = header[0];
141 int payload_len = header[1];
142 std::string payload;
143 payload.resize(payload_len);
144 if (payload_len > 0) {
145 MPI_Recv(payload.data(), payload_len, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
146 }
147 if (dst == center) {
148 ++received_count;
149 } else {
150 std::array<int, 2> fwd_header{0, payload_len};
151 MPI_Send(fwd_header.data(), static_cast<int>(fwd_header.size()), MPI_INT, dst, 0, MPI_COMM_WORLD);
152 if (payload_len > 0) {
153 MPI_Send(payload.data(), payload_len, MPI_CHAR, dst, 0, MPI_COMM_WORLD);
154 }
155 }
156 }
157 1 }
158
159 1 int CountMessagesForCenterFromInput(const InType &input) {
160 int total_expected_for_center = 0;
161 1 std::string s(input.begin(), input.end());
162
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 std::istringstream ss(s);
163 std::string line;
164
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 const std::string prefix = "send:";
165
166
3/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
✓ Branch 4 taken 1 times.
4 while (std::getline(ss, line)) {
167 size_t start = line.find_first_not_of(" \t\r\n");
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (start == std::string::npos) {
169 continue;
170 }
171 size_t end = line.find_last_not_of(" \t\r\n");
172
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 std::string t = line.substr(start, end - start + 1);
173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!t.starts_with(prefix)) {
174 continue;
175 }
176
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 std::string rest = t.substr(prefix.size());
177 3 size_t p1 = rest.find(':');
178
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (p1 == std::string::npos) {
179 continue;
180 }
181 3 size_t p2 = rest.find(':', p1 + 1);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (p2 == std::string::npos) {
183 continue;
184 }
185
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 std::string dsts = rest.substr(p1 + 1, p2 - (p1 + 1));
186 size_t sd = dsts.find_first_not_of(" \t\r\n");
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (sd == std::string::npos) {
188 continue;
189 }
190 size_t ed = dsts.find_last_not_of(" \t\r\n");
191
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 std::string dsttrim = dsts.substr(sd, ed - sd + 1);
192 try {
193 int dst = std::stoi(dsttrim);
194
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (dst == 0) {
195 ++total_expected_for_center;
196 }
197 } catch (...) {
198 continue;
199 }
200 }
201
202 1 return total_expected_for_center;
203 1 }
204
205 2 void ProcessMultiProcMode(int rank, const std::vector<AkimovIStarMPI::Op> &ops, int &received_count) {
206 2 int local_send_count = 0;
207 int local_expected_recv = 0;
208
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 for (const auto &op : ops) {
210 if (op.src == rank) {
211 ++local_send_count;
212 }
213 if (op.dst == rank) {
214 ++local_expected_recv;
215 }
216 }
217
218 2 int total_sends = 0;
219 2 MPI_Allreduce(&local_send_count, &total_sends, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
220
221 const int center = 0;
222
223
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (rank != center) {
224 1 SendOutgoingToCenter(rank, ops);
225 1 received_count = ReceiveForwardedFromCenter(local_expected_recv);
226 } else {
227 1 CenterProcessLocalOutgoing(ops, received_count);
228 int center_local_sends = 0;
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 for (const auto &op : ops) {
230 if (op.src == center) {
231 ++center_local_sends;
232 }
233 }
234 1 int recv_from_others = total_sends - center_local_sends;
235 1 CenterReceiveAndForward(recv_from_others, received_count);
236 }
237 2 }
238
239 } // namespace
240
241
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 AkimovIStarMPI::AkimovIStarMPI(const InType &in) {
242 SetTypeOfTask(GetStaticTypeOfTask());
243
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 GetInput() = in;
244 2 GetOutput() = 0;
245 2 }
246
247 2 bool AkimovIStarMPI::ValidationImpl() {
248 2 int rank = 0;
249 2 int mpi_initialized = 0;
250 2 MPI_Initialized(&mpi_initialized);
251
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (mpi_initialized == 0) {
252 return true;
253 }
254 2 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
255
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (rank == 0) {
256 1 return !GetInput().empty();
257 }
258 return true;
259 }
260
261 2 bool AkimovIStarMPI::PreProcessingImpl() {
262 2 int rank = 0;
263 2 int size = 1;
264 2 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
265 2 MPI_Comm_size(MPI_COMM_WORLD, &size);
266
267 input_buffer_.clear();
268 ops_.clear();
269
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 received_count_ = 0;
270
271 std::string raw;
272
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (rank == 0) {
273 2 raw = ToString(GetInput());
274 }
275
276 2 int len = static_cast<int>(raw.size());
277
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
278
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 len = std::max(len, 0);
279 std::string buf;
280
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 buf.resize(len);
281
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
4 MPI_Bcast((len != 0) ? buf.data() : nullptr, len, MPI_CHAR, 0, MPI_COMM_WORLD);
282
283 input_buffer_.clear();
284
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (len > 0) {
285
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 input_buffer_.assign(buf.begin(), buf.end());
286 }
287
288 std::string parsed = ToString(input_buffer_);
289
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ops_ = ParseOpsFromString(parsed);
290
291 2 return true;
292 }
293
294 2 bool AkimovIStarMPI::RunImpl() {
295 2 int rank = 0;
296 2 int size = 1;
297 2 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
298 2 MPI_Comm_size(MPI_COMM_WORLD, &size);
299
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (size == 1) {
301 received_count_ = CountDstZero(ops_);
302 } else {
303 2 ProcessMultiProcMode(rank, ops_, received_count_);
304 }
305
306 2 int total_expected_for_center = 0;
307
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (rank == 0) {
308 1 total_expected_for_center = CountMessagesForCenterFromInput(GetInput());
309 }
310
311 2 MPI_Bcast(&total_expected_for_center, 1, MPI_INT, 0, MPI_COMM_WORLD);
312
313 2 GetOutput() = total_expected_for_center;
314
315 2 return true;
316 }
317
318 2 bool AkimovIStarMPI::PostProcessingImpl() {
319 2 return true;
320 }
321
322 } // namespace akimov_i_star
323