added some really ugly code to allow errors to propogate to

clients when writing to a rsync server

it works like this:

- we have an extra pipe from the receiver to the generator
- the server always runs with multiplexing on
- errors from the generator go down the multiplexed connection
- errors from the receiver go over the pipe, and from there to
  the multiplexed conn

it required some incredibly ugly code. damn.
This commit is contained in:
Andrew Tridgell
2000-01-23 07:36:56 +00:00
parent b0f3f5784c
commit 554e0a8dd0
6 changed files with 144 additions and 27 deletions

View File

@@ -101,7 +101,7 @@ int start_socket_client(char *host, char *path, int argc, char *argv[])
}
io_printf(fd,"\n");
if (remote_version > 17 && !am_sender)
if (remote_version >= 22 || (remote_version > 17 && !am_sender))
io_start_multiplex_in(fd);
return client_run(fd, fd, -1, argc, argv);
@@ -316,9 +316,17 @@ static int rsync_module(int fd, int i)
argp = argv + optind;
optind = 0;
if (remote_version > 17 && am_sender)
if (remote_version >= 22 || (remote_version > 17 && am_sender))
io_start_multiplex_out(fd);
if (read_only) {
extern int am_sender;
if (!am_sender) {
rprintf(FERROR,"ERROR: module is read only\n");
return -1;
}
}
if (!ret) {
option_error();
}

94
io.c
View File

