Plan 9 from Bell Labs’s /usr/web/sources/contrib/ericvh/go-plan9/src/pkg/io/pipe.go

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Pipe adapter to connect code expecting an io.Read
// with code expecting an io.Write.

package io

import (
	"os";
	"sync";
)

type pipeReturn struct {
	n	int;
	err	os.Error;
}

// Shared pipe structure.
type pipe struct {
	rclosed	bool;			// Read end closed?
	rerr	os.Error;		// Error supplied to CloseReader
	wclosed	bool;			// Write end closed?
	werr	os.Error;		// Error supplied to CloseWriter
	wpend	[]byte;			// Written data waiting to be read.
	wtot	int;			// Bytes consumed so far in current write.
	cr	chan []byte;		// Write sends data here...
	cw	chan pipeReturn;	// ... and reads the n, err back from here.
}

func (p *pipe) Read(data []byte) (n int, err os.Error) {
	if p == nil || p.rclosed {
		return 0, os.EINVAL
	}

	// Wait for next write block if necessary.
	if p.wpend == nil {
		if !p.wclosed {
			p.wpend = <-p.cr
		}
		if p.wpend == nil {
			return 0, p.werr
		}
		p.wtot = 0;
	}

	// Read from current write block.
	n = len(data);
	if n > len(p.wpend) {
		n = len(p.wpend)
	}
	for i := 0; i < n; i++ {
		data[i] = p.wpend[i]
	}
	p.wtot += n;
	p.wpend = p.wpend[n:];

	// If write block is done, finish the write.
	if len(p.wpend) == 0 {
		p.wpend = nil;
		p.cw <- pipeReturn{p.wtot, nil};
		p.wtot = 0;
	}

	return n, nil;
}

func (p *pipe) Write(data []byte) (n int, err os.Error) {
	if p == nil || p.wclosed {
		return 0, os.EINVAL
	}
	if p.rclosed {
		return 0, p.rerr
	}

	// Send data to reader.
	p.cr <- data;

	// Wait for reader to finish copying it.
	res := <-p.cw;
	return res.n, res.err;
}

func (p *pipe) CloseReader(rerr os.Error) os.Error {
	if p == nil || p.rclosed {
		return os.EINVAL
	}

	// Stop any future writes.
	p.rclosed = true;
	if rerr == nil {
		rerr = os.EPIPE
	}
	p.rerr = rerr;

	// Stop the current write.
	if !p.wclosed {
		p.cw <- pipeReturn{p.wtot, rerr}
	}

	return nil;
}

func (p *pipe) CloseWriter(werr os.Error) os.Error {
	if werr == nil {
		werr = os.EOF
	}
	if p == nil || p.wclosed {
		return os.EINVAL
	}

	// Stop any future reads.
	p.wclosed = true;
	p.werr = werr;

	// Stop the current read.
	if !p.rclosed {
		p.cr <- nil
	}

	return nil;
}

// Read/write halves of the pipe.
// They are separate structures for two reasons:
//  1.  If one end becomes garbage without being Closed,
//      its finisher can Close so that the other end
//      does not hang indefinitely.
//  2.  Clients cannot use interface conversions on the
//      read end to find the Write method, and vice versa.

// A PipeReader is the read half of a pipe.
type PipeReader struct {
	lock	sync.Mutex;
	p	*pipe;
}

// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.Read(data);
}

// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(nil);
}

// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(rerr);
}

func (r *PipeReader) finish()	{ r.Close() }

// Write half of pipe.
type PipeWriter struct {
	lock	sync.Mutex;
	p	*pipe;
}

// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.Write(data);
}

// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(nil);
}

// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(werr);
}

func (w *PipeWriter) finish()	{ w.Close() }

// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
	p := new(pipe);
	p.cr = make(chan []byte, 1);
	p.cw = make(chan pipeReturn, 1);
	r := new(PipeReader);
	r.p = p;
	w := new(PipeWriter);
	w.p = p;
	return r, w;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.