lock.c (b9d2f6ada0083bad46f37a1238fea718b575e0fa) lock.c (1151935182b40bbe398905850f6f7f4fbb262e06)
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6**
7**
8*******************************************************************************

--- 72 unchanged lines hidden (view full) ---

81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int send_remove(struct dlm_rsb *r);
86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6**
7**
8*******************************************************************************

--- 72 unchanged lines hidden (view full) ---

81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int send_remove(struct dlm_rsb *r);
86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms, bool local);
90static int receive_extralen(struct dlm_message *ms);
89 const struct dlm_message *ms, bool local);
90static int receive_extralen(const struct dlm_message *ms);
91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92static void toss_rsb(struct kref *kref);
93
94/*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.

--- 880 unchanged lines hidden (view full) ---

979 * recover masters, we are finding the new master for resources
980 * . dlm_recover_masters
981 * . recover_master
982 * . dlm_send_rcom_lookup
983 * . receive_rcom_lookup
984 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985 */
986
91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92static void toss_rsb(struct kref *kref);
93
94/*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.

--- 880 unchanged lines hidden (view full) ---

979 * recover masters, we are finding the new master for resources
980 * . dlm_recover_masters
981 * . recover_master
982 * . dlm_send_rcom_lookup
983 * . receive_rcom_lookup
984 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985 */
986
987int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
988 unsigned int flags, int *r_nodeid, int *result)
987int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
988 int len, unsigned int flags, int *r_nodeid, int *result)
989{
990 struct dlm_rsb *r = NULL;
991 uint32_t hash, b;
992 int our_nodeid = dlm_our_nodeid();
993 int dir_nodeid, error;
994
995 if (len > DLM_RESNAME_MAXLEN)
996 return -EINVAL;

--- 104 unchanged lines hidden (view full) ---

1101 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102 if (r->res_hash == hash)
1103 dlm_dump_rsb(r);
1104 }
1105 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106 }
1107}
1108
989{
990 struct dlm_rsb *r = NULL;
991 uint32_t hash, b;
992 int our_nodeid = dlm_our_nodeid();
993 int dir_nodeid, error;
994
995 if (len > DLM_RESNAME_MAXLEN)
996 return -EINVAL;

--- 104 unchanged lines hidden (view full) ---

1101 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102 if (r->res_hash == hash)
1103 dlm_dump_rsb(r);
1104 }
1105 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106 }
1107}
1108
1109void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1109void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1110{
1111 struct dlm_rsb *r = NULL;
1112 uint32_t hash, b;
1113 int error;
1114
1115 hash = jhash(name, len, 0);
1116 b = hash & (ls->ls_rsbtbl_size - 1);
1117

--- 336 unchanged lines hidden (view full) ---

1454}
1455
1456/* We clear the RESEND flag because we might be taking an lkb off the waiters
1457 list as part of process_requestqueue (e.g. a lookup that has an optimized
1458 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1459 set RESEND and dlm_recover_waiters_post() */
1460
1461static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1110{
1111 struct dlm_rsb *r = NULL;
1112 uint32_t hash, b;
1113 int error;
1114
1115 hash = jhash(name, len, 0);
1116 b = hash & (ls->ls_rsbtbl_size - 1);
1117

--- 336 unchanged lines hidden (view full) ---

1454}
1455
1456/* We clear the RESEND flag because we might be taking an lkb off the waiters
1457 list as part of process_requestqueue (e.g. a lookup that has an optimized
1458 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1459 set RESEND and dlm_recover_waiters_post() */
1460
1461static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1462 struct dlm_message *ms)
1462 const struct dlm_message *ms)
1463{
1464 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1465 int overlap_done = 0;
1466
1467 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1468 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1470 overlap_done = 1;

--- 81 unchanged lines hidden (view full) ---

1552 error = _remove_from_waiters(lkb, mstype, NULL);
1553 mutex_unlock(&ls->ls_waiters_mutex);
1554 return error;
1555}
1556
1557/* Handles situations where we might be processing a "fake" or "local" reply in
1558 which we can't try to take waiters_mutex again. */
1559
1463{
1464 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1465 int overlap_done = 0;
1466
1467 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1468 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1470 overlap_done = 1;

--- 81 unchanged lines hidden (view full) ---

1552 error = _remove_from_waiters(lkb, mstype, NULL);
1553 mutex_unlock(&ls->ls_waiters_mutex);
1554 return error;
1555}
1556
1557/* Handles situations where we might be processing a "fake" or "local" reply in
1558 which we can't try to take waiters_mutex again. */
1559
1560static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
1561 bool local)
1560static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1561 const struct dlm_message *ms, bool local)
1562{
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564 int error;
1565
1566 if (!local)
1567 mutex_lock(&ls->ls_waiters_mutex);
1568 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1569 if (!local)

--- 225 unchanged lines hidden (view full) ---

1795 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1796 r->res_lvbseq++;
1797 rsb_clear_flag(r, RSB_VALNOTVALID);
1798}
1799
1800/* lkb is process copy (pc) */
1801
1802static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1562{
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564 int error;
1565
1566 if (!local)
1567 mutex_lock(&ls->ls_waiters_mutex);
1568 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1569 if (!local)

--- 225 unchanged lines hidden (view full) ---

1795 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1796 r->res_lvbseq++;
1797 rsb_clear_flag(r, RSB_VALNOTVALID);
1798}
1799
1800/* lkb is process copy (pc) */
1801
1802static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1803 struct dlm_message *ms)
1803 const struct dlm_message *ms)
1804{
1805 int b;
1806
1807 if (!lkb->lkb_lvbptr)
1808 return;
1809
1810 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1811 return;

--- 90 unchanged lines hidden (view full) ---

1902
1903static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1904{
1905 set_lvb_lock(r, lkb);
1906 _grant_lock(r, lkb);
1907}
1908
1909static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1804{
1805 int b;
1806
1807 if (!lkb->lkb_lvbptr)
1808 return;
1809
1810 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1811 return;

--- 90 unchanged lines hidden (view full) ---

1902
1903static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1904{
1905 set_lvb_lock(r, lkb);
1906 _grant_lock(r, lkb);
1907}
1908
1909static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1910 struct dlm_message *ms)
1910 const struct dlm_message *ms)
1911{
1912 set_lvb_lock_pc(r, lkb, ms);
1913 _grant_lock(r, lkb);
1914}
1915
1916/* called by grant_pending_locks() which means an async grant message must
1917 be sent to the requesting node in addition to granting the lock if the
1918 lkb belongs to a remote node. */

