Plan 9 from Bell Labs’s /usr/web/sources/plan9/sys/src/9/ip/rudp.c

Copyright © 2009 Alcatel-Lucent.
Distributed under the Lucent Public License version 1.02.
Download the Plan 9 distribution.


/*
 *  Reliable User Datagram Protocol, currently only for IPv4.
 *  This protocol is compatible with UDP's packet format.
 *  It could be done over UDP if need be.
 */
#include	"u.h"
#include	"../port/lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"../port/error.h"

#include	"ip.h"

#define DEBUG	0
#define DPRINT if(DEBUG)print

#define SEQDIFF(a,b) ( (a)>=(b)?\
			(a)-(b):\
			0xffffffffUL-((b)-(a)) )
#define INSEQ(a,start,end) ( (start)<=(end)?\
				((a)>(start)&&(a)<=(end)):\
				((a)>(start)||(a)<=(end)) )
#define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
#define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )

enum
{
	UDP_PHDRSIZE	= 12,	/* pseudo header */
//	UDP_HDRSIZE	= 20,	/* pseudo header + udp header */
	UDP_RHDRSIZE	= 36,	/* pseudo header + udp header + rudp header */
	UDP_IPHDR	= 8,	/* ip header */
	IP_UDPPROTO	= 254,
	UDP_USEAD7	= 52,	/* size of new ipv6 headers struct */

	Rudprxms	= 200,
	Rudptickms	= 50,
	Rudpmaxxmit	= 10,
	Maxunacked	= 100,
};

#define Hangupgen	0xffffffff	/* used only in hangup messages */

typedef struct Udphdr Udphdr;
struct Udphdr
{
	/* ip header */
	uchar	vihl;		/* Version and header length */
	uchar	tos;		/* Type of service */
	uchar	length[2];	/* packet length */
	uchar	id[2];		/* Identification */
	uchar	frag[2];	/* Fragment information */

	/* pseudo header starts here */
	uchar	Unused;
	uchar	udpproto;	/* Protocol */
	uchar	udpplen[2];	/* Header plus data length */
	uchar	udpsrc[4];	/* Ip source */
	uchar	udpdst[4];	/* Ip destination */

	/* udp header */
	uchar	udpsport[2];	/* Source port */
	uchar	udpdport[2];	/* Destination port */
	uchar	udplen[2];	/* data length */
	uchar	udpcksum[2];	/* Checksum */
};

typedef struct Rudphdr Rudphdr;
struct Rudphdr
{
	/* ip header */
	uchar	vihl;		/* Version and header length */
	uchar	tos;		/* Type of service */
	uchar	length[2];	/* packet length */
	uchar	id[2];		/* Identification */
	uchar	frag[2];	/* Fragment information */

	/* pseudo header starts here */
	uchar	Unused;
	uchar	udpproto;	/* Protocol */
	uchar	udpplen[2];	/* Header plus data length */
	uchar	udpsrc[4];	/* Ip source */
	uchar	udpdst[4];	/* Ip destination */

	/* udp header */
	uchar	udpsport[2];	/* Source port */
	uchar	udpdport[2];	/* Destination port */
	uchar	udplen[2];	/* data length (includes rudp header) */
	uchar	udpcksum[2];	/* Checksum */

	/* rudp header */
	uchar	relseq[4];	/* id of this packet (or 0) */
	uchar	relsgen[4];	/* generation/time stamp */
	uchar	relack[4];	/* packet being acked (or 0) */
	uchar	relagen[4];	/* generation/time stamp */
};


/*
 *  one state structure per destination
 */
typedef struct Reliable Reliable;
struct Reliable
{
	Ref;

	Reliable *next;

	uchar	addr[IPaddrlen];	/* always V6 when put here */
	ushort	port;

	Block	*unacked;	/* unacked msg list */
	Block	*unackedtail;	/*  and its tail */

	int	timeout;	/* time since first unacked msg sent */
	int	xmits;		/* number of times first unacked msg sent */

