SQL/XML Dumps/Stubs, page logs, abstracts

From mediawiki.org

How (not) to generate stubs, page logs, and abstract dumps[edit]

TL;DR: It's complicated. Too complicated.

And now, here's the long version.

Among the other items we dump are metadata for each page and revision (“stubs”), the logs of what actions have been taken on pages (moves, deletions, patrolling, page creation and so on), and abstracts, which are small snippets of each page taken from the introductory section of the content.

As we do for all dumps, we try to break these into small pieces, so that if any one piece fails, we can retry it a few times before giving up completely. Giving up would mean that we have to restart the job from the beginning, and for larger wikis, even though the output files are broken into pieces, each piece may still take days to generate.

Let’s see how we do this for page logs, the simplest of the cases. The basics of the code flow are the same for the other two jobs.

Generation of page logs[edit]

Let’s look at xmllogs.py ([1]).

 

#!/usr/bin/python3
'''
generate an xml dump via multiple runs of a php script instead of one
long run.

avoids memory leak issues, permits retries when a single run fails,
recovery if db servers go away in the middle of a run by retrying
the run.
'''

import os
import sys
import getopt
from dumps.wikidump import Config
from dumps.fileutils import FileUtils
from dumps.utils import MultiVersion

 

That import of gzippit_append should tip us off that something ugly is coming our way.

 

 

from xmlstreams import gzippit_append, do_xml_stream


 

We’ll look at the dologsbackup method in a little bit. Let’s look at the main part of the script first.

The xmllogs.py script takes an optional start and end log id range, so that we can generate multiple output files for the big wikis (6 output files from 6 parallel processes) and the huge wikis (27 output files for the same number of processes for enwiki and wikidatawiki). For the other wikis a single output file is generated. This is true also for the stub and abstract dumps.

Otherwise the script takes the standard args: verbose and dryrun which do what you would expect, the name of the wiki which is of course required, the path to the dumps config file, and the full path of the output file.

 

 

def usage(message=None):
    """
    display a helpful usage message with
    an optional introductory message first
    """
    if message is not None:
        sys.stderr.write(message)
        sys.stderr.write("\n")
    usage_message = """
Usage: xmllogs.py --wiki wikidbname --outfile path
    [--start number] [--end number]
    [--config path[:overrides]] [--dryrun] [--verbose]

Options:

  --wiki    (-w):      wiki db name, e.g. enwiki
  --outfile (-o):      full path to xml logs dump that will be created

  --start   (-s):      starting log id to dump (default: 1)
  --end     (-e):      ending log id to dump, exclusive of this entry (default: dump all)

  --config  (-C):      path to wikidump configfile (default: "wikidump.conf" in current dir)
                       if followed by : and a name, this section name in the config file
                       will be used to override config settings in default sections
  --dryrun  (-d):      display the commands that would be run to produce the output but
                       don't actually run them
  --verbose (-v):      display the commands that would be run to produce the output but
                       don't actually run them
"""
    sys.stderr.write(usage_message)
    sys.exit(1)

​​

 

OK, we get all the args, we parse the config file and add in the specific settings for our wiki, and we call the one method of interest: dologsbackup(). We'll look at that next.

 

 

