Use recursive mutex, call callbacks with mutex held

Prior to this commit, the mutex that protected the global client socket
list was non-recursive. This forced the release of the mutex before
iterating through sockets on which to call callbacks. However, releasing
the mutex allowed destructors to proceed, therefore possibly
invalidating the object pointers copied to the local list. This in turn
risked calling callbacks on already destroyed objects. Fixed by the use
of a recursive mutex that allows holding the mutex during the callback
invocation.
This commit is contained in:
Alex Villacís Lasso
2021-02-17 11:42:44 -05:00
parent d5cf17ff65
commit f69bcbd5ad

View File

@@ -81,11 +81,12 @@ void _asynctcpsock_task(void *)
fd_set sockSet_w;
int max_sock = 0;
std::list<AsyncSocketBase *> dnsSockList;
std::list<AsyncSocketBase *> sockList;
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
// Collect all of the active sockets into socket set
FD_ZERO(&sockSet_r); FD_ZERO(&sockSet_w);
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) {
if ((*it)->_socket != -1) {
FD_SET((*it)->_socket, &sockSet_r);
@@ -93,13 +94,7 @@ void _asynctcpsock_task(void *)
(*it)->_selected = true;
if (max_sock <= (*it)->_socket) max_sock = (*it)->_socket + 1;
}
// Collect socket that has finished resolving DNS (with or without error)
if ((*it)->_isdnsfinished) {
dnsSockList.push_back(*it);
}
}
xSemaphoreGive(_asyncsock_mutex);
// Wait for activity on all monitored sockets
struct timeval tv;
@@ -109,25 +104,15 @@ void _asynctcpsock_task(void *)
int r = select(max_sock, &sockSet_r, &sockSet_w, NULL, &tv);
// Check all sockets to see which ones are active
std::list<AsyncSocketBase *> wrSockList;
std::list<AsyncSocketBase *> rdSockList;
uint32_t nActive = 0;
if (r > 0) {
// Collect all writable and readable sockets
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
// Collect and notify all writable sockets
for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) {
if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_w)) {
wrSockList.push_back(*it);
}
if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) {
rdSockList.push_back(*it);
sockList.push_back(*it);
}
}
xSemaphoreGive(_asyncsock_mutex);
// Notify all collected writable sockets
for (it = wrSockList.begin(); it != wrSockList.end(); it++) {
for (it = sockList.begin(); it != sockList.end(); it++) {
#if CONFIG_ASYNC_TCP_USE_WDT
if (esp_task_wdt_add(NULL) != ESP_OK) {
log_e("Failed to add async task to WDT");
@@ -143,10 +128,15 @@ void _asynctcpsock_task(void *)
}
#endif
}
wrSockList.clear();
sockList.clear();
// Notify all collected readable sockets
for (it = rdSockList.begin(); it != rdSockList.end(); it++) {
// Collect and notify all readable sockets
for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) {
if ((*it)->_selected && FD_ISSET((*it)->_socket, &sockSet_r)) {
sockList.push_back(*it);
}
}
for (it = sockList.begin(); it != sockList.end(); it++) {
#if CONFIG_ASYNC_TCP_USE_WDT
if (esp_task_wdt_add(NULL) != ESP_OK) {
log_e("Failed to add async task to WDT");
@@ -161,11 +151,17 @@ void _asynctcpsock_task(void *)
}
#endif
}
rdSockList.clear();
sockList.clear();
}
// Notify all sockets waiting for DNS completion
for (it = dnsSockList.begin(); it != dnsSockList.end(); it++) {
// Collect and notify all sockets waiting for DNS completion
for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) {
// Collect socket that has finished resolving DNS (with or without error)
if ((*it)->_isdnsfinished) {
sockList.push_back(*it);
}
}
for (it = sockList.begin(); it != sockList.end(); it++) {
#if CONFIG_ASYNC_TCP_USE_WDT
if(esp_task_wdt_add(NULL) != ESP_OK){
log_e("Failed to add async task to WDT");
@@ -179,6 +175,9 @@ void _asynctcpsock_task(void *)
}
#endif
}
sockList.clear();
xSemaphoreGiveRecursive(_asyncsock_mutex);
uint32_t t2 = millis();
@@ -187,20 +186,18 @@ void _asynctcpsock_task(void *)
uint32_t d = (nActive == 0 && t2 - t1 < 125) ? 125 - (t2 - t1) : 1;
delay(d);
// Collect all pollable sockets
std::list<AsyncSocketBase *> pollSockList;
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
// Collect and run activity poll on all pollable sockets
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
for (it = _socketBaseList.begin(); it != _socketBaseList.end(); it++) {
(*it)->_selected = false;
if (millis() - (*it)->_sock_lastactivity >= 125) {
(*it)->_sock_lastactivity = millis();
pollSockList.push_back(*it);
sockList.push_back(*it);
}
}
xSemaphoreGive(_asyncsock_mutex);
// Run activity poll on all pollable sockets
for (it = pollSockList.begin(); it != pollSockList.end(); it++) {
for (it = sockList.begin(); it != sockList.end(); it++) {
#if CONFIG_ASYNC_TCP_USE_WDT
if(esp_task_wdt_add(NULL) != ESP_OK){
log_e("Failed to add async task to WDT");
@@ -213,7 +210,9 @@ void _asynctcpsock_task(void *)
}
#endif
}
pollSockList.clear();
sockList.clear();
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
vTaskDelete(NULL);
@@ -222,16 +221,16 @@ void _asynctcpsock_task(void *)
AsyncSocketBase::AsyncSocketBase()
{
if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateMutex();
if (_asyncsock_mutex == NULL) _asyncsock_mutex = xSemaphoreCreateRecursiveMutex();
_sock_lastactivity = millis();
_selected = false;
// Add this base socket to the monitored list
auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList();
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_socketBaseList.push_back(this);
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
std::list<AsyncSocketBase *> & AsyncSocketBase::_getSocketBaseList(void)
@@ -245,9 +244,9 @@ AsyncSocketBase::~AsyncSocketBase()
{
// Remove this base socket from the monitored list
auto & _socketBaseList = AsyncSocketBase::_getSocketBaseList();
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_socketBaseList.remove(this);
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
@@ -276,10 +275,10 @@ AsyncClient::AsyncClient(int sockfd)
int r = fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK );
// Updating state visible to asyncTcpSock task
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_conn_state = 4;
_socket = sockfd;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
}
@@ -493,10 +492,10 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port)
}
// Updating state visible to asyncTcpSock task
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_conn_state = 2;
_socket = sockfd;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
// Socket is now connecting. Should become writable in asyncTcpSock task
//Serial.printf("\twaiting for connect finished on socket: %d\r\n", _socket);
@@ -537,9 +536,9 @@ void _tcpsock_dns_found(const char * name, struct ip_addr * ipaddr, void * arg)
}
// Updating state visible to asyncTcpSock task
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
c->_isdnsfinished = true;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
// TODO: actually use name
}
@@ -712,11 +711,11 @@ void AsyncClient::_removeAllCallbacks(void)
void AsyncClient::_close(void)
{
//Serial.print("AsyncClient::_close: "); Serial.println(_socket);
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_conn_state = 0;
lwip_close_r(_socket);
_socket = -1;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
_clearWriteQueue();
if (_discard_cb) _discard_cb(_discard_cb_arg, this);
@@ -725,11 +724,11 @@ void AsyncClient::_close(void)
void AsyncClient::_error(int8_t err)
{
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_conn_state = 0;
lwip_close_r(_socket);
_socket = -1;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
_clearWriteQueue();
if (_error_cb) _error_cb(_error_cb_arg, this, err);
@@ -938,18 +937,18 @@ void AsyncServer::begin()
int r = fcntl(sockfd, F_SETFL, O_NONBLOCK);
// Updating state visible to asyncTcpSock task
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
_socket = sockfd;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
void AsyncServer::end()
{
if (_socket == -1) return;
xSemaphoreTake(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
xSemaphoreTakeRecursive(_asyncsock_mutex, (TickType_t)portMAX_DELAY);
lwip_close_r(_socket);
_socket = -1;
xSemaphoreGive(_asyncsock_mutex);
xSemaphoreGiveRecursive(_asyncsock_mutex);
}
void AsyncServer::_sockIsReadable(void)