Eagerly flush as much data as possible from write queue

This commit implements a policy of writing as many queued buffers as the
writable socket allows, until non-writable, or the write queue is empty.
This has two objectives: 1) any allocated buffers for data copies will
be freed sooner, 2) the write output will be more responsive, fixing at
least one known delay in WiFi scanlist output in YUBOX Now.
This commit is contained in:
Alex Villacís Lasso
2021-08-03 18:16:32 -05:00
parent 01f436bbcb
commit 78953656ef
2 changed files with 86 additions and 42 deletions

View File

@@ -567,11 +567,9 @@ bool AsyncClient::_sockIsWriteable(void)
int sockerr;
socklen_t len;
bool activity = false;
bool hasErr = false;
int sent_errno = 0;
size_t sent_cb_length = 0;
uint32_t sent_cb_delay = 0;
std::deque<notify_writebuf> notifylist;
// Socket is now writeable. What should we do?
switch (_conn_state) {
@@ -602,29 +600,11 @@ bool AsyncClient::_sockIsWriteable(void)
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
if (_writeQueue.size() > 0) {
activity = _flushWriteQueue();
if (_writeQueue.front().write_errno != 0) {
hasErr = true;
sent_errno = _writeQueue.front().write_errno;
}
if (!hasErr && _writeQueue.front().written >= _writeQueue.front().length) {
// Buffer has been fully written to the socket
if (_writeQueue.front().written_at > _rx_last_packet) {
_rx_last_packet = _writeQueue.front().written_at;
}
if (_writeQueue.front().owned) ::free(_writeQueue.front().data);
sent_cb_length = _writeQueue.front().length;
sent_cb_delay = _writeQueue.front().written_at - _writeQueue.front().queued_at;
_writeQueue.pop_front();
}
_collectNotifyWrittenBuffers(notifylist, sent_errno);
}
xSemaphoreGive(_write_mutex);
if (hasErr) {
_error(sent_errno);
} else if (sent_cb_length > 0 && _sent_cb) {
_sent_cb(_sent_cb_arg, this, sent_cb_length, sent_cb_delay);
}
_notifyWrittenBuffers(notifylist, sent_errno);
break;
}
@@ -638,32 +618,88 @@ bool AsyncClient::_flushWriteQueue(void)
if (_socket == -1) return false;
auto & qwb = _writeQueue.front();
for (auto it = _writeQueue.begin(); it != _writeQueue.end(); it++) {
// Abort iteration if error found while writing a buffer
if (it->write_errno != 0) break;
if (qwb.write_errno == 0 && qwb.written < qwb.length) {
uint8_t * p = qwb.data + qwb.written;
size_t n = qwb.length - qwb.written;
errno = 0; ssize_t r = lwip_write(_socket, p, n);
// Skip over head buffers already fully written
if (it->written >= it->length) continue;
if (r >= 0) {
// Written some data into the socket
qwb.written += r;
_writeSpaceRemaining += r;
activity = true;
bool keep_writing = true;
do {
uint8_t * p = it->data + it->written;
size_t n = it->length - it->written;
errno = 0; ssize_t r = lwip_write(_socket, p, n);
if (qwb.written >= qwb.length) {
qwb.written_at = millis();
if (r >= 0) {
// Written some data into the socket
it->written += r;
_writeSpaceRemaining += r;
activity = true;
if (it->written >= it->length) {
it->written_at = millis();
if (it->owned) ::free(it->data);
it->data = NULL;
}
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket is full, could not write anything
keep_writing = false;
} else {
// A write error happened that should be reported
it->write_errno = errno;
keep_writing = false;
log_e("socket %d lwip_write() failed errno=%d", _socket, it->write_errno);
}
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket is full, could not write anything
} else {
qwb.write_errno = errno;
}
} while (keep_writing && it->written < it->length);
}
return activity;
}
// This method MUST be called with _write_mutex held
void AsyncClient::_collectNotifyWrittenBuffers(std::deque<notify_writebuf> & notifyqueue, int & write_errno)
{
write_errno = 0;
notifyqueue.clear();
while (_writeQueue.size() > 0) {
if (_writeQueue.front().write_errno != 0) {
write_errno = _writeQueue.front().write_errno;
return;
}
if (_writeQueue.front().written >= _writeQueue.front().length) {
// Collect information on fully-written buffer, and stash it into notify queue
if (_writeQueue.front().written_at > _rx_last_packet) {
_rx_last_packet = _writeQueue.front().written_at;
}
if (_writeQueue.front().owned && _writeQueue.front().data != NULL) ::free(_writeQueue.front().data);
notify_writebuf noti;
noti.length = _writeQueue.front().length;
noti.delay = _writeQueue.front().written_at - _writeQueue.front().queued_at;
_writeQueue.pop_front();
notifyqueue.push_back(noti);
} else {
// Found first not-fully-written buffer, stop here
return;
}
}
}
void AsyncClient::_notifyWrittenBuffers(std::deque<notify_writebuf> & notifyqueue, int write_errno)
{
while (notifyqueue.size() > 0) {
if (notifyqueue.front().length > 0 && _sent_cb) {
_sent_cb(_sent_cb_arg, this, notifyqueue.front().length, notifyqueue.front().delay);
}
notifyqueue.pop_front();
}
if (write_errno != 0) _error(write_errno);
}
void AsyncClient::_sockIsReadable(void)
{
_rx_last_packet = millis();
@@ -811,7 +847,7 @@ bool AsyncClient::send()
tv.tv_sec = 0;
tv.tv_usec = 0;
// TODO: data was already queued, what should be done here?
// Write as much data as possible from queue if socket is writable
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
int r = select(_socket + 1, NULL, &sockSet_w, NULL, &tv);
if (r > 0) _flushWriteQueue();
@@ -826,7 +862,7 @@ void AsyncClient::_clearWriteQueue(void)
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
while (_writeQueue.size() > 0) {
if (_writeQueue.front().owned) {
::free(_writeQueue.front().data);
if (_writeQueue.front().data != NULL) ::free(_writeQueue.front().data);
}
_writeQueue.pop_front();
}

View File

@@ -184,6 +184,12 @@ class AsyncClient : public AsyncSocketBase
// If false, app owns the memory and should ensure it remains valid until acked
} queued_writebuf;
// Internal struct used to implement sent buffer notification
typedef struct {
uint32_t length;
uint32_t delay;
} notify_writebuf;
// Queue of buffers to write to socket
SemaphoreHandle_t _write_mutex;
std::deque<queued_writebuf> _writeQueue;
@@ -200,6 +206,8 @@ class AsyncClient : public AsyncSocketBase
void _removeAllCallbacks(void);
bool _flushWriteQueue(void);
void _clearWriteQueue(void);
void _collectNotifyWrittenBuffers(std::deque<notify_writebuf> &, int &);
void _notifyWrittenBuffers(std::deque<notify_writebuf> &, int);
friend void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg);
};