mirror of
https://github.com/RsyncProject/rsync.git
synced 2026-05-24 23:05:52 -04:00
Added a message queue for the receiver->generator messages to handle the case
where the message pipe is being used to forward the file-list data.
This commit is contained in:
61
io.c
61
io.c
@@ -133,7 +133,7 @@ struct msg_list {
|
||||
struct msg_list_item *head, *tail;
|
||||
};
|
||||
|
||||
static struct msg_list msg2sndr;
|
||||
static struct msg_list msg_queue;
|
||||
|
||||
static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
|
||||
{
|
||||
@@ -247,17 +247,29 @@ static void msg_list_add(struct msg_list *lst, int code, const char *buf, int le
|
||||
lst->tail = m;
|
||||
}
|
||||
|
||||
static void msg2sndr_flush(void)
|
||||
static void msg_flush(void)
|
||||
{
|
||||
while (msg2sndr.head && io_multiplexing_out) {
|
||||
struct msg_list_item *m = msg2sndr.head;
|
||||
if (!(msg2sndr.head = m->next))
|
||||
msg2sndr.tail = NULL;
|
||||
stats.total_written += m->len;
|
||||
defer_forwarding_messages = 1;
|
||||
writefd_unbuffered(sock_f_out, m->buf, m->len);
|
||||
defer_forwarding_messages = 0;
|
||||
free(m);
|
||||
if (am_generator) {
|
||||
while (msg_queue.head && io_multiplexing_out) {
|
||||
struct msg_list_item *m = msg_queue.head;
|
||||
if (!(msg_queue.head = m->next))
|
||||
msg_queue.tail = NULL;
|
||||
stats.total_written += m->len;
|
||||
defer_forwarding_messages++;
|
||||
writefd_unbuffered(sock_f_out, m->buf, m->len);
|
||||
defer_forwarding_messages--;
|
||||
free(m);
|
||||
}
|
||||
} else {
|
||||
while (msg_queue.head) {
|
||||
struct msg_list_item *m = msg_queue.head;
|
||||
if (!(msg_queue.head = m->next))
|
||||
msg_queue.tail = NULL;
|
||||
defer_forwarding_messages++;
|
||||
writefd_unbuffered(msg_fd_out, m->buf, m->len);
|
||||
defer_forwarding_messages--;
|
||||
free(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -379,7 +391,7 @@ static void read_msg_fd(void)
|
||||
no_flush--;
|
||||
msg_fd_in = fd;
|
||||
if (!--defer_forwarding_messages)
|
||||
msg2sndr_flush();
|
||||
msg_flush();
|
||||
}
|
||||
|
||||
/* This is used by the generator to limit how many file transfers can
|
||||
@@ -456,7 +468,7 @@ static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len)
|
||||
defer_forwarding_messages++;
|
||||
writefd_unbuffered(fd, buf, len);
|
||||
if (!--defer_forwarding_messages)
|
||||
msg2sndr_flush();
|
||||
msg_flush();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,10 +479,13 @@ int send_msg(enum msgcode code, const char *buf, int len)
|
||||
return io_multiplex_write(code, buf, len);
|
||||
if (!io_multiplexing_out)
|
||||
return 0;
|
||||
msg_list_add(&msg2sndr, code, buf, len);
|
||||
msg_list_add(&msg_queue, code, buf, len);
|
||||
return 1;
|
||||
}
|
||||
mplex_write(msg_fd_out, code, buf, len);
|
||||
if (flist_forward_from >= 0)
|
||||
msg_list_add(&msg_queue, code, buf, len);
|
||||
else
|
||||
mplex_write(msg_fd_out, code, buf, len);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -1254,11 +1269,11 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
|
||||
size_t n, total = 0;
|
||||
fd_set w_fds, r_fds, e_fds;
|
||||
int maxfd, count, cnt, using_r_fds;
|
||||
int defer_save = defer_forwarding_messages;
|
||||
int defer_inc = 0;
|
||||
struct timeval tv;
|
||||
|
||||
if (no_flush++)
|
||||
defer_forwarding_messages = 1;
|
||||
defer_forwarding_messages++, defer_inc++;
|
||||
|
||||
while (total < len) {
|
||||
FD_ZERO(&w_fds);
|
||||
@@ -1337,7 +1352,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
|
||||
}
|
||||
|
||||
total += cnt;
|
||||
defer_forwarding_messages = 1;
|
||||
defer_forwarding_messages++, defer_inc++;
|
||||
|
||||
if (fd == sock_f_out) {
|
||||
if (io_timeout || am_generator)
|
||||
@@ -1347,15 +1362,12 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
|
||||
}
|
||||
|
||||
no_flush--;
|
||||
if (!(defer_forwarding_messages = defer_save))
|
||||
msg2sndr_flush();
|
||||
if (!(defer_forwarding_messages -= defer_inc))
|
||||
msg_flush();
|
||||
}
|
||||
|
||||
void io_flush(int flush_it_all)
|
||||
{
|
||||
if (flush_it_all && !defer_forwarding_messages)
|
||||
msg2sndr_flush();
|
||||
|
||||
if (!iobuf_out_cnt || no_flush)
|
||||
return;
|
||||
|
||||
@@ -1364,6 +1376,9 @@ void io_flush(int flush_it_all)
|
||||
else
|
||||
writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
|
||||
iobuf_out_cnt = 0;
|
||||
|
||||
if (flush_it_all && !defer_forwarding_messages)
|
||||
msg_flush();
|
||||
}
|
||||
|
||||
static void writefd(int fd, const char *buf, size_t len)
|
||||
|
||||
Reference in New Issue
Block a user