using System.Reactive.Concurrency; namespace OF_DL.Utils; public class ThrottledStream : Stream { private readonly int maxBytesPerSecond; private readonly Stream parent; private readonly IScheduler scheduler; private readonly bool shouldThrottle; private readonly IStopwatch stopwatch; private long processed; public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler, bool shouldThrottle) { this.shouldThrottle = shouldThrottle; this.maxBytesPerSecond = maxBytesPerSecond; this.parent = parent; this.scheduler = scheduler; stopwatch = scheduler.StartStopwatch(); processed = 0; } public ThrottledStream(Stream parent, int maxBytesPerSecond, bool shouldThrottle) : this(parent, maxBytesPerSecond, Scheduler.Immediate, shouldThrottle) { } public override bool CanRead => parent.CanRead; public override bool CanSeek => parent.CanSeek; public override bool CanWrite => parent.CanWrite; public override long Length => parent.Length; public override long Position { get => parent.Position; set => parent.Position = value; } protected void Throttle(int bytes) { if (!shouldThrottle) { return; } processed += bytes; TimeSpan targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond); TimeSpan actualTime = stopwatch.Elapsed; TimeSpan sleep = targetTime - actualTime; if (sleep > TimeSpan.Zero) { scheduler.Sleep(sleep).GetAwaiter().GetResult(); } } private async Task ThrottleAsync(int bytes) { if (!shouldThrottle) { return; } processed += bytes; TimeSpan targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond); TimeSpan actualTime = stopwatch.Elapsed; TimeSpan sleep = targetTime - actualTime; if (sleep > TimeSpan.Zero) { await Task.Delay(sleep, CancellationToken.None).ConfigureAwait(false); } } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int read = await parent.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); await ThrottleAsync(read).ConfigureAwait(false); return read; } public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { int bytesRead = await parent.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); await ThrottleAsync(bytesRead).ConfigureAwait(false); return bytesRead; } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await ThrottleAsync(count).ConfigureAwait(false); await parent.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); } public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { await ThrottleAsync(buffer.Length).ConfigureAwait(false); await parent.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); } public override void Flush() => parent.Flush(); public override int Read(byte[] buffer, int offset, int count) { int read = parent.Read(buffer, offset, count); Throttle(read); return read; } public override long Seek(long offset, SeekOrigin origin) => parent.Seek(offset, origin); public override void SetLength(long value) => parent.SetLength(value); public override void Write(byte[] buffer, int offset, int count) { Throttle(count); parent.Write(buffer, offset, count); } }