	ulong	sndseq;		/* next packet to be sent */
	ulong	sndgen;		/*  and its generation */

	ulong	rcvseq;		/* last packet received */
	ulong	rcvgen;		/*  and its generation */

	ulong	acksent;	/* last ack sent */
	ulong	ackrcvd;	/* last msg for which ack was rcvd */

	/* flow control */
	QLock	lock;
	Rendez	vous;
	int	blocked;
};



/* MIB II counters */
typedef struct Rudpstats Rudpstats;
struct Rudpstats
{
	ulong	rudpInDatagrams;
	ulong	rudpNoPorts;
	ulong	rudpInErrors;
	ulong	rudpOutDatagrams;
};

typedef struct Rudppriv Rudppriv;
struct Rudppriv
{
	Ipht	ht;

	/* MIB counters */
	Rudpstats	ustats;

	/* non-MIB stats */
	ulong	csumerr;		/* checksum errors */
	ulong	lenerr;			/* short packet */
	ulong	rxmits;			/* # of retransmissions */
	ulong	orders;			/* # of out of order pkts */

	/* keeping track of the ack kproc */
	int	ackprocstarted;
	QLock	apl;
};


static ulong generation = 0;
static Rendez rend;

/*
 *  protocol specific part of Conv
 */
typedef struct Rudpcb Rudpcb;
struct Rudpcb
{
	QLock;
	uchar	headers;
	uchar	randdrop;
	Reliable *r;
};

/*
 * local functions
 */
void	relsendack(Conv*, Reliable*, int);
int	reliput(Conv*, Block*, uchar*, ushort);
Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
void	relput(Reliable*);
void	relforget(Conv *, uchar*, int, int);
void	relackproc(void *);
void	relackq(Reliable *, Block*);
void	relhangup(Conv *, Reliable*);
void	relrexmit(Conv *, Reliable*);
void	relput(Reliable*);
void	rudpkick(void *x);

static void
rudpstartackproc(Proto *rudp)
{
	Rudppriv *rpriv;
	char kpname[KNAMELEN];

	rpriv = rudp->priv;
	if(rpriv->ackprocstarted == 0){
		qlock(&rpriv->apl);
		if(rpriv->ackprocstarted == 0){
			snprint(kpname, sizeof kpname, "#I%drudpack",
				rudp->f->dev);
			kproc(kpname, relackproc, rudp);
			rpriv->ackprocstarted = 1;
		}
		qunlock(&rpriv->apl);
	}
}

static char*
rudpconnect(Conv *c, char **argv, int argc)
{
	char *e;
	Rudppriv *upriv;

	upriv = c->p->priv;
	rudpstartackproc(c->p);
	e = Fsstdconnect(c, argv, argc);
	Fsconnected(c, e);
	iphtadd(&upriv->ht, c);

	return e;
}


static int
rudpstate(Conv *c, char *state, int n)
{
	Rudpcb *ucb;
	Reliable *r;
	int m;

	m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
	ucb = (Rudpcb*)c->ptcl;
	qlock(ucb);
	for(r = ucb->r; r; r = r->next)
		m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
	m += snprint(state+m, n-m, "\n");
	qunlock(ucb);
	return m;
}

static char*
rudpannounce(Conv *c, char** argv, int argc)
{
	char *e;
	Rudppriv *upriv;

	upriv = c->p->priv;
	rudpstartackproc(c->p);
	e = Fsstdannounce(c, argv, argc);
	if(e != nil)
		return e;
	Fsconnected(c, nil);
	iphtadd(&upriv->ht, c);

	return nil;
}

static void
rudpcreate(Conv *c)
{
	c->rq = qopen(64*1024, Qmsg, 0, 0);
	c->wq = qopen(64*1024, Qkick, rudpkick, c);
}

