Flush some writable data on AsyncClient::send()

This commit is contained in:
Alex Villacís Lasso
2021-02-27 20:55:02 -05:00
parent f69bcbd5ad
commit 0ad4a10c88
2 changed files with 63 additions and 22 deletions

View File

@@ -598,30 +598,20 @@ bool AsyncClient::_sockIsWriteable(void)
// Socket can accept some new data...
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
if (_writeQueue.size() > 0) {
if (_writeQueue.front().written < _writeQueue.front().length) {
uint8_t * p = _writeQueue.front().data + _writeQueue.front().written;
size_t n = _writeQueue.front().length - _writeQueue.front().written;
errno = 0; ssize_t r = lwip_write(_socket, p, n);
if (r >= 0) {
// Written some data into the socket
_writeQueue.front().written += r;
_writeSpaceRemaining += r;
activity = true;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket is full, could not write anything
} else {
hasErr = true;
sent_errno = errno;
}
activity = _flushWriteQueue();
if (_writeQueue.front().write_errno != 0) {
hasErr = true;
sent_errno = _writeQueue.front().write_errno;
}
if (!hasErr && _writeQueue.front().written >= _writeQueue.front().length) {
// Buffer has been fully written to the socket
_rx_last_packet = millis();
if (_writeQueue.front().written_at > _rx_last_packet) {
_rx_last_packet = _writeQueue.front().written_at;
}
if (_writeQueue.front().owned) ::free(_writeQueue.front().data);
sent_cb_length = _writeQueue.front().length;
sent_cb_delay = millis() - _writeQueue.front().queued_at;
sent_cb_delay = _writeQueue.front().written_at - _writeQueue.front().queued_at;
_writeQueue.pop_front();
}
}
@@ -639,6 +629,38 @@ bool AsyncClient::_sockIsWriteable(void)
return activity;
}
bool AsyncClient::_flushWriteQueue(void)
{
bool activity = false;
if (_socket == -1) return false;
auto & qwb = _writeQueue.front();
if (qwb.write_errno == 0 && qwb.written < qwb.length) {
uint8_t * p = qwb.data + qwb.written;
size_t n = qwb.length - qwb.written;
errno = 0; ssize_t r = lwip_write(_socket, p, n);
if (r >= 0) {
// Written some data into the socket
qwb.written += r;
_writeSpaceRemaining += r;
activity = true;
if (qwb.written >= qwb.length) {
qwb.written_at = millis();
}
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket is full, could not write anything
} else {
qwb.write_errno = errno;
}
}
return activity;
}
void AsyncClient::_sockIsReadable(void)
{
_rx_last_packet = millis();
@@ -766,6 +788,8 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags)
n_entry.length = will_send;
n_entry.written = 0;
n_entry.queued_at = millis();
n_entry.written_at = 0;
n_entry.write_errno = 0;
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
_writeQueue.push_back(n_entry);
@@ -778,7 +802,19 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags)
bool AsyncClient::send()
{
fd_set sockSet_w;
struct timeval tv;
FD_ZERO(&sockSet_w);
FD_SET(_socket, &sockSet_w);
tv.tv_sec = 0;
tv.tv_usec = 0;
// TODO: data was already queued, what should be done here?
xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY);
int r = select(_socket + 1, NULL, &sockSet_w, NULL, &tv);
if (r > 0) _flushWriteQueue();
xSemaphoreGive(_write_mutex);
return true;
}

View File

@@ -137,6 +137,12 @@ class AsyncClient : public AsyncSocketBase
const char * errorToString(int8_t error);
// const char * stateToString();
protected:
bool _sockIsWriteable(void);
void _sockIsReadable(void);
void _sockPoll(void);
void _sockDelayedConnect(void);
private:
AcConnectHandler _connect_cb;
@@ -175,6 +181,8 @@ class AsyncClient : public AsyncSocketBase
uint32_t length; // Length of data queued for write
uint32_t written; // Length of data written to socket so far
uint32_t queued_at;// Timestamp at which this data buffer was queued
uint32_t written_at; // Timestamp at which this data buffer was completely written
int write_errno; // If != 0, errno value while writing this buffer
bool owned; // If true, we malloc'ed the data and should be freed after completely written.
// If false, app owns the memory and should ensure it remains valid until acked
} queued_writebuf;
@@ -189,11 +197,8 @@ class AsyncClient : public AsyncSocketBase
void _error(int8_t err);
void _close(void);
bool _sockIsWriteable(void);
void _sockIsReadable(void);
void _sockPoll(void);
void _sockDelayedConnect(void);
void _removeAllCallbacks(void);
bool _flushWriteQueue(void);
void _clearWriteQueue(void);
friend void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg);