diff options
Diffstat (limited to 'fs/ocfs2/cluster/heartbeat.c')
| -rw-r--r-- | fs/ocfs2/cluster/heartbeat.c | 1088 |
1 files changed, 477 insertions, 611 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 5c1c864e81cc..724350925aff 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -1,24 +1,9 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * +// SPDX-License-Identifier: GPL-2.0-or-later +/* * Copyright (C) 2004, 2005 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. */ +#include "linux/kstrtox.h" #include <linux/kernel.h> #include <linux/sched.h> #include <linux/jiffies.h> @@ -35,7 +20,8 @@ #include <linux/time.h> #include <linux/debugfs.h> #include <linux/slab.h> - +#include <linux/bitmap.h> +#include <linux/ktime.h> #include "heartbeat.h" #include "tcp.h" #include "nodemanager.h" @@ -105,10 +91,6 @@ static struct o2hb_debug_buf *o2hb_db_failedregions; #define O2HB_DEBUG_REGION_PINNED "pinned" static struct dentry *o2hb_debug_dir; -static struct dentry *o2hb_debug_livenodes; -static struct dentry *o2hb_debug_liveregions; -static struct dentry *o2hb_debug_quorumregions; -static struct dentry *o2hb_debug_failedregions; static LIST_HEAD(o2hb_all_regions); @@ -118,21 +100,19 @@ static struct o2hb_callback { static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); -#define O2HB_DEFAULT_BLOCK_BITS 9 - enum o2hb_heartbeat_modes { O2HB_HEARTBEAT_LOCAL = 0, O2HB_HEARTBEAT_GLOBAL, O2HB_HEARTBEAT_NUM_MODES, }; -char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = { - "local", /* O2HB_HEARTBEAT_LOCAL */ - "global", /* O2HB_HEARTBEAT_GLOBAL */ +static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = { + "local", /* O2HB_HEARTBEAT_LOCAL */ + "global", /* O2HB_HEARTBEAT_GLOBAL */ }; unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; -unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL; +static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL; /* * o2hb_dependent_users tracks the number of registered callbacks that depend @@ -140,7 +120,7 @@ unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL; * However only o2dlm depends on the heartbeat. It does not want the heartbeat * to stop while a dlm domain is still active. */ -unsigned int o2hb_dependent_users; +static unsigned int o2hb_dependent_users; /* * In global heartbeat mode, all regions are pinned if there are one or more @@ -218,7 +198,8 @@ struct o2hb_region { unsigned hr_unclean_stop:1, hr_aborted_start:1, hr_item_pinned:1, - hr_item_dropped:1; + hr_item_dropped:1, + hr_node_deleted:1; /* protected by the hr_callback_sem */ struct task_struct *hr_task; @@ -233,7 +214,7 @@ struct o2hb_region { unsigned int hr_num_pages; struct page **hr_slot_data; - struct block_device *hr_bdev; + struct file *hr_bdev_file; struct o2hb_disk_slot *hr_slots; /* live node map of this region */ @@ -241,10 +222,6 @@ struct o2hb_region { unsigned int hr_region_num; struct dentry *hr_debug_dir; - struct dentry *hr_debug_livenodes; - struct dentry *hr_debug_regnum; - struct dentry *hr_debug_elapsed_time; - struct dentry *hr_debug_pinned; struct o2hb_debug_buf *hr_db_livenodes; struct o2hb_debug_buf *hr_db_regnum; struct o2hb_debug_buf *hr_db_elapsed_time; @@ -259,8 +236,6 @@ struct o2hb_region { * (hr_steady_iterations == 0) within hr_unsteady_iterations */ atomic_t hr_unsteady_iterations; - char hr_dev_name[BDEVNAME_SIZE]; - unsigned int hr_timeout_ms; /* randomized as the region goes up and down so that a node @@ -270,48 +245,65 @@ struct o2hb_region { struct delayed_work hr_write_timeout_work; unsigned long hr_last_timeout_start; + /* negotiate timer, used to negotiate extending hb timeout. */ + struct delayed_work hr_nego_timeout_work; + unsigned long hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + /* Used during o2hb_check_slot to hold a copy of the block * being checked because we temporarily have to zero out the * crc field. */ struct o2hb_disk_heartbeat_block *hr_tmp_block; + + /* Message key for negotiate timeout message. */ + unsigned int hr_key; + struct list_head hr_handler_list; + + /* last hb status, 0 for success, other value for error. */ + int hr_last_hb_status; }; +static inline struct block_device *reg_bdev(struct o2hb_region *reg) +{ + return reg->hr_bdev_file ? file_bdev(reg->hr_bdev_file) : NULL; +} + struct o2hb_bio_wait_ctxt { atomic_t wc_num_reqs; struct completion wc_io_complete; int wc_error; }; -static int o2hb_pop_count(void *map, int count) -{ - int i = -1, pop = 0; +#define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2) - while ((i = find_next_bit(map, count, i + 1)) < count) - pop++; - return pop; -} +enum { + O2HB_NEGO_TIMEOUT_MSG = 1, + O2HB_NEGO_APPROVE_MSG = 2, +}; + +struct o2hb_nego_msg { + u8 node_num; +}; static void o2hb_write_timeout(struct work_struct *work) { int failed, quorum; - unsigned long flags; struct o2hb_region *reg = container_of(work, struct o2hb_region, hr_write_timeout_work.work); - mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " - "milliseconds\n", reg->hr_dev_name, + mlog(ML_ERROR, "Heartbeat write timeout to device %pg after %u " + "milliseconds\n", reg_bdev(reg), jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); if (o2hb_global_heartbeat_active()) { - spin_lock_irqsave(&o2hb_live_lock, flags); + spin_lock(&o2hb_live_lock); if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) set_bit(reg->hr_region_num, o2hb_failed_region_bitmap); - failed = o2hb_pop_count(&o2hb_failed_region_bitmap, + failed = bitmap_weight(o2hb_failed_region_bitmap, O2NM_MAX_REGIONS); - quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap, + quorum = bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS); - spin_unlock_irqrestore(&o2hb_live_lock, flags); + spin_unlock(&o2hb_live_lock); mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n", quorum, failed); @@ -327,7 +319,7 @@ static void o2hb_write_timeout(struct work_struct *work) o2quo_disk_timeout(); } -static void o2hb_arm_write_timeout(struct o2hb_region *reg) +static void o2hb_arm_timeout(struct o2hb_region *reg) { /* Arm writeout only after thread reaches steady state */ if (atomic_read(®->hr_steady_iterations) != 0) @@ -342,14 +334,134 @@ static void o2hb_arm_write_timeout(struct o2hb_region *reg) spin_unlock(&o2hb_live_lock); } cancel_delayed_work(®->hr_write_timeout_work); - reg->hr_last_timeout_start = jiffies; schedule_delayed_work(®->hr_write_timeout_work, msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); + + cancel_delayed_work(®->hr_nego_timeout_work); + /* negotiate timeout must be less than write timeout. */ + schedule_delayed_work(®->hr_nego_timeout_work, + msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS)); + bitmap_zero(reg->hr_nego_node_bitmap, O2NM_MAX_NODES); } -static void o2hb_disarm_write_timeout(struct o2hb_region *reg) +static void o2hb_disarm_timeout(struct o2hb_region *reg) { cancel_delayed_work_sync(®->hr_write_timeout_work); + cancel_delayed_work_sync(®->hr_nego_timeout_work); +} + +static int o2hb_send_nego_msg(int key, int type, u8 target) +{ + struct o2hb_nego_msg msg; + int status, ret; + + msg.node_num = o2nm_this_node(); +again: + ret = o2net_send_message(type, key, &msg, sizeof(msg), + target, &status); + + if (ret == -EAGAIN || ret == -ENOMEM) { + msleep(100); + goto again; + } + + return ret; +} + +static void o2hb_nego_timeout(struct work_struct *work) +{ + unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + int master_node, i, ret; + struct o2hb_region *reg; + + reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work); + /* don't negotiate timeout if last hb failed since it is very + * possible io failed. Should let write timeout fence self. + */ + if (reg->hr_last_hb_status) + return; + + o2hb_fill_node_map(live_node_bitmap, O2NM_MAX_NODES); + /* lowest node as master node to make negotiate decision. */ + master_node = find_first_bit(live_node_bitmap, O2NM_MAX_NODES); + + if (master_node == o2nm_this_node()) { + if (!test_bit(master_node, reg->hr_nego_node_bitmap)) { + printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%pg).\n", + o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, + config_item_name(®->hr_item), reg_bdev(reg)); + set_bit(master_node, reg->hr_nego_node_bitmap); + } + if (!bitmap_equal(reg->hr_nego_node_bitmap, live_node_bitmap, + O2NM_MAX_NODES)) { + /* check negotiate bitmap every second to do timeout + * approve decision. + */ + schedule_delayed_work(®->hr_nego_timeout_work, + msecs_to_jiffies(1000)); + + return; + } + + printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%pg) is down.\n", + config_item_name(®->hr_item), + reg_bdev(reg)); + /* approve negotiate timeout request. */ + o2hb_arm_timeout(reg); + + i = -1; + while ((i = find_next_bit(live_node_bitmap, + O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { + if (i == master_node) + continue; + + mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i); + ret = o2hb_send_nego_msg(reg->hr_key, + O2HB_NEGO_APPROVE_MSG, i); + if (ret) + mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n", + i, ret); + } + } else { + /* negotiate timeout with master node. */ + printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%pg), negotiate timeout with node %d.\n", + o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(®->hr_item), + reg_bdev(reg), master_node); + ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG, + master_node); + if (ret) + mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n", + master_node, ret); + } +} + +static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) +{ + struct o2hb_region *reg = data; + struct o2hb_nego_msg *nego_msg; + + nego_msg = (struct o2hb_nego_msg *)msg->buf; + printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%pg).\n", + nego_msg->node_num, config_item_name(®->hr_item), + reg_bdev(reg)); + if (nego_msg->node_num < O2NM_MAX_NODES) + set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap); + else + mlog(ML_ERROR, "got nego timeout message from bad node.\n"); + + return 0; +} + +static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) +{ + struct o2hb_region *reg = data; + + printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%pg).\n", + config_item_name(®->hr_item), reg_bdev(reg)); + o2hb_arm_timeout(reg); + return 0; } static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) @@ -373,21 +485,19 @@ static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, } } -static void o2hb_wait_on_io(struct o2hb_region *reg, - struct o2hb_bio_wait_ctxt *wc) +static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc) { o2hb_bio_wait_dec(wc, 1); wait_for_completion(&wc->wc_io_complete); } -static void o2hb_bio_end_io(struct bio *bio, - int error) +static void o2hb_bio_end_io(struct bio *bio) { struct o2hb_bio_wait_ctxt *wc = bio->bi_private; - if (error) { - mlog(ML_ERROR, "IO Error %d\n", error); - wc->wc_error = error; + if (bio->bi_status) { + mlog(ML_ERROR, "IO Error %d\n", bio->bi_status); + wc->wc_error = blk_status_to_errno(bio->bi_status); } o2hb_bio_wait_dec(wc, 1); @@ -399,7 +509,7 @@ static void o2hb_bio_end_io(struct bio *bio, static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, unsigned int *current_slot, - unsigned int max_slots) + unsigned int max_slots, blk_opf_t opf) { int len, current_page; unsigned int vec_len, vec_start; @@ -413,7 +523,7 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, * GFP_KERNEL that the local node can get fenced. It would be * nicest if we could pre-allocate these bios and avoid this * all together. */ - bio = bio_alloc(GFP_ATOMIC, 16); + bio = bio_alloc(reg_bdev(reg), 16, opf, GFP_ATOMIC); if (!bio) { mlog(ML_ERROR, "Could not alloc slots BIO!\n"); bio = ERR_PTR(-ENOMEM); @@ -421,18 +531,17 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, } /* Must put everything in 512 byte sectors for the bio... */ - bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); - bio->bi_bdev = reg->hr_bdev; + bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9); bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; - vec_start = (cs << bits) % PAGE_CACHE_SIZE; + vec_start = (cs << bits) % PAGE_SIZE; while(cs < max_slots) { current_page = cs / spp; page = reg->hr_slot_data[current_page]; - vec_len = min(PAGE_CACHE_SIZE - vec_start, - (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); + vec_len = min(PAGE_SIZE - vec_start, + (max_slots-cs) * (PAGE_SIZE/spp) ); mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", current_page, vec_len, vec_start); @@ -440,7 +549,7 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, len = bio_add_page(bio, page, vec_len, vec_start); if (len != vec_len) break; - cs += vec_len / (PAGE_CACHE_SIZE/spp); + cs += vec_len / (PAGE_SIZE/spp); vec_start = 0; } @@ -450,9 +559,10 @@ bail: } static int o2hb_read_slots(struct o2hb_region *reg, + unsigned int begin_slot, unsigned int max_slots) { - unsigned int current_slot=0; + unsigned int current_slot = begin_slot; int status; struct o2hb_bio_wait_ctxt wc; struct bio *bio; @@ -460,7 +570,8 @@ static int o2hb_read_slots(struct o2hb_region *reg, o2hb_bio_wait_init(&wc); while(current_slot < max_slots) { - bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); + bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots, + REQ_OP_READ); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); @@ -468,13 +579,13 @@ static int o2hb_read_slots(struct o2hb_region *reg, } atomic_inc(&wc.wc_num_reqs); - submit_bio(READ, bio); + submit_bio(bio); } status = 0; bail_and_wait: - o2hb_wait_on_io(reg, &wc); + o2hb_wait_on_io(&wc); if (wc.wc_error && !status) status = wc.wc_error; @@ -492,7 +603,8 @@ static int o2hb_issue_node_write(struct o2hb_region *reg, slot = o2nm_this_node(); - bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); + bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, + REQ_OP_WRITE | REQ_SYNC); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); @@ -500,7 +612,7 @@ static int o2hb_issue_node_write(struct o2hb_region *reg, } atomic_inc(&write_wc->wc_num_reqs); - submit_bio(WRITE_SYNC, bio); + submit_bio(bio); status = 0; bail: @@ -582,8 +694,8 @@ static int o2hb_check_own_slot(struct o2hb_region *reg) else errstr = ERRSTR3; - mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), " - "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name, + mlog(ML_ERROR, "%s (%pg): expected(%u:0x%llx, 0x%llx), " + "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg_bdev(reg), slot->ds_node_num, (unsigned long long)slot->ds_last_generation, (unsigned long long)slot->ds_last_time, hb_block->hb_node, (unsigned long long)le64_to_cpu(hb_block->hb_generation), @@ -606,7 +718,7 @@ static inline void o2hb_prepare_block(struct o2hb_region *reg, hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; memset(hb_block, 0, reg->hr_block_bytes); /* TODO: time stuff */ - cputime = CURRENT_TIME.tv_sec; + cputime = ktime_get_real_seconds(); if (!cputime) cputime = 1; @@ -628,11 +740,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, struct o2nm_node *node, int idx) { - struct list_head *iter; struct o2hb_callback_func *f; - list_for_each(iter, &hbcall->list) { - f = list_entry(iter, struct o2hb_callback_func, hc_item); + list_for_each_entry(f, &hbcall->list, hc_item) { mlog(ML_HEARTBEAT, "calling funcs %p\n", f); (f->hc_func)(node, idx, f->hc_data); } @@ -641,16 +751,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, /* Will run the list in order until we process the passed event */ static void o2hb_run_event_list(struct o2hb_node_event *queued_event) { - int empty; struct o2hb_callback *hbcall; struct o2hb_node_event *event; - spin_lock(&o2hb_live_lock); - empty = list_empty(&queued_event->hn_item); - spin_unlock(&o2hb_live_lock); - if (empty) - return; - /* Holding callback sem assures we don't alter the callback * lists when doing this, and serializes ourselves with other * processes wanting callbacks. */ @@ -709,6 +812,7 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; + int queued = 0; node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) @@ -726,11 +830,13 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); + queued = 1; } } spin_unlock(&o2hb_live_lock); - o2hb_run_event_list(&event); + if (queued) + o2hb_run_event_list(&event); o2nm_node_put(node); } @@ -758,12 +864,12 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg) * live nodes heartbeat on it. In other words, the region has been * added to all nodes. */ - if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, - sizeof(o2hb_live_node_bitmap))) + if (!bitmap_equal(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, + O2NM_MAX_NODES)) goto unlock; - printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n", - config_item_name(®->hr_item), reg->hr_dev_name); + printk(KERN_NOTICE "o2hb: Region %s (%pg) is now a quorum device\n", + config_item_name(®->hr_item), reg_bdev(reg)); set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); @@ -771,7 +877,7 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg) * If global heartbeat active, unpin all regions if the * region count > CUT_OFF */ - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) o2hb_region_unpin(NULL); unlock: @@ -790,6 +896,7 @@ static int o2hb_check_slot(struct o2hb_region *reg, unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; unsigned int slot_dead_ms; int tmp; + int queued = 0; memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); @@ -820,8 +927,8 @@ static int o2hb_check_slot(struct o2hb_region *reg, /* The node is live but pushed out a bad crc. We * consider it a transient miss but don't populate any * other values as they may be junk. */ - mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", - slot->ds_node_num, reg->hr_dev_name); + mlog(ML_ERROR, "Node %d has written a bad crc to %pg\n", + slot->ds_node_num, reg_bdev(reg)); o2hb_dump_slot(hb_block); slot->ds_equal_samples++; @@ -883,6 +990,7 @@ fire_callbacks: slot->ds_node_num); changed = 1; + queued = 1; } list_add_tail(&slot->ds_live_item, @@ -899,12 +1007,12 @@ fire_callbacks: slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); if (slot_dead_ms && slot_dead_ms != dead_ms) { /* TODO: Perhaps we can fail the region here. */ - mlog(ML_ERROR, "Node %d on device %s has a dead count " + mlog(ML_ERROR, "Node %d on device %pg has a dead count " "of %u ms, but our count is %u ms.\n" "Please double check your configuration values " "for 'O2CB_HEARTBEAT_THRESHOLD'\n", - slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, - dead_ms); + slot->ds_node_num, reg_bdev(reg), + slot_dead_ms, dead_ms); } goto out; } @@ -913,7 +1021,7 @@ fire_callbacks: if (list_empty(&slot->ds_live_item)) goto out; - /* live nodes only go dead after enough consequtive missed + /* live nodes only go dead after enough consecutive missed * samples.. reset the missed counter whenever we see * activity */ if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { @@ -934,6 +1042,7 @@ fire_callbacks: node, slot->ds_node_num); changed = 1; + queued = 1; } /* We don't clear this because the node is still @@ -949,35 +1058,27 @@ fire_callbacks: out: spin_unlock(&o2hb_live_lock); - o2hb_run_event_list(&event); + if (queued) + o2hb_run_event_list(&event); if (node) o2nm_node_put(node); return changed; } -/* This could be faster if we just implmented a find_last_bit, but I - * don't think the circumstances warrant it. */ -static int o2hb_highest_node(unsigned long *nodes, - int numbits) +static int o2hb_highest_node(unsigned long *nodes, int numbits) { - int highest, node; - - highest = numbits; - node = -1; - while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { - if (node >= numbits) - break; - - highest = node; - } + return find_last_bit(nodes, numbits); +} - return highest; +static int o2hb_lowest_node(unsigned long *nodes, int numbits) +{ + return find_first_bit(nodes, numbits); } static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) { - int i, ret, highest_node; + int i, ret, highest_node, lowest_node; int membership_change = 0, own_slot_ok = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -994,7 +1095,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * If a node is not configured but is in the livemap, we still need * to read the slot so as to be able to remove it from the livemap. */ - o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); + o2hb_fill_node_map(live_node_bitmap, O2NM_MAX_NODES); i = -1; while ((i = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { @@ -1002,7 +1103,8 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) } highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); - if (highest_node >= O2NM_MAX_NODES) { + lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES); + if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) { mlog(ML_NOTICE, "o2hb: No configured nodes found!\n"); ret = -EINVAL; goto bail; @@ -1012,7 +1114,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * yet. Of course, if the node definitions have holes in them * then we're reading an empty slot anyway... Consider this * best-effort. */ - ret = o2hb_read_slots(reg, highest_node + 1); + ret = o2hb_read_slots(reg, lowest_node, highest_node + 1); if (ret < 0) { mlog_errno(ret); goto bail; @@ -1043,13 +1145,13 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * before we can go to steady state. This ensures that * people we find in our steady state have seen us. */ - o2hb_wait_on_io(reg, &write_wc); + o2hb_wait_on_io(&write_wc); if (write_wc.wc_error) { /* Do not re-arm the write timeout on I/O error - we * can't be sure that the new block ever made it to * disk */ - mlog(ML_ERROR, "Write error %d on device \"%s\"\n", - write_wc.wc_error, reg->hr_dev_name); + mlog(ML_ERROR, "Write error %d on device \"%pg\"\n", + write_wc.wc_error, reg_bdev(reg)); ret = write_wc.wc_error; goto bail; } @@ -1057,7 +1159,8 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) /* Skip disarming the timeout if own slot has stale/bad data */ if (own_slot_ok) { o2hb_set_quorum_device(reg); - o2hb_arm_write_timeout(reg); + o2hb_arm_timeout(reg); + reg->hr_last_timeout_start = jiffies; } bail: @@ -1072,9 +1175,9 @@ bail: if (atomic_read(®->hr_steady_iterations) != 0) { if (atomic_dec_and_test(®->hr_unsteady_iterations)) { printk(KERN_NOTICE "o2hb: Unable to stabilize " - "heartbeart on region %s (%s)\n", + "heartbeat on region %s (%pg)\n", config_item_name(®->hr_item), - reg->hr_dev_name); + reg_bdev(reg)); atomic_set(®->hr_steady_iterations, 0); reg->hr_aborted_start = 1; wake_up(&o2hb_steady_queue); @@ -1085,37 +1188,6 @@ bail: return ret; } -/* Subtract b from a, storing the result in a. a *must* have a larger - * value than b. */ -static void o2hb_tv_subtract(struct timeval *a, - struct timeval *b) -{ - /* just return 0 when a is after b */ - if (a->tv_sec < b->tv_sec || - (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { - a->tv_sec = 0; - a->tv_usec = 0; - return; - } - - a->tv_sec -= b->tv_sec; - a->tv_usec -= b->tv_usec; - while ( a->tv_usec < 0 ) { - a->tv_sec--; - a->tv_usec += 1000000; - } -} - -static unsigned int o2hb_elapsed_msecs(struct timeval *start, - struct timeval *end) -{ - struct timeval res = *end; - - o2hb_tv_subtract(&res, start); - - return res.tv_sec * 1000 + res.tv_usec / 1000; -} - /* * we ride the region ref that the region dir holds. before the region * dir is removed and drops it ref it will wait to tear down this @@ -1126,15 +1198,21 @@ static int o2hb_thread(void *data) int i, ret; struct o2hb_region *reg = data; struct o2hb_bio_wait_ctxt write_wc; - struct timeval before_hb, after_hb; + ktime_t before_hb, after_hb; unsigned int elapsed_msec; mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); - set_user_nice(current, -20); + set_user_nice(current, MIN_NICE); /* Pin node */ - o2nm_depend_this_node(); + ret = o2nm_depend_this_node(); + if (ret) { + mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret); + reg->hr_node_deleted = 1; + wake_up(&o2hb_steady_queue); + return 0; + } while (!kthread_should_stop() && !reg->hr_unclean_stop && !reg->hr_aborted_start) { @@ -1143,18 +1221,19 @@ static int o2hb_thread(void *data) * hr_timeout_ms between disk writes. On busy systems * this should result in a heartbeat which is less * likely to time itself out. */ - do_gettimeofday(&before_hb); + before_hb = ktime_get_real(); ret = o2hb_do_disk_heartbeat(reg); + reg->hr_last_hb_status = ret; + + after_hb = ktime_get_real(); - do_gettimeofday(&after_hb); - elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); + elapsed_msec = (unsigned int) + ktime_ms_delta(after_hb, before_hb); mlog(ML_HEARTBEAT, - "start = %lu.%lu, end = %lu.%lu, msec = %u\n", - before_hb.tv_sec, (unsigned long) before_hb.tv_usec, - after_hb.tv_sec, (unsigned long) after_hb.tv_usec, - elapsed_msec); + "start = %lld, end = %lld, msec = %u, ret = %d\n", + before_hb, after_hb, elapsed_msec, ret); if (!kthread_should_stop() && elapsed_msec < reg->hr_timeout_ms) { @@ -1164,7 +1243,7 @@ static int o2hb_thread(void *data) } } - o2hb_disarm_write_timeout(reg); + o2hb_disarm_timeout(reg); /* unclean stop is only used in very bad situation */ for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) @@ -1179,7 +1258,7 @@ static int o2hb_thread(void *data) o2hb_prepare_block(reg, 0); ret = o2hb_issue_node_write(reg, &write_wc); if (ret == 0) - o2hb_wait_on_io(reg, &write_wc); + o2hb_wait_on_io(&write_wc); else mlog_errno(ret); } @@ -1229,7 +1308,7 @@ static int o2hb_debug_open(struct inode *inode, struct file *file) case O2HB_DB_TYPE_REGION_NUMBER: reg = (struct o2hb_region *)db->db_data; - out += snprintf(buf + out, PAGE_SIZE - out, "%d\n", + out += scnprintf(buf + out, PAGE_SIZE - out, "%d\n", reg->hr_region_num); goto done; @@ -1239,12 +1318,12 @@ static int o2hb_debug_open(struct inode *inode, struct file *file) /* If 0, it has never been set before */ if (lts) lts = jiffies_to_msecs(jiffies - lts); - out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts); + out += scnprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts); goto done; case O2HB_DB_TYPE_REGION_PINNED: reg = (struct o2hb_region *)db->db_data; - out += snprintf(buf + out, PAGE_SIZE - out, "%u\n", + out += scnprintf(buf + out, PAGE_SIZE - out, "%u\n", !!reg->hr_item_pinned); goto done; @@ -1253,8 +1332,8 @@ static int o2hb_debug_open(struct inode *inode, struct file *file) } while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len) - out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i); - out += snprintf(buf + out, PAGE_SIZE - out, "\n"); + out += scnprintf(buf + out, PAGE_SIZE - out, "%d ", i); + out += scnprintf(buf + out, PAGE_SIZE - out, "\n"); done: i_size_write(inode, out); @@ -1303,107 +1382,60 @@ static const struct file_operations o2hb_debug_fops = { void o2hb_exit(void) { + debugfs_remove_recursive(o2hb_debug_dir); kfree(o2hb_db_livenodes); kfree(o2hb_db_liveregions); kfree(o2hb_db_quorumregions); kfree(o2hb_db_failedregions); - debugfs_remove(o2hb_debug_failedregions); - debugfs_remove(o2hb_debug_quorumregions); - debugfs_remove(o2hb_debug_liveregions); - debugfs_remove(o2hb_debug_livenodes); - debugfs_remove(o2hb_debug_dir); } -static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir, - struct o2hb_debug_buf **db, int db_len, - int type, int size, int len, void *data) +static void o2hb_debug_create(const char *name, struct dentry *dir, + struct o2hb_debug_buf **db, int db_len, int type, + int size, int len, void *data) { *db = kmalloc(db_len, GFP_KERNEL); if (!*db) - return NULL; + return; (*db)->db_type = type; (*db)->db_size = size; (*db)->db_len = len; (*db)->db_data = data; - return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, - &o2hb_debug_fops); + debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops); } -static int o2hb_debug_init(void) +static void o2hb_debug_init(void) { - int ret = -ENOMEM; - o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL); - if (!o2hb_debug_dir) { - mlog_errno(ret); - goto bail; - } - - o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES, - o2hb_debug_dir, - &o2hb_db_livenodes, - sizeof(*o2hb_db_livenodes), - O2HB_DB_TYPE_LIVENODES, - sizeof(o2hb_live_node_bitmap), - O2NM_MAX_NODES, - o2hb_live_node_bitmap); - if (!o2hb_debug_livenodes) { - mlog_errno(ret); - goto bail; - } - o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, - o2hb_debug_dir, - &o2hb_db_liveregions, - sizeof(*o2hb_db_liveregions), - O2HB_DB_TYPE_LIVEREGIONS, - sizeof(o2hb_live_region_bitmap), - O2NM_MAX_REGIONS, - o2hb_live_region_bitmap); - if (!o2hb_debug_liveregions) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir, + &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes), + O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap), + O2NM_MAX_NODES, o2hb_live_node_bitmap); - o2hb_debug_quorumregions = - o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, - o2hb_debug_dir, - &o2hb_db_quorumregions, - sizeof(*o2hb_db_quorumregions), - O2HB_DB_TYPE_QUORUMREGIONS, - sizeof(o2hb_quorum_region_bitmap), - O2NM_MAX_REGIONS, - o2hb_quorum_region_bitmap); - if (!o2hb_debug_quorumregions) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir, + &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions), + O2HB_DB_TYPE_LIVEREGIONS, + sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS, + o2hb_live_region_bitmap); - o2hb_debug_failedregions = - o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, - o2hb_debug_dir, - &o2hb_db_failedregions, - sizeof(*o2hb_db_failedregions), - O2HB_DB_TYPE_FAILEDREGIONS, - sizeof(o2hb_failed_region_bitmap), - O2NM_MAX_REGIONS, - o2hb_failed_region_bitmap); - if (!o2hb_debug_failedregions) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir, + &o2hb_db_quorumregions, + sizeof(*o2hb_db_quorumregions), + O2HB_DB_TYPE_QUORUMREGIONS, + sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS, + o2hb_quorum_region_bitmap); - ret = 0; -bail: - if (ret) - o2hb_exit(); - - return ret; + o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir, + &o2hb_db_failedregions, + sizeof(*o2hb_db_failedregions), + O2HB_DB_TYPE_FAILEDREGIONS, + sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS, + o2hb_failed_region_bitmap); } -int o2hb_init(void) +void o2hb_init(void) { int i; @@ -1413,38 +1445,34 @@ int o2hb_init(void) for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) INIT_LIST_HEAD(&o2hb_live_slots[i]); - INIT_LIST_HEAD(&o2hb_node_events); - - memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); - memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap)); - memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap)); - memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap)); - memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap)); + bitmap_zero(o2hb_live_node_bitmap, O2NM_MAX_NODES); + bitmap_zero(o2hb_region_bitmap, O2NM_MAX_REGIONS); + bitmap_zero(o2hb_live_region_bitmap, O2NM_MAX_REGIONS); + bitmap_zero(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS); + bitmap_zero(o2hb_failed_region_bitmap, O2NM_MAX_REGIONS); o2hb_dependent_users = 0; - return o2hb_debug_init(); + o2hb_debug_init(); } /* if we're already in a callback then we're already serialized by the sem */ static void o2hb_fill_node_map_from_callback(unsigned long *map, - unsigned bytes) + unsigned int bits) { - BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); - - memcpy(map, &o2hb_live_node_bitmap, bytes); + bitmap_copy(map, o2hb_live_node_bitmap, bits); } /* * get a map of all nodes that are heartbeating in any regions */ -void o2hb_fill_node_map(unsigned long *map, unsigned bytes) +void o2hb_fill_node_map(unsigned long *map, unsigned int bits) { /* callers want to serialize this map and callbacks so that they * can trust that they don't miss nodes coming to the party */ down_read(&o2hb_callback_sem); spin_lock(&o2hb_live_lock); - o2hb_fill_node_map_from_callback(map, bytes); + o2hb_fill_node_map_from_callback(map, bits); spin_unlock(&o2hb_live_lock); up_read(&o2hb_callback_sem); } @@ -1469,7 +1497,7 @@ static void o2hb_region_release(struct config_item *item) struct page *page; struct o2hb_region *reg = to_o2hb_region(item); - mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name); + mlog(ML_HEARTBEAT, "hb region release (%pg)\n", reg_bdev(reg)); kfree(reg->hr_tmp_block); @@ -1482,38 +1510,37 @@ static void o2hb_region_release(struct config_item *item) kfree(reg->hr_slot_data); } - if (reg->hr_bdev) - blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); + if (reg->hr_bdev_file) + fput(reg->hr_bdev_file); kfree(reg->hr_slots); - kfree(reg->hr_db_regnum); + debugfs_remove_recursive(reg->hr_debug_dir); kfree(reg->hr_db_livenodes); - debugfs_remove(reg->hr_debug_livenodes); - debugfs_remove(reg->hr_debug_regnum); - debugfs_remove(reg->hr_debug_elapsed_time); - debugfs_remove(reg->hr_debug_pinned); - debugfs_remove(reg->hr_debug_dir); + kfree(reg->hr_db_regnum); + kfree(reg->hr_db_elapsed_time); + kfree(reg->hr_db_pinned); spin_lock(&o2hb_live_lock); list_del(®->hr_all_item); spin_unlock(&o2hb_live_lock); + o2net_unregister_handler_list(®->hr_handler_list); kfree(reg); } static int o2hb_read_block_input(struct o2hb_region *reg, const char *page, - size_t count, unsigned long *ret_bytes, unsigned int *ret_bits) { unsigned long bytes; char *p = (char *)page; + int ret; - bytes = simple_strtoul(p, &p, 0); - if (!p || (*p && (*p != '\n'))) - return -EINVAL; + ret = kstrtoul(p, 0, &bytes); + if (ret) + return ret; /* Heartbeat and fs min / max block sizes are the same. */ if (bytes > 4096 || bytes < 512) @@ -1529,25 +1556,26 @@ static int o2hb_read_block_input(struct o2hb_region *reg, return 0; } -static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, +static ssize_t o2hb_region_block_bytes_show(struct config_item *item, char *page) { - return sprintf(page, "%u\n", reg->hr_block_bytes); + return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes); } -static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, +static ssize_t o2hb_region_block_bytes_store(struct config_item *item, const char *page, size_t count) { + struct o2hb_region *reg = to_o2hb_region(item); int status; unsigned long block_bytes; unsigned int block_bits; - if (reg->hr_bdev) + if (reg->hr_bdev_file) return -EINVAL; - status = o2hb_read_block_input(reg, page, count, - &block_bytes, &block_bits); + status = o2hb_read_block_input(reg, page, &block_bytes, + &block_bits); if (status) return status; @@ -1557,24 +1585,26 @@ static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, return count; } -static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, +static ssize_t o2hb_region_start_block_show(struct config_item *item, char *page) { - return sprintf(page, "%llu\n", reg->hr_start_block); + return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block); } -static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, +static ssize_t o2hb_region_start_block_store(struct config_item *item, const char *page, size_t count) { + struct o2hb_region *reg = to_o2hb_region(item); unsigned long long tmp; char *p = (char *)page; + ssize_t ret; - if (reg->hr_bdev) + if (reg->hr_bdev_file) return -EINVAL; - tmp = simple_strtoull(p, &p, 0); - if (!p || (*p && (*p != '\n'))) + ret = kstrtoull(p, 0, &tmp); + if (ret) return -EINVAL; reg->hr_start_block = tmp; @@ -1582,25 +1612,26 @@ static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, return count; } -static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, - char *page) +static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page) { - return sprintf(page, "%d\n", reg->hr_blocks); + return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks); } -static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, +static ssize_t o2hb_region_blocks_store(struct config_item *item, const char *page, size_t count) { + struct o2hb_region *reg = to_o2hb_region(item); unsigned long tmp; char *p = (char *)page; + int ret; - if (reg->hr_bdev) + if (reg->hr_bdev_file) return -EINVAL; - tmp = simple_strtoul(p, &p, 0); - if (!p || (*p && (*p != '\n'))) - return -EINVAL; + ret = kstrtoul(p, 0, &tmp); + if (ret) + return ret; if (tmp > O2NM_MAX_NODES || tmp == 0) return -ERANGE; @@ -1610,20 +1641,19 @@ static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, return count; } -static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, - char *page) +static ssize_t o2hb_region_dev_show(struct config_item *item, char *page) { unsigned int ret = 0; - if (reg->hr_bdev) - ret = sprintf(page, "%s\n", reg->hr_dev_name); + if (to_o2hb_region(item)->hr_bdev_file) + ret = sprintf(page, "%pg\n", reg_bdev(to_o2hb_region(item))); return ret; } static void o2hb_init_region_params(struct o2hb_region *reg) { - reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; + reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits; reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", @@ -1644,17 +1674,13 @@ static int o2hb_map_slot_data(struct o2hb_region *reg) struct o2hb_disk_slot *slot; reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); - if (reg->hr_tmp_block == NULL) { - mlog_errno(-ENOMEM); + if (reg->hr_tmp_block == NULL) return -ENOMEM; - } reg->hr_slots = kcalloc(reg->hr_blocks, sizeof(struct o2hb_disk_slot), GFP_KERNEL); - if (reg->hr_slots == NULL) { - mlog_errno(-ENOMEM); + if (reg->hr_slots == NULL) return -ENOMEM; - } for(i = 0; i < reg->hr_blocks; i++) { slot = ®->hr_slots[i]; @@ -1670,17 +1696,13 @@ static int o2hb_map_slot_data(struct o2hb_region *reg) reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), GFP_KERNEL); - if (!reg->hr_slot_data) { - mlog_errno(-ENOMEM); + if (!reg->hr_slot_data) return -ENOMEM; - } for(i = 0; i < reg->hr_num_pages; i++) { page = alloc_page(GFP_KERNEL); - if (!page) { - mlog_errno(-ENOMEM); + if (!page) return -ENOMEM; - } reg->hr_slot_data[i] = page; @@ -1711,11 +1733,9 @@ static int o2hb_populate_slot_data(struct o2hb_region *reg) struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; - ret = o2hb_read_slots(reg, reg->hr_blocks); - if (ret) { - mlog_errno(ret); + ret = o2hb_read_slots(reg, 0, reg->hr_blocks); + if (ret) goto out; - } /* We only want to get an idea of the values initially in each * slot, so we do no verification - o2hb_check_slot will @@ -1735,61 +1755,57 @@ out: return ret; } -/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ -static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, +/* + * this is acting as commit; we set up all of hr_bdev_file and hr_task or + * nothing + */ +static ssize_t o2hb_region_dev_store(struct config_item *item, const char *page, size_t count) { + struct o2hb_region *reg = to_o2hb_region(item); struct task_struct *hb_task; long fd; int sectsize; char *p = (char *)page; - struct fd f; - struct inode *inode; ssize_t ret = -EINVAL; int live_threshold; - if (reg->hr_bdev) - goto out; + if (reg->hr_bdev_file) + return -EINVAL; /* We can't heartbeat without having had our node number * configured yet. */ if (o2nm_this_node() == O2NM_MAX_NODES) - goto out; + return -EINVAL; - fd = simple_strtol(p, &p, 0); - if (!p || (*p && (*p != '\n'))) - goto out; + ret = kstrtol(p, 0, &fd); + if (ret < 0) + return -EINVAL; if (fd < 0 || fd >= INT_MAX) - goto out; + return -EINVAL; - f = fdget(fd); - if (f.file == NULL) - goto out; + CLASS(fd, f)(fd); + if (fd_empty(f)) + return -EINVAL; if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || reg->hr_block_bytes == 0) - goto out2; - - inode = igrab(f.file->f_mapping->host); - if (inode == NULL) - goto out2; + return -EINVAL; - if (!S_ISBLK(inode->i_mode)) - goto out3; + if (!S_ISBLK(fd_file(f)->f_mapping->host->i_mode)) + return -EINVAL; - reg->hr_bdev = I_BDEV(f.file->f_mapping->host); - ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL); - if (ret) { - reg->hr_bdev = NULL; - goto out3; + reg->hr_bdev_file = bdev_file_open_by_dev(fd_file(f)->f_mapping->host->i_rdev, + BLK_OPEN_WRITE | BLK_OPEN_READ, NULL, NULL); + if (IS_ERR(reg->hr_bdev_file)) { + ret = PTR_ERR(reg->hr_bdev_file); + reg->hr_bdev_file = NULL; + return ret; } - inode = NULL; - - bdevname(reg->hr_bdev, reg->hr_dev_name); - sectsize = bdev_logical_block_size(reg->hr_bdev); + sectsize = bdev_logical_block_size(reg_bdev(reg)); if (sectsize != reg->hr_block_bytes) { mlog(ML_ERROR, "blocksize %u incorrect for device, expected %d", @@ -1819,6 +1835,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, } INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); + INIT_DELAYED_WORK(®->hr_nego_timeout_work, o2hb_nego_timeout); /* * A node is considered live after it has beat LIVE_THRESHOLD @@ -1831,14 +1848,14 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, live_threshold = O2HB_LIVE_THRESHOLD; if (o2hb_global_heartbeat_active()) { spin_lock(&o2hb_live_lock); - if (o2hb_pop_count(&o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) + if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) live_threshold <<= 1; spin_unlock(&o2hb_live_lock); } ++live_threshold; atomic_set(®->hr_steady_iterations, live_threshold); - /* unsteady_iterations is double the steady_iterations */ - atomic_set(®->hr_unsteady_iterations, (live_threshold << 1)); + /* unsteady_iterations is triple the steady_iterations */ + atomic_set(®->hr_unsteady_iterations, (live_threshold * 3)); hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", reg->hr_item.ci_name); @@ -1853,7 +1870,8 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, spin_unlock(&o2hb_live_lock); ret = wait_event_interruptible(o2hb_steady_queue, - atomic_read(®->hr_steady_iterations) == 0); + atomic_read(®->hr_steady_iterations) == 0 || + reg->hr_node_deleted); if (ret) { atomic_set(®->hr_steady_iterations, 0); reg->hr_aborted_start = 1; @@ -1864,6 +1882,11 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, goto out3; } + if (reg->hr_node_deleted) { + ret = -EINVAL; + goto out3; + } + /* Ok, we were woken. Make sure it wasn't by drop_item() */ spin_lock(&o2hb_live_lock); hb_task = reg->hr_task; @@ -1877,26 +1900,20 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, ret = -EIO; if (hb_task && o2hb_global_heartbeat_active()) - printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n", - config_item_name(®->hr_item), reg->hr_dev_name); + printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%pg)\n", + config_item_name(®->hr_item), reg_bdev(reg)); out3: - iput(inode); -out2: - fdput(f); -out: if (ret < 0) { - if (reg->hr_bdev) { - blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); - reg->hr_bdev = NULL; - } + fput(reg->hr_bdev_file); + reg->hr_bdev_file = NULL; } return ret; } -static ssize_t o2hb_region_pid_read(struct o2hb_region *reg, - char *page) +static ssize_t o2hb_region_pid_show(struct config_item *item, char *page) { + struct o2hb_region *reg = to_o2hb_region(item); pid_t pid = 0; spin_lock(&o2hb_live_lock); @@ -1910,95 +1927,26 @@ static ssize_t o2hb_region_pid_read(struct o2hb_region *reg, return sprintf(page, "%u\n", pid); } -struct o2hb_region_attribute { - struct configfs_attribute attr; - ssize_t (*show)(struct o2hb_region *, char *); - ssize_t (*store)(struct o2hb_region *, const char *, size_t); -}; - -static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "block_bytes", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_region_block_bytes_read, - .store = o2hb_region_block_bytes_write, -}; - -static struct o2hb_region_attribute o2hb_region_attr_start_block = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "start_block", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_region_start_block_read, - .store = o2hb_region_start_block_write, -}; - -static struct o2hb_region_attribute o2hb_region_attr_blocks = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "blocks", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_region_blocks_read, - .store = o2hb_region_blocks_write, -}; - -static struct o2hb_region_attribute o2hb_region_attr_dev = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "dev", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_region_dev_read, - .store = o2hb_region_dev_write, -}; - -static struct o2hb_region_attribute o2hb_region_attr_pid = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "pid", - .ca_mode = S_IRUGO | S_IRUSR }, - .show = o2hb_region_pid_read, -}; +CONFIGFS_ATTR(o2hb_region_, block_bytes); +CONFIGFS_ATTR(o2hb_region_, start_block); +CONFIGFS_ATTR(o2hb_region_, blocks); +CONFIGFS_ATTR(o2hb_region_, dev); +CONFIGFS_ATTR_RO(o2hb_region_, pid); static struct configfs_attribute *o2hb_region_attrs[] = { - &o2hb_region_attr_block_bytes.attr, - &o2hb_region_attr_start_block.attr, - &o2hb_region_attr_blocks.attr, - &o2hb_region_attr_dev.attr, - &o2hb_region_attr_pid.attr, + &o2hb_region_attr_block_bytes, + &o2hb_region_attr_start_block, + &o2hb_region_attr_blocks, + &o2hb_region_attr_dev, + &o2hb_region_attr_pid, NULL, }; -static ssize_t o2hb_region_show(struct config_item *item, - struct configfs_attribute *attr, - char *page) -{ - struct o2hb_region *reg = to_o2hb_region(item); - struct o2hb_region_attribute *o2hb_region_attr = - container_of(attr, struct o2hb_region_attribute, attr); - ssize_t ret = 0; - - if (o2hb_region_attr->show) - ret = o2hb_region_attr->show(reg, page); - return ret; -} - -static ssize_t o2hb_region_store(struct config_item *item, - struct configfs_attribute *attr, - const char *page, size_t count) -{ - struct o2hb_region *reg = to_o2hb_region(item); - struct o2hb_region_attribute *o2hb_region_attr = - container_of(attr, struct o2hb_region_attribute, attr); - ssize_t ret = -EINVAL; - - if (o2hb_region_attr->store) - ret = o2hb_region_attr->store(reg, page, count); - return ret; -} - static struct configfs_item_operations o2hb_region_item_ops = { .release = o2hb_region_release, - .show_attribute = o2hb_region_show, - .store_attribute = o2hb_region_store, }; -static struct config_item_type o2hb_region_type = { +static const struct config_item_type o2hb_region_type = { .ct_item_ops = &o2hb_region_item_ops, .ct_attrs = o2hb_region_attrs, .ct_owner = THIS_MODULE, @@ -2018,69 +1966,33 @@ static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group : NULL; } -static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir) +static void o2hb_debug_region_init(struct o2hb_region *reg, + struct dentry *parent) { - int ret = -ENOMEM; + struct dentry *dir; - reg->hr_debug_dir = - debugfs_create_dir(config_item_name(®->hr_item), dir); - if (!reg->hr_debug_dir) { - mlog_errno(ret); - goto bail; - } + dir = debugfs_create_dir(config_item_name(®->hr_item), parent); + reg->hr_debug_dir = dir; - reg->hr_debug_livenodes = - o2hb_debug_create(O2HB_DEBUG_LIVENODES, - reg->hr_debug_dir, - &(reg->hr_db_livenodes), - sizeof(*(reg->hr_db_livenodes)), - O2HB_DB_TYPE_REGION_LIVENODES, - sizeof(reg->hr_live_node_bitmap), - O2NM_MAX_NODES, reg); - if (!reg->hr_debug_livenodes) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes), + sizeof(*(reg->hr_db_livenodes)), + O2HB_DB_TYPE_REGION_LIVENODES, + sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES, + reg); - reg->hr_debug_regnum = - o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, - reg->hr_debug_dir, - &(reg->hr_db_regnum), - sizeof(*(reg->hr_db_regnum)), - O2HB_DB_TYPE_REGION_NUMBER, - 0, O2NM_MAX_NODES, reg); - if (!reg->hr_debug_regnum) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum), + sizeof(*(reg->hr_db_regnum)), + O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg); - reg->hr_debug_elapsed_time = - o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, - reg->hr_debug_dir, - &(reg->hr_db_elapsed_time), - sizeof(*(reg->hr_db_elapsed_time)), - O2HB_DB_TYPE_REGION_ELAPSED_TIME, - 0, 0, reg); - if (!reg->hr_debug_elapsed_time) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir, + &(reg->hr_db_elapsed_time), + sizeof(*(reg->hr_db_elapsed_time)), + O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg); - reg->hr_debug_pinned = - o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, - reg->hr_debug_dir, - &(reg->hr_db_pinned), - sizeof(*(reg->hr_db_pinned)), - O2HB_DB_TYPE_REGION_PINNED, - 0, 0, reg); - if (!reg->hr_debug_pinned) { - mlog_errno(ret); - goto bail; - } + o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned), + sizeof(*(reg->hr_db_pinned)), + O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg); - ret = 0; -bail: - return ret; } static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, @@ -2115,13 +2027,39 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g config_item_init_type_name(®->hr_item, name, &o2hb_region_type); - ret = o2hb_debug_region_init(reg, o2hb_debug_dir); - if (ret) { - config_item_put(®->hr_item); - goto free; - } + /* this is the same way to generate msg key as dlm, for local heartbeat, + * name is also the same, so make initial crc value different to avoid + * message key conflict. + */ + reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS, + name, strlen(name)); + INIT_LIST_HEAD(®->hr_handler_list); + ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key, + sizeof(struct o2hb_nego_msg), + o2hb_nego_timeout_handler, + reg, NULL, ®->hr_handler_list); + if (ret) + goto remove_item; + + ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key, + sizeof(struct o2hb_nego_msg), + o2hb_nego_approve_handler, + reg, NULL, ®->hr_handler_list); + if (ret) + goto unregister_handler; + + o2hb_debug_region_init(reg, o2hb_debug_dir); return ®->hr_item; + +unregister_handler: + o2net_unregister_handler_list(®->hr_handler_list); +remove_item: + spin_lock(&o2hb_live_lock); + list_del(®->hr_all_item); + if (o2hb_global_heartbeat_active()) + clear_bit(reg->hr_region_num, o2hb_region_bitmap); + spin_unlock(&o2hb_live_lock); free: kfree(reg); return ERR_PTR(ret); @@ -2152,10 +2090,10 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group, quorum_region = 1; clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); spin_unlock(&o2hb_live_lock); - printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n", + printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%pg)\n", ((atomic_read(®->hr_steady_iterations) == 0) ? "stopped" : "start aborted"), config_item_name(item), - reg->hr_dev_name); + reg_bdev(reg)); } /* @@ -2182,7 +2120,7 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group, if (!o2hb_dependent_users) goto unlock; - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) o2hb_region_pin(NULL); @@ -2190,56 +2128,22 @@ unlock: spin_unlock(&o2hb_live_lock); } -struct o2hb_heartbeat_group_attribute { - struct configfs_attribute attr; - ssize_t (*show)(struct o2hb_heartbeat_group *, char *); - ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); -}; - -static ssize_t o2hb_heartbeat_group_show(struct config_item *item, - struct configfs_attribute *attr, - char *page) -{ - struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); - struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = - container_of(attr, struct o2hb_heartbeat_group_attribute, attr); - ssize_t ret = 0; - - if (o2hb_heartbeat_group_attr->show) - ret = o2hb_heartbeat_group_attr->show(reg, page); - return ret; -} - -static ssize_t o2hb_heartbeat_group_store(struct config_item *item, - struct configfs_attribute *attr, - const char *page, size_t count) -{ - struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); - struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = - container_of(attr, struct o2hb_heartbeat_group_attribute, attr); - ssize_t ret = -EINVAL; - - if (o2hb_heartbeat_group_attr->store) - ret = o2hb_heartbeat_group_attr->store(reg, page, count); - return ret; -} - -static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, - char *page) +static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item, + char *page) { return sprintf(page, "%u\n", o2hb_dead_threshold); } -static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, - const char *page, - size_t count) +static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item, + const char *page, size_t count) { unsigned long tmp; char *p = (char *)page; + int ret; - tmp = simple_strtoul(p, &p, 10); - if (!p || (*p && (*p != '\n'))) - return -EINVAL; + ret = kstrtoul(p, 10, &tmp); + if (ret) + return ret; /* this will validate ranges for us. */ o2hb_dead_threshold_set((unsigned int) tmp); @@ -2247,17 +2151,15 @@ static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group return count; } -static -ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group, - char *page) +static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item, + char *page) { return sprintf(page, "%s\n", o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]); } -static -ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group, - const char *page, size_t count) +static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item, + const char *page, size_t count) { unsigned int i; int ret; @@ -2268,7 +2170,7 @@ ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group, return -EINVAL; for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) { - if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len)) + if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len)) continue; ret = o2hb_global_heartbeat_mode_set(i); @@ -2282,41 +2184,22 @@ ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group, } -static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "dead_threshold", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_heartbeat_group_threshold_show, - .store = o2hb_heartbeat_group_threshold_store, -}; - -static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = { - .attr = { .ca_owner = THIS_MODULE, - .ca_name = "mode", - .ca_mode = S_IRUGO | S_IWUSR }, - .show = o2hb_heartbeat_group_mode_show, - .store = o2hb_heartbeat_group_mode_store, -}; +CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold); +CONFIGFS_ATTR(o2hb_heartbeat_group_, mode); static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { - &o2hb_heartbeat_group_attr_threshold.attr, - &o2hb_heartbeat_group_attr_mode.attr, + &o2hb_heartbeat_group_attr_dead_threshold, + &o2hb_heartbeat_group_attr_mode, NULL, }; -static struct configfs_item_operations o2hb_heartbeat_group_item_ops = { - .show_attribute = o2hb_heartbeat_group_show, - .store_attribute = o2hb_heartbeat_group_store, -}; - static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { .make_item = o2hb_heartbeat_group_make_item, .drop_item = o2hb_heartbeat_group_drop_item, }; -static struct config_item_type o2hb_heartbeat_group_type = { +static const struct config_item_type o2hb_heartbeat_group_type = { .ct_group_ops = &o2hb_heartbeat_group_group_ops, - .ct_item_ops = &o2hb_heartbeat_group_item_ops, .ct_attrs = o2hb_heartbeat_group_attrs, .ct_owner = THIS_MODULE, }; @@ -2482,7 +2365,7 @@ static int o2hb_region_inc_user(const char *region_uuid) if (o2hb_dependent_users > 1) goto unlock; - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) ret = o2hb_region_pin(NULL); @@ -2491,7 +2374,7 @@ unlock: return ret; } -void o2hb_region_dec_user(const char *region_uuid) +static void o2hb_region_dec_user(const char *region_uuid) { spin_lock(&o2hb_live_lock); @@ -2516,8 +2399,7 @@ unlock: int o2hb_register_callback(const char *region_uuid, struct o2hb_callback_func *hc) { - struct o2hb_callback_func *tmp; - struct list_head *iter; + struct o2hb_callback_func *f; struct o2hb_callback *hbcall; int ret; @@ -2540,10 +2422,9 @@ int o2hb_register_callback(const char *region_uuid, down_write(&o2hb_callback_sem); - list_for_each(iter, &hbcall->list) { - tmp = list_entry(iter, struct o2hb_callback_func, hc_item); - if (hc->hc_priority < tmp->hc_priority) { - list_add_tail(&hc->hc_item, iter); + list_for_each_entry(f, &hbcall->list, hc_item) { + if (hc->hc_priority < f->hc_priority) { + list_add_tail(&hc->hc_item, &f->hc_item); break; } } @@ -2582,11 +2463,13 @@ void o2hb_unregister_callback(const char *region_uuid, } EXPORT_SYMBOL_GPL(o2hb_unregister_callback); -int o2hb_check_node_heartbeating(u8 node_num) +int o2hb_check_node_heartbeating_no_sem(u8 node_num) { unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; - o2hb_fill_node_map(testing_map, sizeof(testing_map)); + spin_lock(&o2hb_live_lock); + o2hb_fill_node_map_from_callback(testing_map, O2NM_MAX_NODES); + spin_unlock(&o2hb_live_lock); if (!test_bit(node_num, testing_map)) { mlog(ML_HEARTBEAT, "node (%u) does not have heartbeating enabled.\n", @@ -2596,13 +2479,13 @@ int o2hb_check_node_heartbeating(u8 node_num) return 1; } -EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); +EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem); int o2hb_check_node_heartbeating_from_callback(u8 node_num) { unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; - o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); + o2hb_fill_node_map_from_callback(testing_map, O2NM_MAX_NODES); if (!test_bit(node_num, testing_map)) { mlog(ML_HEARTBEAT, "node (%u) does not have heartbeating enabled.\n", @@ -2614,23 +2497,6 @@ int o2hb_check_node_heartbeating_from_callback(u8 node_num) } EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); -/* Makes sure our local node is configured with a node number, and is - * heartbeating. */ -int o2hb_check_local_node_heartbeating(void) -{ - u8 node_num; - - /* if this node was set then we have networking */ - node_num = o2nm_this_node(); - if (node_num == O2NM_MAX_NODES) { - mlog(ML_HEARTBEAT, "this node has not been configured.\n"); - return 0; - } - - return o2hb_check_node_heartbeating(node_num); -} -EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); - /* * this is just a hack until we get the plumbing which flips file systems * read only and drops the hb ref instead of killing the node dead. |