static void
rudpclose(Conv *c)
{
	Rudpcb *ucb;
	Reliable *r, *nr;
	Rudppriv *upriv;

	upriv = c->p->priv;
	iphtrem(&upriv->ht, c);

	/* force out any delayed acks */
	ucb = (Rudpcb*)c->ptcl;
	qlock(ucb);
	for(r = ucb->r; r; r = r->next){
		if(r->acksent != r->rcvseq)
			relsendack(c, r, 0);
	}
	qunlock(ucb);

	qclose(c->rq);
	qclose(c->wq);
	qclose(c->eq);
	ipmove(c->laddr, IPnoaddr);
	ipmove(c->raddr, IPnoaddr);
	c->lport = 0;
	c->rport = 0;

	ucb->headers = 0;
	ucb->randdrop = 0;
	qlock(ucb);
	for(r = ucb->r; r; r = nr){
		if(r->acksent != r->rcvseq)
			relsendack(c, r, 0);
		nr = r->next;
		relhangup(c, r);
		relput(r);
	}
	ucb->r = 0;

	qunlock(ucb);
}

/*
 *  randomly don't send packets
 */
static void
doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
{
	Rudpcb *ucb;

	ucb = (Rudpcb*)c->ptcl;
	if(ucb->randdrop && nrand(100) < ucb->randdrop)
		freeblist(bp);
	else
		ipoput4(f, bp, x, ttl, tos, nil);
}

int
flow(void *v)
{
	Reliable *r = v;

	return UNACKED(r) <= Maxunacked;
}

void
rudpkick(void *x)
{
	Conv *c = x;
	Udphdr *uh;
	ushort rport;
	uchar laddr[IPaddrlen], raddr[IPaddrlen];
	Block *bp;
	Rudpcb *ucb;
	Rudphdr *rh;
	Reliable *r;
	int dlen, ptcllen;
	Rudppriv *upriv;
	Fs *f;

	upriv = c->p->priv;
	f = c->p->f;

	netlog(c->p->f, Logrudp, "rudp: kick\n");
	bp = qget(c->wq);
	if(bp == nil)
		return;

	ucb = (Rudpcb*)c->ptcl;
	switch(ucb->headers) {
	case 7:
		/* get user specified addresses */
		bp = pullupblock(bp, UDP_USEAD7);
		if(bp == nil)
			return;
		ipmove(raddr, bp->rp);
		bp->rp += IPaddrlen;
		ipmove(laddr, bp->rp);
		bp->rp += IPaddrlen;
		/* pick interface closest to dest */
		if(ipforme(f, laddr) != Runi)
			findlocalip(f, laddr, raddr);
		bp->rp += IPaddrlen;		/* Ignore ifc address */
		rport = nhgets(bp->rp);
		bp->rp += 2+2;			/* Ignore local port */
		break;
	default:
		ipmove(raddr, c->raddr);
		ipmove(laddr, c->laddr);
		rport = c->rport;
		break;
	}

	dlen = blocklen(bp);

	/* Make space to fit rudp & ip header */
	bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
	if(bp == nil)
		return;

	uh = (Udphdr *)(bp->rp);
	uh->vihl = IP_VER4;

	rh = (Rudphdr*)uh;

	ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
	uh->Unused = 0;
	uh->udpproto = IP_UDPPROTO;
	uh->frag[0] = 0;
	uh->frag[1] = 0;
	hnputs(uh->udpplen, ptcllen);
	switch(ucb->headers){
	case 7:
		v6tov4(uh->udpdst, raddr);
		hnputs(uh->udpdport, rport);
		v6tov4(uh->udpsrc, laddr);
		break;
	default:
		v6tov4(uh->udpdst, c->raddr);
		hnputs(uh->udpdport, c->rport);
		if(ipcmp(c->laddr, IPnoaddr) == 0)
			findlocalip(f, c->laddr, c->raddr);
		v6tov4(uh->udpsrc, c->laddr);
		break;
	}
	hnputs(uh->udpsport, c->lport);
	hnputs(uh->udplen, ptcllen);
	uh->udpcksum[0] = 0;
	uh->udpcksum[1] = 0;

	qlock(ucb);
	r = relstate(ucb, raddr, rport, "kick");
	r->sndseq = NEXTSEQ(r->sndseq);
	hnputl(rh->relseq, r->sndseq);
	hnputl(rh->relsgen, r->sndgen);

	hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
	hnputl(rh->relagen, r->rcvgen);

	if(r->rcvseq != r->acksent)
		r->acksent = r->rcvseq;

	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));

	relackq(r, bp);
	qunlock(ucb);

	upriv->ustats.rudpOutDatagrams++;

	DPRINT("sent: %lud/%lud, %lud/%lud\n",
		r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);

	doipoput(c, f, bp, 0, c->ttl, c->tos);

	if(waserror()) {
		relput(r);
		qunlock(&r->lock);
		nexterror();
	}

	/* flow control of sorts */
	qlock(&r->lock);
	if(UNACKED(r) > Maxunacked){
		r->blocked = 1;
		sleep(&r->vous, flow, r);
		r->blocked = 0;
	}

	qunlock(&r->lock);
	relput(r);
	poperror();
}

