Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6f3bfd45 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: ceph_osds, ceph_pg_to_up_acting_osds()



Knowning just acting set isn't enough, we need to be able to record up
set as well to detect interval changes.  This means returning (up[],
up_len, up_primary, acting[], acting_len, acting_primary) and passing
it around.  Introduce and switch to ceph_osds to help with that.

Rename ceph_calc_pg_acting() to ceph_pg_to_up_acting_osds() and return
both up and acting sets from it.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent d9591f5e
Loading
Loading
Loading
Loading
+18 −3
Original line number Original line Diff line number Diff line
@@ -208,6 +208,20 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
					     struct ceph_osdmap *map);
					     struct ceph_osdmap *map);
extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
extern void ceph_osdmap_destroy(struct ceph_osdmap *map);


struct ceph_osds {
	int osds[CEPH_PG_MAX_SIZE];
	int size;
	int primary; /* id, NOT index */
};

static inline void ceph_osds_init(struct ceph_osds *set)
{
	set->size = 0;
	set->primary = -1;
}

void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src);

/* calculate mapping of a file extent to an object */
/* calculate mapping of a file extent to an object */
extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
					 u64 off, u64 len,
					 u64 off, u64 len,
@@ -218,9 +232,10 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
			      struct ceph_object_locator *oloc,
			      struct ceph_object_locator *oloc,
			      struct ceph_pg *raw_pgid);
			      struct ceph_pg *raw_pgid);


extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
			       struct ceph_pg pgid,
			       const struct ceph_pg *raw_pgid,
			       int *osds, int *primary);
			       struct ceph_osds *up,
			       struct ceph_osds *acting);
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
				struct ceph_pg pgid);
				struct ceph_pg pgid);


+18 −18
Original line number Original line Diff line number Diff line
@@ -1358,8 +1358,7 @@ static int __map_request(struct ceph_osd_client *osdc,
			 struct ceph_osd_request *req, int force_resend)
			 struct ceph_osd_request *req, int force_resend)
{
{
	struct ceph_pg pgid;
	struct ceph_pg pgid;
	int acting[CEPH_PG_MAX_SIZE];
	struct ceph_osds up, acting;
	int num, o;
	int err;
	int err;
	bool was_paused;
	bool was_paused;


@@ -1372,9 +1371,7 @@ static int __map_request(struct ceph_osd_client *osdc,
	}
	}
	req->r_pgid = pgid;
	req->r_pgid = pgid;


	num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
	ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
	if (num < 0)
		num = 0;


	was_paused = req->r_paused;
	was_paused = req->r_paused;
	req->r_paused = __req_should_be_paused(osdc, req);
	req->r_paused = __req_should_be_paused(osdc, req);
@@ -1382,21 +1379,23 @@ static int __map_request(struct ceph_osd_client *osdc,
		force_resend = 1;
		force_resend = 1;


	if ((!force_resend &&
	if ((!force_resend &&
	     req->r_osd && req->r_osd->o_osd == o &&
	     req->r_osd && req->r_osd->o_osd == acting.primary &&
	     req->r_sent >= req->r_osd->o_incarnation &&
	     req->r_sent >= req->r_osd->o_incarnation &&
	     req->r_num_pg_osds == num &&
	     req->r_num_pg_osds == acting.size &&
	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
	     memcmp(req->r_pg_osds, acting.osds,
	    (req->r_osd == NULL && o == -1) ||
		    acting.size * sizeof(acting.osds[0])) == 0) ||
	    (req->r_osd == NULL && acting.primary == -1) ||
	    req->r_paused)
	    req->r_paused)
		return 0;  /* no change */
		return 0;  /* no change */


	dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
	dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
	     req->r_tid, pgid.pool, pgid.seed, o,
	     req->r_tid, pgid.pool, pgid.seed, acting.primary,
	     req->r_osd ? req->r_osd->o_osd : -1);
	     req->r_osd ? req->r_osd->o_osd : -1);


	/* record full pg acting set */
	/* record full pg acting set */
	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
	memcpy(req->r_pg_osds, acting.osds,
	req->r_num_pg_osds = num;
	       acting.size * sizeof(acting.osds[0]));
	req->r_num_pg_osds = acting.size;