def main():
    'main entry point, does all the work'
    wiki = None
    output_file = None
    start = None
    end = None
    configfile = "wikidump.conf"
    dryrun = False
    verbose = False

    try:
        (options, remainder) = getopt.gnu_getopt(
            sys.argv[1:], "w:o:s:e:C:fhv",
            ["wiki=", "outfile=",
             "start=", "end=", "config=",
             "help", "dryrun", "verbose"])

    except getopt.GetoptError as err:
        usage("Unknown option specified: " + str(err))
    for (opt, val) in options:
        if opt in ["-w", "--wiki"]:
            wiki = val
        elif opt in ["-o", "--outfile"]:
            output_file = val
        elif opt in ["-s", "--start"]:
            start = val
        elif opt in ["-e", "--end"]:
            end = val
        elif opt in ["-C", "--config"]:
            configfile = val
        elif opt in ["-d", "--dryrun"]:
            dryrun = True
        elif opt in ["-v", "--verbose"]:
            verbose = True
        elif opt in ["-h", "--help"]:
            usage('Help for this script\n')
        else:
            usage("Unknown option specified: <%s>" % opt)

    if remainder:
        usage("Unknown option(s) specified: <%s>" % remainder[0])

    if wiki is None:
        usage("mandatory argument argument missing: --wiki")
    if output_file is None:
        usage("mandatory argument argument missing: --output")

    if start is not None:
        if not start.isdigit():
            usage("value for --start must be a number")
        else:
            start = int(start)

    if end is not None:
        if not end.isdigit():
            usage("value for --end must be a number")
        else:
            end = int(end) - 1

    wikiconf = Config(configfile)
    wikiconf.parse_conffile_per_project(wiki)
    dologsbackup(wiki, output_file, wikiconf, start, end, dryrun, verbose)


 

The obligatory “work around pylint whines” code ;-)

 

 

if __name__ == '__main__':
    main()

 

Now let's have a look at dologsbackup()

 

 

def dologsbackup(wikidb, outfile,
                 wikiconf, start, end, dryrun, verbose):
    '''
    do a logs xml dump one piece at a time, writing into uncompressed
    temporary files and shovelling those into gzip's stdin for the
    concatenated compressed output
    '''

 

Why use a dict (hash, associative array) for only one file type (logs)? Because when we generate stubs, that dump has three file types, and we want to reuse code wherever possible…

 

 

    outfiles = {'logs': {'name': outfile}}
    for filetype in outfiles:

 

We write into a file in the dumps temp dir and tack _tmp onto the end of the name for good measure.

 

 

        outfiles[filetype]['temp'] = os.path.join(
            FileUtils.wiki_tempdir(wikidb, wikiconf.temp_dir),
            os.path.basename(outfiles[filetype]['name']) + "_tmp")

 

Dryrun? We won’t compress at all. Because we won’t be actually running anything, heh.

 

 

        if dryrun:
            outfiles[filetype]['compr'] = [None, outfiles[filetype]['name']]
        else:

 

Yes we could be gzipping a file which ends in .bz2 or .txt, but that’s the caller’s problem for using a crap output file name :-P

 

 

            outfiles[filetype]['compr'] = [gzippit_append, outfiles[filetype]['name']]

 

This gets the path to the maintenance script to run, with the path to MWScript.php ([2]) prepended if needed (this is determined by the dumps config file).

 

 

    script_command = MultiVersion.mw_script_as_array(wikiconf, "dumpBackup.php")

 

We set up (part of) the command to be run. The --logs says to dump page logs rather than some other type of thing.

 

 

    command = [wikiconf.php] + script_command

    command.extend(["--wiki=%s" % wikidb,
                    "--logs", "--report=1000",
                    "--output=file:%s" % outfiles['logs']['temp']
                    ])

 

We write an xml header at the beginning of the file, with no other output. The header is gzipped all by itself. We write an xml footer at the end of the file, with no other output. It too is gzipped all by itself. We don’t want headers or footers in the middle, which contains all of the data. That too is a separate gzipped stream. When all of these are concatenated together, that is a valid gzip object which gzip tools can decompress with no special parameters required.

 

 

    do_xml_stream(wikidb, outfiles, command, wikiconf,
                  start, end, dryrun, 'log_id', 'logging',
                  50000, 100000, '</logitem>\n', verbose=verbose, header=True)
    do_xml_stream(wikidb, outfiles, command, wikiconf,
                  start, end, dryrun, 'log_id', 'logging',
                  50000, 100000, '</logitem>\n', verbose=verbose)
    do_xml_stream(wikidb, outfiles, command, wikiconf,
                  start, end, dryrun, 'log_id', 'logging',
                  50000, 100000, '</logitem>\n', verbose=verbose, footer=True)