void
rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
{
	int len, olen, ottl;
	Udphdr *uh;
	Conv *c;
	Rudpcb *ucb;
	uchar raddr[IPaddrlen], laddr[IPaddrlen];
	ushort rport, lport;
	Rudppriv *upriv;
	Fs *f;
	uchar *p;

	upriv = rudp->priv;
	f = rudp->f;

	upriv->ustats.rudpInDatagrams++;

	uh = (Udphdr*)(bp->rp);

	/* Put back pseudo header for checksum
	 * (remember old values for icmpnoconv())
	 */
	ottl = uh->Unused;
	uh->Unused = 0;
	len = nhgets(uh->udplen);
	olen = nhgets(uh->udpplen);
	hnputs(uh->udpplen, len);

	v4tov6(raddr, uh->udpsrc);
	v4tov6(laddr, uh->udpdst);
	lport = nhgets(uh->udpdport);
	rport = nhgets(uh->udpsport);

	if(nhgets(uh->udpcksum)) {
		if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
			upriv->ustats.rudpInErrors++;
			upriv->csumerr++;
			netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
			DPRINT("rudp: checksum error %I\n", raddr);
			freeblist(bp);
			return;
		}
	}

	qlock(rudp);

	c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
	if(c == nil){
		/* no conversation found */
		upriv->ustats.rudpNoPorts++;
		qunlock(rudp);
		netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
			laddr, lport);
		uh->Unused = ottl;
		hnputs(uh->udpplen, olen);
		icmpnoconv(f, bp);
		freeblist(bp);
		return;
	}
	ucb = (Rudpcb*)c->ptcl;
	qlock(ucb);
	qunlock(rudp);

	if(reliput(c, bp, raddr, rport) < 0){
		qunlock(ucb);
		freeb(bp);
		return;
	}

	/*
	 * Trim the packet down to data size
	 */

	len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
	bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
	if(bp == nil) {
		netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
			raddr, rport, laddr, lport);
		DPRINT("rudp: len err %I.%d -> %I.%d\n",
			raddr, rport, laddr, lport);
		upriv->lenerr++;
		return;
	}

	netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
		raddr, rport, laddr, lport, len);

	switch(ucb->headers){
	case 7:
		/* pass the src address */
		bp = padblock(bp, UDP_USEAD7);
		p = bp->rp;
		ipmove(p, raddr); p += IPaddrlen;
		ipmove(p, laddr); p += IPaddrlen;
		ipmove(p, ifc->lifc->local); p += IPaddrlen;
		hnputs(p, rport); p += 2;
		hnputs(p, lport);
		break;
	default:
		/* connection oriented rudp */
		if(ipcmp(c->raddr, IPnoaddr) == 0){
			/* save the src address in the conversation */
		 	ipmove(c->raddr, raddr);
			c->rport = rport;

			/* reply with the same ip address (if not broadcast) */
			if(ipforme(f, laddr) == Runi)
				ipmove(c->laddr, laddr);
			else
				v4tov6(c->laddr, ifc->lifc->local);
		}
		break;
	}
	if(bp->next)
		bp = concatblock(bp);

	if(qfull(c->rq)) {
		netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
			laddr, lport);
		freeblist(bp);
	}
	else
		qpass(c->rq, bp);

	qunlock(ucb);
}