	if (req->r_osd) {
	if (req->r_osd) {
		__cancel_request(req);
		__cancel_request(req);
@@ -1405,21 +1404,22 @@ static int __map_request(struct ceph_osd_client *osdc,
		req->r_osd = NULL;
		req->r_osd = NULL;
	}
	}


	req->r_osd = lookup_osd(&osdc->osds, o);
	req->r_osd = lookup_osd(&osdc->osds, acting.primary);
	if (!req->r_osd && o >= 0) {
	if (!req->r_osd && acting.primary >= 0) {
		err = -ENOMEM;
		err = -ENOMEM;
		req->r_osd = create_osd(osdc, o);
		req->r_osd = create_osd(osdc, acting.primary);
		if (!req->r_osd) {
		if (!req->r_osd) {
			list_move(&req->r_req_lru_item, &osdc->req_notarget);
			list_move(&req->r_req_lru_item, &osdc->req_notarget);
			goto out;
			goto out;
		}
		}


		dout("map_request osd %p is osd%d\n", req->r_osd, o);
		dout("map_request osd %p is osd%d\n", req->r_osd,
		     acting.primary);
		insert_osd(&osdc->osds, req->r_osd);
		insert_osd(&osdc->osds, req->r_osd);


		ceph_con_open(&req->r_osd->o_con,
		ceph_con_open(&req->r_osd->o_con,
			      CEPH_ENTITY_TYPE_OSD, o,
			      CEPH_ENTITY_TYPE_OSD, acting.primary,
			      &osdc->osdmap->osd_addr[o]);
			      &osdc->osdmap->osd_addr[acting.primary]);
	}
	}


	__enqueue_request(req);
	__enqueue_request(req);
+179 −125
Original line number Original line Diff line number Diff line
@@ -1474,6 +1474,38 @@ void ceph_oid_destroy(struct ceph_object_id *oid)
}
}
EXPORT_SYMBOL(ceph_oid_destroy);
EXPORT_SYMBOL(ceph_oid_destroy);


static bool osds_valid(const struct ceph_osds *set)
{
	/* non-empty set */
	if (set->size > 0 && set->primary >= 0)
		return true;

	/* empty can_shift_osds set */
	if (!set->size && set->primary == -1)
		return true;

	/* empty !can_shift_osds set - all NONE */
	if (set->size > 0 && set->primary == -1) {
		int i;

		for (i = 0; i < set->size; i++) {
			if (set->osds[i] != CRUSH_ITEM_NONE)
				break;
		}
		if (i == set->size)
			return true;
	}

	return false;
}

void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
{
	memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
	dest->size = src->size;
	dest->primary = src->primary;
}

/*
/*
 * calculate file layout from given offset, length.
 * calculate file layout from given offset, length.
 * fill in correct oid, logical length, and object extent
 * fill in correct oid, logical length, and object extent
@@ -1571,6 +1603,46 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
}
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
EXPORT_SYMBOL(ceph_object_locator_to_pg);


/*
 * Map a raw PG (full precision ps) into an actual PG.
 */
static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
			 const struct ceph_pg *raw_pgid,
			 struct ceph_pg *pgid)
{
	pgid->pool = raw_pgid->pool;
	pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
				     pi->pg_num_mask);
}

/*
 * Map a raw PG (full precision ps) into a placement ps (placement
 * seed).  Include pool id in that value so that different pools don't
 * use the same seeds.
 */
static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
			 const struct ceph_pg *raw_pgid)
{
	if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
		/* hash pool id and seed so that pool PGs do not overlap */
		return crush_hash32_2(CRUSH_HASH_RJENKINS1,
				      ceph_stable_mod(raw_pgid->seed,
						      pi->pgp_num,
						      pi->pgp_num_mask),
				      raw_pgid->pool);
	} else {
		/*
		 * legacy behavior: add ps and pool together.  this is
		 * not a great approach because the PGs from each pool
		 * will overlap on top of each other: 0.5 == 1.4 ==
		 * 2.3 == ...
		 */
		return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
				       pi->pgp_num_mask) +
		       (unsigned)raw_pgid->pool;
	}
}