Pretty simple, right? But now we get to look at xmlstreams.py ([3]) which is where do_xml_stream() is defined. Here we go!

Xmlstreams for stubs, page logs, abstracts[edit]

 

#!/usr/bin/python3
'''
generate an xml dump via multiple runs of a php script instead of one
long run.

avoids memory leak issues, permits retries when a single run fails,
recovery if db servers go away in the middle of a run by retrying
the run.
'''

import os
import sys
import time
import traceback
from subprocess import Popen, PIPE
from dumps.utils import DbServerInfo
from dumps.wikidump import Wiki


# fix all the error returns and make subroutines out of stuff
# current code puts together a command with a bunch of crap in it


 

Note that the table and the row id name for the table are passed in because we might be dumping info about pages or page logs.

 

 

def do_xml_stream(wikidb, outfiles, command, wikiconf,
                  start, end, dryrun, id_field, table,
                  small_interval, max_interval, ends_with,
                  verbose=False, callback=None, header=False, footer=False):
    '''
    do an xml dump one piece at a time, writing into uncompressed
    temporary files and shovelling those into gzip's stdin for the
    concatenated compressed output

    if header is True, write only the header
    if footer is True, write only the footer
    '''

 

If no start or end is passed in, we’ll dump the whole thing, from the first page log entry to the last one (hence the call to get_max_id to find out what the last one is). small_interval and max_interval are the number of items to dump in a batch which can be retried upon failure without having lost too much time on the failed run, so not days, let's say.

The small_interval value is used for tiny little wikis, basically a minimum size beyond which it’s silly to ask for smaller batches than that. The max_interval is the max number of rows we will dump in a batch even on a giant wiki.

 

 

    if start is None:
        start = 1

    interval = None
    if end is None:
        end = get_max_id(wikiconf, wikidb, id_field, table)
        # if the whole wiki is small enough, take
        # arbitrary hopefully reasonable slices
        if start == 1 and end < 1000000:
            interval = small_interval

    if interval is None:
        # hope this is not too awful a guess
        interval = int((int(end) - int(start)) / 50)
        if interval == 0:
            interval = 1
        elif interval > max_interval:
            interval = max_interval

    interval_save = interval

 

Just dump the header all by itself if the header was requested.

 





    if header:
        # get just the header
        piece_command = [field for field in command]
        piece_command.extend(["--skip-footer", "--start=1", "--end=1"])
        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
                    outfiles[filetype]['compr'][1])

        do_xml_piece(piece_command, outfiles, wikiconf, dryrun=dryrun, verbose=verbose)
        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'].stdin.close()
            for filetype in outfiles:
                outfiles[filetype]['process'].wait()

 

Just dump the footer if that’s what’s requested. These are compressed files all on their own. In practice, they are gzipped files, since that's always the compression type wanted for stubs, page logs or abstracts. And we can concat them together with the body later to make a well-formed gzip file that is easy to manipulate later, for example for recombining several of these in a row, removing headers and footers on all but the first and last file in the sequence.

 





    elif footer:
        # get just the footer
        piece_command = [field for field in command]
        piece_command.extend(["--skip-header", "--start=1", "--end=1"])
        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
                    outfiles[filetype]['compr'][1])

        do_xml_piece(piece_command, outfiles, wikiconf, dryrun=dryrun, verbose=verbose)
        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'].stdin.close()
            for filetype in outfiles:
                outfiles[filetype]['process'].wait()

 

Here we dump all the actual data ;-)

 

 

    else:
        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
                    outfiles[filetype]['compr'][1])

 

This callback stuff is only used for stubs and it’s complicated, so we’ll just skip right by it…

 

 

        if callback is not None:
            wiki = Wiki(wikiconf, wikidb)
            db_info = DbServerInfo(wiki, wikidb)

        upto = start
        while upto <= end:
            if callback is not None:
                interval = callback(upto, interval_save, wiki, db_info)

 

