Plan 9 from Bell Labs’s /usr/web/sources/contrib/ericvh/go-plan9/src/pkg/rpc/client.go

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rpc

import (
	"bufio";
	"gob";
	"http";
	"io";
	"log";
	"net";
	"os";
	"sync";
)

// Call represents an active RPC.
type Call struct {
	ServiceMethod	string;		// The name of the service and method to call.
	Args		interface{};	// The argument to the function (*struct).
	Reply		interface{};	// The reply from the function (*struct).
	Error		os.Error;	// After completion, the error status.
	Done		chan *Call;	// Strobes when call is complete; value is the error status.
	seq		uint64;
}

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client.
type Client struct {
	mutex		sync.Mutex;	// protects pending, seq
	shutdown	os.Error;	// non-nil if the client is shut down
	sending		sync.Mutex;
	seq		uint64;
	conn		io.ReadWriteCloser;
	enc		*gob.Encoder;
	dec		*gob.Decoder;
	pending		map[uint64]*Call;
}

func (client *Client) send(c *Call) {
	// Register this call.
	client.mutex.Lock();
	if client.shutdown != nil {
		c.Error = client.shutdown;
		client.mutex.Unlock();
		_ = c.Done <- c;	// do not block
		return;
	}
	c.seq = client.seq;
	client.seq++;
	client.pending[c.seq] = c;
	client.mutex.Unlock();

	// Encode and send the request.
	request := new(Request);
	client.sending.Lock();
	request.Seq = c.seq;
	request.ServiceMethod = c.ServiceMethod;
	client.enc.Encode(request);
	err := client.enc.Encode(c.Args);
	if err != nil {
		panicln("rpc: client encode error:", err.String())
	}
	client.sending.Unlock();
}

func (client *Client) input() {
	var err os.Error;
	for err == nil {
		response := new(Response);
		err = client.dec.Decode(response);
		if err != nil {
			if err == os.EOF {
				err = io.ErrUnexpectedEOF
			}
			break;
		}
		seq := response.Seq;
		client.mutex.Lock();
		c := client.pending[seq];
		client.pending[seq] = c, false;
		client.mutex.Unlock();
		err = client.dec.Decode(c.Reply);
		// Empty strings should turn into nil os.Errors
		if response.Error != "" {
			c.Error = os.ErrorString(response.Error)
		} else {
			c.Error = nil
		}
		// We don't want to block here.  It is the caller's responsibility to make
		// sure the channel has enough buffer space. See comment in Go().
		_ = c.Done <- c;	// do not block
	}
	// Terminate pending calls.
	client.mutex.Lock();
	client.shutdown = err;
	for _, call := range client.pending {
		call.Error = err;
		_ = call.Done <- call;	// do not block
	}
	client.mutex.Unlock();
	log.Stderr("rpc: client protocol error:", err);
}

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
func NewClient(conn io.ReadWriteCloser) *Client {
	client := new(Client);
	client.conn = conn;
	client.enc = gob.NewEncoder(conn);
	client.dec = gob.NewDecoder(conn);
	client.pending = make(map[uint64]*Call);
	go client.input();
	return client;
}

// DialHTTP connects to an HTTP RPC server at the specified network address.
func DialHTTP(network, address string) (*Client, os.Error) {
	conn, err := net.Dial(network, "", address);
	if err != nil {
		return nil, err
	}
	io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n");

	// Require successful HTTP response
	// before switching to RPC protocol.
	resp, err := http.ReadResponse(bufio.NewReader(conn));
	if err == nil && resp.Status == connected {
		return NewClient(conn), nil
	}
	if err == nil {
		err = os.ErrorString("unexpected HTTP response: " + resp.Status)
	}
	conn.Close();
	return nil, &net.OpError{"dial-http", network + " " + address, nil, err};
}

// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, os.Error) {
	conn, err := net.Dial(network, "", address);
	if err != nil {
		return nil, err
	}
	return NewClient(conn), nil;
}

// Go invokes the function asynchronously.  It returns the Call structure representing
// the invocation.  The done channel will signal when the call is complete by returning
// the same Call object.  If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	c := new(Call);
	c.ServiceMethod = serviceMethod;
	c.Args = args;
	c.Reply = reply;
	if done == nil {
		done = make(chan *Call, 10)	// buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel.  If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Crash("rpc: done channel is unbuffered")
		}
	}
	c.Done = done;
	if client.shutdown != nil {
		c.Error = client.shutdown;
		_ = c.Done <- c;	// do not block
		return c;
	}
	client.send(c);
	return c;
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error {
	if client.shutdown != nil {
		return client.shutdown
	}
	call := <-client.Go(serviceMethod, args, reply, nil).Done;
	return call.Error;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.