| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "akimov_i_star/mpi/include/ops_mpi.hpp" | ||
| 2 | |||
| 3 | #include <mpi.h> | ||
| 4 | |||
| 5 | #include <algorithm> | ||
| 6 | #include <array> | ||
| 7 | #include <cctype> | ||
| 8 | #include <cstddef> | ||
| 9 | #include <cstdlib> | ||
| 10 | #include <sstream> | ||
| 11 | #include <string> | ||
| 12 | #include <vector> | ||
| 13 | |||
| 14 | #include "akimov_i_star/common/include/common.hpp" | ||
| 15 | |||
| 16 | namespace akimov_i_star { | ||
| 17 | |||
| 18 | namespace { | ||
| 19 | |||
| 20 | std::string ToString(const InType &buf) { | ||
| 21 | 3 | return {buf.begin(), buf.end()}; | |
| 22 | } | ||
| 23 | |||
| 24 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | inline void Trim(std::string &s) { |
| 25 | const char *ws = " \t\n\r\f\v"; | ||
| 26 | auto first = s.find_first_not_of(ws); | ||
| 27 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (first == std::string::npos) { |
| 28 | s.clear(); | ||
| 29 | ✗ | return; | |
| 30 | } | ||
| 31 | auto last = s.find_last_not_of(ws); | ||
| 32 | 4 | s = s.substr(first, last - first + 1); | |
| 33 | } | ||
| 34 | |||
| 35 | 2 | std::vector<AkimovIStarMPI::Op> ParseOpsFromString(const std::string &s) { | |
| 36 | 2 | std::vector<AkimovIStarMPI::Op> res; | |
| 37 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | std::istringstream ss(s); |
| 38 | std::string line; | ||
| 39 |
3/4✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
|
4 | while (std::getline(ss, line)) { |
| 40 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | Trim(line); |
| 41 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (line.empty()) { |
| 42 | 2 | continue; | |
| 43 | } | ||
| 44 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | const std::string prefix = "send:"; |
| 45 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (!line.starts_with(prefix)) { |
| 46 | 2 | continue; | |
| 47 | } | ||
| 48 | ✗ | std::string rest = line.substr(prefix.size()); | |
| 49 | ✗ | size_t p1 = rest.find(':'); | |
| 50 | ✗ | if (p1 == std::string::npos) { | |
| 51 | ✗ | continue; | |
| 52 | } | ||
| 53 | ✗ | size_t p2 = rest.find(':', p1 + 1); | |
| 54 | ✗ | if (p2 == std::string::npos) { | |
| 55 | ✗ | continue; | |
| 56 | } | ||
| 57 | ✗ | std::string srcs = rest.substr(0, p1); | |
| 58 | ✗ | std::string dsts = rest.substr(p1 + 1, p2 - (p1 + 1)); | |
| 59 | ✗ | std::string msg = rest.substr(p2 + 1); | |
| 60 | ✗ | Trim(srcs); | |
| 61 | ✗ | Trim(dsts); | |
| 62 | ✗ | Trim(msg); | |
| 63 | try { | ||
| 64 | int src = std::stoi(srcs); | ||
| 65 | int dst = std::stoi(dsts); | ||
| 66 | ✗ | res.push_back(AkimovIStarMPI::Op{.src = src, .dst = dst, .msg = msg}); | |
| 67 | ✗ | } catch (...) { | |
| 68 | continue; | ||
| 69 | ✗ | } | |
| 70 | } | ||
| 71 | 2 | return res; | |
| 72 | 2 | } | |
| 73 | |||
| 74 | int CountDstZero(const std::vector<AkimovIStarMPI::Op> &ops) { | ||
| 75 | int cnt = 0; | ||
| 76 | ✗ | for (const auto &op : ops) { | |
| 77 | ✗ | if (op.dst == 0) { | |
| 78 | ✗ | ++cnt; | |
| 79 | } | ||
| 80 | } | ||
| 81 | return cnt; | ||
| 82 | } | ||
| 83 | |||
| 84 | 1 | void SendOutgoingToCenter(int myrank, const std::vector<AkimovIStarMPI::Op> &ops) { | |
| 85 | const int center = 0; | ||
| 86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | for (const auto &op : ops) { |
| 87 | ✗ | if (op.src != myrank) { | |
| 88 | ✗ | continue; | |
| 89 | } | ||
| 90 | ✗ | std::array<int, 2> header{op.dst, static_cast<int>(op.msg.size())}; | |
| 91 | ✗ | MPI_Send(header.data(), static_cast<int>(header.size()), MPI_INT, center, 0, MPI_COMM_WORLD); | |
| 92 | ✗ | if (header[1] > 0) { | |
| 93 | ✗ | MPI_Send(op.msg.data(), header[1], MPI_CHAR, center, 0, MPI_COMM_WORLD); | |
| 94 | } | ||
| 95 | } | ||
| 96 | 1 | } | |
| 97 | |||
| 98 | 1 | int ReceiveForwardedFromCenter(int expected) { | |
| 99 | int recvd = 0; | ||
| 100 | const int center = 0; | ||
| 101 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | for (int i = 0; i < expected; ++i) { |
| 102 | ✗ | std::array<int, 2> header{0, 0}; | |
| 103 | ✗ | MPI_Recv(header.data(), static_cast<int>(header.size()), MPI_INT, center, MPI_ANY_TAG, MPI_COMM_WORLD, | |
| 104 | MPI_STATUS_IGNORE); | ||
| 105 | ✗ | int payload_len = header[1]; | |
| 106 | std::string payload; | ||
| 107 | ✗ | payload.resize(payload_len); | |
| 108 | ✗ | if (payload_len > 0) { | |
| 109 | ✗ | MPI_Recv(payload.data(), payload_len, MPI_CHAR, center, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | |
| 110 | } | ||
| 111 | ✗ | ++recvd; | |
| 112 | } | ||
| 113 | 1 | return recvd; | |
| 114 | } | ||
| 115 | |||
| 116 | 1 | void CenterProcessLocalOutgoing(const std::vector<AkimovIStarMPI::Op> &ops, int &received_count) { | |
| 117 | const int center = 0; | ||
| 118 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | for (const auto &op : ops) { |
| 119 | ✗ | if (op.src != center) { | |
| 120 | ✗ | continue; | |
| 121 | } | ||
| 122 | ✗ | if (op.dst == center) { | |
| 123 | ✗ | ++received_count; | |
| 124 | } else { | ||
| 125 | ✗ | std::array<int, 2> header{op.src, static_cast<int>(op.msg.size())}; | |
| 126 | ✗ | MPI_Send(header.data(), static_cast<int>(header.size()), MPI_INT, op.dst, 0, MPI_COMM_WORLD); | |
| 127 | ✗ | if (header[1] > 0) { | |
| 128 | ✗ | MPI_Send(op.msg.data(), header[1], MPI_CHAR, op.dst, 0, MPI_COMM_WORLD); | |
| 129 | } | ||
| 130 | } | ||
| 131 | } | ||
| 132 | 1 | } | |
| 133 | |||
| 134 | 1 | void CenterReceiveAndForward(int recv_from_others, int &received_count) { | |
| 135 | const int center = 0; | ||
| 136 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | for (int i = 0; i < recv_from_others; ++i) { |
| 137 | ✗ | std::array<int, 2> header{0, 0}; | |
| 138 | ✗ | MPI_Recv(header.data(), static_cast<int>(header.size()), MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, | |
| 139 | MPI_STATUS_IGNORE); | ||
| 140 | ✗ | int dst = header[0]; | |
| 141 | ✗ | int payload_len = header[1]; | |
| 142 | std::string payload; | ||
| 143 | ✗ | payload.resize(payload_len); | |
| 144 | ✗ | if (payload_len > 0) { | |
| 145 | ✗ | MPI_Recv(payload.data(), payload_len, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | |
| 146 | } | ||
| 147 | ✗ | if (dst == center) { | |
| 148 | ✗ | ++received_count; | |
| 149 | } else { | ||
| 150 | ✗ | std::array<int, 2> fwd_header{0, payload_len}; | |
| 151 | ✗ | MPI_Send(fwd_header.data(), static_cast<int>(fwd_header.size()), MPI_INT, dst, 0, MPI_COMM_WORLD); | |
| 152 | ✗ | if (payload_len > 0) { | |
| 153 | ✗ | MPI_Send(payload.data(), payload_len, MPI_CHAR, dst, 0, MPI_COMM_WORLD); | |
| 154 | } | ||
| 155 | } | ||
| 156 | } | ||
| 157 | 1 | } | |
| 158 | |||
| 159 | 1 | int CountMessagesForCenterFromInput(const InType &input) { | |
| 160 | int total_expected_for_center = 0; | ||
| 161 | 1 | std::string s(input.begin(), input.end()); | |
| 162 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | std::istringstream ss(s); |
| 163 | std::string line; | ||
| 164 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | const std::string prefix = "send:"; |
| 165 | |||
| 166 |
3/4✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
✓ Branch 4 taken 1 times.
|
4 | while (std::getline(ss, line)) { |
| 167 | size_t start = line.find_first_not_of(" \t\r\n"); | ||
| 168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (start == std::string::npos) { |
| 169 | ✗ | continue; | |
| 170 | } | ||
| 171 | size_t end = line.find_last_not_of(" \t\r\n"); | ||
| 172 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | std::string t = line.substr(start, end - start + 1); |
| 173 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (!t.starts_with(prefix)) { |
| 174 | ✗ | continue; | |
| 175 | } | ||
| 176 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | std::string rest = t.substr(prefix.size()); |
| 177 | 3 | size_t p1 = rest.find(':'); | |
| 178 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (p1 == std::string::npos) { |
| 179 | ✗ | continue; | |
| 180 | } | ||
| 181 | 3 | size_t p2 = rest.find(':', p1 + 1); | |
| 182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (p2 == std::string::npos) { |
| 183 | ✗ | continue; | |
| 184 | } | ||
| 185 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | std::string dsts = rest.substr(p1 + 1, p2 - (p1 + 1)); |
| 186 | size_t sd = dsts.find_first_not_of(" \t\r\n"); | ||
| 187 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (sd == std::string::npos) { |
| 188 | ✗ | continue; | |
| 189 | } | ||
| 190 | size_t ed = dsts.find_last_not_of(" \t\r\n"); | ||
| 191 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | std::string dsttrim = dsts.substr(sd, ed - sd + 1); |
| 192 | try { | ||
| 193 | int dst = std::stoi(dsttrim); | ||
| 194 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (dst == 0) { |
| 195 | ✗ | ++total_expected_for_center; | |
| 196 | } | ||
| 197 | ✗ | } catch (...) { | |
| 198 | continue; | ||
| 199 | ✗ | } | |
| 200 | } | ||
| 201 | |||
| 202 | 1 | return total_expected_for_center; | |
| 203 | 1 | } | |
| 204 | |||
| 205 | 2 | void ProcessMultiProcMode(int rank, const std::vector<AkimovIStarMPI::Op> &ops, int &received_count) { | |
| 206 | 2 | int local_send_count = 0; | |
| 207 | int local_expected_recv = 0; | ||
| 208 | |||
| 209 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | for (const auto &op : ops) { |
| 210 | ✗ | if (op.src == rank) { | |
| 211 | ✗ | ++local_send_count; | |
| 212 | } | ||
| 213 | ✗ | if (op.dst == rank) { | |
| 214 | ✗ | ++local_expected_recv; | |
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | 2 | int total_sends = 0; | |
| 219 | 2 | MPI_Allreduce(&local_send_count, &total_sends, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); | |
| 220 | |||
| 221 | const int center = 0; | ||
| 222 | |||
| 223 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (rank != center) { |
| 224 | 1 | SendOutgoingToCenter(rank, ops); | |
| 225 | 1 | received_count = ReceiveForwardedFromCenter(local_expected_recv); | |
| 226 | } else { | ||
| 227 | 1 | CenterProcessLocalOutgoing(ops, received_count); | |
| 228 | int center_local_sends = 0; | ||
| 229 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | for (const auto &op : ops) { |
| 230 | ✗ | if (op.src == center) { | |
| 231 | ✗ | ++center_local_sends; | |
| 232 | } | ||
| 233 | } | ||
| 234 | 1 | int recv_from_others = total_sends - center_local_sends; | |
| 235 | 1 | CenterReceiveAndForward(recv_from_others, received_count); | |
| 236 | } | ||
| 237 | 2 | } | |
| 238 | |||
| 239 | } // namespace | ||
| 240 | |||
| 241 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | AkimovIStarMPI::AkimovIStarMPI(const InType &in) { |
| 242 | SetTypeOfTask(GetStaticTypeOfTask()); | ||
| 243 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | GetInput() = in; |
| 244 | 2 | GetOutput() = 0; | |
| 245 | 2 | } | |
| 246 | |||
| 247 | 2 | bool AkimovIStarMPI::ValidationImpl() { | |
| 248 | 2 | int rank = 0; | |
| 249 | 2 | int mpi_initialized = 0; | |
| 250 | 2 | MPI_Initialized(&mpi_initialized); | |
| 251 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (mpi_initialized == 0) { |
| 252 | return true; | ||
| 253 | } | ||
| 254 | 2 | MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
| 255 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (rank == 0) { |
| 256 | 1 | return !GetInput().empty(); | |
| 257 | } | ||
| 258 | return true; | ||
| 259 | } | ||
| 260 | |||
| 261 | 2 | bool AkimovIStarMPI::PreProcessingImpl() { | |
| 262 | 2 | int rank = 0; | |
| 263 | 2 | int size = 1; | |
| 264 | 2 | MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
| 265 | 2 | MPI_Comm_size(MPI_COMM_WORLD, &size); | |
| 266 | |||
| 267 | input_buffer_.clear(); | ||
| 268 | ops_.clear(); | ||
| 269 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | received_count_ = 0; |
| 270 | |||
| 271 | std::string raw; | ||
| 272 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (rank == 0) { |
| 273 | 2 | raw = ToString(GetInput()); | |
| 274 | } | ||
| 275 | |||
| 276 | 2 | int len = static_cast<int>(raw.size()); | |
| 277 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD); |
| 278 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | len = std::max(len, 0); |
| 279 | std::string buf; | ||
| 280 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | buf.resize(len); |
| 281 |
2/4✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
4 | MPI_Bcast((len != 0) ? buf.data() : nullptr, len, MPI_CHAR, 0, MPI_COMM_WORLD); |
| 282 | |||
| 283 | input_buffer_.clear(); | ||
| 284 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (len > 0) { |
| 285 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | input_buffer_.assign(buf.begin(), buf.end()); |
| 286 | } | ||
| 287 | |||
| 288 | std::string parsed = ToString(input_buffer_); | ||
| 289 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ops_ = ParseOpsFromString(parsed); |
| 290 | |||
| 291 | 2 | return true; | |
| 292 | } | ||
| 293 | |||
| 294 | 2 | bool AkimovIStarMPI::RunImpl() { | |
| 295 | 2 | int rank = 0; | |
| 296 | 2 | int size = 1; | |
| 297 | 2 | MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
| 298 | 2 | MPI_Comm_size(MPI_COMM_WORLD, &size); | |
| 299 | |||
| 300 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (size == 1) { |
| 301 | ✗ | received_count_ = CountDstZero(ops_); | |
| 302 | } else { | ||
| 303 | 2 | ProcessMultiProcMode(rank, ops_, received_count_); | |
| 304 | } | ||
| 305 | |||
| 306 | 2 | int total_expected_for_center = 0; | |
| 307 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (rank == 0) { |
| 308 | 1 | total_expected_for_center = CountMessagesForCenterFromInput(GetInput()); | |
| 309 | } | ||
| 310 | |||
| 311 | 2 | MPI_Bcast(&total_expected_for_center, 1, MPI_INT, 0, MPI_COMM_WORLD); | |
| 312 | |||
| 313 | 2 | GetOutput() = total_expected_for_center; | |
| 314 | |||
| 315 | 2 | return true; | |
| 316 | } | ||
| 317 | |||
| 318 | 2 | bool AkimovIStarMPI::PostProcessingImpl() { | |
| 319 | 2 | return true; | |
| 320 | } | ||
| 321 | |||
| 322 | } // namespace akimov_i_star | ||
| 323 |