Here we will call do_xml_piece() passing in the actual maintenance script command.

 

 

            piece_command = [field for field in command]
            piece_command.append("--skip-header")
            piece_command.extend(["--start=%s" % str(upto)])
            piece_command.append("--skip-footer")
            if upto + interval <= end:
                piece_command.extend(["--end", str(upto + interval)])
            else:
                piece_command.extend(["--end", str(end + 1)])
            upto = upto + interval
            do_xml_piece(piece_command, outfiles, wikiconf, ends_with,
                         dryrun=dryrun, verbose=verbose)

 

Close up the output file and wait for all the processes to complete. We can wait for them in any order, since we can't move on to the next thing until they are all complete. That's it!

 






        if not dryrun:
            for filetype in outfiles:
                outfiles[filetype]['process'].stdin.close()
            for filetype in outfiles:
                outfiles[filetype]['process'].wait()

    if dryrun:
        return


 

But of course now we have to look at do_xml_piece() which does the real work.

Although the comment says we do three retries, the number of retries is actually a configuration setting. Woops!

 





















def do_xml_piece(command, outfiles, wikiconf, ends_with=None, dryrun=False, verbose=False):
    '''
    do one piece of a logs dump, output going uncompressed
    to a temporary file and the that file being shovelled
    into the compressor's stdin

    we do three retries with plenty of delay, in case
    the db server has issues or some other problem
    crops up
    '''

    if dryrun:
        sys.stderr.write("would run command: %s\n" % " ".join(command))
        return

    if verbose:
        sys.stderr.write("running command: %s\n" % " ".join(command))

    retries = 0
    maxretries = wikiconf.max_retries
    timeout = 60

 

Try the command until we get success or hit maxretries.

We wait a bit in between retries, in case it's a db server issue or a network hiccup, something that might be remedied in a few minutes. All partial output files are removed on failure, just to keep cruft from accumulating on disk.

 











    while retries < maxretries:
        try:
            result = run_script(command, outfiles, ends_with)
        except Exception:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
            result = False
        if result:
            break
        time.sleep(timeout)
        timeout = timeout * 2
        retries += 1
    if not result:
        sys.stderr.write("failed job after max retries\n")
        for filetype in outfiles:
            try:
                # don't bother to save partial files, cleanup everything
                outfiles[filetype]['compr'].stdin.close()
                os.unlink(outfiles[filetype]['temp'])
            except Exception:
                # files might not be there, we don't care
                pass
        sys.exit(1)

    errors = False
    for filetype in outfiles:
        try:

 

Cat (or gzcat) the temp file to the final output file. That’s right, we write out an uncompressed intermediate file to disk, read it, and feed it to the compressor.

Here stuff can go awry; if, for some reason, we can’t feed the temp file to the output compressor correctly, then all bets are off.

 

 

            catfile(outfiles[filetype]['temp'], outfiles[filetype]['process'])
        except Exception:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
            errors = True
            try:
                # get rid of the final output file, it's crap now
                os.unlink(outfiles[filetype]['compr'][1])
            except Exception:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                sys.stderr.write(repr(traceback.format_exception(
                    exc_type, exc_value, exc_traceback)))

 

Clean up all the temp files, whether we won or lost…

 

 

    # get rid of all temp files, regardless
    for filetype in outfiles:
        try:
            os.unlink(outfiles[filetype]['temp'])
        except Exception:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))

 

And if something unrecoverable broke, then we have to give the bad news to the caller.

 

 

    if errors:
        # consider ourselves screwed. the parent process can start over
        sys.exit(1)

 

The other fun things to look at here are the methods that handle the i/o for these temp files and the final output file. Not much to say here except that it’s all very gross.

I'm not even sure that using a large argument like that to read() is the best thing over NFS or not.

