Rework recovery control, simplifying the process significantly and
removing a lot of code. This also makes it easier for different
user-space infrastructures to drive the dlm.
Signed-off-by: David Teigland <[email protected]>
Index: linux-2.6.12-mm1/drivers/dlm/ast.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/ast.c
+++ linux-2.6.12-mm1/drivers/dlm/ast.c
@@ -61,7 +61,7 @@ static void process_asts(void)
r = lkb->lkb_resource;
ls = r->res_ls;
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags))
+ if (dlm_locking_stopped(ls))
continue;
list_del(&lkb->lkb_astqueue);
Index: linux-2.6.12-mm1/drivers/dlm/dir.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/dir.c
+++ linux-2.6.12-mm1/drivers/dlm/dir.c
@@ -270,7 +270,7 @@ int dlm_recover_directory(struct dlm_ls
;
}
- set_bit(LSFL_DIR_VALID, &ls->ls_flags);
+ dlm_set_recover_status(ls, DLM_RS_DIR);
error = 0;
log_debug(ls, "dlm_recover_directory %d entries", count);
Index: linux-2.6.12-mm1/drivers/dlm/dlm_internal.h
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/dlm_internal.h
+++ linux-2.6.12-mm1/drivers/dlm/dlm_internal.h
@@ -133,7 +133,6 @@ struct dlm_lkbtable {
struct dlm_member {
struct list_head list;
int nodeid;
- int gone_event;
int weight;
};
@@ -145,7 +144,7 @@ struct dlm_recover {
struct list_head list;
int *nodeids;
int node_count;
- int event_id;
+ uint64_t seq;
};
/*
@@ -262,6 +261,7 @@ struct dlm_lkb {
long lkb_astparam; /* caller's ast arg */
};
+
struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */
struct kref res_ref;
@@ -376,12 +376,14 @@ struct dlm_message {
};
-#define DIR_VALID 0x00000001
-#define DIR_ALL_VALID 0x00000002
-#define NODES_VALID 0x00000004
-#define NODES_ALL_VALID 0x00000008
-#define LOCKS_VALID 0x00000010
-#define LOCKS_ALL_VALID 0x00000020
+#define DLM_RS_NODES 0x00000001
+#define DLM_RS_NODES_ALL 0x00000002
+#define DLM_RS_DIR 0x00000004
+#define DLM_RS_DIR_ALL 0x00000008
+#define DLM_RS_LOCKS 0x00000010
+#define DLM_RS_LOCKS_ALL 0x00000020
+#define DLM_RS_DONE 0x00000040
+#define DLM_RS_DONE_ALL 0x00000080
#define DLM_RCOM_STATUS 1
#define DLM_RCOM_NAMES 2
@@ -427,31 +429,6 @@ struct rcom_lock {
char rl_lvb[0];
};
-
-#define LSST_NONE 0
-#define LSST_INIT 1
-#define LSST_INIT_DONE 2
-#define LSST_CLEAR 3
-#define LSST_WAIT_START 4
-#define LSST_RECONFIG_DONE 5
-
-#define LSFL_WORK 0
-#define LSFL_LS_RUN 1
-#define LSFL_LS_STOP 2
-#define LSFL_LS_START 3
-#define LSFL_LS_FINISH 4
-#define LSFL_RCOM_READY 5
-#define LSFL_FINISH_RECOVERY 6
-#define LSFL_DIR_VALID 7
-#define LSFL_ALL_DIR_VALID 8
-#define LSFL_NODES_VALID 9
-#define LSFL_ALL_NODES_VALID 10
-#define LSFL_LS_TERMINATE 11
-#define LSFL_JOIN_DONE 12
-#define LSFL_LEAVE_DONE 13
-#define LSFL_LOCKS_VALID 14
-#define LSFL_ALL_LOCKS_VALID 15
-
struct dlm_ls {
struct list_head ls_list; /* list of lockspaces */
uint32_t ls_global_id; /* global unique lockspace ID */
@@ -487,20 +464,18 @@ struct dlm_ls {
struct dentry *ls_debug_dentry; /* debugfs */
+ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
+ int ls_uevent_result;
+
/* recovery related */
struct timer_list ls_timer;
- wait_queue_head_t ls_wait_member;
struct task_struct *ls_recoverd_task;
struct semaphore ls_recoverd_active;
- struct list_head ls_recover; /* dlm_recover structs */
spinlock_t ls_recover_lock;
- int ls_last_stop;
- int ls_last_start;
- int ls_last_finish;
- int ls_startdone;
- int ls_state; /* recovery states */
-
+ uint32_t ls_recover_status; /* DLM_RS_ */
+ uint64_t ls_recover_seq;
+ struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */
struct list_head ls_requestqueue;/* queue remote requests */
struct semaphore ls_requestqueue_lock;
@@ -517,5 +492,21 @@ struct dlm_ls {
char ls_name[1];
};
+#define LSFL_WORK 0
+#define LSFL_RUNNING 1
+#define LSFL_RECOVERY_STOP 2
+#define LSFL_RCOM_READY 3
+#define LSFL_UEVENT_WAIT 4
+
+static inline int dlm_locking_stopped(struct dlm_ls *ls)
+{
+ return !test_bit(LSFL_RUNNING, &ls->ls_flags);
+}
+
+static inline int dlm_recovery_stopped(struct dlm_ls *ls)
+{
+ return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+}
+
#endif /* __DLM_INTERNAL_DOT_H__ */
Index: linux-2.6.12-mm1/drivers/dlm/lock.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/lock.c
+++ linux-2.6.12-mm1/drivers/dlm/lock.c
@@ -800,7 +800,7 @@ void dlm_scan_rsbs(struct dlm_ls *ls)
{
int i;
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags))
+ if (dlm_locking_stopped(ls))
return;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
@@ -3086,7 +3086,7 @@ int dlm_receive_message(struct dlm_heade
recovery operation is starting. */
while (1) {
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
+ if (dlm_locking_stopped(ls)) {
if (!recovery)
dlm_add_requestqueue(ls, nodeid, hd);
error = -EINTR;
@@ -3293,7 +3293,7 @@ int dlm_recover_waiters_post(struct dlm_
int error = 0, mstype;
while (1) {
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
+ if (dlm_locking_stopped(ls)) {
log_debug(ls, "recover_waiters_post aborted");
error = -EINTR;
break;
Index: linux-2.6.12-mm1/drivers/dlm/lockspace.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/lockspace.c
+++ linux-2.6.12-mm1/drivers/dlm/lockspace.c
@@ -252,36 +252,46 @@ static int new_lockspace(char *name, int
rwlock_init(&ls->ls_dirtbl[i].lock);
}
- ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
- if (!ls->ls_recover_buf)
- goto out_dirfree;
+ INIT_LIST_HEAD(&ls->ls_waiters);
+ init_MUTEX(&ls->ls_waiters_sem);
- init_waitqueue_head(&ls->ls_wait_member);
INIT_LIST_HEAD(&ls->ls_nodes);
INIT_LIST_HEAD(&ls->ls_nodes_gone);
- INIT_LIST_HEAD(&ls->ls_waiters);
ls->ls_num_nodes = 0;
+ ls->ls_low_nodeid = 0;
+ ls->ls_total_weight = 0;
ls->ls_node_array = NULL;
+ ls->ls_nodeids_next = NULL;
+ ls->ls_nodeids_next_count = 0;
+
+ memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
+ ls->ls_stub_rsb.res_ls = ls;
+
+ ls->ls_debug_dentry = NULL;
+
+ init_waitqueue_head(&ls->ls_uevent_wait);
+ ls->ls_uevent_result = 0;
+
ls->ls_recoverd_task = NULL;
init_MUTEX(&ls->ls_recoverd_active);
- INIT_LIST_HEAD(&ls->ls_recover);
spin_lock_init(&ls->ls_recover_lock);
+ ls->ls_recover_status = 0;
+ ls->ls_recover_seq = 0;
+ ls->ls_recover_args = NULL;
+ init_rwsem(&ls->ls_in_recovery);
+ INIT_LIST_HEAD(&ls->ls_requestqueue);
+ init_MUTEX(&ls->ls_requestqueue_lock);
+
+ ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
+ if (!ls->ls_recover_buf)
+ goto out_dirfree;
+
INIT_LIST_HEAD(&ls->ls_recover_list);
- ls->ls_recover_list_count = 0;
spin_lock_init(&ls->ls_recover_list_lock);
+ ls->ls_recover_list_count = 0;
init_waitqueue_head(&ls->ls_wait_general);
INIT_LIST_HEAD(&ls->ls_root_list);
- INIT_LIST_HEAD(&ls->ls_requestqueue);
- ls->ls_last_stop = 0;
- ls->ls_last_start = 0;
- ls->ls_last_finish = 0;
- init_MUTEX(&ls->ls_waiters_sem);
- init_MUTEX(&ls->ls_requestqueue_lock);
init_rwsem(&ls->ls_root_sem);
- init_rwsem(&ls->ls_in_recovery);
-
- memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
- ls->ls_stub_rsb.res_ls = ls;
down_write(&ls->ls_in_recovery);
@@ -291,8 +301,6 @@ static int new_lockspace(char *name, int
goto out_rcomfree;
}
- ls->ls_state = LSST_INIT;
-
spin_lock(&lslist_lock);
list_add(&ls->ls_list, &lslist);
spin_unlock(&lslist_lock);
@@ -307,22 +315,10 @@ static int new_lockspace(char *name, int
if (error)
goto out_del;
- kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE, NULL);
-
- /* Now we depend on userspace to notice the new ls, join it and
- give us a start or terminate. The ls isn't actually running
- until it receives a start. */
-
- error = wait_event_interruptible(ls->ls_wait_member,
- test_bit(LSFL_JOIN_DONE, &ls->ls_flags));
+ error = dlm_uevent(ls, 1);
if (error)
goto out_unreg;
- if (test_bit(LSFL_LS_TERMINATE, &ls->ls_flags)) {
- error = -ERESTARTSYS;
- goto out_unreg;
- }
-
*lockspace = ls;
return 0;
@@ -402,19 +398,15 @@ static int release_lockspace(struct dlm_
{
struct dlm_lkb *lkb;
struct dlm_rsb *rsb;
- struct dlm_recover *rv;
struct list_head *head;
- int i, error;
+ int i;
int busy = lockspace_busy(ls);
if (busy > force)
return -EBUSY;
- if (force < 3) {
- error = kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE, NULL);
- error = wait_event_interruptible(ls->ls_wait_member,
- test_bit(LSFL_LEAVE_DONE, &ls->ls_flags));
- }
+ if (force < 3)
+ dlm_uevent(ls, 0);
dlm_recoverd_stop(ls);
@@ -486,13 +478,7 @@ static int release_lockspace(struct dlm_
* Free structures on any other lists
*/
- head = &ls->ls_recover;
- while (!list_empty(head)) {
- rv = list_entry(head->next, struct dlm_recover, list);
- list_del(&rv->list);
- kfree(rv);
- }
-
+ kfree(ls->ls_recover_args);
dlm_clear_free_entries(ls);
dlm_clear_members(ls);
dlm_clear_members_gone(ls);
Index: linux-2.6.12-mm1/drivers/dlm/member.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/member.c
+++ linux-2.6.12-mm1/drivers/dlm/member.c
@@ -112,18 +112,6 @@ void dlm_clear_members_gone(struct dlm_l
clear_memb_list(&ls->ls_nodes_gone);
}
-void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event)
-{
- struct dlm_member *memb, *safe;
-
- list_for_each_entry_safe(memb, safe, &ls->ls_nodes_gone, list) {
- if (memb->gone_event <= finish_event) {
- list_del(&memb->list);
- kfree(memb);
- }
- }
-}
-
static void make_member_array(struct dlm_ls *ls)
{
struct dlm_member *memb;
@@ -195,7 +183,6 @@ int dlm_recover_members(struct dlm_ls *l
if (!found) {
neg++;
- memb->gone_event = rv->event_id;
dlm_remove_member(ls, memb);
log_debug(ls, "remove member %d", memb->nodeid);
}
@@ -218,7 +205,7 @@ int dlm_recover_members(struct dlm_ls *l
ls->ls_low_nodeid = low;
make_member_array(ls);
- set_bit(LSFL_NODES_VALID, &ls->ls_flags);
+ dlm_set_recover_status(ls, DLM_RS_NODES);
*neg_out = neg;
ping_members(ls);
@@ -228,57 +215,24 @@ int dlm_recover_members(struct dlm_ls *l
return error;
}
-int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv)
-{
- int i, error, nodeid, low = -1;
-
- dlm_clear_members(ls);
-
- log_debug(ls, "add members");
-
- for (i = 0; i < rv->node_count; i++) {
- nodeid = rv->nodeids[i];
- dlm_add_member(ls, nodeid);
-
- if (low == -1 || nodeid < low)
- low = nodeid;
- }
- ls->ls_low_nodeid = low;
-
- make_member_array(ls);
- set_bit(LSFL_NODES_VALID, &ls->ls_flags);
-
- ping_members(ls);
-
- error = dlm_recover_members_wait(ls);
- log_debug(ls, "total members %d", ls->ls_num_nodes);
- return error;
-}
-
/*
* Following called from member_sysfs.c
*/
-int dlm_ls_terminate(struct dlm_ls *ls)
-{
- spin_lock(&ls->ls_recover_lock);
- set_bit(LSFL_LS_TERMINATE, &ls->ls_flags);
- set_bit(LSFL_JOIN_DONE, &ls->ls_flags);
- set_bit(LSFL_LEAVE_DONE, &ls->ls_flags);
- spin_unlock(&ls->ls_recover_lock);
- wake_up(&ls->ls_wait_member);
- log_error(ls, "dlm_ls_terminate");
- return 0;
-}
-
int dlm_ls_stop(struct dlm_ls *ls)
{
int new;
+ /*
+ * A stop cancels any recovery that's in progress (see RECOVERY_STOP,
+ * dlm_recovery_stopped()) and prevents any new locks from being
+ * processed (see RUNNING, dlm_locking_stopped()).
+ */
+
spin_lock(&ls->ls_recover_lock);
- ls->ls_last_stop = ls->ls_last_start;
- set_bit(LSFL_LS_STOP, &ls->ls_flags);
- new = test_and_clear_bit(LSFL_LS_RUN, &ls->ls_flags);
+ set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+ new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
+ ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock);
/*
@@ -295,26 +249,20 @@ int dlm_ls_stop(struct dlm_ls *ls)
/*
* The recoverd suspend/resume makes sure that dlm_recoverd (if
- * running) has noticed the clearing of LS_RUN above and quit
+ * running) has noticed the clearing of RUNNING above and quit
* processing the previous recovery. This will be true for all nodes
- * before any nodes get the start.
+ * before any nodes start the new recovery.
*/
dlm_recoverd_suspend(ls);
- clear_bit(LSFL_LOCKS_VALID, &ls->ls_flags);
- clear_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags);
- clear_bit(LSFL_DIR_VALID, &ls->ls_flags);
- clear_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags);
- clear_bit(LSFL_NODES_VALID, &ls->ls_flags);
- clear_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags);
+ ls->ls_recover_status = 0;
dlm_recoverd_resume(ls);
- dlm_recoverd_kick(ls);
return 0;
}
-int dlm_ls_start(struct dlm_ls *ls, int event_nr)
+int dlm_ls_start(struct dlm_ls *ls)
{
- struct dlm_recover *rv;
+ struct dlm_recover *rv, *rv_old;
int error = 0;
rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL);
@@ -324,7 +272,9 @@ int dlm_ls_start(struct dlm_ls *ls, int
spin_lock(&ls->ls_recover_lock);
- if (test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
+ /* the lockspace needs to be stopped before it can be started */
+
+ if (!dlm_locking_stopped(ls)) {
spin_unlock(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
kfree(rv);
@@ -340,44 +290,21 @@ int dlm_ls_start(struct dlm_ls *ls, int
goto out;
}
- if (event_nr <= ls->ls_last_start) {
- spin_unlock(&ls->ls_recover_lock);
- log_error(ls, "start event_nr %d not greater than last %d",
- event_nr, ls->ls_last_start);
- kfree(rv);
- error = -EINVAL;
- goto out;
- }
-
rv->nodeids = ls->ls_nodeids_next;
ls->ls_nodeids_next = NULL;
rv->node_count = ls->ls_nodeids_next_count;
- rv->event_id = event_nr;
- ls->ls_last_start = event_nr;
- list_add_tail(&rv->list, &ls->ls_recover);
- set_bit(LSFL_LS_START, &ls->ls_flags);
+ rv->seq = ++ls->ls_recover_seq;
+ rv_old = ls->ls_recover_args;
+ ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
- set_bit(LSFL_JOIN_DONE, &ls->ls_flags);
- wake_up(&ls->ls_wait_member);
+ if (rv_old) {
+ kfree(rv_old->nodeids);
+ kfree(rv_old);
+ }
+
dlm_recoverd_kick(ls);
out:
return error;
}
-int dlm_ls_finish(struct dlm_ls *ls, int event_nr)
-{
- spin_lock(&ls->ls_recover_lock);
- if (event_nr != ls->ls_last_start) {
- spin_unlock(&ls->ls_recover_lock);
- log_error(ls, "finish event_nr %d doesn't match start %d",
- event_nr, ls->ls_last_start);
- return -EINVAL;
- }
- ls->ls_last_finish = event_nr;
- set_bit(LSFL_LS_FINISH, &ls->ls_flags);
- spin_unlock(&ls->ls_recover_lock);
- dlm_recoverd_kick(ls);
- return 0;
-}
-
Index: linux-2.6.12-mm1/drivers/dlm/member.h
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/member.h
+++ linux-2.6.12-mm1/drivers/dlm/member.h
@@ -13,15 +13,10 @@
#ifndef __MEMBER_DOT_H__
#define __MEMBER_DOT_H__
-int dlm_ls_terminate(struct dlm_ls *ls);
int dlm_ls_stop(struct dlm_ls *ls);
-int dlm_ls_start(struct dlm_ls *ls, int event_nr);
-int dlm_ls_finish(struct dlm_ls *ls, int event_nr);
-
+int dlm_ls_start(struct dlm_ls *ls);
void dlm_clear_members(struct dlm_ls *ls);
void dlm_clear_members_gone(struct dlm_ls *ls);
-void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event);
-int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv);
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
int dlm_is_removed(struct dlm_ls *ls, int nodeid);
Index: linux-2.6.12-mm1/drivers/dlm/member_sysfs.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/member_sysfs.c
+++ linux-2.6.12-mm1/drivers/dlm/member_sysfs.c
@@ -13,113 +13,33 @@
#include "dlm_internal.h"
#include "member.h"
-/*
-/dlm/lsname/stop RW write 1 to suspend operation
-/dlm/lsname/start RW write event_nr to start recovery
-/dlm/lsname/finish RW write event_nr to finish recovery
-/dlm/lsname/terminate RW write event_nr to term recovery
-/dlm/lsname/done RO event_nr dlm is done processing
-/dlm/lsname/id RW global id of lockspace
-/dlm/lsname/members RW read = current members, write = next members
-*/
-
-static ssize_t dlm_stop_show(struct dlm_ls *ls, char *buf)
-{
- ssize_t ret;
- int val;
-
- spin_lock(&ls->ls_recover_lock);
- val = ls->ls_last_stop;
- spin_unlock(&ls->ls_recover_lock);
- ret = sprintf(buf, "%d\n", val);
- return ret;
-}
-
-static ssize_t dlm_stop_store(struct dlm_ls *ls, const char *buf, size_t len)
+static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
{
- ssize_t ret = -EINVAL;
+ ssize_t ret = len;
+ int n = simple_strtol(buf, NULL, 0);
- if (simple_strtol(buf, NULL, 0) == 1) {
+ switch (n) {
+ case 0:
dlm_ls_stop(ls);
- ret = len;
+ break;
+ case 1:
+ dlm_ls_start(ls);
+ break;
+ default:
+ ret = -EINVAL;
}
return ret;
}
-static ssize_t dlm_start_show(struct dlm_ls *ls, char *buf)
-{
- ssize_t ret;
- int val;
-
- spin_lock(&ls->ls_recover_lock);
- val = ls->ls_last_start;
- spin_unlock(&ls->ls_recover_lock);
- ret = sprintf(buf, "%d\n", val);
- return ret;
-}
-
-static ssize_t dlm_start_store(struct dlm_ls *ls, const char *buf, size_t len)
-{
- ssize_t ret;
- ret = dlm_ls_start(ls, simple_strtol(buf, NULL, 0));
- return ret ? ret : len;
-}
-
-static ssize_t dlm_finish_show(struct dlm_ls *ls, char *buf)
-{
- ssize_t ret;
- int val;
-
- spin_lock(&ls->ls_recover_lock);
- val = ls->ls_last_finish;
- spin_unlock(&ls->ls_recover_lock);
- ret = sprintf(buf, "%d\n", val);
- return ret;
-}
-
-static ssize_t dlm_finish_store(struct dlm_ls *ls, const char *buf, size_t len)
+static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
{
- dlm_ls_finish(ls, simple_strtol(buf, NULL, 0));
+ ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
+ set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
+ wake_up(&ls->ls_uevent_wait);
return len;
}
-static ssize_t dlm_terminate_show(struct dlm_ls *ls, char *buf)
-{
- ssize_t ret;
- int val = 0;
-
- spin_lock(&ls->ls_recover_lock);
- if (test_bit(LSFL_LS_TERMINATE, &ls->ls_flags))
- val = 1;
- spin_unlock(&ls->ls_recover_lock);
- ret = sprintf(buf, "%d\n", val);
- return ret;
-}
-
-static ssize_t dlm_terminate_store(struct dlm_ls *ls, const char *buf, size_t len)
-{
- ssize_t ret = -EINVAL;
-
- if (simple_strtol(buf, NULL, 0) == 1) {
- dlm_ls_terminate(ls);
- ret = len;
- }
- return ret;
-}
-
-static ssize_t dlm_done_show(struct dlm_ls *ls, char *buf)
-{
- ssize_t ret;
- int val;
-
- spin_lock(&ls->ls_recover_lock);
- val = ls->ls_startdone;
- spin_unlock(&ls->ls_recover_lock);
- ret = sprintf(buf, "%d\n", val);
- return ret;
-}
-
static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
{
return sprintf(buf, "%u\n", ls->ls_global_id);
@@ -204,33 +124,14 @@ struct dlm_attr {
ssize_t (*store)(struct dlm_ls *, const char *, size_t);
};
-static struct dlm_attr dlm_attr_stop = {
- .attr = {.name = "stop", .mode = S_IRUGO | S_IWUSR},
- .show = dlm_stop_show,
- .store = dlm_stop_store
-};
-
-static struct dlm_attr dlm_attr_start = {
- .attr = {.name = "start", .mode = S_IRUGO | S_IWUSR},
- .show = dlm_start_show,
- .store = dlm_start_store
-};
-
-static struct dlm_attr dlm_attr_finish = {
- .attr = {.name = "finish", .mode = S_IRUGO | S_IWUSR},
- .show = dlm_finish_show,
- .store = dlm_finish_store
-};
-
-static struct dlm_attr dlm_attr_terminate = {
- .attr = {.name = "terminate", .mode = S_IRUGO | S_IWUSR},
- .show = dlm_terminate_show,
- .store = dlm_terminate_store
+static struct dlm_attr dlm_attr_control = {
+ .attr = {.name = "control", .mode = S_IWUSR},
+ .store = dlm_control_store
};
-static struct dlm_attr dlm_attr_done = {
- .attr = {.name = "done", .mode = S_IRUGO},
- .show = dlm_done_show,
+static struct dlm_attr dlm_attr_event = {
+ .attr = {.name = "event_done", .mode = S_IWUSR},
+ .store = dlm_event_store
};
static struct dlm_attr dlm_attr_id = {
@@ -246,11 +147,8 @@ static struct dlm_attr dlm_attr_members
};
static struct attribute *dlm_attrs[] = {
- &dlm_attr_stop.attr,
- &dlm_attr_start.attr,
- &dlm_attr_finish.attr,
- &dlm_attr_terminate.attr,
- &dlm_attr_done.attr,
+ &dlm_attr_control.attr,
+ &dlm_attr_event.attr,
&dlm_attr_id.attr,
&dlm_attr_members.attr,
NULL,
@@ -320,3 +218,22 @@ int dlm_kobject_setup(struct dlm_ls *ls)
return 0;
}
+int dlm_uevent(struct dlm_ls *ls, int in)
+{
+ int error;
+
+ if (in)
+ kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE, NULL);
+ else
+ kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE, NULL);
+
+ error = wait_event_interruptible(ls->ls_uevent_wait,
+ test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
+ if (error)
+ goto out;
+
+ error = ls->ls_uevent_result;
+ out:
+ return error;
+}
+
Index: linux-2.6.12-mm1/drivers/dlm/member_sysfs.h
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/member_sysfs.h
+++ linux-2.6.12-mm1/drivers/dlm/member_sysfs.h
@@ -16,6 +16,7 @@
int dlm_member_sysfs_init(void);
void dlm_member_sysfs_exit(void);
int dlm_kobject_setup(struct dlm_ls *ls);
+int dlm_uevent(struct dlm_ls *ls, int in);
#endif /* __MEMBER_SYSFS_DOT_H__ */
Index: linux-2.6.12-mm1/drivers/dlm/rcom.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/rcom.c
+++ linux-2.6.12-mm1/drivers/dlm/rcom.c
@@ -84,31 +84,6 @@ static int check_config(struct dlm_ls *l
return 0;
}
-static int make_status(struct dlm_ls *ls)
-{
- int status = 0;
-
- if (test_bit(LSFL_NODES_VALID, &ls->ls_flags))
- status |= NODES_VALID;
-
- if (test_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags))
- status |= NODES_ALL_VALID;
-
- if (test_bit(LSFL_DIR_VALID, &ls->ls_flags))
- status |= DIR_VALID;
-
- if (test_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags))
- status |= DIR_ALL_VALID;
-
- if (test_bit(LSFL_LOCKS_VALID, &ls->ls_flags))
- status |= LOCKS_VALID;
-
- if (test_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags))
- status |= LOCKS_ALL_VALID;
-
- return status;
-}
-
int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
{
struct dlm_rcom *rc;
@@ -119,7 +94,7 @@ int dlm_rcom_status(struct dlm_ls *ls, i
if (nodeid == dlm_our_nodeid()) {
rc = (struct dlm_rcom *) ls->ls_recover_buf;
- rc->rc_result = make_status(ls);
+ rc->rc_result = dlm_recover_status(ls);
goto out;
}
@@ -150,7 +125,7 @@ static void receive_rcom_status(struct d
sizeof(struct rcom_config), &rc, &mh);
if (error)
return;
- rc->rc_result = make_status(ls);
+ rc->rc_result = dlm_recover_status(ls);
make_config(ls, (struct rcom_config *) rc->rc_buf);
send_rcom(ls, mh, rc);
@@ -197,6 +172,7 @@ static void receive_rcom_names(struct dl
struct dlm_mhandle *mh;
int error, inlen, outlen;
int nodeid = rc_in->rc_header.h_nodeid;
+ uint32_t status = dlm_recover_status(ls);
/*
* We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
@@ -205,7 +181,7 @@ static void receive_rcom_names(struct dl
* message from a previous instance of recovery.
*/
- if (!test_bit(LSFL_NODES_VALID, &ls->ls_flags)) {
+ if (!(status & DLM_RS_NODES)) {
log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
return;
}
@@ -355,7 +331,9 @@ static void receive_rcom_lock(struct dlm
static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
- if (!test_bit(LSFL_DIR_VALID, &ls->ls_flags)) {
+ uint32_t status = dlm_recover_status(ls);
+
+ if (!(status & DLM_RS_DIR)) {
log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
rc_in->rc_header.h_nodeid);
return;
Index: linux-2.6.12-mm1/drivers/dlm/recover.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/recover.c
+++ linux-2.6.12-mm1/drivers/dlm/recover.c
@@ -30,19 +30,14 @@
* the one being waited for).
*/
-int dlm_recovery_stopped(struct dlm_ls *ls)
-{
- return test_bit(LSFL_LS_STOP, &ls->ls_flags);
-}
-
/*
- * Wait until given function returns non-zero or lockspace is stopped (LS_STOP
- * set due to failure of a node in ls_nodes). When another function thinks it
- * could have completed the waited-on task, they should wake up ls_wait_general
- * to get an immediate response rather than waiting for the timer to detect the
- * result. A timer wakes us up periodically while waiting to see if we should
- * abort due to a node failure. This should only be called by the dlm_recoverd
- * thread.
+ * Wait until given function returns non-zero or lockspace is stopped
+ * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
+ * function thinks it could have completed the waited-on task, they should wake
+ * up ls_wait_general to get an immediate response rather than waiting for the
+ * timer to detect the result. A timer wakes us up periodically while waiting
+ * to see if we should abort due to a node failure. This should only be called
+ * by the dlm_recoverd thread.
*/
static void dlm_wait_timer_fn(unsigned long data)
@@ -73,11 +68,28 @@ int dlm_wait_function(struct dlm_ls *ls,
/*
* An efficient way for all nodes to wait for all others to have a certain
* status. The node with the lowest nodeid polls all the others for their
- * status (dlm_wait_status_all) and all the others poll the node with the low
- * id for its accumulated result (dlm_wait_status_low).
+ * status (wait_status_all) and all the others poll the node with the low id
+ * for its accumulated result (wait_status_low). When all nodes have set
+ * status flag X, then status flag X_ALL will be set on the low nodeid.
*/
-static int dlm_wait_status_all(struct dlm_ls *ls, unsigned int wait_status)
+uint32_t dlm_recover_status(struct dlm_ls *ls)
+{
+ uint32_t status;
+ spin_lock(&ls->ls_recover_lock);
+ status = ls->ls_recover_status;
+ spin_unlock(&ls->ls_recover_lock);
+ return status;
+}
+
+void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
+{
+ spin_lock(&ls->ls_recover_lock);
+ ls->ls_recover_status |= status;
+ spin_unlock(&ls->ls_recover_lock);
+}
+
+static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
{
struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
struct dlm_member *memb;
@@ -105,7 +117,7 @@ static int dlm_wait_status_all(struct dl
return error;
}
-static int dlm_wait_status_low(struct dlm_ls *ls, unsigned int wait_status)
+static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
{
struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -129,46 +141,39 @@ static int dlm_wait_status_low(struct dl
return error;
}
-int dlm_recover_members_wait(struct dlm_ls *ls)
+static int wait_status(struct dlm_ls *ls, uint32_t status)
{
+ uint32_t status_all = status << 1;
int error;
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
- error = dlm_wait_status_all(ls, NODES_VALID);
+ error = wait_status_all(ls, status);
if (!error)
- set_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags);
+ dlm_set_recover_status(ls, status_all);
} else
- error = dlm_wait_status_low(ls, NODES_ALL_VALID);
+ error = wait_status_low(ls, status_all);
return error;
}
-int dlm_recover_directory_wait(struct dlm_ls *ls)
+int dlm_recover_members_wait(struct dlm_ls *ls)
{
- int error;
-
- if (ls->ls_low_nodeid == dlm_our_nodeid()) {
- error = dlm_wait_status_all(ls, DIR_VALID);
- if (!error)
- set_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags);
- } else
- error = dlm_wait_status_low(ls, DIR_ALL_VALID);
+ return wait_status(ls, DLM_RS_NODES);
+}
- return error;
+int dlm_recover_directory_wait(struct dlm_ls *ls)
+{
+ return wait_status(ls, DLM_RS_DIR);
}
int dlm_recover_locks_wait(struct dlm_ls *ls)
{
- int error;
-
- if (ls->ls_low_nodeid == dlm_our_nodeid()) {
- error = dlm_wait_status_all(ls, LOCKS_VALID);
- if (!error)
- set_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags);
- } else
- error = dlm_wait_status_low(ls, LOCKS_ALL_VALID);
+ return wait_status(ls, DLM_RS_LOCKS);
+}
- return error;
+int dlm_recover_done_wait(struct dlm_ls *ls)
+{
+ return wait_status(ls, DLM_RS_DONE);
}
/*
@@ -517,7 +522,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
if (error)
recover_list_clear(ls);
else
- set_bit(LSFL_LOCKS_VALID, &ls->ls_flags);
+ dlm_set_recover_status(ls, DLM_RS_LOCKS);
return error;
}
Index: linux-2.6.12-mm1/drivers/dlm/recover.h
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/recover.h
+++ linux-2.6.12-mm1/drivers/dlm/recover.h
@@ -14,8 +14,13 @@
#ifndef __RECOVER_DOT_H__
#define __RECOVER_DOT_H__
-int dlm_recovery_stopped(struct dlm_ls *ls);
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls));
+uint32_t dlm_recover_status(struct dlm_ls *ls);
+void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status);
+int dlm_recover_members_wait(struct dlm_ls *ls);
+int dlm_recover_directory_wait(struct dlm_ls *ls);
+int dlm_recover_locks_wait(struct dlm_ls *ls);
+int dlm_recover_done_wait(struct dlm_ls *ls);
int dlm_recover_masters(struct dlm_ls *ls);
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls);
@@ -24,9 +29,6 @@ int dlm_create_root_list(struct dlm_ls *
void dlm_release_root_list(struct dlm_ls *ls);
void dlm_clear_toss_list(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls);
-int dlm_recover_members_wait(struct dlm_ls *ls);
-int dlm_recover_directory_wait(struct dlm_ls *ls);
-int dlm_recover_locks_wait(struct dlm_ls *ls);
#endif /* __RECOVER_DOT_H__ */
Index: linux-2.6.12-mm1/drivers/dlm/recoverd.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/recoverd.c
+++ linux-2.6.12-mm1/drivers/dlm/recoverd.c
@@ -21,91 +21,30 @@
#include "lock.h"
#include "requestqueue.h"
-/*
- * next_move actions
- */
-
-#define DO_STOP (1)
-#define DO_START (2)
-#define DO_FINISH (3)
-#define DO_FINISH_STOP (4)
-#define DO_FINISH_START (5)
-static void set_start_done(struct dlm_ls *ls, int event_id)
-{
- spin_lock(&ls->ls_recover_lock);
- ls->ls_startdone = event_id;
- spin_unlock(&ls->ls_recover_lock);
-
- kobject_uevent(&ls->ls_kobj, KOBJ_CHANGE, NULL);
-}
+/* If the start for which we're re-enabling locking (seq) has been superseded
+ by a newer stop (ls_recover_seq), we need to leave locking disabled. */
-static int enable_locking(struct dlm_ls *ls, int event_id)
+static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
- int error = 0;
+ int error = -EINTR;
spin_lock(&ls->ls_recover_lock);
- if (ls->ls_last_stop < event_id) {
- set_bit(LSFL_LS_RUN, &ls->ls_flags);
+ if (ls->ls_recover_seq == seq) {
+ set_bit(LSFL_RUNNING, &ls->ls_flags);
up_write(&ls->ls_in_recovery);
- } else {
- error = -EINTR;
- log_debug(ls, "enable_locking: abort %d", event_id);
+ error = 0;
}
spin_unlock(&ls->ls_recover_lock);
return error;
}
-static int ls_first_start(struct dlm_ls *ls, struct dlm_recover *rv)
-{
- int error;
-
- log_debug(ls, "recover event %u (first)", rv->event_id);
-
- down(&ls->ls_recoverd_active);
-
- error = dlm_recover_members_first(ls, rv);
- if (error) {
- log_error(ls, "recover_members first failed %d", error);
- goto out;
- }
-
- error = dlm_recover_directory(ls);
- if (error) {
- log_error(ls, "recover_directory failed %d", error);
- goto out;
- }
-
- error = dlm_recover_directory_wait(ls);
- if (error) {
- log_error(ls, "recover_directory_wait failed %d", error);
- goto out;
- }
-
- log_debug(ls, "recover event %u done", rv->event_id);
- set_start_done(ls, rv->event_id);
-
- out:
- up(&ls->ls_recoverd_active);
- return error;
-}
-
-/*
- * We are given here a new group of nodes which are in the lockspace. We first
- * figure out the differences in ls membership from when we were last running.
- * If nodes from before are gone, then there will be some lock recovery to do.
- * If there are only nodes which have joined, then there's no lock recovery.
- *
- * Lockspace recovery for failed nodes must be completed before any nodes are
- * allowed to join or leave the lockspace.
- */
-
-static int ls_reconfig(struct dlm_ls *ls, struct dlm_recover *rv)
+static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
{
unsigned long start;
int error, neg = 0;
- log_debug(ls, "recover event %u", rv->event_id);
+ log_debug(ls, "recover %"PRIx64"", rv->seq);
down(&ls->ls_recoverd_active);
@@ -228,431 +167,61 @@ static int ls_reconfig(struct dlm_ls *ls
dlm_release_root_list(ls);
- log_debug(ls, "recover event %u done: %u ms", rv->event_id,
- jiffies_to_msecs(jiffies - start));
- set_start_done(ls, rv->event_id);
- up(&ls->ls_recoverd_active);
-
- return 0;
-
- fail:
- log_debug(ls, "recover event %d error %d", rv->event_id, error);
- up(&ls->ls_recoverd_active);
- return error;
-}
-
-/*
- * Between calls to this routine for a ls, there can be multiple stop/start
- * events where every start but the latest is cancelled by stops. There can
- * only be a single finish because every finish requires us to call start_done.
- * A single finish event could be followed by multiple stop/start events. This
- * routine takes any combination of events and boils them down to one course of
- * action.
- */
-
-static int next_move(struct dlm_ls *ls, struct dlm_recover **rv_out,
- int *finish_out)
-{
- LIST_HEAD(events);
- unsigned int cmd = 0, stop, start, finish;
- unsigned int last_stop, last_start, last_finish;
- struct dlm_recover *rv = NULL, *start_rv = NULL;
-
- spin_lock(&ls->ls_recover_lock);
-
- stop = test_and_clear_bit(LSFL_LS_STOP, &ls->ls_flags) ? 1 : 0;
- start = test_and_clear_bit(LSFL_LS_START, &ls->ls_flags) ? 1 : 0;
- finish = test_and_clear_bit(LSFL_LS_FINISH, &ls->ls_flags) ? 1 : 0;
-
- last_stop = ls->ls_last_stop;
- last_start = ls->ls_last_start;
- last_finish = ls->ls_last_finish;
-
- while (!list_empty(&ls->ls_recover)) {
- rv = list_entry(ls->ls_recover.next, struct dlm_recover, list);
- list_del(&rv->list);
- list_add_tail(&rv->list, &events);
+ dlm_set_recover_status(ls, DLM_RS_DONE);
+ error = dlm_recover_done_wait(ls);
+ if (error) {
+ log_error(ls, "recover_done_wait failed %d", error);
+ goto fail;
}
- /*
- * There are two cases where we need to adjust these event values:
- * 1. - we get a first start
- * - we get a stop
- * - we process the start + stop here and notice this special case
- *
- * 2. - we get a first start
- * - we process the start
- * - we get a stop
- * - we process the stop here and notice this special case
- *
- * In both cases, the first start we received was aborted by a
- * stop before we received a finish. last_finish being zero is the
- * indication that this is the "first" start, i.e. we've not yet
- * finished a start; if we had, last_finish would be non-zero.
- * Part of the problem arises from the fact that when we initially
- * get start/stop/start, we may get the same event id for both starts
- * (since the first was cancelled).
- *
- * In both cases, last_start and last_stop will be equal.
- * In both cases, finish=0.
- * In the first case start=1 && stop=1.
- * In the second case start=0 && stop=1.
- *
- * In both cases, we need to make adjustments to values so:
- * - we process the current event (now) as a normal stop
- * - the next start we receive will be processed normally
- * (taking into account the assertions below)
- *
- * In the first case, dlm_ls_start() will have printed the
- * "repeated start" warning.
- *
- * In the first case we need to get rid of the recover event struct.
- *
- * - set stop=1, start=0, finish=0 for case 4 below
- * - last_stop and last_start must be set equal per the case 4 assert
- * - ls_last_stop = 0 so the next start will be larger
- * - ls_last_start = 0 not really necessary (avoids dlm_ls_start print)
- */
+ dlm_clear_members_gone(ls);
- if (!last_finish && (last_start == last_stop)) {
- log_debug(ls, "move reset %u,%u,%u ids %u,%u,%u", stop,
- start, finish, last_stop, last_start, last_finish);
- stop = 1;
- start = 0;
- finish = 0;
- last_stop = 0;
- last_start = 0;
- ls->ls_last_stop = 0;
- ls->ls_last_start = 0;
-
- while (!list_empty(&events)) {
- rv = list_entry(events.next, struct dlm_recover, list);
- list_del(&rv->list);
- kfree(rv->nodeids);
- kfree(rv);
- }
+ error = enable_locking(ls, rv->seq);
+ if (error) {
+ log_error(ls, "enable_locking failed %d", error);
+ goto fail;
}
- spin_unlock(&ls->ls_recover_lock);
- log_debug(ls, "move flags %u,%u,%u ids %u,%u,%u", stop, start, finish,
- last_stop, last_start, last_finish);
-
- /*
- * Toss start events which have since been cancelled.
- */
+ error = dlm_process_requestqueue(ls);
+ if (error) {
+ log_error(ls, "process_requestqueue failed %d", error);
+ goto fail;
+ }
- while (!list_empty(&events)) {
- DLM_ASSERT(start,);
- rv = list_entry(events.next, struct dlm_recover, list);
- list_del(&rv->list);
-
- if (rv->event_id <= last_stop) {
- log_debug(ls, "move skip event %u", rv->event_id);
- kfree(rv->nodeids);
- kfree(rv);
- rv = NULL;
- } else {
- log_debug(ls, "move use event %u", rv->event_id);
- DLM_ASSERT(!start_rv,);
- start_rv = rv;
- }
+ error = dlm_recover_waiters_post(ls);
+ if (error) {
+ log_error(ls, "recover_waiters_post failed %d", error);
+ goto fail;
}
- /*
- * Eight possible combinations of events.
- */
+ dlm_grant_after_purge(ls);
- /* 0 */
- if (!stop && !start && !finish) {
- DLM_ASSERT(!start_rv,);
- cmd = 0;
- goto out;
- }
-
- /* 1 */
- if (!stop && !start && finish) {
- DLM_ASSERT(!start_rv,);
- DLM_ASSERT(last_start > last_stop,);
- DLM_ASSERT(last_finish == last_start,);
- cmd = DO_FINISH;
- *finish_out = last_finish;
- goto out;
- }
-
- /* 2 */
- if (!stop && start && !finish) {
- DLM_ASSERT(start_rv,);
- DLM_ASSERT(last_start > last_stop,);
- cmd = DO_START;
- *rv_out = start_rv;
- goto out;
- }
-
- /* 3 */
- if (!stop && start && finish) {
- DLM_ASSERT(0, printk("finish and start with no stop\n"););
- }
-
- /* 4 */
- if (stop && !start && !finish) {
- DLM_ASSERT(!start_rv,);
- DLM_ASSERT(last_start == last_stop,);
- cmd = DO_STOP;
- goto out;
- }
-
- /* 5 */
- if (stop && !start && finish) {
- DLM_ASSERT(!start_rv,);
- DLM_ASSERT(last_finish == last_start,);
- DLM_ASSERT(last_stop == last_start,);
- cmd = DO_FINISH_STOP;
- *finish_out = last_finish;
- goto out;
- }
-
- /* 6 */
- if (stop && start && !finish) {
- if (start_rv) {
- DLM_ASSERT(last_start > last_stop,);
- cmd = DO_START;
- *rv_out = start_rv;
- } else {
- DLM_ASSERT(last_stop == last_start,);
- cmd = DO_STOP;
- }
- goto out;
- }
+ dlm_astd_wake();
- /* 7 */
- if (stop && start && finish) {
- if (start_rv) {
- DLM_ASSERT(last_start > last_stop,);
- DLM_ASSERT(last_start > last_finish,);
- cmd = DO_FINISH_START;
- *finish_out = last_finish;
- *rv_out = start_rv;
- } else {
- DLM_ASSERT(last_start == last_stop,);
- DLM_ASSERT(last_start > last_finish,);
- cmd = DO_FINISH_STOP;
- *finish_out = last_finish;
- }
- goto out;
- }
+ log_debug(ls, "recover %"PRIx64" done: %u ms", rv->seq,
+ jiffies_to_msecs(jiffies - start));
+ up(&ls->ls_recoverd_active);
- out:
- return cmd;
-}
+ return 0;
-/*
- * This function decides what to do given every combination of current
- * lockspace state and next lockspace state.
- */
+ fail:
+ log_debug(ls, "recover %"PRIx64" error %d", rv->seq, error);
+ up(&ls->ls_recoverd_active);
+ return error;
+}
static void do_ls_recovery(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL;
- int error, cur_state, next_state = 0, do_now, finish_event = 0;
-
- do_now = next_move(ls, &rv, &finish_event);
- if (!do_now)
- goto out;
-
- cur_state = ls->ls_state;
- next_state = 0;
-
- DLM_ASSERT(!test_bit(LSFL_LS_RUN, &ls->ls_flags),
- log_error(ls, "curstate=%d donow=%d", cur_state, do_now););
-
- /*
- * LSST_CLEAR - we're not in any recovery state. We can get a stop or
- * a stop and start which equates with a START.
- */
- if (cur_state == LSST_CLEAR) {
- switch (do_now) {
- case DO_STOP:
- next_state = LSST_WAIT_START;
- break;
-
- case DO_START:
- error = ls_reconfig(ls, rv);
- if (error)
- next_state = LSST_WAIT_START;
- else
- next_state = LSST_RECONFIG_DONE;
- break;
-
- case DO_FINISH: /* invalid */
- case DO_FINISH_STOP: /* invalid */
- case DO_FINISH_START: /* invalid */
- default:
- DLM_ASSERT(0,);
- }
- goto out;
- }
-
- /*
- * LSST_WAIT_START - we're not running because of getting a stop or
- * failing a start. We wait in this state for another stop/start or
- * just the next start to begin another reconfig attempt.
- */
-
- if (cur_state == LSST_WAIT_START) {
- switch (do_now) {
- case DO_STOP:
- break;
-
- case DO_START:
- error = ls_reconfig(ls, rv);
- if (error)
- next_state = LSST_WAIT_START;
- else
- next_state = LSST_RECONFIG_DONE;
- break;
-
- case DO_FINISH: /* invalid */
- case DO_FINISH_STOP: /* invalid */
- case DO_FINISH_START: /* invalid */
- default:
- DLM_ASSERT(0,);
- }
- goto out;
- }
-
- /*
- * LSST_RECONFIG_DONE - we entered this state after successfully
- * completing ls_reconfig and calling set_start_done. We expect to get
- * a finish if everything goes ok. A finish could be followed by stop
- * or stop/start before we get here to check it. Or a finish may never
- * happen, only stop or stop/start.
- */
-
- if (cur_state == LSST_RECONFIG_DONE) {
- switch (do_now) {
- case DO_FINISH:
- dlm_clear_members_finish(ls, finish_event);
- next_state = LSST_CLEAR;
-
- error = enable_locking(ls, finish_event);
- if (error)
- break;
-
- error = dlm_process_requestqueue(ls);
- if (error)
- break;
-
- error = dlm_recover_waiters_post(ls);
- if (error)
- break;
-
- dlm_grant_after_purge(ls);
-
- dlm_astd_wake();
-
- log_debug(ls, "recover event %u finished", finish_event);
- break;
-
- case DO_STOP:
- next_state = LSST_WAIT_START;
- break;
-
- case DO_FINISH_STOP:
- dlm_clear_members_finish(ls, finish_event);
- next_state = LSST_WAIT_START;
- break;
-
- case DO_FINISH_START:
- dlm_clear_members_finish(ls, finish_event);
- /* fall into DO_START */
-
- case DO_START:
- error = ls_reconfig(ls, rv);
- if (error)
- next_state = LSST_WAIT_START;
- else
- next_state = LSST_RECONFIG_DONE;
- break;
-
- default:
- DLM_ASSERT(0,);
- }
- goto out;
- }
-
- /*
- * LSST_INIT - state after ls is created and before it has been
- * started. A start operation will cause the ls to be started for the
- * first time. A failed start will cause to just wait in INIT for
- * another stop/start.
- */
-
- if (cur_state == LSST_INIT) {
- switch (do_now) {
- case DO_START:
- error = ls_first_start(ls, rv);
- if (!error)
- next_state = LSST_INIT_DONE;
- break;
-
- case DO_STOP:
- break;
-
- case DO_FINISH: /* invalid */
- case DO_FINISH_STOP: /* invalid */
- case DO_FINISH_START: /* invalid */
- default:
- DLM_ASSERT(0,);
- }
- goto out;
- }
-
- /*
- * LSST_INIT_DONE - after the first start operation is completed
- * successfully and set_start_done() called. If there are no errors, a
- * finish will arrive next and we'll move to LSST_CLEAR.
- */
-
- if (cur_state == LSST_INIT_DONE) {
- switch (do_now) {
- case DO_STOP:
- case DO_FINISH_STOP:
- next_state = LSST_WAIT_START;
- break;
-
- case DO_START:
- case DO_FINISH_START:
- error = ls_reconfig(ls, rv);
- if (error)
- next_state = LSST_WAIT_START;
- else
- next_state = LSST_RECONFIG_DONE;
- break;
-
- case DO_FINISH:
- next_state = LSST_CLEAR;
-
- enable_locking(ls, finish_event);
-
- dlm_process_requestqueue(ls);
-
- dlm_astd_wake();
-
- log_debug(ls, "recover event %u finished", finish_event);
- break;
-
- default:
- DLM_ASSERT(0,);
- }
- goto out;
- }
-
- out:
- if (next_state)
- ls->ls_state = next_state;
+ spin_lock(&ls->ls_recover_lock);
+ rv = ls->ls_recover_args;
+ ls->ls_recover_args = NULL;
+ clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+ spin_unlock(&ls->ls_recover_lock);
if (rv) {
+ ls_recover(ls, rv);
kfree(rv->nodeids);
kfree(rv);
}
Index: linux-2.6.12-mm1/drivers/dlm/requestqueue.c
===================================================================
--- linux-2.6.12-mm1.orig/drivers/dlm/requestqueue.c
+++ linux-2.6.12-mm1/drivers/dlm/requestqueue.c
@@ -79,8 +79,8 @@ int dlm_process_requestqueue(struct dlm_
list_del(&e->list);
kfree(e);
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
- log_debug(ls, "process_requestqueue abort ls_run");
+ if (dlm_locking_stopped(ls)) {
+ log_debug(ls, "process_requestqueue abort running");
up(&ls->ls_requestqueue_lock);
error = -EINTR;
break;
@@ -105,7 +105,7 @@ void dlm_wait_requestqueue(struct dlm_ls
down(&ls->ls_requestqueue_lock);
if (list_empty(&ls->ls_requestqueue))
break;
- if (!test_bit(LSFL_LS_RUN, &ls->ls_flags))
+ if (dlm_locking_stopped(ls))
break;
up(&ls->ls_requestqueue_lock);
schedule();
--
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
[Index of Archives]
[Kernel Newbies]
[Netfilter]
[Bugtraq]
[Photo]
[Gimp]
[Yosemite News]
[MIPS Linux]
[ARM Linux]
[Linux Security]
[Linux RAID]
[Video 4 Linux]
[Linux for the blind]
|
|