diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index e0f301c..5871d68 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -271,6 +271,7 @@ AsyncClient::AsyncClient(int sockfd) , _writeSpaceRemaining(TCP_SND_BUF) , _conn_state(0) { + _write_mutex = xSemaphoreCreateMutex(); if (sockfd != -1) { int r = fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); @@ -564,25 +565,23 @@ bool AsyncClient::_sockIsWriteable(void) bool activity = false; bool hasErr = false; - //Serial.print("AsyncClient::_sockIsWriteable: "); Serial.println(_socket); + int sent_errno = 0; + size_t sent_cb_length = 0; + uint32_t sent_cb_delay = 0; // Socket is now writeable. What should we do? switch (_conn_state) { case 2: case 3: - //Serial.println("\tconnect end"); // Socket has finished connecting. What happened? len = (socklen_t)sizeof(int); res = getsockopt(_socket, SOL_SOCKET, SO_ERROR, &sockerr, &len); if (res < 0) { - //Serial.printf("\terrno=%d\r\n", errno); _error(errno); } else if (sockerr != 0) { - //Serial.printf("\tsockerr=%d\r\n", errno); _error(sockerr); } else { // Socket is now fully connected - //Serial.println("SUCCESS"); _conn_state = 4; activity = true; _rx_last_packet = millis(); @@ -596,29 +595,23 @@ bool AsyncClient::_sockIsWriteable(void) case 4: default: // Socket can accept some new data... - //Serial.printf("\tbefore: remaining %d\r\n", _writeSpaceRemaining); + xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); if (_writeQueue.size() > 0) { - //Serial.printf("\tbuffers remaining: %d\r\n", _writeQueue.size()); if (_writeQueue.front().written < _writeQueue.front().length) { uint8_t * p = _writeQueue.front().data + _writeQueue.front().written; size_t n = _writeQueue.front().length - _writeQueue.front().written; - //Serial.printf("\tlwip_write(%p, %d) ... ", p, n); errno = 0; ssize_t r = lwip_write(_socket, p, n); - //Serial.printf("r=%d errno=%d\r\n", r, errno); if (r >= 0) { // Written some data into the socket _writeQueue.front().written += r; _writeSpaceRemaining += r; activity = true; - //Serial.printf("\tduring: remaining %d\r\n", _writeSpaceRemaining); } else if (errno == EAGAIN || errno == EWOULDBLOCK) { - // Socket is full again - //Serial.println("\tEAGAIN"); - break; // NOTE: breaks from switch() + // Socket is full, could not write anything } else { hasErr = true; - _error(errno); + sent_errno = errno; } } @@ -626,14 +619,18 @@ bool AsyncClient::_sockIsWriteable(void) // Buffer has been fully written to the socket _rx_last_packet = millis(); if (_writeQueue.front().owned) ::free(_writeQueue.front().data); - if (_sent_cb) { - _sent_cb(_sent_cb_arg, this, _writeQueue.front().length, (millis() - _writeQueue.front().queued_at)); - } - + sent_cb_length = _writeQueue.front().length; + uint32_t sent_cb_delay = millis() - _writeQueue.front().queued_at; _writeQueue.pop_front(); } } - //Serial.printf("\tafter: remaining %d\r\n", _writeSpaceRemaining); + xSemaphoreGive(_write_mutex); + + if (hasErr) { + _error(sent_errno); + } else if (sent_cb_length > 0 && _sent_cb) { + _sent_cb(_sent_cb_arg, this, sent_cb_length, sent_cb_delay); + } break; } @@ -643,12 +640,8 @@ bool AsyncClient::_sockIsWriteable(void) void AsyncClient::_sockIsReadable(void) { - //Serial.print("AsyncClient::_sockIsReadable: "); Serial.println(_socket); - _rx_last_packet = millis(); - //Serial.print("\tlwip_read ... "); errno = 0; ssize_t r = lwip_read(_socket, _readBuffer, MAX_PAYLOAD_SIZE); - //Serial.printf("r=%d errno=%d\r\n", r, errno); if (r > 0) { if(_recv_cb) { _recv_cb(_recv_cb_arg, this, _readBuffer, r); @@ -658,7 +651,7 @@ void AsyncClient::_sockIsReadable(void) _close(); } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - //Serial.println("\tEAGAIN"); + // Do nothing, will try later } else { _error(errno); } @@ -671,22 +664,22 @@ void AsyncClient::_sockPoll(void) uint32_t now = millis(); - //Serial.print("AsyncClient::_sockPoll: "); Serial.println(_socket); - // ACK Timeout - simulated by write queue staleness - if (_writeQueue.size() > 0 && !_ack_timeout_signaled && _ack_timeout && (now - _writeQueue.front().queued_at) >= _ack_timeout) { + xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); + uint32_t sent_delay = now - _writeQueue.front().queued_at; + if (_writeQueue.size() > 0 && !_ack_timeout_signaled && _ack_timeout && sent_delay >= _ack_timeout) { _ack_timeout_signaled = true; //log_w("ack timeout %d", pcb->state); - //Serial.println("\tACK TIMEOUT"); + xSemaphoreGive(_write_mutex); if(_timeout_cb) - _timeout_cb(_timeout_cb_arg, this, (now - _writeQueue.front().queued_at)); + _timeout_cb(_timeout_cb_arg, this, sent_delay); return; } + xSemaphoreGive(_write_mutex); // RX Timeout if (_rx_since_timeout && (now - _rx_last_packet) >= (_rx_since_timeout * 1000)) { //log_w("rx timeout %d", pcb->state); - //Serial.println("\tRX TIMEOUT"); _close(); return; } @@ -753,9 +746,12 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) n_entry.written = 0; n_entry.queued_at = millis(); + xSemaphoreTake(_write_mutex, (TickType_t)portMAX_DELAY); _writeQueue.push_back(n_entry); _writeSpaceRemaining -= will_send; _ack_timeout_signaled = false; + xSemaphoreGive(_write_mutex); + return will_send; } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 6ac9601..b6691db 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -180,6 +180,7 @@ class AsyncClient : public AsyncSocketBase } queued_writebuf; // Queue of buffers to write to socket + SemaphoreHandle_t _write_mutex; std::deque _writeQueue; bool _ack_timeout_signaled = false;