static char *rudpunknown = "unknown rudp ctl request";

char*
rudpctl(Conv *c, char **f, int n)
{
	Rudpcb *ucb;
	uchar ip[IPaddrlen];
	int x;

	ucb = (Rudpcb*)c->ptcl;
	if(n < 1)
		return rudpunknown;

	if(strcmp(f[0], "headers") == 0){
		ucb->headers = 7;		/* new headers format */
		return nil;
	} else if(strcmp(f[0], "hangup") == 0){
		if(n < 3)
			return "bad syntax";
		if (parseip(ip, f[1]) == -1)
			return Ebadip;
		x = atoi(f[2]);
		qlock(ucb);
		relforget(c, ip, x, 1);
		qunlock(ucb);
		return nil;
	} else if(strcmp(f[0], "randdrop") == 0){
		x = 10;			/* default is 10% */
		if(n > 1)
			x = atoi(f[1]);
		if(x > 100 || x < 0)
			return "illegal rudp drop rate";
		ucb->randdrop = x;
		return nil;
	}
	return rudpunknown;
}

void
rudpadvise(Proto *rudp, Block *bp, char *msg)
{
	Udphdr *h;
	uchar source[IPaddrlen], dest[IPaddrlen];
	ushort psource, pdest;
	Conv *s, **p;

	h = (Udphdr*)(bp->rp);

	v4tov6(dest, h->udpdst);
	v4tov6(source, h->udpsrc);
	psource = nhgets(h->udpsport);
	pdest = nhgets(h->udpdport);

	/* Look for a connection */
	for(p = rudp->conv; *p; p++) {
		s = *p;
		if(s->rport == pdest)
		if(s->lport == psource)
		if(ipcmp(s->raddr, dest) == 0)
		if(ipcmp(s->laddr, source) == 0){
			qhangup(s->rq, msg);
			qhangup(s->wq, msg);
			break;
		}
	}
	freeblist(bp);
}

int
rudpstats(Proto *rudp, char *buf, int len)
{
	Rudppriv *upriv;

	upriv = rudp->priv;
	return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
		upriv->ustats.rudpInDatagrams,
		upriv->ustats.rudpNoPorts,
		upriv->ustats.rudpInErrors,
		upriv->ustats.rudpOutDatagrams,
		upriv->rxmits,
		upriv->orders);
}

void
rudpinit(Fs *fs)
{

	Proto *rudp;

	rudp = smalloc(sizeof(Proto));
	rudp->priv = smalloc(sizeof(Rudppriv));
	rudp->name = "rudp";
	rudp->connect = rudpconnect;
	rudp->announce = rudpannounce;
	rudp->ctl = rudpctl;
	rudp->state = rudpstate;
	rudp->create = rudpcreate;
	rudp->close = rudpclose;
	rudp->rcv = rudpiput;
	rudp->advise = rudpadvise;
	rudp->stats = rudpstats;
	rudp->ipproto = IP_UDPPROTO;
	rudp->nc = 32;
	rudp->ptclsize = sizeof(Rudpcb);

	Fsproto(fs, rudp);
}

/*********************************************/
/* Here starts the reliable helper functions */
/*********************************************/
/*
 *  Enqueue a copy of an unacked block for possible retransmissions
 */
void
relackq(Reliable *r, Block *bp)
{
	Block *np;

	np = copyblock(bp, blocklen(bp));
	if(r->unacked)
		r->unackedtail->list = np;
	else {
		/* restart timer */
		r->timeout = 0;
		r->xmits = 1;
		r->unacked = np;
	}
	r->unackedtail = np;
	np->list = nil;
}

/*
 *  retransmit unacked blocks
 */
