diff --git a/src/equihash.rs b/src/equihash.rs index dc2d706..47e2eb1 100644 --- a/src/equihash.rs +++ b/src/equihash.rs @@ -244,25 +244,88 @@ fn repack_index(src: &[u8], dst: &mut [u32]) { repack_index_scalar(src, dst); } +/// A raw `*mut u32` shared across rayon workers for the parallel scatter below. +/// Sound only because the workers write provably disjoint position sets. +#[derive(Clone, Copy)] +struct OrderPtr(*mut u32); +unsafe impl Send for OrderPtr {} +unsafe impl Sync for OrderPtr {} + /// Partition the `n` entries into `TOP_BUCKETS` runs by the high `TOP_BITS` of /// their (dense) leading block. Returns `(starts, order)`, where partition `v` /// owns the input indices `order[starts[v]..starts[v + 1]]`. The histogram /// passes stream over `keys[]` (4 bytes/entry) instead of striding the slots. +/// +/// Parallel counting sort: the input is split into one contiguous chunk per +/// rayon worker. Each chunk histograms its slice (phase 1), a small serial pass +/// turns those into per-chunk base offsets within each bucket's output region +/// (phase 2), and each chunk scatters its entries into `order` (phase 3). Chunk +/// `c`'s bucket-`b` writes land in `[off[c][b], off[c+1][b])`, disjoint from +/// every other chunk and bucket, so the concurrent writes never alias. Entries +/// within a bucket end up chunk-major rather than index-major; that reordering +/// is immaterial — `count_pairs`/`low_group` depend only on the multiset of low +/// keys, and final solutions are canonicalised, de-duplicated, and verified. fn partition_top(keys: &[u32], n: usize) -> (Vec, Vec) { let mut starts = vec![0u32; TOP_BUCKETS + 1]; - for k in 0..n { - starts[(keys[k] >> LOW_BITS) as usize + 1] += 1; - } - for i in 0..TOP_BUCKETS { - starts[i + 1] += starts[i]; - } let mut order = vec![0u32; n]; - let mut cur = starts.clone(); // small: TOP_BUCKETS + 1 entries - for k in 0..n { - let b = (keys[k] >> LOW_BITS) as usize; - order[cur[b] as usize] = k as u32; - cur[b] += 1; + if n == 0 { + return (starts, order); } + + let nthreads = rayon::current_num_threads().max(1); + let chunk = n.div_ceil(nthreads); + let nchunks = n.div_ceil(chunk); + + // Phase 1 (parallel): per-chunk top-bucket histograms. + let local_hists: Vec> = (0..nchunks) + .into_par_iter() + .map(|c| { + let lo = c * chunk; + let hi = ((c + 1) * chunk).min(n); + let mut h = vec![0u32; TOP_BUCKETS]; + for &key in &keys[lo..hi] { + h[(key >> LOW_BITS) as usize] += 1; + } + h + }) + .collect(); + + // Phase 2 (serial, nchunks * TOP_BUCKETS work): bucket starts, then each + // chunk's per-bucket base offset within its bucket's output region. + let mut totals = vec![0u32; TOP_BUCKETS]; + for h in &local_hists { + for b in 0..TOP_BUCKETS { + totals[b] += h[b]; + } + } + for b in 0..TOP_BUCKETS { + starts[b + 1] = starts[b] + totals[b]; + } + let mut offsets = vec![vec![0u32; TOP_BUCKETS]; nchunks]; + let mut running = starts[..TOP_BUCKETS].to_vec(); // running[b] starts at starts[b] + for c in 0..nchunks { + for b in 0..TOP_BUCKETS { + offsets[c][b] = running[b]; + running[b] += local_hists[c][b]; + } + } + + // Phase 3 (parallel): each chunk scatters into its disjoint sub-ranges. + let optr = OrderPtr(order.as_mut_ptr()); + offsets.into_par_iter().enumerate().for_each(|(c, mut cur)| { + let optr = optr; // capture the whole (Sync) wrapper, not the inner ptr + let lo = c * chunk; + let hi = ((c + 1) * chunk).min(n); + let base = optr.0; + for k in lo..hi { + let b = (keys[k] >> LOW_BITS) as usize; + // SAFETY: `cur[b]` ranges over `[off[c][b], off[c+1][b])`, a range + // owned exclusively by chunk `c` and within `order`'s bounds. + unsafe { *base.add(cur[b] as usize) = k as u32 }; + cur[b] += 1; + } + }); + (starts, order) } @@ -403,7 +466,13 @@ unsafe fn emit_bucket( /// directly into one pre-sized arena — so there is no per-partition allocation /// or final concatenation copy. Returns `(keys_out, slots_out, parents)`. fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec, Vec, Vec) { + // Sub-phase timing, gated on `EQ_PROFILE`. Prints partition / count / alloc / + // emit splits so we can see which part of the round dominates. + let prof = std::env::var_os("EQ_PROFILE").is_some(); + let t0 = std::time::Instant::now(); + let (starts, order) = partition_top(keys, n); + let t_part = std::time::Instant::now(); // Pass 1: per-partition child counts (histogram-derived, no reordering). let counts: Vec = (0..TOP_BUCKETS) @@ -416,6 +485,7 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec, Ve }, ) .collect(); + let t_count = std::time::Instant::now(); let mut out_starts = vec![0usize; TOP_BUCKETS + 1]; for v in 0..TOP_BUCKETS { @@ -446,6 +516,8 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec, Ve } } + let t_alloc = std::time::Instant::now(); + // Pass 2: group each partition and emit its colliding pairs in place. kparts .into_par_iter() @@ -461,6 +533,17 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec, Ve debug_assert_eq!(w, kout.len()); }); + if prof { + let ms = |a: std::time::Instant, b: std::time::Instant| (b - a).as_secs_f64() * 1000.0; + eprintln!( + " [collide n={n}] partition {:6.1} count {:6.1} alloc {:6.1} emit(group+xor) {:6.1} ms", + ms(t0, t_part), + ms(t_part, t_count), + ms(t_count, t_alloc), + ms(t_alloc, std::time::Instant::now()), + ); + } + (keys_out, slots_out, parents) } @@ -532,6 +615,22 @@ pub fn solve(header: &[u8]) -> Vec> { pub fn solve_with(header: &[u8], clamp: Option) -> Vec> { let clamp = clamp.unwrap_or(usize::MAX); + // Optional per-phase timing, gated on `EQ_PROFILE` (any value). Zero cost + // when unset — the only overhead is one env lookup. Each `phase(label, n)` + // prints the wall time since the previous call and the live entry count. + let prof = std::env::var_os("EQ_PROFILE").is_some(); + let mut t_last = std::time::Instant::now(); + let mut phase = |label: &str, n: usize| { + if prof { + let now = std::time::Instant::now(); + eprintln!( + " [profile] {label:<13} {:8.1} ms (n={n})", + (now - t_last).as_secs_f64() * 1000.0 + ); + t_last = now; + } + }; + // ---- round 0: hash every index into NBLK0 big-endian 24-bit blocks, stored // in padded 8-word slots with the leading block mirrored into `keys`. Entry // k corresponds to leaf index k (the gen order), so no leaf table is needed. @@ -567,20 +666,23 @@ pub fn solve_with(header: &[u8], clamp: Option) -> Vec> { // parents[t] (t = 0..K-2) maps a round-(t+1) entry to its two round-t parents. // `keys`/`slots` ping-pong between rounds (the previous buffers are freed as // the new ones replace them). + phase("round0-hash", n0); let mut parents: Vec> = Vec::with_capacity(K - 1); let mut n = n0; - for _ in 0..(K - 1) { + for r in 0..(K - 1) { let (ok, os, op) = collide(&keys, &slots, n, clamp); n = op.len(); parents.push(op); keys = ok; slots = os; + phase(&format!("collide r{}", r + 1), n); if n == 0 { return Vec::new(); } } let candidates = collide_final(&keys, &slots, n, clamp); + phase("collide-final", candidates.len()); if candidates.is_empty() { return Vec::new(); } @@ -605,8 +707,11 @@ pub fn solve_with(header: &[u8], clamp: Option) -> Vec> { // refs are now round-0 indices == leaf indices. recovered.extend_from_slice(&refs); } + phase("recover", recovered.len() / SOLUTION_INDICES); - filter_candidates(&base_state(header), &recovered) + let result = filter_candidates(&base_state(header), &recovered); + phase("filter+verify", result.len()); + result } /// Re-order recovered leaf indices into the canonical solution ordering: at @@ -780,4 +885,97 @@ mod tests { } } } + + /// A deterministic dense 140-byte header for benchmarking, filled by an + /// xorshift64* stream keyed on `seed` (the same scheme as `main::pseudo_header`) + /// so each nonce exercises a realistic, fully-populated header rather than the + /// degenerate all-`0x42` one. + #[cfg(test)] + fn bench_header(seed: u64) -> Vec { + let mut header = vec![0u8; HEADER_LEN]; + let mut x = seed.wrapping_mul(0x9E3779B97F4A7C15).wrapping_add(1); + for b in header.iter_mut() { + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + *b = (x.wrapping_mul(0x2545F4914F6CDD1D) >> 33) as u8; + } + header + } + + /// Steady-state throughput baseline for the CPU solver. Ignored by default; + /// run it in release (the only meaningful configuration — `debug_assert`s in + /// the collision hot path are off and `opt-level=3`/LTO is on) with output + /// visible: + /// + /// ```text + /// cargo test --release full_solve_baseline -- --ignored --nocapture + /// ``` + /// + /// Tunables via env: + /// * `EQ_BENCH_ITERS` — timed solves per clamp (default 8). + /// * `EQ_BENCH_CLAMPS` — comma-separated clamp sweep (default `32`, the + /// value the miner and `--selftest` use). + /// + /// Each timed iteration uses a distinct nonce so the figures average over + /// per-header variance; one warm-up solve per clamp (pages in the ~GB of + /// round buffers, warms caches) is excluded. This measures one solve at a + /// time across the whole rayon pool — i.e. solver latency and its memory + /// behaviour, which is exactly what the planned radix-scatter change targets + /// — not the aggregate multi-group mining rate `--bench` reports. + #[test] + #[ignore] + fn full_solve_baseline() { + use std::time::Instant; + + let iters: usize = std::env::var("EQ_BENCH_ITERS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(8); + let clamps: Vec = std::env::var("EQ_BENCH_CLAMPS") + .ok() + .map(|v| v.split(',').filter_map(|s| s.trim().parse().ok()).collect()) + .filter(|v: &Vec| !v.is_empty()) + .unwrap_or_else(|| vec![32]); + + eprintln!( + "CPU solver baseline — {iters} timed solve(s)/clamp, clamps={clamps:?}, \ + rayon threads={}", + rayon::current_num_threads() + ); + + for &clamp in &clamps { + // Warm-up (excluded): fault in buffers, warm caches. + let _ = solve_with(&bench_header(0), Some(clamp)); + + let mut times = Vec::with_capacity(iters); + let mut total_sols = 0usize; + for i in 0..iters { + let header = bench_header(i as u64 + 1); + let base = base_state(&header); + let t = Instant::now(); + let sols = solve_with(&header, Some(clamp)); + times.push(t.elapsed().as_secs_f64()); + for s in &sols { + assert!(is_valid_solution(&base, s), "solver returned an invalid solution"); + } + total_sols += sols.len(); + } + + times.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let n = times.len(); + let sum: f64 = times.iter().sum(); + let mean = 1000.0 * sum / n as f64; + let median = 1000.0 * times[n / 2]; + let min = 1000.0 * times[0]; + let max = 1000.0 * times[n - 1]; + let solves_per_s = n as f64 / sum; + eprintln!( + "clamp={clamp:>3}: {solves_per_s:6.2} solve/s | mean {mean:6.0} ms \ + median {median:6.0} ms min {min:6.0} ms max {max:6.0} ms | \ + {total_sols} sol over {n} solve(s) ({:.2}/solve)", + total_sols as f64 / n as f64, + ); + } + } }