One of the things you’ll see most common when working in Go is usage of the
io.Reader
and io.Writer
interfaces. This makes sense: they’re the workhorse
interfaces for data transfer.
Something that’s bugged me about the io.Reader
interface is that it’s Read()
function is blocking – there’s no way to preempt a read once it’s begun.
Similarly, there’s no way to select
on a read, so asynchronously coordinating
reads from multiple io.Reader
s can be a bit tricky.
The one escape hatch that idiomatic Go gives you is io.ReadCloser
, which in
many cases does allow you to preempt a read. Calling Close()
on the reader
will (in some implementations) cancel pending reads. However, you can only do
this once; after a reader is closed, subsequent reads should fail.
There are many places in an application that I work on that do something effectively like this:
func asyncReader(r io.Reader) {
ch := make(chan []byte)
go func() {
defer close(ch)
for {
var b []byte
// Point of no return
if _, err := r.Read(b); err != nil {
return
}
ch <- b
}
}()
select {
case data <- ch:
doSomething(data)
//...
// Other asynchronous cases
//...
}
}
We use a pattern like the above when it’s necessary to intermix a Read()
with
some other asynchronous data source, like a gRPC stream. You spin off a
goroutine that reads indefinitely from the reader, sends the data it receives on
a channel, and performs some cleanup once the reader closes.
The tricky part comes when you’re trying to cleanup this whole system. What if we want to send a signal that ends this process? Consider an altered example:
func asyncReader(r io.Reader, doneCh <-chan struct{}) {
ch := make(chan []byte)
continueReading := false
go func() {
defer close(ch)
for !continueReading {
var b []byte
// Point of no return
if _, err := r.Read(b); err != nil {
return
}
ch <- b
}
}()
select {
case data <- ch:
doSomething(data)
case <-doneCh:
continueReading = false
return
}
}
When doneCh
is closed above, the function asyncReader
will return. The
goroutine we created will also return the next time it evaluates the condition
in the for
loop. But, what if the goroutine is blocking on r.Read()
? Then,
we essentially have leaked a goroutine. We’re stuck until the reader unblocks.
Is it likely that the reader will block forever? Maybe, maybe not. It depends on the underlying reader. Crucially, you have no guarantee from the interface that it will ever unblock, so the possibility of a leaked goroutine remains.
You’re pretty much stuck at this point if you only have an io.Reader
interface. If you can switch to io.ReadCloser
(and you control the behavior of
Close()
), you can use that as a side-channel to cancel the final read. But,
that’s about it.
This is frustrating, and it made me wonder why something like the following interface isn’t more common:
interface PreemptibleReader {
Read(ctx context.Context, p []byte) (n int, err error)
}
If Read()
took a context argument, then preempting reads would be possible.
Why io.Reader
Blocks
Taking a step back, it’s worth pondering for a moment why reads are not
preemptible by default. Recall the interface for io.Reader
:
interface Reader {
Read(p []byte) (n int, err error)
}
What does this actually mean?1 In effect, the interface stipulates: “here,
take a byte slice p
and write something to it. Tell me how many bytes you
wrote (n
), and if there was an error encountered somewhere in the process
(err
)”.
This interface is incredibly generic, as it needs to accommodate a variety of
uses. Everything from an in-memory buffer to a HTTP response body to a database
transaction result can be implemented as an io.Reader
. As such, many
io.Reader
implementations will have internal state.
Modifications to p
are not atomic either. If you were to cancel a read at some
arbitrary point in its execution, you could be left with inconsistent state –
both in the target byte slice and in the reader’s internal state, if applicable.
All of this leads to a bunch of complexity if we were to permit arbitrarily preemtible reads. What happens if the process is half-way through altering the byte slice? Does the reader need to zero out the byte slice? What if the reader was about to emit an error and was doing some cleanup before doing so?
Preemption makes things… messy. Granted, there are ways of allowing “graceful cleanup”. Contexts are often used when we want to cancel an operation in Go – such as network requests. Canceling a context doesn’t give you the guarantee that the operation will be terminated immediately – so there’s room for some bookkeeping and cleanup.
I think the thing to remember is that io.Reader
is designed to work almost
anywhere. As such, especially when the actual work done by an io.Reader
is
more akin to a memcpy
than an HTTP request, the amount of overhead needed to
manage contexts and internal bookkeeping would likely introduce nasty
performance problems.
What about Close()
?
As mentioned above, Close()
can be used as a mechanism to cancel a pending
Read()
. However, there are still some cases where it’d be nice to preempt a
read without closing the underlying reader.
For example, consider a program reading from os.Stdin
. If you wanted to
preempt an in-progress read from stdin
using Close()
, then you’d, well, be
closing stdin
. This isn’t great, because once stdin
is closed, you can’t
then re-open it and begin reading again.
Since Close()
is usually a one-time-only action, it’s not a great candidate
for preempting reads in the general case. Close()
is better categorized as a
cleanup method, not a preemption signal.
How You Can Get Around This Anyways
Given the caveats above, there are still ways to effectively cancel an
in-flight read. There’s no getting around the interface limitations of
io.Reader
, but you can smooth things over for the users of the interface.
type CancelableReader struct {
ctx context.Context
data chan []byte
err error
r io.Reader
}
func (c *CancelableReader) begin() {
buf := make([]byte, 1024)
for {
n, err := c.r.Read(buf)
if n > 0 {
tmp := make([]byte, n)
copy(tmp, buf[:n])
c.data <- tmp
}
if err != nil {
c.err = err
close(c.data)
return
}
}
}
func (c *CancelableReader) Read(p []byte) (int, error) {
select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
case d, ok := <-c.data:
if !ok {
return 0, c.err
}
copy(p, d)
return len(d), nil
}
}
func New(ctx context.Context, r io.Reader) *CancelableReader {
c := &CancelableReader{
r: r,
ctx: ctx,
data: make(chan []byte),
}
go c.begin()
return c
}
The above is a wrapper for an io.Reader
that takes a context.Context
in its
constructor. When the context is canceled, any in-flight reads immediately
return. With a bit of fiddling, the above can also be adapted to play nicely
with an io.ReadCloser
.
There’s a huge asterisk on this CancelableReader
wrapper: it still has the
goroutine leak. If the underlying io.Reader
never returns, then the goroutine
in begin()
will never get cleaned up.
At least with this approach, it is a bit clearer where this leak occurs, and you
could store some additional state on the struct to track if the goroutine has
finished. Potentially, you could make a pool of these CancelableReaders
and
recycle them if/when the final read returns.
This doesn’t feel like a satisfying conclusion. I’m curious if there are better
practices for preempting reads. Maybe the answer is “add enough wrapper layers
until the problem goes away”? Maybe the answer is “know something about the
underlying reader implementation”? – For example, using
syscall.SetNonblock on stdin
.
If anyone has a better approach, I’d love to hear it! 😃
Update: Discussion on Reddit, lobste.rs
Update 3/2021: Thanks to Reed for pointing out that the inner loop of
CancelableReader.begin
needs to handle non-nil errors differently when
n > 0
.
-
One thing I like about the simplicity of Go is that it often leads you down these tangential existential questions like, “what does it mean to read from a blocking interface?” 😜 ↩︎