--- 21 unchanged lines hidden (view full) ---

1940 log_print("munge_demoted %x invalid modes gr %d rq %d",
1941 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1942 return;
1943 }
1944
1945 lkb->lkb_grmode = DLM_LOCK_NL;
1946}
1947
1911{
1912 set_lvb_lock_pc(r, lkb, ms);
1913 _grant_lock(r, lkb);
1914}
1915
1916/* called by grant_pending_locks() which means an async grant message must
1917 be sent to the requesting node in addition to granting the lock if the
1918 lkb belongs to a remote node. */

--- 21 unchanged lines hidden (view full) ---

1940 log_print("munge_demoted %x invalid modes gr %d rq %d",
1941 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1942 return;
1943 }
1944
1945 lkb->lkb_grmode = DLM_LOCK_NL;
1946}
1947
1948static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1948static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1949{
1950 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1951 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1952 log_print("munge_altmode %x invalid reply type %d",
1953 lkb->lkb_id, le32_to_cpu(ms->m_type));
1954 return;
1955 }
1956

--- 1679 unchanged lines hidden (view full) ---

3636 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3637}
3638
3639static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3640{
3641 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3642}
3643
1949{
1950 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1951 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1952 log_print("munge_altmode %x invalid reply type %d",
1953 lkb->lkb_id, le32_to_cpu(ms->m_type));
1954 return;
1955 }
1956

--- 1679 unchanged lines hidden (view full) ---

3636 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3637}
3638
3639static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3640{
3641 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3642}
3643
3644static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3645 int ret_nodeid, int rv)
3644static int send_lookup_reply(struct dlm_ls *ls,
3645 const struct dlm_message *ms_in, int ret_nodeid,
3646 int rv)
3646{
3647 struct dlm_rsb *r = &ls->ls_local_rsb;
3648 struct dlm_message *ms;
3649 struct dlm_mhandle *mh;
3650 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3651
3652 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3653 GFP_NOFS);

--- 8 unchanged lines hidden (view full) ---

3662 out:
3663 return error;
3664}
3665
3666/* which args we save from a received message depends heavily on the type
3667 of message, unlike the send side where we can safely send everything about
3668 the lkb for any type of message */
3669
3647{
3648 struct dlm_rsb *r = &ls->ls_local_rsb;
3649 struct dlm_message *ms;
3650 struct dlm_mhandle *mh;
3651 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3652
3653 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3654 GFP_NOFS);

--- 8 unchanged lines hidden (view full) ---