void
relackproc(void *a)
{
	Rudpcb *ucb;
	Proto *rudp;
	Reliable *r;
	Conv **s, *c;

	rudp = (Proto *)a;

loop:
	tsleep(&up->sleep, return0, 0, Rudptickms);

	for(s = rudp->conv; *s; s++) {
		c = *s;
		ucb = (Rudpcb*)c->ptcl;
		qlock(ucb);

		for(r = ucb->r; r; r = r->next) {
			if(r->unacked != nil){
				r->timeout += Rudptickms;
				if(r->timeout > Rudprxms*r->xmits)
					relrexmit(c, r);
			}
			if(r->acksent != r->rcvseq)
				relsendack(c, r, 0);
		}
		qunlock(ucb);
	}
	goto loop;
}

/*
 *  get the state record for a conversation
 */
Reliable*
relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
{
	Reliable *r, **l;

	l = &ucb->r;
	for(r = *l; r; r = *l){
		if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
		    port == r->port)
			break;
		l = &r->next;
	}

	/* no state for this addr/port, create some */
	if(r == nil){
		while(generation == 0)
			generation = rand();

		DPRINT("from %s new state %lud for %I!%ud\n",
		        from, generation, addr, port);

		r = smalloc(sizeof(Reliable));
		memmove(r->addr, addr, IPaddrlen);
		r->port = port;
		r->unacked = 0;
		if(generation == Hangupgen)
			generation++;
		r->sndgen = generation++;
		r->sndseq = 0;
		r->ackrcvd = 0;
		r->rcvgen = 0;
		r->rcvseq = 0;
		r->acksent = 0;
		r->xmits = 0;
		r->timeout = 0;
		r->ref = 0;
		incref(r);	/* one reference for being in the list */

		*l = r;
	}

	incref(r);
	return r;
}

void
relput(Reliable *r)
{
	if(decref(r) == 0)
		free(r);
}

/*
 *  forget a Reliable state
 */
void
relforget(Conv *c, uchar *ip, int port, int originator)
{
	Rudpcb *ucb;
	Reliable *r, **l;

	ucb = (Rudpcb*)c->ptcl;

	l = &ucb->r;
	for(r = *l; r; r = *l){
		if(ipcmp(ip, r->addr) == 0 && port == r->port){
			*l = r->next;
			if(originator)
				relsendack(c, r, 1);
			relhangup(c, r);
			relput(r);	/* remove from the list */
			break;
		}
		l = &r->next;
	}
}

/*
 *  process a rcvd reliable packet. return -1 if not to be passed to user process,
 *  0 therwise.
 *
 *  called with ucb locked.
 */
int
reliput(Conv *c, Block *bp, uchar *addr, ushort port)
{
	Block *nbp;
	Rudpcb *ucb;
	Rudppriv *upriv;
	Udphdr *uh;
	Reliable *r;
	Rudphdr *rh;
	ulong seq, ack, sgen, agen, ackreal;
	int rv = -1;

	/* get fields */
	uh = (Udphdr*)(bp->rp);
	rh = (Rudphdr*)uh;
	seq = nhgetl(rh->relseq);
	sgen = nhgetl(rh->relsgen);
	ack = nhgetl(rh->relack);
	agen = nhgetl(rh->relagen);

	upriv = c->p->priv;
	ucb = (Rudpcb*)c->ptcl;
	r = relstate(ucb, addr, port, "input");

	DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
		seq, sgen, ack, agen, r->sndgen);

	/* if acking an incorrect generation, ignore */
	if(ack && agen != r->sndgen)
		goto out;

	/* Look for a hangup */
	if(sgen == Hangupgen) {
		if(agen == r->sndgen)
			relforget(c, addr, port, 0);
		goto out;
	}

	/* make sure we're not talking to a new remote side */
	if(r->rcvgen != sgen){
		if(seq != 0 && seq != 1)
			goto out;

		/* new connection */
		if(r->rcvgen != 0){
			DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
			relhangup(c, r);
		}
		r->rcvgen = sgen;
	}

	/* dequeue acked packets */
	if(ack && agen == r->sndgen){
		ackreal = 0;
		while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
			nbp = r->unacked;
			r->unacked = nbp->list;
			DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
			       ack, agen, r->sndgen);
			freeb(nbp);
			r->ackrcvd = NEXTSEQ(r->ackrcvd);
			ackreal = 1;
		}

		/* flow control */
		if(UNACKED(r) < Maxunacked/8 && r->blocked)
			wakeup(&r->vous);

		/*
		 *  retransmit next packet if the acked packet
		 *  was transmitted more than once
		 */
		if(ackreal && r->unacked != nil){
			r->timeout = 0;
			if(r->xmits > 1){
				r->xmits = 1;
				relrexmit(c, r);
			}
		}

	}

	/* no message or input queue full */
	if(seq == 0 || qfull(c->rq))
		goto out;

	/* refuse out of order delivery */
	if(seq != NEXTSEQ(r->rcvseq)){
		relsendack(c, r, 0);	/* tell him we got it already */
		upriv->orders++;
		DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
		goto out;
	}
	r->rcvseq = seq;

	rv = 0;
