From f69bcbd5adf6f3692493d5912a238ed7c8a4be3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Villac=C3=ADs=20Lasso?= Date: Wed, 17 Feb 2021 11:42:44 -0500 Subject: [PATCH] Use recursive mutex, call callbacks with mutex held Prior to this commit, the mutex that protected the global client socket list was non-recursive. This forced the release of the mutex before iterating through sockets on which to call callbacks. However, releasing the mutex allowed destructors to proceed, therefore possibly invalidating the object pointers copied to the local list. This in turn risked calling callbacks on already destroyed objects. Fixed by the use of a recursive mutex that allows holding the mutex during the callback invocation. --- src/AsyncTCP.cpp | 113 +++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 57 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 77fe1d9..7ed632a 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -81,11 +81,12 @@ void _asynctcpsock_task(void *) fd_set sockSet_w; int max_sock = 0; - std::list dnsSockList; + std::list sockList; + + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); // Collect all of the active sockets into socket set FD_ZERO(&sockSet_r); FD_ZERO(&sockSet_w); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { if ((*it)->_socket != -1) { FD_SET((*it)->_socket, &sockSet_r); @@ -93,13 +94,7 @@ void _asynctcpsock_task(void *) (*it)->_selected = true; if (max_sock <= (*it)->_socket) max_sock = (*it)->_socket + 1; } - - // Collect socket that has finished resolving DNS (with or without error) - if ((*it)->_isdnsfinished) { - dnsSockList.push_back(*it); - } } - xSemaphoreGive(_asyncsock_mutex); // Wait for activity on all monitored sockets struct timeval tv; @@ -109,27 +104,17 @@ void _asynctcpsock_task(void *) int r = select(max_sock, &sockSet_r, &sockSet_w, NULL, &tv); // Check all sockets to see which ones are active - std::list wrSockList; - std::list rdSockList; - uint32_t nActive = 0; if (r > 0) { - // Collect all writable and readable sockets - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + // Collect and notify all writable sockets for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_w)) { - wrSockList.push_back(*it); - } - if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) { - rdSockList.push_back(*it); + sockList.push_back(*it); } } - xSemaphoreGive(_asyncsock_mutex); - - // Notify all collected writable sockets - for (it = wrSockList.begin(); it != wrSockList.end(); it++) { + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_add(NULL) != ESP_OK){ + if (esp_task_wdt_add(NULL) != ESP_OK) { log_e("Failed to add async task to WDT"); } #endif @@ -138,17 +123,22 @@ void _asynctcpsock_task(void *) nActive++; } #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_delete(NULL) != ESP_OK){ + if (esp_task_wdt_delete(NULL) != ESP_OK) { log_e("Failed to remove loop task from WDT"); } #endif } - wrSockList.clear(); + sockList.clear(); - // Notify all collected readable sockets - for (it = rdSockList.begin(); it != rdSockList.end(); it++) { + // Collect and notify all readable sockets + for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { + if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) { + sockList.push_back(*it); + } + } + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_add(NULL) != ESP_OK){ + if (esp_task_wdt_add(NULL) != ESP_OK) { log_e("Failed to add async task to WDT"); } #endif @@ -156,16 +146,22 @@ void _asynctcpsock_task(void *) (*it)->_sockIsReadable(); nActive++; #if CONFIG_ASYNC_TCP_USE_WDT - if(esp_task_wdt_delete(NULL) != ESP_OK){ + if (esp_task_wdt_delete(NULL) != ESP_OK) { log_e("Failed to remove loop task from WDT"); } #endif } - rdSockList.clear(); + sockList.clear(); } - // Notify all sockets waiting for DNS completion - for (it = dnsSockList.begin(); it != dnsSockList.end(); it++) { + // Collect and notify all sockets waiting for DNS completion + for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { + // Collect socket that has finished resolving DNS (with or without error) + if ((*it)->_isdnsfinished) { + sockList.push_back(*it); + } + } + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT if(esp_task_wdt_add(NULL) != ESP_OK){ log_e("Failed to add async task to WDT"); @@ -179,6 +175,9 @@ void _asynctcpsock_task(void *) } #endif } + sockList.clear(); + + xSemaphoreGiveRecursive(_asyncsock_mutex); uint32_t t2 = millis(); @@ -187,20 +186,18 @@ void _asynctcpsock_task(void *) uint32_t d = (nActive == 0 && t2 - t1 < 125) ? 125 - (t2 - t1) : 1; delay(d); - // Collect all pollable sockets - std::list pollSockList; - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + // Collect and run activity poll on all pollable sockets + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) { (*it)->_selected = false; if (millis() - (*it)->_sock_lastactivity >= 125) { (*it)->_sock_lastactivity = millis(); - pollSockList.push_back(*it); + sockList.push_back(*it); } } - xSemaphoreGive(_asyncsock_mutex); // Run activity poll on all pollable sockets - for (it = pollSockList.begin(); it != pollSockList.end(); it++) { + for (it = sockList.begin(); it != sockList.end(); it++) { #if CONFIG_ASYNC_TCP_USE_WDT if(esp_task_wdt_add(NULL) != ESP_OK){ log_e("Failed to add async task to WDT"); @@ -213,7 +210,9 @@ void _asynctcpsock_task(void *) } #endif } - pollSockList.clear(); + sockList.clear(); + + xSemaphoreGiveRecursive(_asyncsock_mutex); } vTaskDelete(NULL); @@ -222,16 +221,16 @@ void _asynctcpsock_task(void *) AsyncSocketBase::AsyncSocketBase() { - if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateMutex(); + if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateRecursiveMutex(); _sock_lastactivity = millis(); _selected = false; // Add this base socket to the monitored list auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList(); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socketBaseList.push_back(this); - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } std::list & AsyncSocketBase::_getSocketBaseList(void) @@ -245,9 +244,9 @@ AsyncSocketBase::~AsyncSocketBase() { // Remove this base socket from the monitored list auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList(); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socketBaseList.remove(this); - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } @@ -276,10 +275,10 @@ AsyncClient::AsyncClient(int sockfd) int r = fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 4; _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } } @@ -493,10 +492,10 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port) } // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 2; _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); // Socket is now connecting. Should become writable in asyncTcpSock task //Serial.printf("\twaiting for connect finished on socket: %d\r\n", _socket); @@ -537,9 +536,9 @@ void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg) } // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); c->_isdnsfinished = true; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); // TODO: actually use name } @@ -712,11 +711,11 @@ void AsyncClient::_removeAllCallbacks(void) void AsyncClient::_close(void) { //Serial.print("AsyncClient::_close: "); Serial.println(_socket); - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 0; lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); _clearWriteQueue(); if (_discard_cb) _discard_cb(_discard_cb_arg, this); @@ -725,11 +724,11 @@ void AsyncClient::_close(void) void AsyncClient::_error(int8_t err) { - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _conn_state = 0; lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); _clearWriteQueue(); if (_error_cb) _error_cb(_error_cb_arg, this, err); @@ -938,18 +937,18 @@ void AsyncServer::begin() int r = fcntl(sockfd, F_SETFL, O_NONBLOCK); // Updating state visible to asyncTcpSock task - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); _socket = sockfd; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } void AsyncServer::end() { if (_socket == -1) return; - xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY); + xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY); lwip_close_r(_socket); _socket = -1; - xSemaphoreGive(_asyncsock_mutex); + xSemaphoreGiveRecursive(_asyncsock_mutex); } void AsyncServer::_sockIsReadable(void)