3663 out:
3664 return error;
3665}
3666
3667/* which args we save from a received message depends heavily on the type
3668 of message, unlike the send side where we can safely send everything about
3669 the lkb for any type of message */
3670
3670static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3671static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3671{
3672 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3673 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3674 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3675}
3676
3672{
3673 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3674 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3675 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3676}
3677
3677static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
3678static void receive_flags_reply(struct dlm_lkb *lkb,
3679 const struct dlm_message *ms,
3678 bool local)
3679{
3680 if (local)
3681 return;
3682
3683 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3684 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3685}
3686
3680 bool local)
3681{
3682 if (local)
3683 return;
3684
3685 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3686 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3687}
3688
3687static int receive_extralen(struct dlm_message *ms)
3689static int receive_extralen(const struct dlm_message *ms)
3688{
3689 return (le16_to_cpu(ms->m_header.h_length) -
3690 sizeof(struct dlm_message));
3691}
3692
3693static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3690{
3691 return (le16_to_cpu(ms->m_header.h_length) -
3692 sizeof(struct dlm_message));
3693}
3694
3695static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3694 struct dlm_message *ms)
3696 const struct dlm_message *ms)
3695{
3696 int len;
3697
3698 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3699 if (!lkb->lkb_lvbptr)
3700 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3701 if (!lkb->lkb_lvbptr)
3702 return -ENOMEM;

--- 11 unchanged lines hidden (view full) ---

3714}
3715
3716static void fake_astfn(void *astparam)
3717{
3718 log_print("fake_astfn should not be called");
3719}
3720
3721static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3697{
3698 int len;
3699
3700 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3701 if (!lkb->lkb_lvbptr)
3702 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3703 if (!lkb->lkb_lvbptr)
3704 return -ENOMEM;

--- 11 unchanged lines hidden (view full) ---

3716}
3717
3718static void fake_astfn(void *astparam)
3719{
3720 log_print("fake_astfn should not be called");
3721}
3722
3723static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3722 struct dlm_message *ms)
3724 const struct dlm_message *ms)
3723{
3724 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3725 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3726 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3727 lkb->lkb_grmode = DLM_LOCK_IV;
3728 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3729
3730 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;

--- 5 unchanged lines hidden (view full) ---

3736 if (!lkb->lkb_lvbptr)
3737 return -ENOMEM;
3738 }
3739
3740 return 0;
3741}
3742
3743static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3725{
3726 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3727 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3728 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3729 lkb->lkb_grmode = DLM_LOCK_IV;
3730 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3731
3732 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;

--- 5 unchanged lines hidden (view full) ---

3738 if (!lkb->lkb_lvbptr)
3739 return -ENOMEM;
3740 }
3741
3742 return 0;
3743}
3744
3745static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3744 struct dlm_message *ms)
3746 const struct dlm_message *ms)
3745{
3746 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3747 return -EBUSY;
3748
3749 if (receive_lvb(ls, lkb, ms))
3750 return -ENOMEM;
3751
3752 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3753 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3754
3755 return 0;
3756}
3757
3758static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3747{
3748 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3749 return -EBUSY;
3750
3751 if (receive_lvb(ls, lkb, ms))
3752 return -ENOMEM;
3753
3754 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3755 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3756
3757 return 0;
3758}
3759
3760static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3759 struct dlm_message *ms)
3761 const struct dlm_message *ms)
3760{
3761 if (receive_lvb(ls, lkb, ms))
3762 return -ENOMEM;
3763 return 0;
3764}
3765
3766/* We fill in the local-lkb fields with the info that send_xxxx_reply()
3767 uses to send a reply and that the remote end uses to process the reply. */
3768
3762{
3763 if (receive_lvb(ls, lkb, ms))
3764 return -ENOMEM;
3765 return 0;
3766}
3767
3768/* We fill in the local-lkb fields with the info that send_xxxx_reply()
3769 uses to send a reply and that the remote end uses to process the reply. */
3770
3769static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3771static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3770{
3771 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3772 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3773 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3774}
3775
3776/* This is called after the rsb is locked so that we can safely inspect
3777 fields in the lkb. */
3778
3772{
3773 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3774 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3775 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3776}
3777
3778/* This is called after the rsb is locked so that we can safely inspect
3779 fields in the lkb. */
3780
3779static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3781static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3780{
3781 int from = le32_to_cpu(ms->m_header.h_nodeid);
3782 int error = 0;
3783
3784 /* currently mixing of user/kernel locks are not supported */
3785 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3786 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3787 log_error(lkb->lkb_resource->res_ls,

--- 35 unchanged lines hidden (view full) ---

3823 log_error(lkb->lkb_resource->res_ls,
3824 "ignore invalid message %d from %d %x %x %x %d",
3825 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3826 lkb->lkb_remid, dlm_iflags_val(lkb),
3827 lkb->lkb_nodeid);
3828 return error;
3829}
3830
3782{
3783 int from = le32_to_cpu(ms->m_header.h_nodeid);
3784 int error = 0;
3785
3786 /* currently mixing of user/kernel locks are not supported */
3787 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3788 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3789 log_error(lkb->lkb_resource->res_ls,

--- 35 unchanged lines hidden (view full) ---

3825 log_error(lkb->lkb_resource->res_ls,
3826 "ignore invalid message %d from %d %x %x %x %d",
3827 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3828 lkb->lkb_remid, dlm_iflags_val(lkb),
3829 lkb->lkb_nodeid);
3830 return error;
3831}
3832
3831static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3833static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3832{
3833 struct dlm_lkb *lkb;
3834 struct dlm_rsb *r;
3835 int from_nodeid;
3836 int error, namelen = 0;
3837
3838 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3839

--- 62 unchanged lines hidden (view full) ---

3902 le32_to_cpu(ms->m_lkid), from_nodeid, error);
3903 }
3904
3905 setup_local_lkb(ls, ms);
3906 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3907 return error;
3908}
3909
3834{
3835 struct dlm_lkb *lkb;
3836 struct dlm_rsb *r;
3837 int from_nodeid;
3838 int error, namelen = 0;
3839
3840 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3841

--- 62 unchanged lines hidden (view full) ---

3904 le32_to_cpu(ms->m_lkid), from_nodeid, error);
3905 }
3906
3907 setup_local_lkb(ls, ms);
3908 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3909 return error;
3910}
3911
3910static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3912static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
3911{
3912 struct dlm_lkb *lkb;
3913 struct dlm_rsb *r;
3914 int error, reply = 1;
3915
3916 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3917 if (error)
3918 goto fail;

--- 39 unchanged lines hidden (view full) ---

3958 return 0;
3959
3960 fail:
3961 setup_local_lkb(ls, ms);
3962 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3963 return error;
3964}
3965
3913{
3914 struct dlm_lkb *lkb;
3915 struct dlm_rsb *r;
3916 int error, reply = 1;
3917
3918 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3919 if (error)
3920 goto fail;

--- 39 unchanged lines hidden (view full) ---

3960 return 0;
3961
3962 fail:
3963 setup_local_lkb(ls, ms);
3964 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3965 return error;
3966}
3967
3966static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3968static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
3967{
3968 struct dlm_lkb *lkb;
3969 struct dlm_rsb *r;
3970 int error;
3971
3972 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3973 if (error)
3974 goto fail;

--- 35 unchanged lines hidden (view full) ---

4010 return 0;
4011
4012 fail:
4013 setup_local_lkb(ls, ms);
4014 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4015 return error;
4016}
4017
3969{
3970 struct dlm_lkb *lkb;
3971 struct dlm_rsb *r;
3972 int error;
3973
3974 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3975 if (error)
3976 goto fail;

--- 35 unchanged lines hidden (view full) ---

4012 return 0;
4013
4014 fail:
4015 setup_local_lkb(ls, ms);
4016 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4017 return error;
4018}
4019
4018static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4020static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4019{
4020 struct dlm_lkb *lkb;
4021 struct dlm_rsb *r;
4022 int error;
4023
4024 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4025 if (error)
4026 goto fail;

--- 19 unchanged lines hidden (view full) ---

4046 return 0;
4047
4048 fail:
4049 setup_local_lkb(ls, ms);
4050 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4051 return error;
4052}
4053
4021{
4022 struct dlm_lkb *lkb;
4023 struct dlm_rsb *r;
4024 int error;
4025
4026 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4027 if (error)
4028 goto fail;

--- 19 unchanged lines hidden (view full) ---

4048 return 0;
4049
4050 fail:
4051 setup_local_lkb(ls, ms);
4052 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4053 return error;
4054}
4055
4054static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4056static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4055{
4056 struct dlm_lkb *lkb;
4057 struct dlm_rsb *r;
4058 int error;
4059
4060 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4061 if (error)
4062 return error;

--- 14 unchanged lines hidden (view full) ---

4077 queue_cast(r, lkb, 0);
4078 out:
4079 unlock_rsb(r);
4080 put_rsb(r);
4081 dlm_put_lkb(lkb);
4082 return 0;
4083}
4084
4057{
4058 struct dlm_lkb *lkb;
4059 struct dlm_rsb *r;
4060 int error;
4061
4062 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4063 if (error)
4064 return error;

--- 14 unchanged lines hidden (view full) ---

4079 queue_cast(r, lkb, 0);
4080 out:
4081 unlock_rsb(r);
4082 put_rsb(r);
4083 dlm_put_lkb(lkb);
4084 return 0;
4085}
4086
4085static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4087static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4086{
4087 struct dlm_lkb *lkb;
4088 struct dlm_rsb *r;
4089 int error;
4090
4091 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4092 if (error)
4093 return error;

--- 11 unchanged lines hidden (view full) ---

4105 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4106 out:
4107 unlock_rsb(r);
4108 put_rsb(r);
4109 dlm_put_lkb(lkb);
4110 return 0;
4111}
4112
4088{
4089 struct dlm_lkb *lkb;
4090 struct dlm_rsb *r;
4091 int error;
4092
4093 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4094 if (error)
4095 return error;

--- 11 unchanged lines hidden (view full) ---

4107 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4108 out:
4109 unlock_rsb(r);
4110 put_rsb(r);
4111 dlm_put_lkb(lkb);
4112 return 0;
4113}
4114
4113static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4115static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4114{
4115 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4116
4117 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4118 our_nodeid = dlm_our_nodeid();
4119
4120 len = receive_extralen(ms);
4121
4122 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4123 &ret_nodeid, NULL);
4124
4125 /* Optimization: we're master so treat lookup as a request */
4126 if (!error && ret_nodeid == our_nodeid) {
4127 receive_request(ls, ms);
4128 return;
4129 }
4130 send_lookup_reply(ls, ms, ret_nodeid, error);
4131}
4132
4116{
4117 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4118
4119 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4120 our_nodeid = dlm_our_nodeid();
4121
4122 len = receive_extralen(ms);
4123
4124 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4125 &ret_nodeid, NULL);
4126
4127 /* Optimization: we're master so treat lookup as a request */
4128 if (!error && ret_nodeid == our_nodeid) {
4129 receive_request(ls, ms);
4130 return;
4131 }
4132 send_lookup_reply(ls, ms, ret_nodeid, error);
4133}
4134
4133static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4135static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4134{
4135 char name[DLM_RESNAME_MAXLEN+1];
4136 struct dlm_rsb *r;
4137 uint32_t hash, b;
4138 int rv, len, dir_nodeid, from_nodeid;
4139
4140 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4141

--- 71 unchanged lines hidden (view full) ---

4213 } else {
4214 log_error(ls, "receive_remove from %d rsb ref error",
4215 from_nodeid);
4216 dlm_print_rsb(r);
4217 spin_unlock(&ls->ls_rsbtbl[b].lock);
4218 }
4219}
4220
4136{
4137 char name[DLM_RESNAME_MAXLEN+1];
4138 struct dlm_rsb *r;
4139 uint32_t hash, b;
4140 int rv, len, dir_nodeid, from_nodeid;
4141
4142 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4143

--- 71 unchanged lines hidden (view full) ---

4215 } else {
4216 log_error(ls, "receive_remove from %d rsb ref error",
4217 from_nodeid);
4218 dlm_print_rsb(r);
4219 spin_unlock(&ls->ls_rsbtbl[b].lock);
4220 }
4221}
4222
4221static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4223static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4222{
4223 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4224}
4225
4224{
4225 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4226}
4227
4226static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4228static int receive_request_reply(struct dlm_ls *ls,
4229 const struct dlm_message *ms)
4227{
4228 struct dlm_lkb *lkb;
4229 struct dlm_rsb *r;
4230 int error, mstype, result;
4231 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4232
4233 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4234 if (error)

--- 105 unchanged lines hidden (view full) ---

4340 out:
4341 unlock_rsb(r);
4342 put_rsb(r);
4343 dlm_put_lkb(lkb);
4344 return 0;
4345}
4346
4347static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4230{
4231 struct dlm_lkb *lkb;
4232 struct dlm_rsb *r;
4233 int error, mstype, result;
4234 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4235
4236 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4237 if (error)

--- 105 unchanged lines hidden (view full) ---

4343 out:
4344 unlock_rsb(r);
4345 put_rsb(r);
4346 dlm_put_lkb(lkb);
4347 return 0;
4348}
4349
4350static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4348 struct dlm_message *ms, bool local)
4351 const struct dlm_message *ms, bool local)
4349{
4350 /* this is the value returned from do_convert() on the master */
4351 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4352 case -EAGAIN:
4353 /* convert would block (be queued) on remote master */
4354 queue_cast(r, lkb, -EAGAIN);
4355 break;
4356

--- 26 unchanged lines hidden (view full) ---

4383 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4384 le32_to_cpu(ms->m_lkid),
4385 from_dlm_errno(le32_to_cpu(ms->m_result)));
4386 dlm_print_rsb(r);
4387 dlm_print_lkb(lkb);
4388 }
4389}
4390
4352{
4353 /* this is the value returned from do_convert() on the master */
4354 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4355 case -EAGAIN:
4356 /* convert would block (be queued) on remote master */
4357 queue_cast(r, lkb, -EAGAIN);
4358 break;
4359

--- 26 unchanged lines hidden (view full) ---

4386 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4387 le32_to_cpu(ms->m_lkid),
4388 from_dlm_errno(le32_to_cpu(ms->m_result)));
4389 dlm_print_rsb(r);
4390 dlm_print_lkb(lkb);
4391 }
4392}
4393
4391static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4392 bool local)
4394static void _receive_convert_reply(struct dlm_lkb *lkb,
4395 const struct dlm_message *ms, bool local)
4393{
4394 struct dlm_rsb *r = lkb->lkb_resource;
4395 int error;
4396
4397 hold_rsb(r);
4398 lock_rsb(r);
4399
4400 error = validate_message(lkb, ms);

--- 6 unchanged lines hidden (view full) ---

4407 goto out;
4408
4409 __receive_convert_reply(r, lkb, ms, local);
4410 out:
4411 unlock_rsb(r);
4412 put_rsb(r);
4413}
4414
4396{
4397 struct dlm_rsb *r = lkb->lkb_resource;
4398 int error;
4399
4400 hold_rsb(r);
4401 lock_rsb(r);
4402
4403 error = validate_message(lkb, ms);

--- 6 unchanged lines hidden (view full) ---

4410 goto out;
4411
4412 __receive_convert_reply(r, lkb, ms, local);
4413 out:
4414 unlock_rsb(r);
4415 put_rsb(r);
4416}
4417
4415static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4418static int receive_convert_reply(struct dlm_ls *ls,
4419 const struct dlm_message *ms)
4416{
4417 struct dlm_lkb *lkb;
4418 int error;
4419
4420 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4421 if (error)
4422 return error;
4423
4424 _receive_convert_reply(lkb, ms, false);
4425 dlm_put_lkb(lkb);
4426 return 0;
4427}
4428
4420{
4421 struct dlm_lkb *lkb;
4422 int error;
4423
4424 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4425 if (error)
4426 return error;
4427
4428 _receive_convert_reply(lkb, ms, false);
4429 dlm_put_lkb(lkb);
4430 return 0;
4431}
4432
4429static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4430 bool local)
4433static void _receive_unlock_reply(struct dlm_lkb *lkb,
4434 const struct dlm_message *ms, bool local)
4431{
4432 struct dlm_rsb *r = lkb->lkb_resource;
4433 int error;
4434
4435 hold_rsb(r);
4436 lock_rsb(r);
4437
4438 error = validate_message(lkb, ms);

--- 19 unchanged lines hidden (view full) ---

4458 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4459 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4460 }
4461 out:
4462 unlock_rsb(r);
4463 put_rsb(r);
4464}
4465
4435{
4436 struct dlm_rsb *r = lkb->lkb_resource;
4437 int error;
4438
4439 hold_rsb(r);
4440 lock_rsb(r);
4441
4442 error = validate_message(lkb, ms);

--- 19 unchanged lines hidden (view full) ---

4462 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4463 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4464 }
4465 out:
4466 unlock_rsb(r);
4467 put_rsb(r);
4468}
4469
4466static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4470static int receive_unlock_reply(struct dlm_ls *ls,
4471 const struct dlm_message *ms)
4467{
4468 struct dlm_lkb *lkb;
4469 int error;
4470
4471 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4472 if (error)
4473 return error;
4474
4475 _receive_unlock_reply(lkb, ms, false);
4476 dlm_put_lkb(lkb);
4477 return 0;
4478}
4479
4472{
4473 struct dlm_lkb *lkb;
4474 int error;
4475
4476 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4477 if (error)
4478 return error;
4479
4480 _receive_unlock_reply(lkb, ms, false);
4481 dlm_put_lkb(lkb);
4482 return 0;
4483}
4484
4480static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4481 bool local)
4485static void _receive_cancel_reply(struct dlm_lkb *lkb,
4486 const struct dlm_message *ms, bool local)
4482{
4483 struct dlm_rsb *r = lkb->lkb_resource;
4484 int error;
4485
4486 hold_rsb(r);
4487 lock_rsb(r);
4488
4489 error = validate_message(lkb, ms);

--- 20 unchanged lines hidden (view full) ---

4510 lkb->lkb_id,
4511 from_dlm_errno(le32_to_cpu(ms->m_result)));
4512 }
4513 out:
4514 unlock_rsb(r);
4515 put_rsb(r);
4516}
4517
4487{
4488 struct dlm_rsb *r = lkb->lkb_resource;
4489 int error;
4490
4491 hold_rsb(r);
4492 lock_rsb(r);
4493
4494 error = validate_message(lkb, ms);

--- 20 unchanged lines hidden (view full) ---

4515 lkb->lkb_id,
4516 from_dlm_errno(le32_to_cpu(ms->m_result)));
4517 }
4518 out:
4519 unlock_rsb(r);
4520 put_rsb(r);
4521}
4522
4518static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4523static int receive_cancel_reply(struct dlm_ls *ls,
4524 const struct dlm_message *ms)
4519{
4520 struct dlm_lkb *lkb;
4521 int error;
4522
4523 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4524 if (error)
4525 return error;
4526
4527 _receive_cancel_reply(lkb, ms, false);
4528 dlm_put_lkb(lkb);
4529 return 0;
4530}
4531
4525{
4526 struct dlm_lkb *lkb;
4527 int error;
4528
4529 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4530 if (error)
4531 return error;
4532
4533 _receive_cancel_reply(lkb, ms, false);
4534 dlm_put_lkb(lkb);
4535 return 0;
4536}
4537
4532static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4538static void receive_lookup_reply(struct dlm_ls *ls,
4539 const struct dlm_message *ms)
4533{
4534 struct dlm_lkb *lkb;
4535 struct dlm_rsb *r;
4536 int error, ret_nodeid;
4537 int do_lookup_list = 0;
4538
4539 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4540 if (error) {

--- 62 unchanged lines hidden (view full) ---

4603 if (do_lookup_list)
4604 process_lookup_list(r);
4605 out:
4606 unlock_rsb(r);
4607 put_rsb(r);
4608 dlm_put_lkb(lkb);
4609}
4610
4540{
4541 struct dlm_lkb *lkb;
4542 struct dlm_rsb *r;
4543 int error, ret_nodeid;
4544 int do_lookup_list = 0;
4545
4546 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4547 if (error) {

--- 62 unchanged lines hidden (view full) ---

4610 if (do_lookup_list)
4611 process_lookup_list(r);
4612 out:
4613 unlock_rsb(r);
4614 put_rsb(r);
4615 dlm_put_lkb(lkb);
4616}
4617
4611static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4618static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4612 uint32_t saved_seq)
4613{
4614 int error = 0, noent = 0;
4615
4616 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4617 log_limit(ls, "receive %d from non-member %d %x %x %d",
4618 le32_to_cpu(ms->m_type),
4619 le32_to_cpu(ms->m_header.h_nodeid),

--- 119 unchanged lines hidden (view full) ---

4739/* If the lockspace is in recovery mode (locking stopped), then normal
4740 messages are saved on the requestqueue for processing after recovery is
4741 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4742 messages off the requestqueue before we process new ones. This occurs right
4743 after recovery completes when we transition from saving all messages on
4744 requestqueue, to processing all the saved messages, to processing new
4745 messages as they arrive. */
4746
4619 uint32_t saved_seq)
4620{
4621 int error = 0, noent = 0;
4622
4623 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4624 log_limit(ls, "receive %d from non-member %d %x %x %d",
4625 le32_to_cpu(ms->m_type),
4626 le32_to_cpu(ms->m_header.h_nodeid),

--- 119 unchanged lines hidden (view full) ---

4746/* If the lockspace is in recovery mode (locking stopped), then normal
4747 messages are saved on the requestqueue for processing after recovery is
4748 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4749 messages off the requestqueue before we process new ones. This occurs right
4750 after recovery completes when we transition from saving all messages on
4751 requestqueue, to processing all the saved messages, to processing new
4752 messages as they arrive. */
4753
4747static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4754static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4748 int nodeid)
4749{
4750 if (dlm_locking_stopped(ls)) {
4751 /* If we were a member of this lockspace, left, and rejoined,
4752 other nodes may still be sending us messages from the
4753 lockspace generation before we left. */
4754 if (WARN_ON_ONCE(!ls->ls_generation)) {
4755 log_limit(ls, "receive %d from %d ignore old gen",

--- 6 unchanged lines hidden (view full) ---

4762 dlm_wait_requestqueue(ls);
4763 _receive_message(ls, ms, 0);
4764 }
4765}
4766
4767/* This is called by dlm_recoverd to process messages that were saved on
4768 the requestqueue. */
4769
4755 int nodeid)
4756{
4757 if (dlm_locking_stopped(ls)) {
4758 /* If we were a member of this lockspace, left, and rejoined,
4759 other nodes may still be sending us messages from the
4760 lockspace generation before we left. */
4761 if (WARN_ON_ONCE(!ls->ls_generation)) {
4762 log_limit(ls, "receive %d from %d ignore old gen",

--- 6 unchanged lines hidden (view full) ---

4769 dlm_wait_requestqueue(ls);
4770 _receive_message(ls, ms, 0);
4771 }
4772}
4773
4774/* This is called by dlm_recoverd to process messages that were saved on
4775 the requestqueue. */
4776
4770void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4777void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4771 uint32_t saved_seq)
4772{
4773 _receive_message(ls, ms, saved_seq);
4774}
4775
4776/* This is called by the midcomms layer when something is received for
4777 the lockspace. It could be either a MSG (normal message sent as part of
4778 standard locking activity) or an RCOM (recovery message sent as part of
4779 lockspace recovery). */
4780
4778 uint32_t saved_seq)
4779{
4780 _receive_message(ls, ms, saved_seq);
4781}
4782
4783/* This is called by the midcomms layer when something is received for
4784 the lockspace. It could be either a MSG (normal message sent as part of
4785 standard locking activity) or an RCOM (recovery message sent as part of
4786 lockspace recovery). */
4787
4781void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4788void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4782{
4789{
4783 struct dlm_header *hd = &p->header;
4790 const struct dlm_header *hd = &p->header;
4784 struct dlm_ls *ls;
4785 int type = 0;
4786
4787 switch (hd->h_cmd) {
4788 case DLM_MSG:
4789 type = le32_to_cpu(p->message.m_type);
4790 break;
4791 case DLM_RCOM:

--- 537 unchanged lines hidden (view full) ---

5329 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5330 if (lkb)
5331 return lkb;
5332 return NULL;
5333}
5334
5335/* needs at least dlm_rcom + rcom_lock */
5336static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4791 struct dlm_ls *ls;
4792 int type = 0;
4793
4794 switch (hd->h_cmd) {
4795 case DLM_MSG:
4796 type = le32_to_cpu(p->message.m_type);
4797 break;
4798 case DLM_RCOM:

--- 537 unchanged lines hidden (view full) ---

5336 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5337 if (lkb)
5338 return lkb;
5339 return NULL;
5340}
5341
5342/* needs at least dlm_rcom + rcom_lock */
5343static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5337 struct dlm_rsb *r, struct dlm_rcom *rc)
5344 struct dlm_rsb *r, const struct dlm_rcom *rc)
5338{
5339 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5340
5341 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5342 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5343 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5344 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5345 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));

--- 33 unchanged lines hidden (view full) ---

5379
5380/* This lkb may have been recovered in a previous aborted recovery so we need
5381 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5382 If so we just send back a standard reply. If not, we create a new lkb with
5383 the given values and send back our lkid. We send back our lkid by sending
5384 back the rcom_lock struct we got but with the remid field filled in. */
5385
5386/* needs at least dlm_rcom + rcom_lock */
5345{
5346 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5347
5348 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5349 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5350 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5351 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5352 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));

--- 33 unchanged lines hidden (view full) ---

5386
5387/* This lkb may have been recovered in a previous aborted recovery so we need
5388 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5389 If so we just send back a standard reply. If not, we create a new lkb with
5390 the given values and send back our lkid. We send back our lkid by sending
5391 back the rcom_lock struct we got but with the remid field filled in. */
5392
5393/* needs at least dlm_rcom + rcom_lock */
5387int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
5394int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5388 __le32 *rl_remid, __le32 *rl_result)
5389{
5390 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5391 struct dlm_rsb *r;
5392 struct dlm_lkb *lkb;
5393 uint32_t remid = 0;
5394 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5395 int error;

--- 67 unchanged lines hidden (view full) ---

5463 if (error && error != -EEXIST)
5464 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5465 from_nodeid, remid, error);
5466 *rl_result = cpu_to_le32(error);
5467 return error;
5468}
5469
5470/* needs at least dlm_rcom + rcom_lock */
5395 __le32 *rl_remid, __le32 *rl_result)
5396{
5397 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5398 struct dlm_rsb *r;
5399 struct dlm_lkb *lkb;
5400 uint32_t remid = 0;
5401 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5402 int error;

--- 67 unchanged lines hidden (view full) ---

5470 if (error && error != -EEXIST)
5471 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5472 from_nodeid, remid, error);
5473 *rl_result = cpu_to_le32(error);
5474 return error;
5475}
5476
5477/* needs at least dlm_rcom + rcom_lock */
5471int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
5478int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5472 uint64_t seq)
5473{
5474 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5475 struct dlm_rsb *r;
5476 struct dlm_lkb *lkb;
5477 uint32_t lkid, remid;
5478 int error, result;
5479

--- 667 unchanged lines hidden ---
5479 uint64_t seq)
5480{
5481 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5482 struct dlm_rsb *r;
5483 struct dlm_lkb *lkb;
5484 uint32_t lkid, remid;
5485 int error, result;
5486

--- 667 unchanged lines hidden ---