Note again that we make use of gzip's "concat multiple gzip files together and get a valid gzip file as a result" property.

 









def catfile(inputfile, process):
    '''
    read a file, cat it as fast as possible to the
    stdin of the process passed, then go away
    '''
    with open(inputfile, "r") as fhandle:
        while True:
            content = fhandle.read(1048576)
            if not content:
                fhandle.close()
                break
            process.stdin.write(content.encode('utf-8'))


def gzippit_append(outfile):
    '''
    start a gzip process that reads from stdin
    and appends to the specified file
    '''
    process = Popen("gzip >> %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
    return process


def gzippit(outfile):
    '''
    start a gzip process that reads from stdin
    and writes to the specified file
    '''
    process = Popen("gzip > %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
    return process


def catit(outfile):
    '''
    start a cat process that reads from stdin
    and writes to the specified file
    '''
    process = Popen("cat > %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
    return process


 

We won’t bother to delve into run_script(), which is straight forward enough if you just read the code.

There is a little sanity check on the output, to be sure it ends with the right xml tag, if that's a thing for the specific kind of file being produced.

 























def run_script(command, outfiles, shouldendwith=None):
    '''
    given a command
    returns True on success, None on failure
    '''
    failed = False
    process = Popen(command)
    # would be best for there to be a timeout for this eh?
    process.wait()
    retval = process.returncode
    if not retval:
        for filetype in outfiles:
            outfile = outfiles[filetype]['temp']
            if os.path.exists(outfile):
                # file could be empty (all pages in the range deleted)
                if os.path.getsize(outfile) > 0:
                    if shouldendwith is not None:
                        with open(outfile, 'rb') as outfd:
                            outfd.seek(len(shouldendwith) * -1, os.SEEK_END)
                            remainder = outfd.read().decode('utf-8')
                            outfd.close()
                            if remainder != shouldendwith:
                                os.unlink(outfile)
                                sys.stderr.write(
                                    "bad output saved to {ofile} from '{command}'\n".format(
                                        ofile=outfile, command=" ".join(command)))
                                failed = True
    else:
        sys.stderr.write("nonzero return {retval} from command '{command}'\n".format(
            retval=retval, command=" ".join(command)))
        failed = True

    if failed:
        return False
    return True


 

Same for get_max_id(), which is reproduced here just for completeness.

We do try the select a few times in case of failure, just in case the db server becomes temporarily unvailable, for example. No point in having to abort an entire dump because of a momentary glitch.

 

























def get_max_id(wikiconf, wikidb, id_field, table):
    '''
    retrieve the largest id for this wiki from the db for specific table
    pass in name of id field, name of table
    '''
    wiki = Wiki(wikiconf, wikidb)

    db_info = DbServerInfo(wiki, wikidb)
    query = "select MAX(%s) from %s%s;" % (
        id_field, db_info.get_attr('db_table_prefix'), table)
    results = None
    retries = 0
    maxretries = wiki.config.max_retries
    end = 0
    results = db_info.run_sql_and_get_output(query)
    if results:
        lines = results.splitlines()
        if lines and lines[1]:
            if not lines[1].isdigit():
                return 0   # probably NULL or missing table
            end = int(lines[1])
            return end

    while results is None and retries < maxretries:
        retries = retries + 1
        time.sleep(5)
        results = db_info.run_sql_and_get_output(query)
        if not results:
            continue
        lines = results.splitlines()
        if lines and lines[1]:
            end = int(lines[1])
            break

    if not end:
        sys.stderr.write("failed to get max page id from db, exiting\n")
        sys.exit(1)
    else:
        return end


Yes, it is gross, I admit it[edit]

For the last bit of gross, note that xmlllogs.py is called as a script from the xmljobs.py module. So we have:

worker.py -> potentially multiple copies of xmllogs.py -> multiple runs of dumpBackup.php for each copy, where “->” is the “why yes we are forking out” operator :-P