diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 0b5bab1..d0713a3 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -567,11 +567,9 @@ bool AsyncClient::_sockIsWriteable(void) int sockerr; socklen_t len; bool activity = false; - bool hasErr = false; int sent_errno = 0; - size_t sent_cb_length = 0; - uint32_t sent_cb_delay = 0; + std::deque notifylist; // Socket is now writeable. What should we do? switch (_conn_state) { @@ -602,29 +600,11 @@ bool AsyncClient::_sockIsWriteable(void) xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); if (_writeQueue.size() > 0) { activity = _flushWriteQueue(); - if (_writeQueue.front().write_errno != 0) { - hasErr = true; - sent_errno = _writeQueue.front().write_errno; - } - - if (!hasErr && _writeQueue.front().written >= _writeQueue.front().length) { - // Buffer has been fully written to the socket - if (_writeQueue.front().written_at > _rx_last_packet) { - _rx_last_packet = _writeQueue.front().written_at; - } - if (_writeQueue.front().owned) ::free(_writeQueue.front().data); - sent_cb_length = _writeQueue.front().length; - sent_cb_delay = _writeQueue.front().written_at - _writeQueue.front().queued_at; - _writeQueue.pop_front(); - } + _collectNotifyWrittenBuffers(notifylist, sent_errno); } xSemaphoreGive(_write_mutex); - if (hasErr) { - _error(sent_errno); - } else if (sent_cb_length > 0 && _sent_cb) { - _sent_cb(_sent_cb_arg, this, sent_cb_length, sent_cb_delay); - } + _notifyWrittenBuffers(notifylist, sent_errno); break; } @@ -638,32 +618,88 @@ bool AsyncClient::_flushWriteQueue(void) if (_socket == -1) return false; - auto & qwb = _writeQueue.front(); + for (auto it = _writeQueue.begin(); it != _writeQueue.end(); it++) { + // Abort iteration if error found while writing a buffer + if (it->write_errno != 0) break; - if (qwb.write_errno == 0 && qwb.written < qwb.length) { - uint8_t * p = qwb.data + qwb.written; - size_t n = qwb.length - qwb.written; - errno = 0; ssize_t r = lwip_write(_socket, p, n); + // Skip over head buffers already fully written + if (it->written >= it->length) continue; - if (r >= 0) { - // Written some data into the socket - qwb.written += r; - _writeSpaceRemaining += r; - activity = true; + bool keep_writing = true; + do { + uint8_t * p = it->data + it->written; + size_t n = it->length - it->written; + errno = 0; ssize_t r = lwip_write(_socket, p, n); - if (qwb.written >= qwb.length) { - qwb.written_at = millis(); + if (r >= 0) { + // Written some data into the socket + it->written += r; + _writeSpaceRemaining += r; + activity = true; + + if (it->written >= it->length) { + it->written_at = millis(); + if (it->owned) ::free(it->data); + it->data = NULL; + } + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Socket is full, could not write anything + keep_writing = false; + } else { + // A write error happened that should be reported + it->write_errno = errno; + keep_writing = false; + log_e("socket %d lwip_write() failed errno=%d", _socket, it->write_errno); } - } else if (errno == EAGAIN || errno == EWOULDBLOCK) { - // Socket is full, could not write anything - } else { - qwb.write_errno = errno; - } + } while (keep_writing && it->written < it->length); } return activity; } +// This method MUST be called with _write_mutex held +void AsyncClient::_collectNotifyWrittenBuffers(std::deque & notifyqueue, int & write_errno) +{ + write_errno = 0; + notifyqueue.clear(); + + while (_writeQueue.size() > 0) { + if (_writeQueue.front().write_errno != 0) { + write_errno = _writeQueue.front().write_errno; + return; + } + + if (_writeQueue.front().written >= _writeQueue.front().length) { + // Collect information on fully-written buffer, and stash it into notify queue + if (_writeQueue.front().written_at > _rx_last_packet) { + _rx_last_packet = _writeQueue.front().written_at; + } + if (_writeQueue.front().owned && _writeQueue.front().data != NULL) ::free(_writeQueue.front().data); + + notify_writebuf noti; + noti.length = _writeQueue.front().length; + noti.delay = _writeQueue.front().written_at - _writeQueue.front().queued_at; + _writeQueue.pop_front(); + notifyqueue.push_back(noti); + } else { + // Found first not-fully-written buffer, stop here + return; + } + } +} + +void AsyncClient::_notifyWrittenBuffers(std::deque & notifyqueue, int write_errno) +{ + while (notifyqueue.size() > 0) { + if (notifyqueue.front().length > 0 && _sent_cb) { + _sent_cb(_sent_cb_arg, this, notifyqueue.front().length, notifyqueue.front().delay); + } + notifyqueue.pop_front(); + } + + if (write_errno != 0) _error(write_errno); +} + void AsyncClient::_sockIsReadable(void) { _rx_last_packet = millis(); @@ -811,7 +847,7 @@ bool AsyncClient::send() tv.tv_sec = 0; tv.tv_usec = 0; - // TODO: data was already queued, what should be done here? + // Write as much data as possible from queue if socket is writable xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); int r = select(_socket + 1, NULL, &sockSet_w, NULL, &tv); if (r > 0) _flushWriteQueue(); @@ -826,7 +862,7 @@ void AsyncClient::_clearWriteQueue(void) xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); while (_writeQueue.size() > 0) { if (_writeQueue.front().owned) { - ::free(_writeQueue.front().data); + if (_writeQueue.front().data != NULL) ::free(_writeQueue.front().data); } _writeQueue.pop_front(); } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index d83d87b..6c24749 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -184,6 +184,12 @@ class AsyncClient : public AsyncSocketBase // If false, app owns the memory and should ensure it remains valid until acked } queued_writebuf; + // Internal struct used to implement sent buffer notification + typedef struct { + uint32_t length; + uint32_t delay; + } notify_writebuf; + // Queue of buffers to write to socket SemaphoreHandle_t _write_mutex; std::deque _writeQueue; @@ -200,6 +206,8 @@ class AsyncClient : public AsyncSocketBase void _removeAllCallbacks(void); bool _flushWriteQueue(void); void _clearWriteQueue(void); + void _collectNotifyWrittenBuffers(std::deque &, int &); + void _notifyWrittenBuffers(std::deque &, int); friend void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg); };