out:
	relput(r);
	return rv;
}

void
relsendack(Conv *c, Reliable *r, int hangup)
{
	Udphdr *uh;
	Block *bp;
	Rudphdr *rh;
	int ptcllen;
	Fs *f;

	bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
	if(bp == nil)
		return;
	bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
	f = c->p->f;
	uh = (Udphdr *)(bp->rp);
	uh->vihl = IP_VER4;
	rh = (Rudphdr*)uh;

	ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
	uh->Unused = 0;
	uh->udpproto = IP_UDPPROTO;
	uh->frag[0] = 0;
	uh->frag[1] = 0;
	hnputs(uh->udpplen, ptcllen);

	v6tov4(uh->udpdst, r->addr);
	hnputs(uh->udpdport, r->port);
	hnputs(uh->udpsport, c->lport);
	if(ipcmp(c->laddr, IPnoaddr) == 0)
		findlocalip(f, c->laddr, c->raddr);
	v6tov4(uh->udpsrc, c->laddr);
	hnputs(uh->udplen, ptcllen);

	if(hangup)
		hnputl(rh->relsgen, Hangupgen);
	else
		hnputl(rh->relsgen, r->sndgen);
	hnputl(rh->relseq, 0);
	hnputl(rh->relagen, r->rcvgen);
	hnputl(rh->relack, r->rcvseq);

	if(r->acksent < r->rcvseq)
		r->acksent = r->rcvseq;

	uh->udpcksum[0] = 0;
	uh->udpcksum[1] = 0;
	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));

	DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
	doipoput(c, f, bp, 0, c->ttl, c->tos);
}


/*
 *  called with ucb locked (and c locked if user initiated close)
 */
void
relhangup(Conv *c, Reliable *r)
{
	int n;
	Block *bp;
	char hup[ERRMAX];

	n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
	qproduce(c->eq, hup, n);

	/*
	 *  dump any unacked outgoing messages
	 */
	for(bp = r->unacked; bp != nil; bp = r->unacked){
		r->unacked = bp->list;
		bp->list = nil;
		freeb(bp);
	}

	r->rcvgen = 0;
	r->rcvseq = 0;
	r->acksent = 0;
	if(generation == Hangupgen)
		generation++;
	r->sndgen = generation++;
	r->sndseq = 0;
	r->ackrcvd = 0;
	r->xmits = 0;
	r->timeout = 0;
	wakeup(&r->vous);
}

/*
 *  called with ucb locked
 */
void
relrexmit(Conv *c, Reliable *r)
{
	Rudppriv *upriv;
	Block *np;
	Fs *f;

	upriv = c->p->priv;
	f = c->p->f;
	r->timeout = 0;
	if(r->xmits++ > Rudpmaxxmit){
		relhangup(c, r);
		return;
	}

	upriv->rxmits++;
	np = copyblock(r->unacked, blocklen(r->unacked));
	DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
	doipoput(c, f, np, 0, c->ttl, c->tos);
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2009 Alcatel-Lucent. All Rights Reserved.
Comments to webmaster@plan9.bell-labs.com.