[PATCH] PATCH 9/16: NFSD: RPC init tidyup
[opensuse:kernel.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP reconnect handling (when finished).
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/types.h>
27 #include <linux/mm.h>
28 #include <linux/slab.h>
29 #include <linux/in.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33
34 #include <linux/nfs.h>
35
36
37 #define RPC_SLACK_SPACE         512     /* total overkill */
38
39 #ifdef RPC_DEBUG
40 # define RPCDBG_FACILITY        RPCDBG_CALL
41 #endif
42
43 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
44
45
46 static void     call_reserve(struct rpc_task *task);
47 static void     call_reserveresult(struct rpc_task *task);
48 static void     call_allocate(struct rpc_task *task);
49 static void     call_encode(struct rpc_task *task);
50 static void     call_decode(struct rpc_task *task);
51 static void     call_bind(struct rpc_task *task);
52 static void     call_transmit(struct rpc_task *task);
53 static void     call_status(struct rpc_task *task);
54 static void     call_refresh(struct rpc_task *task);
55 static void     call_refreshresult(struct rpc_task *task);
56 static void     call_timeout(struct rpc_task *task);
57 static void     call_reconnect(struct rpc_task *task);
58 static void     child_reconnect(struct rpc_task *);
59 static void     child_reconnect_status(struct rpc_task *);
60 static u32 *    call_header(struct rpc_task *task);
61 static u32 *    call_verify(struct rpc_task *task);
62
63
64 /*
65  * Create an RPC client
66  * FIXME: This should also take a flags argument (as in task->tk_flags).
67  * It's called (among others) from pmap_create_client, which may in
68  * turn be called by an async task. In this case, rpciod should not be
69  * made to sleep too long.
70  */
71 struct rpc_clnt *
72 rpc_create_client(struct rpc_xprt *xprt, char *servname,
73                   struct rpc_program *program, u32 vers, int flavor)
74 {
75         struct rpc_version      *version;
76         struct rpc_clnt         *clnt = NULL;
77
78         dprintk("RPC: creating %s client for %s (xprt %p)\n",
79                 program->name, servname, xprt);
80
81         if (!xprt)
82                 goto out;
83         if (vers >= program->nrvers || !(version = program->version[vers]))
84                 goto out;
85
86         clnt = (struct rpc_clnt *) rpc_allocate(0, sizeof(*clnt));
87         if (!clnt)
88                 goto out_no_clnt;
89         memset(clnt, 0, sizeof(*clnt));
90         atomic_set(&clnt->cl_users, 0);
91
92         clnt->cl_xprt     = xprt;
93         clnt->cl_procinfo = version->procs;
94         clnt->cl_maxproc  = version->nrprocs;
95         clnt->cl_server   = servname;
96         clnt->cl_protname = program->name;
97         clnt->cl_port     = xprt->addr.sin_port;
98         clnt->cl_prog     = program->number;
99         clnt->cl_vers     = version->number;
100         clnt->cl_prot     = xprt->prot;
101         clnt->cl_stats    = program->stats;
102         INIT_RPC_WAITQ(&clnt->cl_bindwait, "bindwait");
103
104         if (!clnt->cl_port)
105                 clnt->cl_autobind = 1;
106
107         if (!rpcauth_create(flavor, clnt))
108                 goto out_no_auth;
109
110         /* save the nodename */
111         clnt->cl_nodelen = strlen(system_utsname.nodename);
112         if (clnt->cl_nodelen > UNX_MAXNODENAME)
113                 clnt->cl_nodelen = UNX_MAXNODENAME;
114         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
115 out:
116         return clnt;
117
118 out_no_clnt:
119         printk(KERN_INFO "RPC: out of memory in rpc_create_client\n");
120         goto out;
121 out_no_auth:
122         printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %d)\n",
123                 flavor);
124         rpc_free(clnt);
125         clnt = NULL;
126         goto out;
127 }
128
129 /*
130  * Properly shut down an RPC client, terminating all outstanding
131  * requests. Note that we must be certain that cl_oneshot and
132  * cl_dead are cleared, or else the client would be destroyed
133  * when the last task releases it.
134  */
135 int
136 rpc_shutdown_client(struct rpc_clnt *clnt)
137 {
138         dprintk("RPC: shutting down %s client for %s\n",
139                 clnt->cl_protname, clnt->cl_server);
140         while (atomic_read(&clnt->cl_users)) {
141 #ifdef RPC_DEBUG
142                 dprintk("RPC: rpc_shutdown_client: client %s, tasks=%d\n",
143                         clnt->cl_protname, atomic_read(&clnt->cl_users));
144 #endif
145                 /* Don't let rpc_release_client destroy us */
146                 clnt->cl_oneshot = 0;
147                 clnt->cl_dead = 0;
148                 rpc_killall_tasks(clnt);
149                 sleep_on_timeout(&destroy_wait, 1*HZ);
150         }
151         return rpc_destroy_client(clnt);
152 }
153
154 /*
155  * Delete an RPC client
156  */
157 int
158 rpc_destroy_client(struct rpc_clnt *clnt)
159 {
160         dprintk("RPC: destroying %s client for %s\n",
161                         clnt->cl_protname, clnt->cl_server);
162
163         if (clnt->cl_auth) {
164                 rpcauth_destroy(clnt->cl_auth);
165                 clnt->cl_auth = NULL;
166         }
167         if (clnt->cl_xprt) {
168                 xprt_destroy(clnt->cl_xprt);
169                 clnt->cl_xprt = NULL;
170         }
171         rpc_free(clnt);
172         return 0;
173 }
174
175 /*
176  * Release an RPC client
177  */
178 void
179 rpc_release_client(struct rpc_clnt *clnt)
180 {
181         dprintk("RPC:      rpc_release_client(%p, %d)\n",
182                                 clnt, atomic_read(&clnt->cl_users));
183
184         if (!atomic_dec_and_test(&clnt->cl_users))
185                 return;
186         wake_up(&destroy_wait);
187         if (clnt->cl_oneshot || clnt->cl_dead)
188                 rpc_destroy_client(clnt);
189 }
190
191 /*
192  * Default callback for async RPC calls
193  */
194 static void
195 rpc_default_callback(struct rpc_task *task)
196 {
197 }
198
199 /*
200  *      Export the signal mask handling for aysnchronous code that
201  *      sleeps on RPC calls
202  */
203  
204 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
205 {
206         unsigned long   sigallow = sigmask(SIGKILL);
207         unsigned long   irqflags;
208         
209         /* Turn off various signals */
210         if (clnt->cl_intr) {
211                 struct k_sigaction *action = current->sig->action;
212                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
213                         sigallow |= sigmask(SIGINT);
214                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
215                         sigallow |= sigmask(SIGQUIT);
216         }
217         spin_lock_irqsave(&current->sigmask_lock, irqflags);
218         *oldset = current->blocked;
219         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
220         recalc_sigpending();
221         spin_unlock_irqrestore(&current->sigmask_lock, irqflags);
222 }
223
224 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
225 {
226         unsigned long   irqflags;
227         
228         spin_lock_irqsave(&current->sigmask_lock, irqflags);
229         current->blocked = *oldset;
230         recalc_sigpending();
231         spin_unlock_irqrestore(&current->sigmask_lock, irqflags);
232 }
233
234 /*
235  * New rpc_call implementation
236  */
237 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
238 {
239         struct rpc_task my_task, *task = &my_task;
240         sigset_t        oldset;
241         int             status;
242
243         /* If this client is slain all further I/O fails */
244         if (clnt->cl_dead) 
245                 return -EIO;
246
247         if (flags & RPC_TASK_ASYNC) {
248                 printk("rpc_call_sync: Illegal flag combination for synchronous task\n");
249                 flags &= ~RPC_TASK_ASYNC;
250         }
251
252         rpc_clnt_sigmask(clnt, &oldset);                
253
254         /* Create/initialize a new RPC task */
255         rpc_init_task(task, clnt, NULL, flags);
256         rpc_call_setup(task, msg, 0);
257
258         /* Set up the call info struct and execute the task */
259         if (task->tk_status == 0)
260                 status = rpc_execute(task);
261         else {
262                 status = task->tk_status;
263                 rpc_release_task(task);
264         }
265
266         rpc_clnt_sigunmask(clnt, &oldset);              
267
268         return status;
269 }
270
271 /*
272  * New rpc_call implementation
273  */
274 int
275 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
276                rpc_action callback, void *data)
277 {
278         struct rpc_task *task;
279         sigset_t        oldset;
280         int             status;
281
282         /* If this client is slain all further I/O fails */
283         if (clnt->cl_dead) 
284                 return -EIO;
285
286         flags |= RPC_TASK_ASYNC;
287
288         rpc_clnt_sigmask(clnt, &oldset);                
289
290         /* Create/initialize a new RPC task */
291         if (!callback)
292                 callback = rpc_default_callback;
293         status = -ENOMEM;
294         if (!(task = rpc_new_task(clnt, callback, flags)))
295                 goto out;
296         task->tk_calldata = data;
297
298         rpc_call_setup(task, msg, 0);
299
300         /* Set up the call info struct and execute the task */
301         if (task->tk_status == 0)
302                 status = rpc_execute(task);
303         else {
304                 status = task->tk_status;
305                 rpc_release_task(task);
306         }
307
308 out:
309         rpc_clnt_sigunmask(clnt, &oldset);              
310
311         return status;
312 }
313
314
315 void
316 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
317 {
318         task->tk_msg   = *msg;
319         task->tk_flags |= flags;
320         /* Bind the user cred */
321         if (task->tk_msg.rpc_cred != NULL) {
322                 rpcauth_holdcred(task);
323         } else
324                 rpcauth_bindcred(task);
325
326         if (task->tk_status == 0)
327                 task->tk_action = call_reserve;
328         else
329                 task->tk_action = NULL;
330
331         /* Increment call count */
332         if (task->tk_msg.rpc_proc < task->tk_client->cl_maxproc)
333                 rpcproc_count(task->tk_client, task->tk_msg.rpc_proc)++;
334 }
335
336 /*
337  * Restart an (async) RPC call. Usually called from within the
338  * exit handler.
339  */
340 void
341 rpc_restart_call(struct rpc_task *task)
342 {
343         if (RPC_ASSASSINATED(task))
344                 return;
345
346         task->tk_action = call_reserve;
347         rpcproc_count(task->tk_client, task->tk_msg.rpc_proc)++;
348 }
349
350 /*
351  * 1.   Reserve an RPC call slot
352  */
353 static void
354 call_reserve(struct rpc_task *task)
355 {
356         struct rpc_clnt *clnt = task->tk_client;
357
358         if (task->tk_msg.rpc_proc > clnt->cl_maxproc) {
359                 printk(KERN_WARNING "%s (vers %d): bad procedure number %d\n",
360                         clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc);
361                 rpc_exit(task, -EIO);
362                 return;
363         }
364
365         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
366         if (!rpcauth_uptodatecred(task)) {
367                 task->tk_action = call_refresh;
368                 return;
369         }
370
371         task->tk_status  = 0;
372         task->tk_action  = call_reserveresult;
373         task->tk_timeout = clnt->cl_timeout.to_resrvval;
374         clnt->cl_stats->rpccnt++;
375         xprt_reserve(task);
376 }
377
378 /*
379  * 1b.  Grok the result of xprt_reserve()
380  */
381 static void
382 call_reserveresult(struct rpc_task *task)
383 {
384         int status = task->tk_status;
385
386         dprintk("RPC: %4d call_reserveresult (status %d)\n",
387                                 task->tk_pid, task->tk_status);
388         /*
389          * After a call to xprt_reserve(), we must have either
390          * a request slot or else an error status.
391          */
392         if ((task->tk_status >= 0 && !task->tk_rqstp) ||
393             (task->tk_status < 0 && task->tk_rqstp))
394                 printk(KERN_ERR "call_reserveresult: status=%d, request=%p??\n",
395                  task->tk_status, task->tk_rqstp);
396
397         if (task->tk_status >= 0) {
398                 task->tk_action = call_allocate;
399                 return;
400         }
401
402         task->tk_status = 0;
403         switch (status) {
404         case -EAGAIN:
405         case -ENOBUFS:
406                 task->tk_timeout = task->tk_client->cl_timeout.to_resrvval;
407                 task->tk_action = call_reserve;
408                 break;
409         case -ETIMEDOUT:
410                 dprintk("RPC: task timed out\n");
411                 task->tk_action = call_timeout;
412                 break;
413         default:
414                 if (!task->tk_rqstp) {
415                         printk(KERN_INFO "RPC: task has no request, exit EIO\n");
416                         rpc_exit(task, -EIO);
417                 } else
418                         rpc_exit(task, status);
419         }
420 }
421
422 /*
423  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
424  *      (Note: buffer memory is freed in rpc_task_release).
425  */
426 static void
427 call_allocate(struct rpc_task *task)
428 {
429         struct rpc_clnt *clnt = task->tk_client;
430         unsigned int    bufsiz;
431
432         dprintk("RPC: %4d call_allocate (status %d)\n", 
433                                 task->tk_pid, task->tk_status);
434         task->tk_action = call_encode;
435         if (task->tk_buffer)
436                 return;
437
438         /* FIXME: compute buffer requirements more exactly using
439          * auth->au_wslack */
440         bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc) + RPC_SLACK_SPACE;
441
442         if ((task->tk_buffer = rpc_malloc(task, bufsiz << 1)) != NULL)
443                 return;
444         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
445
446         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
447                 xprt_release(task);
448                 task->tk_action = call_reserve;
449                 rpc_delay(task, HZ>>4);
450                 return;
451         }
452
453         rpc_exit(task, -ERESTARTSYS);
454 }
455
456 /*
457  * 3.   Encode arguments of an RPC call
458  */
459 static void
460 call_encode(struct rpc_task *task)
461 {
462         struct rpc_clnt *clnt = task->tk_client;
463         struct rpc_rqst *req = task->tk_rqstp;
464         unsigned int    bufsiz;
465         kxdrproc_t      encode;
466         int             status;
467         u32             *p;
468
469         dprintk("RPC: %4d call_encode (status %d)\n", 
470                                 task->tk_pid, task->tk_status);
471
472         task->tk_action = call_bind;
473
474         /* Default buffer setup */
475         bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc)+RPC_SLACK_SPACE;
476         req->rq_svec[0].iov_base = (void *)task->tk_buffer;
477         req->rq_svec[0].iov_len  = bufsiz;
478         req->rq_slen             = 0;
479         req->rq_snr              = 1;
480         req->rq_rvec[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
481         req->rq_rvec[0].iov_len  = bufsiz;
482         req->rq_rlen             = bufsiz;
483         req->rq_rnr              = 1;
484
485         /* Zero buffer so we have automatic zero-padding of opaque & string */
486         memset(task->tk_buffer, 0, bufsiz);
487
488         /* Encode header and provided arguments */
489         encode = rpcproc_encode(clnt, task->tk_msg.rpc_proc);
490         if (!(p = call_header(task))) {
491                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
492                 rpc_exit(task, -EIO);
493         } else
494         if (encode && (status = encode(req, p, task->tk_msg.rpc_argp)) < 0) {
495                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
496                                 clnt->cl_protname, -status);
497                 rpc_exit(task, status);
498         }
499 }
500
501 /*
502  * 4.   Get the server port number if not yet set
503  */
504 static void
505 call_bind(struct rpc_task *task)
506 {
507         struct rpc_clnt *clnt = task->tk_client;
508         struct rpc_xprt *xprt = clnt->cl_xprt;
509
510         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_reconnect;
511
512         if (!clnt->cl_port) {
513                 task->tk_action = call_reconnect;
514                 task->tk_timeout = clnt->cl_timeout.to_maxval;
515                 rpc_getport(task, clnt);
516         }
517 }
518
519 /*
520  * 4a.  Reconnect to the RPC server (TCP case)
521  */
522 static void
523 call_reconnect(struct rpc_task *task)
524 {
525         struct rpc_clnt *clnt = task->tk_client;
526         struct rpc_task *child;
527
528         dprintk("RPC: %4d call_reconnect status %d\n",
529                                 task->tk_pid, task->tk_status);
530
531         task->tk_action = call_transmit;
532         if (task->tk_status < 0 || !clnt->cl_xprt->stream)
533                 return;
534
535         /* Run as a child to ensure it runs as an rpciod task */
536         child = rpc_new_child(clnt, task);
537         if (child) {
538                 child->tk_action = child_reconnect;
539                 rpc_run_child(task, child, NULL);
540         }
541 }
542
543 static void child_reconnect(struct rpc_task *task)
544 {
545         task->tk_client->cl_stats->netreconn++;
546         task->tk_status = 0;
547         task->tk_action = child_reconnect_status;
548         xprt_reconnect(task);
549 }
550
551 static void child_reconnect_status(struct rpc_task *task)
552 {
553         if (task->tk_status == -EAGAIN)
554                 task->tk_action = child_reconnect;
555         else
556                 task->tk_action = NULL;
557 }
558
559 /*
560  * 5.   Transmit the RPC request, and wait for reply
561  */
562 static void
563 call_transmit(struct rpc_task *task)
564 {
565         struct rpc_clnt *clnt = task->tk_client;
566
567         dprintk("RPC: %4d call_transmit (status %d)\n", 
568                                 task->tk_pid, task->tk_status);
569
570         task->tk_action = call_status;
571         if (task->tk_status < 0)
572                 return;
573         xprt_transmit(task);
574         if (!rpcproc_decode(clnt, task->tk_msg.rpc_proc)) {
575                 task->tk_action = NULL;
576                 rpc_wake_up_task(task);
577         }
578 }
579
580 /*
581  * 6.   Sort out the RPC call status
582  */
583 static void
584 call_status(struct rpc_task *task)
585 {
586         struct rpc_clnt *clnt = task->tk_client;
587         struct rpc_xprt *xprt = clnt->cl_xprt;
588         struct rpc_rqst *req;
589         int             status = task->tk_status;
590
591         dprintk("RPC: %4d call_status (status %d)\n", 
592                                 task->tk_pid, task->tk_status);
593
594         if (status >= 0) {
595                 task->tk_action = call_decode;
596                 return;
597         }
598
599         task->tk_status = 0;
600         req = task->tk_rqstp;
601         switch(status) {
602         case -ETIMEDOUT:
603                 task->tk_action = call_timeout;
604                 break;
605         case -ECONNREFUSED:
606         case -ENOTCONN:
607                 req->rq_bytes_sent = 0;
608                 if (clnt->cl_autobind || !clnt->cl_port) {
609                         clnt->cl_port = 0;
610                         task->tk_action = call_bind;
611                         break;
612                 }
613                 if (xprt->stream) {
614                         task->tk_action = call_reconnect;
615                         break;
616                 }
617                 /*
618                  * Sleep and dream of an open connection
619                  */
620                 task->tk_timeout = 5 * HZ;
621                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
622         case -ENOMEM:
623         case -EAGAIN:
624                 task->tk_action = call_transmit;
625                 clnt->cl_stats->rpcretrans++;
626                 break;
627         default:
628                 if (clnt->cl_chatty)
629                         printk("%s: RPC call returned error %d\n",
630                                clnt->cl_protname, -status);
631                 rpc_exit(task, status);
632         }
633 }
634
635 /*
636  * 6a.  Handle RPC timeout
637  *      We do not release the request slot, so we keep using the
638  *      same XID for all retransmits.
639  */
640 static void
641 call_timeout(struct rpc_task *task)
642 {
643         struct rpc_clnt *clnt = task->tk_client;
644         struct rpc_rqst *req = task->tk_rqstp;
645
646         if (req) {
647                 struct rpc_timeout *to = &req->rq_timeout;
648
649                 if (xprt_adjust_timeout(to)) {
650                         dprintk("RPC: %4d call_timeout (minor timeo)\n",
651                                 task->tk_pid);
652                         goto minor_timeout;
653                 }
654                 to->to_retries = clnt->cl_timeout.to_retries;
655         }
656
657         dprintk("RPC: %4d call_timeout (major timeo)\n", task->tk_pid);
658         if (clnt->cl_softrtry) {
659                 if (clnt->cl_chatty && !task->tk_exit)
660                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
661                                 clnt->cl_protname, clnt->cl_server);
662                 rpc_exit(task, -EIO);
663                 return;
664         }
665         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
666                 task->tk_flags |= RPC_CALL_MAJORSEEN;
667                 if (req)
668                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
669                                 clnt->cl_protname, clnt->cl_server);
670 #ifdef RPC_DEBUG                                
671                 else
672                         printk(KERN_NOTICE "%s: task %d can't get a request slot\n",
673                                 clnt->cl_protname, task->tk_pid);
674 #endif                          
675         }
676         if (clnt->cl_autobind)
677                 clnt->cl_port = 0;
678
679 minor_timeout:
680         if (!req)
681                 task->tk_action = call_reserve;
682         else if (!clnt->cl_port) {
683                 task->tk_action = call_bind;
684                 clnt->cl_stats->rpcretrans++;
685         } else if (!xprt_connected(clnt->cl_xprt)) {
686                 task->tk_action = call_reconnect;
687                 clnt->cl_stats->rpcretrans++;
688         } else {
689                 task->tk_action = call_transmit;
690                 clnt->cl_stats->rpcretrans++;
691         }
692         task->tk_status = 0;
693 }
694
695 /*
696  * 7.   Decode the RPC reply
697  */
698 static void
699 call_decode(struct rpc_task *task)
700 {
701         struct rpc_clnt *clnt = task->tk_client;
702         struct rpc_rqst *req = task->tk_rqstp;
703         kxdrproc_t      decode = rpcproc_decode(clnt, task->tk_msg.rpc_proc);
704         u32             *p;
705
706         dprintk("RPC: %4d call_decode (status %d)\n", 
707                                 task->tk_pid, task->tk_status);
708
709         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
710                 printk(KERN_NOTICE "%s: server %s OK\n",
711                         clnt->cl_protname, clnt->cl_server);
712                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
713         }
714
715         if (task->tk_status < 12) {
716                 if (!clnt->cl_softrtry) {
717                         task->tk_action = call_transmit;
718                         clnt->cl_stats->rpcretrans++;
719                 } else {
720                         printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
721                                 clnt->cl_protname, task->tk_status);
722                         rpc_exit(task, -EIO);
723                 }
724                 return;
725         }
726
727         /* Verify the RPC header */
728         if (!(p = call_verify(task)))
729                 return;
730
731         /*
732          * The following is an NFS-specific hack to cater for setuid
733          * processes whose uid is mapped to nobody on the server.
734          */
735         if (task->tk_client->cl_droppriv && 
736             (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) {
737                 if (RPC_IS_SETUID(task) && task->tk_suid_retry) {
738                         dprintk("RPC: %4d retry squashed uid\n", task->tk_pid);
739                         task->tk_flags ^= RPC_CALL_REALUID;
740                         task->tk_action = call_encode;
741                         task->tk_suid_retry--;
742                         return;
743                 }
744         }
745
746         task->tk_action = NULL;
747
748         if (decode)
749                 task->tk_status = decode(req, p, task->tk_msg.rpc_resp);
750         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
751                                         task->tk_status);
752 }
753
754 /*
755  * 8.   Refresh the credentials if rejected by the server
756  */
757 static void
758 call_refresh(struct rpc_task *task)
759 {
760         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
761
762         xprt_release(task);     /* Must do to obtain new XID */
763         task->tk_action = call_refreshresult;
764         task->tk_status = 0;
765         task->tk_client->cl_stats->rpcauthrefresh++;
766         rpcauth_refreshcred(task);
767 }
768
769 /*
770  * 8a.  Process the results of a credential refresh
771  */
772 static void
773 call_refreshresult(struct rpc_task *task)
774 {
775         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
776                                 task->tk_pid, task->tk_status);
777
778         if (task->tk_status < 0)
779                 rpc_exit(task, -EACCES);
780         else
781                 task->tk_action = call_reserve;
782 }
783
784 /*
785  * Call header serialization
786  */
787 static u32 *
788 call_header(struct rpc_task *task)
789 {
790         struct rpc_clnt *clnt = task->tk_client;
791         struct rpc_xprt *xprt = clnt->cl_xprt;
792         struct rpc_rqst *req = task->tk_rqstp;
793         u32             *p = req->rq_svec[0].iov_base;
794
795         /* FIXME: check buffer size? */
796         if (xprt->stream)
797                 *p++ = 0;               /* fill in later */
798         *p++ = req->rq_xid;             /* XID */
799         *p++ = htonl(RPC_CALL);         /* CALL */
800         *p++ = htonl(RPC_VERSION);      /* RPC version */
801         *p++ = htonl(clnt->cl_prog);    /* program number */
802         *p++ = htonl(clnt->cl_vers);    /* program version */
803         *p++ = htonl(task->tk_msg.rpc_proc);    /* procedure */
804         return rpcauth_marshcred(task, p);
805 }
806
807 /*
808  * Reply header verification
809  */
810 static u32 *
811 call_verify(struct rpc_task *task)
812 {
813         u32     *p = task->tk_rqstp->rq_rvec[0].iov_base, n;
814
815         p += 1; /* skip XID */
816
817         if ((n = ntohl(*p++)) != RPC_REPLY) {
818                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
819                 goto garbage;
820         }
821         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
822                 int     error = -EACCES;
823
824                 if ((n = ntohl(*p++)) != RPC_AUTH_ERROR) {
825                         printk(KERN_WARNING "call_verify: RPC call rejected: %x\n", n);
826                 } else
827                 switch ((n = ntohl(*p++))) {
828                 case RPC_AUTH_REJECTEDCRED:
829                 case RPC_AUTH_REJECTEDVERF:
830                         if (!task->tk_cred_retry)
831                                 break;
832                         task->tk_cred_retry--;
833                         dprintk("RPC: %4d call_verify: retry stale creds\n",
834                                                         task->tk_pid);
835                         rpcauth_invalcred(task);
836                         task->tk_action = call_refresh;
837                         return NULL;
838                 case RPC_AUTH_BADCRED:
839                 case RPC_AUTH_BADVERF:
840                         /* possibly garbled cred/verf? */
841                         if (!task->tk_garb_retry)
842                                 break;
843                         task->tk_garb_retry--;
844                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
845                                                         task->tk_pid);
846                         task->tk_action = call_encode;
847                         return NULL;
848                 case RPC_AUTH_TOOWEAK:
849                         printk(KERN_NOTICE "call_verify: server requires stronger "
850                                "authentication.\n");
851                         break;
852                 default:
853                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
854                         error = -EIO;
855                 }
856                 dprintk("RPC: %4d call_verify: call rejected %d\n",
857                                                 task->tk_pid, n);
858                 rpc_exit(task, error);
859                 return NULL;
860         }
861         if (!(p = rpcauth_checkverf(task, p))) {
862                 printk(KERN_WARNING "call_verify: auth check failed\n");
863                 goto garbage;           /* bad verifier, retry */
864         }
865         switch ((n = ntohl(*p++))) {
866         case RPC_SUCCESS:
867                 return p;
868         case RPC_GARBAGE_ARGS:
869                 break;                  /* retry */
870         default:
871                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
872                 /* Also retry */
873         }
874
875 garbage:
876         dprintk("RPC: %4d call_verify: server saw garbage\n", task->tk_pid);
877         task->tk_client->cl_stats->rpcgarbage++;
878         if (task->tk_garb_retry) {
879                 task->tk_garb_retry--;
880                 dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
881                 task->tk_action = call_encode;
882                 return NULL;
883         }
884         printk(KERN_WARNING "RPC: garbage, exit EIO\n");
885         rpc_exit(task, -EIO);
886         return NULL;
887 }