static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
		    int *result, int result_max,
		    int *result, int result_max,
		    const __u32 *weight, int weight_max)
		    const __u32 *weight, int weight_max)
@@ -1588,84 +1660,92 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
}
}


/*
/*
 * Calculate raw (crush) set for given pgid.
 * Calculate raw set (CRUSH output) for given PG.  The result may
 * contain nonexistent OSDs.  ->primary is undefined for a raw set.
 *
 *
 * Return raw set length, or error.
 * Placement seed (CRUSH input) is returned through @ppps.
 */
 */
static int pg_to_raw_osds(struct ceph_osdmap *osdmap,
static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
			  struct ceph_pg_pool_info *pool,
			   struct ceph_pg_pool_info *pi,
			  struct ceph_pg pgid, u32 pps, int *osds)
			   const struct ceph_pg *raw_pgid,
			   struct ceph_osds *raw,
			   u32 *ppps)
{
{
	u32 pps = raw_pg_to_pps(pi, raw_pgid);
	int ruleno;
	int ruleno;
	int len;
	int len;


	/* crush */
	ceph_osds_init(raw);
	ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
	if (ppps)
				 pool->type, pool->size);
		*ppps = pps;

	ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
				 pi->size);
	if (ruleno < 0) {
	if (ruleno < 0) {
		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
		       pgid.pool, pool->crush_ruleset, pool->type,
		       pi->id, pi->crush_ruleset, pi->type, pi->size);
		       pool->size);
		return;
		return -ENOENT;
	}
	}


	len = do_crush(osdmap, ruleno, pps, osds,
	len = do_crush(osdmap, ruleno, pps, raw->osds,
		       min_t(int, pool->size, CEPH_PG_MAX_SIZE),
		       min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
		       osdmap->osd_weight, osdmap->max_osd);
		       osdmap->osd_weight, osdmap->max_osd);
	if (len < 0) {
	if (len < 0) {
		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
		       len, ruleno, pgid.pool, pool->crush_ruleset,
		       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
		       pool->type, pool->size);
		       pi->size);
		return len;
		return;
	}
	}


	return len;
	raw->size = len;
}
}


/*
/*
 * Given raw set, calculate up set and up primary.
 * Given raw set, calculate up set and up primary.  By definition of an
 * up set, the result won't contain nonexistent or down OSDs.
 *
 *
 * Return up set length.  *primary is set to up primary osd id, or -1
 * This is done in-place - on return @set is the up set.  If it's
 * if up set is empty.
 * empty, ->primary will remain undefined.
 */
 */
static int raw_to_up_osds(struct ceph_osdmap *osdmap,
static void raw_to_up_osds(struct ceph_osdmap *osdmap,
			  struct ceph_pg_pool_info *pool,
			   struct ceph_pg_pool_info *pi,
			  int *osds, int len, int *primary)
			   struct ceph_osds *set)
{
{
	int up_primary = -1;
	int i;
	int i;


	if (ceph_can_shift_osds(pool)) {
	/* ->primary is undefined for a raw set */
	BUG_ON(set->primary != -1);

	if (ceph_can_shift_osds(pi)) {
		int removed = 0;
		int removed = 0;


		for (i = 0; i < len; i++) {
		/* shift left */
			if (ceph_osd_is_down(osdmap, osds[i])) {
		for (i = 0; i < set->size; i++) {
			if (ceph_osd_is_down(osdmap, set->osds[i])) {
				removed++;
				removed++;
				continue;
				continue;
			}
			}
			if (removed)
			if (removed)
				osds[i - removed] = osds[i];
				set->osds[i - removed] = set->osds[i];
		}
		}

		set->size -= removed;
		len -= removed;
		if (set->size > 0)
		if (len > 0)
			set->primary = set->osds[0];
			up_primary = osds[0];
	} else {
	} else {
		for (i = len - 1; i >= 0; i--) {
		/* set down/dne devices to NONE */
			if (ceph_osd_is_down(osdmap, osds[i]))
		for (i = set->size - 1; i >= 0; i--) {
				osds[i] = CRUSH_ITEM_NONE;
			if (ceph_osd_is_down(osdmap, set->osds[i]))
				set->osds[i] = CRUSH_ITEM_NONE;
			else
			else
				up_primary = osds[i];
				set->primary = set->osds[i];
		}
		}
	}
	}

	*primary = up_primary;
	return len;
}
}


