diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 77fe1d9..7ed632a 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -81,11 +81,12 @@ void _asynctcpsock_task(void *) fd_set sockSet_w; int max_sock = 0; - std::list dnsSockList; + std::list sockList; + + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); // Collect all of the active sockets into socket set FD_ZERO(&sockSet_r); FD_ZERO(&sockSet_w); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { if ((*it)->_socket != -1) { FD_SET((*it)->_socket, &sockSet_r); @@ -93,13 +94,7 @@ void _asynctcpsock_task(void *) (*it)->_selected = true; if (max_sock <= (*it)->_socket) max_sock = (*it)->_socket + 1; } - - // Collect socket that has finished resolving DNS (with or without error) - if ((*it)->_isdnsfinished) { - dnsSockList.push_back(*it); - } } - xSemaphoreGive(_asyncsock_mutex); // Wait for activity on all monitored sockets struct timeval tv; @@ -109,27 +104,17 @@ void _asynctcpsock_task(void *) int r = select(max_sock, &sockSet_r, &sockSet_w, NULL, &tv); // Check all sockets to see which ones are active - std::list wrSockList; - std::list rdSockList; - uint32_t nActive = 0; if (r > 0) { - // Collect all writable and readable sockets - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + // Collect and notify all writable sockets for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_w)) { - wrSockList.push_back(*it); - } - if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) { - rdSockList.push_back(*it); + sockList.push_back(*it); } } - xSemaphoreGive(_asyncsock_mutex); - - // Notify all collected writable sockets - for (it = wrSockList.begin(); it != wrSockList.end(); it++) { + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_add(NULL) != ESP_OK){ + if (esp_task_wdt_add(NULL) != ESP_OK) { log_e("Failed to add async task to WDT"); } #endif @@ -138,17 +123,22 @@ void _asynctcpsock_task(void *) nActive++; } #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_delete(NULL) != ESP_OK){ + if (esp_task_wdt_delete(NULL) != ESP_OK) { log_e("Failed to remove loop task from WDT"); } #endif } - wrSockList.clear(); + sockList.clear(); - // Notify all collected readable sockets - for (it = rdSockList.begin(); it != rdSockList.end(); it++) { + // Collect and notify all readable sockets + for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { + if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) { + sockList.push_back(*it); + } + } + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_add(NULL) != ESP_OK){ + if (esp_task_wdt_add(NULL) != ESP_OK) { log_e("Failed to add async task to WDT"); } #endif @@ -156,16 +146,22 @@ void _asynctcpsock_task(void *) (*it)->_sockIsReadable(); nActive++; #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_delete(NULL) != ESP_OK){ + if (esp_task_wdt_delete(NULL) != ESP_OK) { log_e("Failed to remove loop task from WDT"); } #endif } - rdSockList.clear(); + sockList.clear(); } - // Notify all sockets waiting for DNS completion - for (it = dnsSockList.begin(); it != dnsSockList.end(); it++) { + // Collect and notify all sockets waiting for DNS completion + for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { + // Collect socket that has finished resolving DNS (with or without error) + if ((*it)->_isdnsfinished) { + sockList.push_back(*it); + } + } + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT if(esp_task_wdt_add(NULL) != ESP_OK){ log_e("Failed to add async task to WDT"); @@ -179,6 +175,9 @@ void _asynctcpsock_task(void *) } #endif } + sockList.clear(); + + xSemaphoreGiveRecursive(_asyncsock_mutex); uint32_t t2 = millis(); @@ -187,20 +186,18 @@ void _asynctcpsock_task(void *) uint32_t d = (nActive == 0 && t2 - t1 < 125) ? 125 - (t2 - t1) : 1; delay(d); - // Collect all pollable sockets - std::list pollSockList; - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + // Collect and run activity poll on all pollable sockets + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { (*it)->_selected = false; if (millis() - (*it)->_sock_lastactivity >= 125) { (*it)->_sock_lastactivity = millis(); - pollSockList.push_back(*it); + sockList.push_back(*it); } } - xSemaphoreGive(_asyncsock_mutex); // Run activity poll on all pollable sockets - for (it = pollSockList.begin(); it != pollSockList.end(); it++) { + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT if(esp_task_wdt_add(NULL) != ESP_OK){ log_e("Failed to add async task to WDT"); @@ -213,7 +210,9 @@ void _asynctcpsock_task(void *) } #endif } - pollSockList.clear(); + sockList.clear(); + + xSemaphoreGiveRecursive(_asyncsock_mutex); } vTaskDelete(NULL); @@ -222,16 +221,16 @@ void _asynctcpsock_task(void *) AsyncSocketBase::AsyncSocketBase() { - if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateMutex(); + if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateRecursiveMutex(); _sock_lastactivity = millis(); _selected = false; // Add this base socket to the monitored list auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList(); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socketBaseList.push_back(this); - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } std::list & AsyncSocketBase::_getSocketBaseList(void) @@ -245,9 +244,9 @@ AsyncSocketBase::~AsyncSocketBase() { // Remove this base socket from the monitored list auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList(); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socketBaseList.remove(this); - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } @@ -276,10 +275,10 @@ AsyncClient::AsyncClient(int sockfd) int r = fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 4; _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } } @@ -493,10 +492,10 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port) } // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 2; _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); // Socket is now connecting. Should become writable in asyncTcpSock task //Serial.printf("\twaiting for connect finished on socket: %d\r\n", _socket); @@ -537,9 +536,9 @@ void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg) } // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); c->_isdnsfinished = true; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); // TODO: actually use name } @@ -712,11 +711,11 @@ void AsyncClient::_removeAllCallbacks(void) void AsyncClient::_close(void) { //Serial.print("AsyncClient::_close: "); Serial.println(_socket); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 0; lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); _clearWriteQueue(); if (_discard_cb) _discard_cb(_discard_cb_arg, this); @@ -725,11 +724,11 @@ void AsyncClient::_close(void) void AsyncClient::_error(int8_t err) { - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 0; lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); _clearWriteQueue(); if (_error_cb) _error_cb(_error_cb_arg, this, err); @@ -938,18 +937,18 @@ void AsyncServer::begin() int r = fcntl(sockfd, F_SETFL, O_NONBLOCK); // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } void AsyncServer::end() { if (_socket == -1) return; - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } void AsyncServer::_sockIsReadable(void)