@@ -38,6 +38,8 @@ extern int io_timeout;
extern struct stats stats;
static int buffer_f_in = -1;
static int io_error_fd = -1;
static int in_read_check;
void setup_readbuffer(int f_in)
{
@@ -64,6 +66,28 @@ static void check_timeout(void)
}
}
/* setup the fd used to propogate errors */
void io_set_error_fd(int fd)
{
io_error_fd = fd;
}
/* read some data from the error fd and write it to FERROR */
static void read_error_fd(void)
{
char buf[200];
int n;
int fd = io_error_fd;
io_error_fd = -1;
n = read(fd, buf, sizeof(buf)-1);
if (n > 0) {
rwrite(FERROR, buf, n);
}
io_error_fd = fd;
}
static char *read_buffer;
static char *read_buffer_p;
@@ -86,17 +110,34 @@ static int read_timeout(int fd, char *buf, int len)
while (ret == 0) {
fd_set fds;
struct timeval tv;
int fd_count = fd+1;
FD_ZERO(&fds);
FD_SET(fd, &fds);
if (io_error_fd != -1) {
FD_SET(io_error_fd, &fds);
if (io_error_fd > fd) fd_count = io_error_fd+1;
}
tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
tv.tv_usec = 0;
if (select(fd+1, &fds, NULL, NULL, &tv) != 1) {
errno = 0;
if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
if (errno == EBADF) {
exit_cleanup(RERR_SOCKETIO);
}
check_timeout();
continue;
}
if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
read_error_fd();
}
if (!FD_ISSET(fd, &fds)) continue;
n = read(fd, buf, len);
if (n > 0) {
@@ -121,7 +162,7 @@ static int read_timeout(int fd, char *buf, int len)
}
/* this prevents us trying to write errors on a dead socket */
io_multiplexing_out = 0;
io_multiplexing_close();
rprintf(FERROR,"read error: %s\n", strerror(errno));
exit_cleanup(RERR_STREAMIO);
@@ -190,6 +231,8 @@ static int read_unbuffered(int fd, char *buf, int len)
rprintf(tag,"%s", line);
remaining = 0;
if (in_read_check) break;
}
return ret;
@@ -203,6 +246,8 @@ static int read_unbuffered(int fd, char *buf, int len)
static void read_check(int f)
{
int n = 8192;
in_read_check = 1;
if (f == -1) return;
@@ -227,6 +272,8 @@ static void read_check(int f)
n = read_unbuffered(f,read_buffer+read_buffer_len,n);
read_buffer_len += n;
in_read_check = 0;
}
@@ -335,7 +382,7 @@ static void writefd_unbuffered(int fd,char *buf,int len)
FD_ZERO(&w_fds);
FD_ZERO(&r_fds);
FD_SET(fd,&w_fds);
fd_count = fd+1;
fd_count = fd;
if (!no_flush_read) {
reading = (buffer_f_in != -1);
@@ -343,19 +390,30 @@ static void writefd_unbuffered(int fd,char *buf,int len)
if (reading) {
FD_SET(buffer_f_in,&r_fds);
if (buffer_f_in > fd)
fd_count = buffer_f_in+1;
if (buffer_f_in > fd_count)
fd_count = buffer_f_in;
}
if (io_error_fd != -1) {
FD_SET(io_error_fd,&r_fds);
if (io_error_fd > fd_count)
fd_count = io_error_fd;
}
tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
tv.tv_usec = 0;
count = select(fd_count,
reading?&r_fds:NULL,
errno = 0;
count = select(fd_count+1,
(reading || io_error_fd != -1)?&r_fds:NULL,
&w_fds,NULL,
&tv);
if (count <= 0) {
if (errno == EBADF) {
exit_cleanup(RERR_SOCKETIO);
}
check_timeout();
continue;
}
@@ -364,6 +422,10 @@ static void writefd_unbuffered(int fd,char *buf,int len)
read_check(buffer_f_in);
}
if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
read_error_fd();
}
if (FD_ISSET(fd, &w_fds)) {
int ret, n = len-total;
@@ -433,7 +495,7 @@ static void writefd(int fd,char *buf,int len)
{
stats.total_written += len;
if (!io_buffer) {
if (!io_buffer || fd != multiplex_out_fd) {
writefd_unbuffered(fd, buf, len);
return;
}
@@ -559,7 +621,7 @@ void io_start_multiplex_in(int fd)
io_multiplexing_in = 1;
}
/* write an message to the error stream */
/* write an message to the multiplexed error stream */
int io_multiplex_write(int f, char *buf, int len)
{
if (!io_multiplexing_out) return 0;
@@ -575,6 +637,20 @@ int io_multiplex_write(int f, char *buf, int len)
return 1;
}
/* write a message to the special error fd */
int io_error_write(int f, char *buf, int len)
{
if (f == -1) return 0;
writefd_unbuffered(f, buf, len);
return 1;
}
/* stop output multiplexing */
void io_multiplexing_close(void)
{
io_multiplexing_out = 0;
}
void io_close_input(int fd)
{
buffer_f_in = -1;

45
log.c
View File

@@ -24,7 +24,7 @@
#include "rsync.h"
static FILE *logfile;
static int log_error_fd = -1;
static void logit(int priority, char *buf)
{
@@ -77,14 +77,18 @@ void log_open(void)
logit(LOG_INFO,"rsyncd started\n");
#endif
}
/* this is the rsync debugging function. Call it with FINFO, FERROR or FLOG */
void rprintf(int fd, const char *format, ...)
/* setup the error file descriptor - used when we are a server
that is receiving files */
void set_error_fd(int fd)
{
log_error_fd = fd;
}
/* this is the underlying (unformatted) rsync debugging function. Call
it with FINFO, FERROR or FLOG */
void rwrite(int fd, char *buf, int len)
{
va_list ap;
char buf[1024];
int len;
FILE *f=NULL;
extern int am_daemon;
extern int quiet;
@@ -92,14 +96,8 @@ void log_open(void)
if (quiet != 0 && fd == FINFO) return;
va_start(ap, format);
len = vslprintf(buf, sizeof(buf), format, ap);
va_end(ap);
if (len < 0) exit_cleanup(RERR_MESSAGEIO);
if (len > sizeof(buf)-1) exit_cleanup(RERR_MESSAGEIO);
buf[len] = 0;
if (fd == FLOG) {
@@ -117,7 +115,9 @@ void log_open(void)
depth++;
log_open();
if (!io_multiplex_write(fd, buf, strlen(buf))) {
if (!io_error_write(log_error_fd, buf, strlen(buf)) &&
!io_multiplex_write(fd, buf, strlen(buf))) {
logit(priority, buf);
}
@@ -143,6 +143,23 @@ void log_open(void)
if (buf[len-1] == '\r' || buf[len-1] == '\n') fflush(f);
}
/* this is the rsync debugging function. Call it with FINFO, FERROR or FLOG */
void rprintf(int fd, const char *format, ...)
{
va_list ap;
char buf[1024];
int len;
va_start(ap, format);
len = vslprintf(buf, sizeof(buf), format, ap);
va_end(ap);
if (len > sizeof(buf)-1) exit_cleanup(RERR_MESSAGEIO);
rwrite(fd, buf, len);
}
void rflush(int fd)
{

16
main.c
View File

@@ -278,6 +278,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
int pid;
int status=0;
int recv_pipe[2];
int error_pipe[2];
extern int preserve_hard_links;
if (preserve_hard_links)
@@ -287,13 +288,25 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
rprintf(FERROR,"pipe failed in do_recv\n");
exit_cleanup(RERR_SOCKETIO);
}
if (pipe(error_pipe) < 0) {
rprintf(FERROR,"error pipe failed in do_recv\n");
exit_cleanup(RERR_SOCKETIO);
}
io_flush();
if ((pid=do_fork()) == 0) {
close(recv_pipe[0]);
close(error_pipe[0]);
if (f_in != f_out) close(f_out);
/* we can't let two processes write to the socket at one time */
io_multiplexing_close();
/* set place to send errors */
set_error_fd(error_pipe[1]);
recv_files(f_in,flist,local_name,recv_pipe[1]);
report(f_in);
@@ -302,11 +315,14 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
}
close(recv_pipe[1]);
close(error_pipe[1]);
io_close_input(f_in);
if (f_in != f_out) close(f_in);
io_start_buffering(f_out);
io_set_error_fd(error_pipe[0]);
generate_files(f_out,flist,local_name,recv_pipe[0]);
io_flush();

View File

@@ -468,7 +468,7 @@ int recv_files(int f_in,struct file_list *flist,char *local_name,int f_gen)
finish_transfer(fname, fnametmp, file);
cleanup_disable();
if (!recv_ok) {
if (csum_length == SUM_LENGTH) {
rprintf(FERROR,"ERROR: file corruption in %s. File changed during transfer?\n",

View File

@@ -47,7 +47,7 @@
#define SAME_TIME (1<<7)
/* update this if you make incompatible changes */
#define PROTOCOL_VERSION 21
#define PROTOCOL_VERSION 22
#define MIN_PROTOCOL_VERSION 11
#define MAX_PROTOCOL_VERSION 30