static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
static void apply_primary_affinity(struct ceph_osdmap *osdmap,
				   struct ceph_pg_pool_info *pool,
				   struct ceph_pg_pool_info *pi,
				   int *osds, int len, int *primary)
				   u32 pps,
				   struct ceph_osds *up)
{
{
	int i;
	int i;
	int pos = -1;
	int pos = -1;
@@ -1677,8 +1757,8 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
	if (!osdmap->osd_primary_affinity)
	if (!osdmap->osd_primary_affinity)
		return;
		return;


	for (i = 0; i < len; i++) {
	for (i = 0; i < up->size; i++) {
		int osd = osds[i];
		int osd = up->osds[i];


		if (osd != CRUSH_ITEM_NONE &&
		if (osd != CRUSH_ITEM_NONE &&
		    osdmap->osd_primary_affinity[osd] !=
		    osdmap->osd_primary_affinity[osd] !=
@@ -1686,7 +1766,7 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
			break;
			break;
		}
		}
	}
	}
	if (i == len)
	if (i == up->size)
		return;
		return;


	/*
	/*
@@ -1694,8 +1774,8 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
	 * osd into the hash/rng so that a proportional fraction of an
	 * osd into the hash/rng so that a proportional fraction of an
	 * osd's pgs get rejected as primary.
	 * osd's pgs get rejected as primary.
	 */
	 */
	for (i = 0; i < len; i++) {
	for (i = 0; i < up->size; i++) {
		int osd = osds[i];
		int osd = up->osds[i];
		u32 aff;
		u32 aff;


		if (osd == CRUSH_ITEM_NONE)
		if (osd == CRUSH_ITEM_NONE)
@@ -1720,123 +1800,99 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
	if (pos < 0)
	if (pos < 0)
		return;
		return;


	*primary = osds[pos];
	up->primary = up->osds[pos];


	if (ceph_can_shift_osds(pool) && pos > 0) {
	if (ceph_can_shift_osds(pi) && pos > 0) {
		/* move the new primary to the front */
		/* move the new primary to the front */
		for (i = pos; i > 0; i--)
		for (i = pos; i > 0; i--)
			osds[i] = osds[i - 1];
			up->osds[i] = up->osds[i - 1];
		osds[0] = *primary;
		up->osds[0] = up->primary;
	}
	}
}
}


/*
/*
 * Given up set, apply pg_temp and primary_temp mappings.
 * Get pg_temp and primary_temp mappings for given PG.
 *
 *
 * Return acting set length.  *primary is set to acting primary osd id,
 * Note that a PG may have none, only pg_temp, only primary_temp or
 * or -1 if acting set is empty.
 * both pg_temp and primary_temp mappings.  This means @temp isn't
 * always a valid OSD set on return: in the "only primary_temp" case,
 * @temp will have its ->primary >= 0 but ->size == 0.
 */
 */
static int apply_temps(struct ceph_osdmap *osdmap,
static void get_temp_osds(struct ceph_osdmap *osdmap,
		       struct ceph_pg_pool_info *pool, struct ceph_pg pgid,
			  struct ceph_pg_pool_info *pi,
		       int *osds, int len, int *primary)
			  const struct ceph_pg *raw_pgid,
			  struct ceph_osds *temp)
{
{
	struct ceph_pg pgid;
	struct ceph_pg_mapping *pg;
	struct ceph_pg_mapping *pg;
	int temp_len;
	int temp_primary;
	int i;
	int i;


	/* raw_pg -> pg */
	raw_pg_to_pg(pi, raw_pgid, &pgid);
	pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
	ceph_osds_init(temp);
				    pool->pg_num_mask);


	/* pg_temp? */
	/* pg_temp? */
	pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
	pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
	if (pg) {
	if (pg) {
		temp_len = 0;
		temp_primary = -1;

		for (i = 0; i < pg->pg_temp.len; i++) {
		for (i = 0; i < pg->pg_temp.len; i++) {
			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
				if (ceph_can_shift_osds(pool))
				if (ceph_can_shift_osds(pi))
					continue;
					continue;
				else

					osds[temp_len++] = CRUSH_ITEM_NONE;
				temp->osds[temp->size++] = CRUSH_ITEM_NONE;
			} else {
			} else {
				osds[temp_len++] = pg->pg_temp.osds[i];
				temp->osds[temp->size++] = pg->pg_temp.osds[i];
			}
			}
		}
		}


		/* apply pg_temp's primary */
		/* apply pg_temp's primary */
		for (i = 0; i < temp_len; i++) {
		for (i = 0; i < temp->size; i++) {
			if (osds[i] != CRUSH_ITEM_NONE) {
			if (temp->osds[i] != CRUSH_ITEM_NONE) {
				temp_primary = osds[i];
				temp->primary = temp->osds[i];
				break;
				break;
			}
			}
		}
		}
	} else {
		temp_len = len;
		temp_primary = *primary;
	}
	}


