# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2019-2024 Andrew Rechnitzer
# Copyright (C) 2021-2025 Colin B. Macdonald
"""Tools for manipulating version maps."""
import csv
import json
import random
from pathlib import Path
# TODO: go through and fix all the places with str(q+1)
# TODO: there is some documentation of "param" below that should move elsewhere
def check_version_map(
vm: dict[int, dict[int | str, int]],
spec=None,
*,
legacy: bool = False,
required_papers: list[int] | None = None,
num_questions: int | None = None,
num_versions: int | None = None,
) -> None:
"""Correctness checks of a version maps.
Args:
vm: a dict-of-dicts describing versions. See the output
of :func:`plom.make_random_version_map`.
spec (plom.SpecVerifier/dict): a plom spec or the underlying
dict, see :func:`plom.SpecVerifier`.
Keyword Args:
legacy: True if this version map is for a legacy server, which
is more strict about contiguous range of papers for example.
required_papers: A list of paper_numbers that the qv map must have.
num_questions: if specified, we'll ensure each row has versions
for each.
num_versions: how versions we expect. If specified, we'll check
the data from the file against this value.
Returns:
None
Raises:
ValueError: with a message about what is wrong.
"""
if spec:
# if both spec and the kwargs are passed, ensure they are consistent,
# although callers will probably user one or the other.
if num_questions is None:
num_questions = spec["numberOfQuestions"]
else:
if spec["numberOfQuestions"] != num_questions:
raise ValueError(
f"spec and num_questions={num_questions} do not match, spec: {spec}"
)
if num_versions is None:
num_versions = spec["numberOfVersions"]
else:
if spec["numberOfVersions"] != num_versions:
raise ValueError(
f"spec and num_versions={num_versions} do not match, spec: {spec}"
)
rowlens = set()
for t, qd in vm.items():
if not isinstance(t, int):
raise ValueError(f'paper number key "{t}" ({type(t)}) is not an integer')
if not isinstance(qd, dict):
raise ValueError(f'row "{qd}" of version map should be a dict')
if num_questions is not None:
if "id" in qd.keys():
if len(qd) != num_questions + 1:
raise ValueError(
f"length of row {qd} does not match num questions {num_questions}"
)
else:
if len(qd) != num_questions:
raise ValueError(
f"length of row {qd} does not match num questions {num_questions}"
)
# even if no spec we can ensure all rows the same
rowlens.add(len(qd))
for q, v in qd.items():
if q == "id":
pass
elif not isinstance(q, int):
raise ValueError(f'question key "{q}" ({type(q)}) is not an integer')
if not isinstance(v, int):
raise ValueError(f'version "{v}" ({type(v)}) should be an integer')
if not v > 0:
raise ValueError(f'version "{v}" should be strictly positive')
if num_versions is not None:
if not v <= num_versions:
raise ValueError(
f'paper_number {t}: version "{v}" can be at most '
f" number of versions = {num_versions}"
)
if spec:
# TODO: unsure about this: maybe we should doc that we ignore "select"
# when custom version maps are used.
# TODO: revisit this when working on Issue #2261.
if spec["question"][str(q)]["select"] == "fix":
if not v == 1:
raise ValueError(
f'version "{v}" is not 1 but question is "fix" in spec {spec}'
)
if not len(rowlens) <= 1:
raise ValueError("Inconsistency in version map: not all rows had same length")
# check if required papers are all present
if required_papers:
missing_papers = [X for X in required_papers if X not in vm]
if missing_papers:
raise ValueError(
f"Map is missing required papers: {missing_papers}. These were likely prenamed papers"
)
if not legacy:
return
# remaining checks should matter only for legacy servers
if spec and not len(vm) == spec["numberToProduce"]:
raise ValueError(
f"Legacy server requires numberToProduce={spec['numberToProduce']}"
f" to match the number of rows {len(vm)} of the version map"
)
if vm.keys():
min_testnum = min(vm.keys())
max_testnum = max(vm.keys())
if not min_testnum == 1:
raise ValueError(f"test_number should start at 1: got {list(vm.keys())}")
if not set(vm.keys()) == set(range(min_testnum, max_testnum + 1)):
raise ValueError(f"No gaps allowed in test_num: got {list(vm.keys())}")
def make_random_version_map(
spec, *, seed: str | None = None
) -> dict[int, dict[int | str, int]]:
"""Build a random version map.
Args:
spec (plom.SpecVerifier/dict): A plom exam specification or the
underlying dict. See :func:`plom.SpecVerifier`. The most
important properties are the `numberToProduce`, the
`numberOfQuestions`, and the `select` of each question.
Keyword Args:
seed: to get a reproducible version map, we can seed the
pseudo-random number generator. Unknown how portable this
is between Python versions or OSes.
Returns:
A dict-of-dicts keyed by paper number (int) and then
question number (int, but indexed from 1 not 0). Values are
integers.
Raises:
KeyError: invalid question selection scheme in spec.
"""
if seed is not None:
random.seed(seed)
# we want to have nearly equal numbers of each version - issue #1470
# first make a list which cycles through versions
vlist = [(x % spec["numberOfVersions"]) + 1 for x in range(spec["numberToProduce"])]
# now assign a copy of this for each question, so qvlist[question][testnumber]=version
qvlist = [
random.sample(vlist, len(vlist)) for q in range(spec["numberOfQuestions"])
]
# we use the above when a question is shuffled, else we just use v=1.
vmap: dict[int, dict[int | str, int]] = {}
for t in range(1, spec["numberToProduce"] + 1):
vmap[t] = {}
for g in range(spec["numberOfQuestions"]): # runs from 0,1,2,...
gs = str(g + 1) # now a str and 1,2,3,...
if spec["question"][gs]["select"] == "fix":
# there is only one version so all are version 1
vmap[t][g + 1] = 1
elif spec["question"][gs]["select"] == "shuffle":
# version selected randomly in [1, 2, ..., #versions]
# the below is purely random, so uneven distribution of versions
# vmap[t][g + 1] = random.randint(1, spec["numberOfVersions"])
# replace with more equal distribution of versions from qvlist above
vmap[t][g + 1] = qvlist[g][t - 1]
# offset by one due to indices starting from 0
# TODO: we may enable something like this later
# elif spec["question"][gs]["select"] == "param":
# # If caller does not provide a version, all are version 1.
# # Caller can provide a version to group their parameters by any
# # way they wish. Typically this would be be ease grading, e.g.,
# # * map negative parameters to v1 and positive to v2.
# # * map tuples (a,b) with common `b` value to same version.
# # In fact there is no significant difference between `param`
# # and `shuffle` when user data is provided. But clients or
# # other aspects of the software might behave differently.
# vmap[t][g + 1] = random.randint(1, spec["numberOfVersions"])
else:
raise KeyError(
'Invalid spec: question {} "select" of "{}" is unexpected'.format(
gs, spec["question"][gs]["select"]
)
)
return vmap
def undo_json_packing_of_version_map(
vermap_in: dict[str, dict[str, int]],
) -> dict[int, dict[int | str, int]]:
"""JSON must have string keys; undo such to int keys for version map.
Both the test number and the question number have likely been
converted to strings by an evil JSON: we build a new dict-of-dicts
with both converted explicitly to integers.
Note: sometimes the dict-of-dicts is key'd by page number instead
of question number. This same function can be used in that case.
"""
vmap = {}
for t, vers in vermap_in.items():
vmap[int(t)] = {(int(q) if q != "id" else q): v for q, v in vers.items()}
return vmap
def _version_map_from_json(
f: Path,
*,
required_papers: list[int] | None = None,
num_questions: int | None = None,
num_versions: int | None = None,
) -> dict:
with open(f, "r") as fh:
qvmap = json.load(fh)
qvmap = undo_json_packing_of_version_map(qvmap)
check_version_map(
qvmap,
required_papers=required_papers,
num_questions=num_questions,
num_versions=num_versions,
)
return qvmap
def _version_map_from_csv(
f: Path,
*,
required_papers: list[int] | None = None,
num_questions: int | None = None,
num_versions: int | None = None,
) -> dict[int, dict[int | str, int]]:
"""Extract the version map from a csv file.
Args:
f: a csv file, must have a `test_number` column
and some `q{n}.version` columns. The number of such columns
is generally autodetected unless ``num_questions`` kwarg is passed.
Optionally, there can be an `id.version` column,
This could be output of :func:`save_question_version_map`.
Keyword Args:
required_papers: A list of paper_numbers that the qv map must have.
num_questions: how many questions we expect. If specified, we'll
check the data from the file against this value.
num_versions: how versions we expect. If specified, we'll check
the data from the file against this value.
Returns:
dict: keys are the paper numbers (`int`) and each value is a row
of the version map: another dict with questions as question
number (`int`) and value version (`int`). If there was an
`id.version` column in the input, there will be `"id"` keys
(i.e., of type `str`).
Raises:
ValueError: values could not be converted to integers, or
other errors in the version map.
KeyError: wrong column header names.
"""
qvmap: dict[int, dict[int | str, int]] = {}
with open(f, "r") as csvfile:
reader = csv.DictReader(csvfile)
if not reader.fieldnames:
raise ValueError("csv must have column names")
if num_questions is None:
# in this case we have to autodetect...
N = len(reader.fieldnames) - 1
if "id.version" in reader.fieldnames:
N -= 1
else:
N = num_questions
for line, row in enumerate(reader):
# Its called "test_number" on legacy and "paper_number" on webplom
# raise a value error if you cannot find either.
if "paper_number" in row:
papernum = int(row["paper_number"])
elif "test_number" in row:
papernum = int(row["test_number"])
else:
raise ValueError("Cannot find paper_number column")
if papernum in qvmap.keys():
raise ValueError(
f"In line {line} Duplicate paper number detected: {papernum}"
)
try:
qvmap[papernum] = {
n: int(row[f"q{n}.version"]) for n in range(1, N + 1)
}
except KeyError as err:
raise KeyError(f"Missing column header {err}") from err
except ValueError as err:
raise ValueError(f"In line {line}: {err}") from err
try:
qvmap[papernum]["id"] = int(row["id.version"])
except KeyError:
pass # id.version is optional
except ValueError as err:
raise ValueError(f"In line {line}: {err}") from err
check_version_map(
qvmap,
required_papers=required_papers,
num_questions=num_questions,
num_versions=num_versions,
)
return qvmap
[docs]
def version_map_from_file(
f: Path | str,
*,
required_papers: list[int] | None = None,
num_questions: int | None = None,
num_versions: int | None = None,
) -> dict[int, dict[int | str, int]]:
"""Extract the version map from a csv or json file.
Args:
f: If ``.csv`` file, must have a `test_number`
column and some `q{n}.version` columns. The number of such
columns is autodetected. If ``.json`` file, its a dict of
dicts. Either case could, for example, be the output of
:func:`save_question_version_map`.
Keyword Args:
required_papers: A list of paper_numbers that the qv map must have.
num_questions: how many questions we expect. If specified, we'll
check the data from the file against this value.
num_versions: how versions we expect. If specified, we'll check
the data from the file against this value.
Returns:
keys are the paper numbers (`int`) and each value is a row
of the version map: another dict with questions as question
number (`int`) and value version (`int`).
Raises:
ValueError: values could not be converted to integers, or
other errors in the version map.
KeyError: wrong column header names.
"""
f = Path(f)
if f.suffix.casefold() not in (".json", ".csv"):
f = f.with_suffix(f.suffix + ".csv")
suffix = f.suffix
if suffix.casefold() == ".json":
return _version_map_from_json(
f,
required_papers=required_papers,
num_questions=num_questions,
num_versions=num_versions,
)
elif suffix.casefold() == ".csv":
return _version_map_from_csv(
f,
required_papers=required_papers,
num_questions=num_questions,
num_versions=num_versions,
)
else:
raise NotImplementedError(f'Don\'t know how to import from "{f}"')
[docs]
def version_map_to_csv(
qvmap: dict[int, dict[int | str, int]], filename: Path, *, _legacy: bool = True
) -> None:
"""Output a csv of the question-version map.
Arguments:
qvmap: the question-version map, documented elsewhere.
filename: where to save.
Keyword Args:
_legacy: if True, we call the column "test_number" else "paper_number".
Currently the default is True but this is expected to change.
Raises:
ValueError: some rows have differing numbers of questions.
"""
# all rows should have same length: get than length or fail
(N,) = {len(v) for v in qvmap.values()}
if _legacy:
header = ["test_number"]
else:
header = ["paper_number"]
has_id_versions = False
# do rows generally have "id" is in the keys?
if any("id" in row for row in qvmap.values()):
has_id_versions = True
if has_id_versions:
N -= 1
header.append("id.version")
for q in range(1, N + 1):
header.append(f"q{q}.version")
with open(filename, "w") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(header)
# make sure the rows are ordered by paper num (Issue #3597)
for t, row in sorted(qvmap.items()):
output_row = [t]
if has_id_versions:
output_row.append(row["id"])
output_row.extend([row[q] for q in range(1, N + 1)])
csv_writer.writerow(output_row)