fs: dlm: wait until all midcomms nodes detect version

The current dlm version detection is very complex due to backwards
compatablilty with earlier dlm protocol versions. It takes some time to
detect if a peer node has a specific DLM version. If it's not detected,
we just cut the socket connection. There could be cases where the local
node has not detected the version yet, but the peer node has.  In these
cases, we are trying to shutdown the dlm connection with a FIN/ACK message
exchange to be sure the other peer is ready to shutdown the connection on
dlm application level.  However this mechanism is only available on DLM
protocol version 3.2 and we need to be sure the DLM version is detected
before.

To make it more robust we introduce a a "best effort" wait to wait for the
version detection before shutdown the dlm connection. This need to be
done before the kthread recoverd for recovery handling is stopped,
because recovery handling will trigger enough messages to have a version
detection going on.

It is a corner case which was detected by modprobe dlm_locktroture module
and rmmod dlm_locktorture module directly afterwards (in a looping
behaviour). In practice probably nobody would leave a lockspace immediately
after joining it.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2023-01-12 17:18:44 -05:00 committed by David Teigland
parent 89835b064f
commit b8b750e0c9
3 changed files with 27 additions and 0 deletions

View file

@ -820,6 +820,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
return rv;
}
if (ls_count == 1)
dlm_midcomms_version_wait();
dlm_device_deregister(ls);
if (force < 3 && dlm_user_daemon_available())

View file

@ -657,6 +657,7 @@ static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
break;
@ -826,6 +827,7 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_1;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
node->nodeid);
break;
@ -1386,6 +1388,27 @@ static void midcomms_node_release(struct rcu_head *rcu)
kfree(node);
}
void dlm_midcomms_version_wait(void)
{
struct midcomms_node *node;
int i, idx, ret;
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
ret = wait_event_timeout(node->shutdown_wait,
node->version != DLM_VERSION_NOT_SET ||
node->state == DLM_CLOSED ||
test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
DLM_SHUTDOWN_TIMEOUT);
if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
pr_debug("version wait timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
}
}
srcu_read_unlock(&nodes_srcu, idx);
}
static void midcomms_shutdown(struct midcomms_node *node)
{
int ret;

View file

@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
void dlm_midcomms_version_wait(void);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
void dlm_midcomms_stop(void);