	/* primary_temp? */
	/* primary_temp? */
	pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
	pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
	if (pg)
	if (pg)
		temp_primary = pg->primary_temp.osd;
		temp->primary = pg->primary_temp.osd;

	*primary = temp_primary;
	return temp_len;
}
}


/*
/*
 * Calculate acting set for given pgid.
 * Map a PG to its acting set as well as its up set.
 *
 *
 * Return acting set length, or error.  *primary is set to acting
 * Acting set is used for data mapping purposes, while up set can be
 * primary osd id, or -1 if acting set is empty or on error.
 * recorded for detecting interval changes and deciding whether to
 * resend a request.
 */
 */
int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
			int *osds, int *primary)
			       const struct ceph_pg *raw_pgid,
			       struct ceph_osds *up,
			       struct ceph_osds *acting)
{
{
	struct ceph_pg_pool_info *pool;
	struct ceph_pg_pool_info *pi;
	u32 pps;
	u32 pps;
	int len;


	pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
	pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
	if (!pool) {
	if (!pi) {
		*primary = -1;
		ceph_osds_init(up);
		return -ENOENT;
		ceph_osds_init(acting);
		goto out;
	}
	}


	if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
	pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
		/* hash pool id and seed so that pool PGs do not overlap */
	raw_to_up_osds(osdmap, pi, up);
		pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
	apply_primary_affinity(osdmap, pi, pps, up);
				     ceph_stable_mod(pgid.seed, pool->pgp_num,
	get_temp_osds(osdmap, pi, raw_pgid, acting);
						     pool->pgp_num_mask),
	if (!acting->size) {
				     pgid.pool);
		memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
	} else {
		acting->size = up->size;
		/*
		if (acting->primary == -1)
		 * legacy behavior: add ps and pool together.  this is
			acting->primary = up->primary;
		 * not a great approach because the PGs from each pool
		 * will overlap on top of each other: 0.5 == 1.4 ==
		 * 2.3 == ...
		 */
		pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
				      pool->pgp_num_mask) +
			(unsigned)pgid.pool;
	}
	}

out:
	len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
	WARN_ON(!osds_valid(up) || !osds_valid(acting));
	if (len < 0) {
		*primary = -1;
		return len;
	}

	len = raw_to_up_osds(osdmap, pool, osds, len, primary);

	apply_primary_affinity(osdmap, pps, pool, osds, len, primary);

	len = apply_temps(osdmap, pool, pgid, osds, len, primary);

	return len;
}
}


/*
/*
@@ -1844,11 +1900,9 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 */
 */
int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
{
{
	int osds[CEPH_PG_MAX_SIZE];
	struct ceph_osds up, acting;
	int primary;

	ceph_calc_pg_acting(osdmap, pgid, osds, &primary);


	return primary;
	ceph_pg_to_up_acting_osds(osdmap, &pgid, &up, &acting);
	return acting.primary;
}
}
EXPORT_SYMBOL(ceph_calc_pg_primary);
EXPORT_SYMBOL(ceph_calc_pg_primary);