| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "timofeev_n_radix_batcher_sort/all/include/ops_all.hpp" | ||
| 2 | |||
| 3 | #include <mpi.h> | ||
| 4 | |||
| 5 | #include <algorithm> | ||
| 6 | #include <climits> | ||
| 7 | #include <cstddef> | ||
| 8 | #include <cstdint> | ||
| 9 | #include <thread> | ||
| 10 | #include <utility> | ||
| 11 | #include <vector> | ||
| 12 | |||
| 13 | #include "timofeev_n_radix_batcher_sort/common/include/common.hpp" | ||
| 14 | |||
| 15 | namespace timofeev_n_radix_batcher_sort_threads { | ||
| 16 | |||
| 17 | ✗ | TimofeevNRadixBatcherALL::TimofeevNRadixBatcherALL(const InType &in) { | |
| 18 | SetTypeOfTask(GetStaticTypeOfTask()); | ||
| 19 | ✗ | GetInput() = in; | |
| 20 | ✗ | GetOutput() = in; | |
| 21 | ✗ | } | |
| 22 | |||
| 23 | ✗ | bool TimofeevNRadixBatcherALL::ValidationImpl() { | |
| 24 | ✗ | return GetInput().size() >= 2; | |
| 25 | } | ||
| 26 | |||
| 27 | ✗ | bool TimofeevNRadixBatcherALL::PreProcessingImpl() { | |
| 28 | ✗ | return true; | |
| 29 | } | ||
| 30 | |||
| 31 | ✗ | void TimofeevNRadixBatcherALL::CompExch(int &a, int &b, int digit) { | |
| 32 | ✗ | int b_r = b % (digit * 10) / digit; | |
| 33 | ✗ | int a_r = a % (digit * 10) / digit; | |
| 34 | ✗ | if (b_r < a_r) { | |
| 35 | std::swap(a, b); | ||
| 36 | } | ||
| 37 | ✗ | } | |
| 38 | |||
| 39 | ✗ | void TimofeevNRadixBatcherALL::BubbleSort(std::vector<int> &arr, int digit, int left, int right) { | |
| 40 | ✗ | for (int i = left; i <= right; i++) { | |
| 41 | ✗ | for (int j = 0; j + 1 < right - left; j++) { | |
| 42 | ✗ | CompExch(arr[left + j], arr[left + j + 1], digit); | |
| 43 | } | ||
| 44 | } | ||
| 45 | ✗ | } | |
| 46 | |||
| 47 | ✗ | void TimofeevNRadixBatcherALL::ComparR(int &a, int &b) { | |
| 48 | ✗ | if (a > b) { | |
| 49 | std::swap(a, b); | ||
| 50 | } | ||
| 51 | ✗ | } | |
| 52 | |||
| 53 | ✗ | void TimofeevNRadixBatcherALL::OddEvenMerge(std::vector<int> &arr, int lft, int n) { | |
| 54 | ✗ | if (n <= 1) { | |
| 55 | return; | ||
| 56 | } | ||
| 57 | |||
| 58 | ✗ | int otstup = n / 2; | |
| 59 | ✗ | for (int i = 0; i < otstup; i += 1) { | |
| 60 | ✗ | if (arr[lft + i] > arr[lft + otstup + i]) { | |
| 61 | std::swap(arr[lft + i], arr[lft + otstup + i]); | ||
| 62 | } | ||
| 63 | } | ||
| 64 | |||
| 65 | ✗ | for (otstup = n / 4; otstup > 0; otstup /= 2) { | |
| 66 | ✗ | int h = otstup * 2; | |
| 67 | ✗ | for (int start = otstup; start + otstup < n; start += h) { | |
| 68 | ✗ | for (int i = 0; i < otstup; i += 1) { | |
| 69 | ✗ | ComparR(arr[lft + start + i], arr[lft + start + i + otstup]); | |
| 70 | } | ||
| 71 | } | ||
| 72 | } | ||
| 73 | } | ||
| 74 | |||
| 75 | ✗ | void TimofeevNRadixBatcherALL::BubbleSortAux(size_t &num_threads, int &max_x, std::vector<int> &r_in, size_t &piece, | |
| 76 | std::vector<std::thread> &threads) { | ||
| 77 | ✗ | for (size_t t_s = 0; t_s < num_threads; ++t_s) { | |
| 78 | ✗ | threads.emplace_back([&, t_s]() { | |
| 79 | ✗ | for (int k = 1; k <= max_x; k *= 10) { | |
| 80 | ✗ | BubbleSort(r_in, k, static_cast<int>(piece * t_s), static_cast<int>((piece * t_s) + piece)); | |
| 81 | } | ||
| 82 | ✗ | }); | |
| 83 | } | ||
| 84 | ✗ | } | |
| 85 | |||
| 86 | ✗ | void TimofeevNRadixBatcherALL::OddMergeAux(std::vector<int> &r_in, size_t &piece, std::vector<std::thread> &threads, | |
| 87 | size_t &n_n) { | ||
| 88 | ✗ | for (size_t c_p = piece * 2; c_p <= n_n; c_p *= 2) { | |
| 89 | ✗ | threads.clear(); | |
| 90 | |||
| 91 | ✗ | for (size_t i = 0; i < n_n; i += c_p) { | |
| 92 | ✗ | threads.emplace_back([&, i]() { OddEvenMerge(r_in, static_cast<int>(i), static_cast<int>(c_p)); }); | |
| 93 | } | ||
| 94 | |||
| 95 | ✗ | for (auto &th : threads) { | |
| 96 | ✗ | th.join(); | |
| 97 | } | ||
| 98 | } | ||
| 99 | ✗ | } | |
| 100 | |||
| 101 | ✗ | void TimofeevNRadixBatcherALL::ProcessLocalArray(std::vector<int> &local_arr, size_t num_threads) { | |
| 102 | ✗ | int n = static_cast<int>(local_arr.size()); | |
| 103 | ✗ | int max_x = *std::ranges::max_element(local_arr.begin(), local_arr.end()); | |
| 104 | |||
| 105 | ✗ | std::vector<std::thread> threads; | |
| 106 | ✗ | size_t piece = n / num_threads; | |
| 107 | ✗ | threads.reserve(num_threads); | |
| 108 | |||
| 109 | BubbleSortAux(num_threads, max_x, local_arr, piece, threads); | ||
| 110 | |||
| 111 | ✗ | for (auto &th : threads) { | |
| 112 | ✗ | th.join(); | |
| 113 | } | ||
| 114 | |||
| 115 | ✗ | size_t nnn = n; | |
| 116 | ✗ | OddMergeAux(local_arr, piece, threads, nnn); | |
| 117 | ✗ | } | |
| 118 | |||
| 119 | ✗ | void TimofeevNRadixBatcherALL::ProcessLocalArrayWOSort(std::vector<int> &local_arr, size_t num_threads, | |
| 120 | size_t &elements_per_process) { | ||
| 121 | ✗ | int n = static_cast<int>(local_arr.size()); | |
| 122 | |||
| 123 | ✗ | std::vector<std::thread> threads; | |
| 124 | ✗ | threads.reserve(num_threads); | |
| 125 | |||
| 126 | ✗ | size_t nnn = n; | |
| 127 | ✗ | OddMergeAux(local_arr, elements_per_process, threads, nnn); | |
| 128 | ✗ | } | |
| 129 | |||
| 130 | ✗ | void TimofeevNRadixBatcherALL::PrepAux(int &n, int &m, std::vector<int> &in, int &max_x, size_t &num_threads, | |
| 131 | size_t &n_thr, size_t &number_procs) { | ||
| 132 | ✗ | n = static_cast<int>(in.size()); | |
| 133 | ✗ | m = n; | |
| 134 | |||
| 135 | // дополняем | ||
| 136 | ✗ | while (n % 2 == 0) { | |
| 137 | ✗ | n /= 2; | |
| 138 | } | ||
| 139 | ✗ | if (n > 1) { | |
| 140 | n = static_cast<int>(in.size()); | ||
| 141 | int p = 1; | ||
| 142 | ✗ | while (p < n) { | |
| 143 | ✗ | p *= 2; | |
| 144 | } | ||
| 145 | ✗ | n = p; | |
| 146 | } else { | ||
| 147 | ✗ | n = m; | |
| 148 | } | ||
| 149 | |||
| 150 | ✗ | max_x = *std::ranges::max_element(in); | |
| 151 | ✗ | if (n != m) { | |
| 152 | ✗ | in.resize(n, max_x); | |
| 153 | } | ||
| 154 | |||
| 155 | ✗ | n_thr = std::thread::hardware_concurrency(); | |
| 156 | ✗ | while ((n_thr & (n_thr - 1)) != 0 && n_thr > 1) { // и то, и то - степени двойки, поэтому достаточно вот такого цикла | |
| 157 | ✗ | n_thr--; | |
| 158 | } | ||
| 159 | ✗ | if (std::cmp_greater(n_thr, static_cast<size_t>(n))) { | |
| 160 | ✗ | n_thr = n; | |
| 161 | } | ||
| 162 | ✗ | num_threads = 1; | |
| 163 | ✗ | while (num_threads * 2 <= n_thr / number_procs && | |
| 164 | ✗ | n / num_threads >= 4) { // хотя бы не более, чем число n_thr / число пр-ов, и <условие с const> | |
| 165 | ✗ | num_threads *= 2; | |
| 166 | } | ||
| 167 | ✗ | } | |
| 168 | |||
| 169 | ✗ | void TimofeevNRadixBatcherALL::HandleZero(std::vector<int> &global_array, size_t &total_elements, | |
| 170 | size_t &total_elements_primal, int &num_processes, | ||
| 171 | std::vector<int> &local_array, int &maxxx, size_t &num_threads_per_process, | ||
| 172 | size_t &elements_per_process) { | ||
| 173 | ✗ | global_array.assign(GetInput().begin(), GetInput().end()); | |
| 174 | |||
| 175 | ✗ | size_t numnum = std::thread::hardware_concurrency(); | |
| 176 | |||
| 177 | ✗ | int ttttt = static_cast<int>(total_elements); | |
| 178 | ✗ | int mmmmm = static_cast<int>(total_elements_primal); | |
| 179 | ✗ | size_t nnnnn = num_processes; | |
| 180 | |||
| 181 | ✗ | PrepAux(ttttt, mmmmm, global_array, maxxx, num_threads_per_process, numnum, nnnnn); | |
| 182 | |||
| 183 | ✗ | total_elements = ttttt; | |
| 184 | ✗ | total_elements_primal = mmmmm; | |
| 185 | ✗ | num_processes = static_cast<int>(nnnnn); | |
| 186 | |||
| 187 | ✗ | elements_per_process = total_elements / num_processes; | |
| 188 | |||
| 189 | ✗ | local_array.resize(elements_per_process); | |
| 190 | |||
| 191 | ✗ | for (int i = 0; i < num_processes; ++i) { | |
| 192 | ✗ | if (i == 0) { | |
| 193 | ✗ | std::copy(global_array.begin(), global_array.begin() + static_cast<int64_t>(elements_per_process), | |
| 194 | local_array.begin()); | ||
| 195 | } else { | ||
| 196 | ✗ | MPI_Send(&num_threads_per_process, 1, MPI_UNSIGNED_LONG_LONG, i, 0, MPI_COMM_WORLD); | |
| 197 | ✗ | MPI_Send(&elements_per_process, 1, MPI_UNSIGNED_LONG_LONG, i, 0, MPI_COMM_WORLD); | |
| 198 | ✗ | MPI_Send(global_array.data() + static_cast<int64_t>(i * elements_per_process), | |
| 199 | ✗ | static_cast<int>(elements_per_process), MPI_INT, i, 0, MPI_COMM_WORLD); | |
| 200 | } | ||
| 201 | } | ||
| 202 | ✗ | } | |
| 203 | |||
| 204 | ✗ | bool TimofeevNRadixBatcherALL::RunImpl() { | |
| 205 | ✗ | int world_size = 0; | |
| 206 | ✗ | int world_rank = 0; | |
| 207 | ✗ | MPI_Comm_size(MPI_COMM_WORLD, &world_size); | |
| 208 | ✗ | MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); | |
| 209 | |||
| 210 | ✗ | size_t total_elements = GetInput().size(); | |
| 211 | ✗ | size_t total_elements_primal = total_elements; | |
| 212 | ✗ | size_t num_threads_per_process = 1; | |
| 213 | ✗ | int num_processes = world_size; | |
| 214 | ✗ | while ((num_processes & (num_processes - 1)) != 0 && num_processes > 1) { | |
| 215 | ✗ | num_processes--; | |
| 216 | } | ||
| 217 | ✗ | std::vector<int> global_array; | |
| 218 | ✗ | int maxxx = 0; | |
| 219 | ✗ | size_t elements_per_process = 0; | |
| 220 | ✗ | std::vector<int> local_array; | |
| 221 | |||
| 222 | ✗ | if (world_rank == 0) { | |
| 223 | ✗ | HandleZero(global_array, total_elements, total_elements_primal, num_processes, local_array, maxxx, | |
| 224 | num_threads_per_process, elements_per_process); | ||
| 225 | // std::cout << "lbl-1 " << num_threads_per_process << "\n"; | ||
| 226 | ✗ | } else if (world_rank < num_processes) { | |
| 227 | ✗ | MPI_Recv(&num_threads_per_process, 1, MPI_UNSIGNED_LONG_LONG, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | |
| 228 | ✗ | MPI_Recv(&elements_per_process, 1, MPI_UNSIGNED_LONG_LONG, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | |
| 229 | ✗ | local_array.resize(elements_per_process); | |
| 230 | ✗ | MPI_Recv(local_array.data(), static_cast<int>(elements_per_process), MPI_INT, 0, 0, MPI_COMM_WORLD, | |
| 231 | MPI_STATUS_IGNORE); | ||
| 232 | } | ||
| 233 | |||
| 234 | ✗ | if (world_rank < num_processes) { | |
| 235 | // std::cout << "lbl-2 " << num_threads_per_process << "\n"; | ||
| 236 | ✗ | ProcessLocalArray(local_array, num_threads_per_process); | |
| 237 | } | ||
| 238 | |||
| 239 | ✗ | if (world_rank == 0) { | |
| 240 | std::ranges::copy(local_array, global_array.begin()); | ||
| 241 | |||
| 242 | ✗ | for (int i = 1; i < num_processes; ++i) { | |
| 243 | ✗ | MPI_Recv(global_array.data() + static_cast<int>(i * elements_per_process), static_cast<int>(elements_per_process), | |
| 244 | MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | ||
| 245 | } | ||
| 246 | |||
| 247 | ✗ | ProcessLocalArrayWOSort(global_array, num_threads_per_process, elements_per_process); | |
| 248 | |||
| 249 | ✗ | if (total_elements != total_elements_primal) { | |
| 250 | ✗ | global_array.resize(total_elements_primal); | |
| 251 | } | ||
| 252 | |||
| 253 | ✗ | for (int i = 1; i < world_size; ++i) { | |
| 254 | ✗ | MPI_Send(&total_elements_primal, 1, MPI_UNSIGNED_LONG_LONG, i, 0, MPI_COMM_WORLD); | |
| 255 | ✗ | MPI_Send(global_array.data(), static_cast<int>(total_elements_primal), MPI_INT, i, 0, MPI_COMM_WORLD); | |
| 256 | } | ||
| 257 | |||
| 258 | } else { | ||
| 259 | ✗ | if (world_rank < num_processes) { | |
| 260 | ✗ | MPI_Send(local_array.data(), static_cast<int>(elements_per_process), MPI_INT, 0, 0, MPI_COMM_WORLD); | |
| 261 | } | ||
| 262 | ✗ | MPI_Recv(&total_elements_primal, 1, MPI_UNSIGNED_LONG_LONG, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); | |
| 263 | ✗ | global_array.resize(total_elements_primal); | |
| 264 | ✗ | MPI_Recv(global_array.data(), static_cast<int>(total_elements_primal), MPI_INT, 0, 0, MPI_COMM_WORLD, | |
| 265 | MPI_STATUS_IGNORE); | ||
| 266 | } | ||
| 267 | |||
| 268 | ✗ | GetOutput() = global_array; | |
| 269 | ✗ | return true; | |
| 270 | } | ||
| 271 | |||
| 272 | ✗ | bool TimofeevNRadixBatcherALL::PostProcessingImpl() { | |
| 273 | ✗ | return true; | |
| 274 | } | ||
| 275 | |||
| 276 | } // namespace timofeev_n_radix_batcher_sort_threads | ||
| 277 |