Parallelize partition_top; add solver benchmark + phase profiling

partition_top was the only serial stage in the otherwise rayon-parallel
collision rounds — plain `for k in 0..n` count and scatter loops that left
15/16 cores idle for ~35% of every round (~650 ms). Replace it with a
parallel counting sort: per-chunk top-bucket histograms, a small serial
pass to per-chunk base offsets, then a disjoint-region scatter through a
shared raw pointer (each chunk writes a provably non-overlapping set of
positions). Entries within a bucket become chunk-major rather than
index-major, which is immaterial: count_pairs/low_group depend only on the
low-key multiset, and solutions are canonicalized, de-duplicated, and
verified downstream.

Measured (16 threads): partition_top ~650 -> ~100 ms/round (6.5x),
collide-final ~1.18 -> ~0.59 s, full solve ~13.4 -> ~9.2 s (-31%,
0.07 -> 0.11 solve/s), with identical solution yield and all validity
tests passing.

Also add (gated/ignored, no production-path behavior change):
- full_solve_baseline: an #[ignore] throughput benchmark over realistic
  dense headers (EQ_BENCH_ITERS / EQ_BENCH_CLAMPS).
- EQ_PROFILE-gated per-phase and per-collide-sub-phase timing in solve_with.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
jackpotincorporated
2026-06-06 10:42:54 -04:00
parent 6a753b62fa
commit 501527d3cb
+211 -13
View File
@@ -244,25 +244,88 @@ fn repack_index(src: &[u8], dst: &mut [u32]) {
repack_index_scalar(src, dst);
}
/// A raw `*mut u32` shared across rayon workers for the parallel scatter below.
/// Sound only because the workers write provably disjoint position sets.
#[derive(Clone, Copy)]
struct OrderPtr(*mut u32);
unsafe impl Send for OrderPtr {}
unsafe impl Sync for OrderPtr {}
/// Partition the `n` entries into `TOP_BUCKETS` runs by the high `TOP_BITS` of
/// their (dense) leading block. Returns `(starts, order)`, where partition `v`
/// owns the input indices `order[starts[v]..starts[v + 1]]`. The histogram
/// passes stream over `keys[]` (4 bytes/entry) instead of striding the slots.
///
/// Parallel counting sort: the input is split into one contiguous chunk per
/// rayon worker. Each chunk histograms its slice (phase 1), a small serial pass
/// turns those into per-chunk base offsets within each bucket's output region
/// (phase 2), and each chunk scatters its entries into `order` (phase 3). Chunk
/// `c`'s bucket-`b` writes land in `[off[c][b], off[c+1][b])`, disjoint from
/// every other chunk and bucket, so the concurrent writes never alias. Entries
/// within a bucket end up chunk-major rather than index-major; that reordering
/// is immaterial — `count_pairs`/`low_group` depend only on the multiset of low
/// keys, and final solutions are canonicalised, de-duplicated, and verified.
fn partition_top(keys: &[u32], n: usize) -> (Vec<u32>, Vec<u32>) {
let mut starts = vec![0u32; TOP_BUCKETS + 1];
for k in 0..n {
starts[(keys[k] >> LOW_BITS) as usize + 1] += 1;
}
for i in 0..TOP_BUCKETS {
starts[i + 1] += starts[i];
}
let mut order = vec![0u32; n];
let mut cur = starts.clone(); // small: TOP_BUCKETS + 1 entries
for k in 0..n {
let b = (keys[k] >> LOW_BITS) as usize;
order[cur[b] as usize] = k as u32;
cur[b] += 1;
if n == 0 {
return (starts, order);
}
let nthreads = rayon::current_num_threads().max(1);
let chunk = n.div_ceil(nthreads);
let nchunks = n.div_ceil(chunk);
// Phase 1 (parallel): per-chunk top-bucket histograms.
let local_hists: Vec<Vec<u32>> = (0..nchunks)
.into_par_iter()
.map(|c| {
let lo = c * chunk;
let hi = ((c + 1) * chunk).min(n);
let mut h = vec![0u32; TOP_BUCKETS];
for &key in &keys[lo..hi] {
h[(key >> LOW_BITS) as usize] += 1;
}
h
})
.collect();
// Phase 2 (serial, nchunks * TOP_BUCKETS work): bucket starts, then each
// chunk's per-bucket base offset within its bucket's output region.
let mut totals = vec![0u32; TOP_BUCKETS];
for h in &local_hists {
for b in 0..TOP_BUCKETS {
totals[b] += h[b];
}
}
for b in 0..TOP_BUCKETS {
starts[b + 1] = starts[b] + totals[b];
}
let mut offsets = vec![vec![0u32; TOP_BUCKETS]; nchunks];
let mut running = starts[..TOP_BUCKETS].to_vec(); // running[b] starts at starts[b]
for c in 0..nchunks {
for b in 0..TOP_BUCKETS {
offsets[c][b] = running[b];
running[b] += local_hists[c][b];
}
}
// Phase 3 (parallel): each chunk scatters into its disjoint sub-ranges.
let optr = OrderPtr(order.as_mut_ptr());
offsets.into_par_iter().enumerate().for_each(|(c, mut cur)| {
let optr = optr; // capture the whole (Sync) wrapper, not the inner ptr
let lo = c * chunk;
let hi = ((c + 1) * chunk).min(n);
let base = optr.0;
for k in lo..hi {
let b = (keys[k] >> LOW_BITS) as usize;
// SAFETY: `cur[b]` ranges over `[off[c][b], off[c+1][b])`, a range
// owned exclusively by chunk `c` and within `order`'s bounds.
unsafe { *base.add(cur[b] as usize) = k as u32 };
cur[b] += 1;
}
});
(starts, order)
}
@@ -403,7 +466,13 @@ unsafe fn emit_bucket(
/// directly into one pre-sized arena — so there is no per-partition allocation
/// or final concatenation copy. Returns `(keys_out, slots_out, parents)`.
fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec<u32>, Vec<u32>, Vec<u64>) {
// Sub-phase timing, gated on `EQ_PROFILE`. Prints partition / count / alloc /
// emit splits so we can see which part of the round dominates.
let prof = std::env::var_os("EQ_PROFILE").is_some();
let t0 = std::time::Instant::now();
let (starts, order) = partition_top(keys, n);
let t_part = std::time::Instant::now();
// Pass 1: per-partition child counts (histogram-derived, no reordering).
let counts: Vec<usize> = (0..TOP_BUCKETS)
@@ -416,6 +485,7 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec<u32>, Ve
},
)
.collect();
let t_count = std::time::Instant::now();
let mut out_starts = vec![0usize; TOP_BUCKETS + 1];
for v in 0..TOP_BUCKETS {
@@ -446,6 +516,8 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec<u32>, Ve
}
}
let t_alloc = std::time::Instant::now();
// Pass 2: group each partition and emit its colliding pairs in place.
kparts
.into_par_iter()
@@ -461,6 +533,17 @@ fn collide(keys: &[u32], slots: &[u32], n: usize, clamp: usize) -> (Vec<u32>, Ve
debug_assert_eq!(w, kout.len());
});
if prof {
let ms = |a: std::time::Instant, b: std::time::Instant| (b - a).as_secs_f64() * 1000.0;
eprintln!(
" [collide n={n}] partition {:6.1} count {:6.1} alloc {:6.1} emit(group+xor) {:6.1} ms",
ms(t0, t_part),
ms(t_part, t_count),
ms(t_count, t_alloc),
ms(t_alloc, std::time::Instant::now()),
);
}
(keys_out, slots_out, parents)
}
@@ -532,6 +615,22 @@ pub fn solve(header: &[u8]) -> Vec<Vec<u32>> {
pub fn solve_with(header: &[u8], clamp: Option<usize>) -> Vec<Vec<u32>> {
let clamp = clamp.unwrap_or(usize::MAX);
// Optional per-phase timing, gated on `EQ_PROFILE` (any value). Zero cost
// when unset — the only overhead is one env lookup. Each `phase(label, n)`
// prints the wall time since the previous call and the live entry count.
let prof = std::env::var_os("EQ_PROFILE").is_some();
let mut t_last = std::time::Instant::now();
let mut phase = |label: &str, n: usize| {
if prof {
let now = std::time::Instant::now();
eprintln!(
" [profile] {label:<13} {:8.1} ms (n={n})",
(now - t_last).as_secs_f64() * 1000.0
);
t_last = now;
}
};
// ---- round 0: hash every index into NBLK0 big-endian 24-bit blocks, stored
// in padded 8-word slots with the leading block mirrored into `keys`. Entry
// k corresponds to leaf index k (the gen order), so no leaf table is needed.
@@ -567,20 +666,23 @@ pub fn solve_with(header: &[u8], clamp: Option<usize>) -> Vec<Vec<u32>> {
// parents[t] (t = 0..K-2) maps a round-(t+1) entry to its two round-t parents.
// `keys`/`slots` ping-pong between rounds (the previous buffers are freed as
// the new ones replace them).
phase("round0-hash", n0);
let mut parents: Vec<Vec<u64>> = Vec::with_capacity(K - 1);
let mut n = n0;
for _ in 0..(K - 1) {
for r in 0..(K - 1) {
let (ok, os, op) = collide(&keys, &slots, n, clamp);
n = op.len();
parents.push(op);
keys = ok;
slots = os;
phase(&format!("collide r{}", r + 1), n);
if n == 0 {
return Vec::new();
}
}
let candidates = collide_final(&keys, &slots, n, clamp);
phase("collide-final", candidates.len());
if candidates.is_empty() {
return Vec::new();
}
@@ -605,8 +707,11 @@ pub fn solve_with(header: &[u8], clamp: Option<usize>) -> Vec<Vec<u32>> {
// refs are now round-0 indices == leaf indices.
recovered.extend_from_slice(&refs);
}
phase("recover", recovered.len() / SOLUTION_INDICES);
filter_candidates(&base_state(header), &recovered)
let result = filter_candidates(&base_state(header), &recovered);
phase("filter+verify", result.len());
result
}
/// Re-order recovered leaf indices into the canonical solution ordering: at
@@ -780,4 +885,97 @@ mod tests {
}
}
}
/// A deterministic dense 140-byte header for benchmarking, filled by an
/// xorshift64* stream keyed on `seed` (the same scheme as `main::pseudo_header`)
/// so each nonce exercises a realistic, fully-populated header rather than the
/// degenerate all-`0x42` one.
#[cfg(test)]
fn bench_header(seed: u64) -> Vec<u8> {
let mut header = vec![0u8; HEADER_LEN];
let mut x = seed.wrapping_mul(0x9E3779B97F4A7C15).wrapping_add(1);
for b in header.iter_mut() {
x ^= x >> 12;
x ^= x << 25;
x ^= x >> 27;
*b = (x.wrapping_mul(0x2545F4914F6CDD1D) >> 33) as u8;
}
header
}
/// Steady-state throughput baseline for the CPU solver. Ignored by default;
/// run it in release (the only meaningful configuration — `debug_assert`s in
/// the collision hot path are off and `opt-level=3`/LTO is on) with output
/// visible:
///
/// ```text
/// cargo test --release full_solve_baseline -- --ignored --nocapture
/// ```
///
/// Tunables via env:
/// * `EQ_BENCH_ITERS` — timed solves per clamp (default 8).
/// * `EQ_BENCH_CLAMPS` — comma-separated clamp sweep (default `32`, the
/// value the miner and `--selftest` use).
///
/// Each timed iteration uses a distinct nonce so the figures average over
/// per-header variance; one warm-up solve per clamp (pages in the ~GB of
/// round buffers, warms caches) is excluded. This measures one solve at a
/// time across the whole rayon pool — i.e. solver latency and its memory
/// behaviour, which is exactly what the planned radix-scatter change targets
/// — not the aggregate multi-group mining rate `--bench` reports.
#[test]
#[ignore]
fn full_solve_baseline() {
use std::time::Instant;
let iters: usize = std::env::var("EQ_BENCH_ITERS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(8);
let clamps: Vec<usize> = std::env::var("EQ_BENCH_CLAMPS")
.ok()
.map(|v| v.split(',').filter_map(|s| s.trim().parse().ok()).collect())
.filter(|v: &Vec<usize>| !v.is_empty())
.unwrap_or_else(|| vec![32]);
eprintln!(
"CPU solver baseline — {iters} timed solve(s)/clamp, clamps={clamps:?}, \
rayon threads={}",
rayon::current_num_threads()
);
for &clamp in &clamps {
// Warm-up (excluded): fault in buffers, warm caches.
let _ = solve_with(&bench_header(0), Some(clamp));
let mut times = Vec::with_capacity(iters);
let mut total_sols = 0usize;
for i in 0..iters {
let header = bench_header(i as u64 + 1);
let base = base_state(&header);
let t = Instant::now();
let sols = solve_with(&header, Some(clamp));
times.push(t.elapsed().as_secs_f64());
for s in &sols {
assert!(is_valid_solution(&base, s), "solver returned an invalid solution");
}
total_sols += sols.len();
}
times.sort_by(|a, b| a.partial_cmp(b).unwrap());
let n = times.len();
let sum: f64 = times.iter().sum();
let mean = 1000.0 * sum / n as f64;
let median = 1000.0 * times[n / 2];
let min = 1000.0 * times[0];
let max = 1000.0 * times[n - 1];
let solves_per_s = n as f64 / sum;
eprintln!(
"clamp={clamp:>3}: {solves_per_s:6.2} solve/s | mean {mean:6.0} ms \
median {median:6.0} ms min {min:6.0} ms max {max:6.0} ms | \
{total_sols} sol over {n} solve(s) ({:.2}/solve)",
total_sols as f64 / n as f64,
);
}
}
}