diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 7ed632a..9188098 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -598,30 +598,20 @@ bool AsyncClient::_sockIsWriteable(void) // Socket can accept some new data... xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); if (_writeQueue.size() > 0) { - if (_writeQueue.front().written < _writeQueue.front().length) { - uint8_t * p = _writeQueue.front().data + _writeQueue.front().written; - size_t n = _writeQueue.front().length - _writeQueue.front().written; - errno = 0; ssize_t r = lwip_write(_socket, p, n); - - if (r >= 0) { - // Written some data into the socket - _writeQueue.front().written += r; - _writeSpaceRemaining += r; - activity = true; - } else if (errno == EAGAIN || errno == EWOULDBLOCK) { - // Socket is full, could not write anything - } else { - hasErr = true; - sent_errno = errno; - } + activity = _flushWriteQueue(); + if (_writeQueue.front().write_errno != 0) { + hasErr = true; + sent_errno = _writeQueue.front().write_errno; } if (!hasErr && _writeQueue.front().written >= _writeQueue.front().length) { // Buffer has been fully written to the socket - _rx_last_packet = millis(); + if (_writeQueue.front().written_at > _rx_last_packet) { + _rx_last_packet = _writeQueue.front().written_at; + } if (_writeQueue.front().owned) ::free(_writeQueue.front().data); sent_cb_length = _writeQueue.front().length; - sent_cb_delay = millis() - _writeQueue.front().queued_at; + sent_cb_delay = _writeQueue.front().written_at - _writeQueue.front().queued_at; _writeQueue.pop_front(); } } @@ -639,6 +629,38 @@ bool AsyncClient::_sockIsWriteable(void) return activity; } +bool AsyncClient::_flushWriteQueue(void) +{ + bool activity = false; + + if (_socket == -1) return false; + + auto & qwb = _writeQueue.front(); + + if (qwb.write_errno == 0 && qwb.written < qwb.length) { + uint8_t * p = qwb.data + qwb.written; + size_t n = qwb.length - qwb.written; + errno = 0; ssize_t r = lwip_write(_socket, p, n); + + if (r >= 0) { + // Written some data into the socket + qwb.written += r; + _writeSpaceRemaining += r; + activity = true; + + if (qwb.written >= qwb.length) { + qwb.written_at = millis(); + } + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Socket is full, could not write anything + } else { + qwb.write_errno = errno; + } + } + + return activity; +} + void AsyncClient::_sockIsReadable(void) { _rx_last_packet = millis(); @@ -766,6 +788,8 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) n_entry.length = will_send; n_entry.written = 0; n_entry.queued_at = millis(); + n_entry.written_at = 0; + n_entry.write_errno = 0; xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); _writeQueue.push_back(n_entry); @@ -778,7 +802,19 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) bool AsyncClient::send() { + fd_set sockSet_w; + struct timeval tv; + + FD_ZERO(&sockSet_w); + FD_SET(_socket, &sockSet_w); + tv.tv_sec = 0; + tv.tv_usec = 0; + // TODO: data was already queued, what should be done here? + xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); + int r = select(_socket + 1, NULL, &sockSet_w, NULL, &tv); + if (r > 0) _flushWriteQueue(); + xSemaphoreGive(_write_mutex); return true; } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index d922449..2e0f2ae 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -137,6 +137,12 @@ class AsyncClient : public AsyncSocketBase const char * errorToString(int8_t error); // const char * stateToString(); + protected: + bool _sockIsWriteable(void); + void _sockIsReadable(void); + void _sockPoll(void); + void _sockDelayedConnect(void); + private: AcConnectHandler _connect_cb; @@ -175,6 +181,8 @@ class AsyncClient : public AsyncSocketBase uint32_t length; // Length of data queued for write uint32_t written; // Length of data written to socket so far uint32_t queued_at;// Timestamp at which this data buffer was queued + uint32_t written_at; // Timestamp at which this data buffer was completely written + int write_errno; // If != 0, errno value while writing this buffer bool owned; // If true, we malloc'ed the data and should be freed after completely written. // If false, app owns the memory and should ensure it remains valid until acked } queued_writebuf; @@ -189,11 +197,8 @@ class AsyncClient : public AsyncSocketBase void _error(int8_t err); void _close(void); - bool _sockIsWriteable(void); - void _sockIsReadable(void); - void _sockPoll(void); - void _sockDelayedConnect(void); void _removeAllCallbacks(void); + bool _flushWriteQueue(void); void _clearWriteQueue(void); friend void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg);