Identifying corrupted rows and recovering healthy rows

Matthew Gwillam
Matthew Gwillam
  • Updated

Abstract

This article is part of series:

This article provides functions for operating on corrupted tables and demonstrates use of them.

  • The first two functions find which rows are corrupted
    • 1] edb_check_table_row_corruption, 2] edb_check_table_row_corruption_parallel.
  • The last two functions find which rows are corrupted, and save the healthy rows to a new table, ready to be dumped
    • 3] edb_preserve_healthy_rows, 4] edb_preserve_healthy_rows_parallel.

If you just want to find out which rows are corrupted, run functions 1] or 2].

If you want to find out which rows are corrupted, and save the healthy rows, run functions 3] or 4].

Identifying corrupted rows

1] Identify corrupted rows single thread (small table)

This function should be used if the table is small, or only one CPU is available.

Create DDL:

CREATE TABLE edb_corrupted_rows(schemaname TEXT, tablename TEXT, t_ctid TID, sqlstate TEXT, sqlerrm TEXT);

CREATE OR REPLACE FUNCTION edb_check_table_row_corruption(schemaname TEXT, tablename TEXT)
RETURNS VOID AS $$
DECLARE
    rec RECORD;
    tmp RECORD;
    t_ctid TID;
tmp_text TEXT;
BEGIN
    FOR rec IN EXECUTE 'SELECT ctid
    FROM ' || quote_ident(schemaname) || '.' || quote_ident(tablename)
    LOOP
    BEGIN
        t_ctid := rec.ctid;
        BEGIN
            EXECUTE 'SELECT * FROM '
            || quote_ident(schemaname) || '.' || quote_ident(tablename)
            || ' WHERE ctid = ''' || t_ctid || '''::tid'
            INTO STRICT tmp;
            tmp_text := tmp::text;
        EXCEPTION WHEN OTHERS THEN
        BEGIN
            INSERT INTO edb_corrupted_rows VALUES(schemaname, tablename, t_ctid, SQLSTATE::text,SQLERRM::text);
        COMMIT;
        END;
    END;
END;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;

Run the function:

select edb_check_table_row_corruption( <enter_schema>','<enter_table>');

Examine the corrupted rows:

select count(*) from edb_corrupted_rows;
select * from edb_corrupted_rows limit 10;

2] Identify corrupted rows multi thread (large table)

This function should be used to reduce execution time when analysing large tables and multiple CPUs are available. Author Jakub Wartak.

Create DDL:

CREATE TABLE edb_corrupted_rows_parallel(schemaname TEXT, tablename TEXT, t_ctid TID, sqlstate TEXT, sqlerrm TEXT);

CREATE OR REPLACE PROCEDURE edb_check_table_row_corruption_parallel(schemaname TEXT, tablename TEXT, worker int default 1, dop int default 1)  AS $$
DECLARE
    tmp RECORD;
    tmp_text TEXT;
    checkctid TEXT;
    s TEXT;
    totalblks BIGINT;
    blockno BIGINT;
    pct FLOAT;
    c INT:=0;
BEGIN
    IF (worker > dop OR worker = 0) THEN
        RAISE NOTICE 'wrong usage, worker needs to start at 1 and cannot be higher or requal than dop';
        RETURN;
    END IF;

    SELECT pg_relation_size(schemaname || '.' || tablename)/current_setting('block_size')::integer INTO totalblks;
    SELECT (100/dop)::float INTO pct;
    RAISE NOTICE 'total % blocks found, will read % %% of them in this session', totalblks, pct;
    FOR blockno IN 0..totalblks
    LOOP
        BEGIN
            -- split the rows across many workers
            IF MOD(blockno, dop)+1 <> worker THEN
                CONTINUE;
            END IF;

            -- tid-scan/fetch whole DB page with all LP ids at once
            -- just scanning up to 292 lpids (as in original non-parallel version
			-- magic number equation: 8192 (blocksize) / (27(sizeof HeapTupleHeaderData) + min 1 byte for data) =~ 292)
            s := format($q$ SELECT * FROM %I.%I WHERE ctid = ANY (ARRAY(SELECT ('(%s, ' || s.i || ')')::tid FROM generate_series(0,292) AS s(i) )) $q$,
                schemaname, tablename, blockno);
            EXECUTE s INTO tmp;
            tmp_text := tmp::text;


        EXCEPTION WHEN OTHERS THEN
            RAISE NOTICE 'detected damaged blockno % because of % and %', blockno, SQLSTATE::text, SQLERRM::text;
            -- if it fails then pinpoint it individually/slow path
            FOR lpid IN 1..292 LOOP
                BEGIN
                    checkctid := '(' || blockno || ',' || lpid || ')';
                    s := format ($q$ SELECT * FROM %I.%I  WHERE ctid = '%s'::tid $q$, schemaname, tablename, checkctid);
                    --RAISE NOTICE 'running %', s;
                    EXECUTE s INTO tmp;
                EXCEPTION WHEN OTHERS THEN
                    --RAISE NOTICE 'failed checking %: %: % ', checkctid, SQLSTATE::text, SQLERRM::text;
                    INSERT INTO edb_corrupted_rows_parallel VALUES(schemaname, tablename, checkctid::tid, SQLSTATE::text, SQLERRM::text);
                END;
            END LOOP; -- for lpid
        END;

        c := c + 1;
        IF (c > 100) THEN
            c := 0;
            COMMIT;
        END IF;

    END LOOP;
END;
$$ LANGUAGE PLPGSQL;

Usage (more threads => more bandwidth from storage => bigger impact on the system):

Example of 4 threads.

Run the function:

CALL edb_check_table_row_corruption_parallel('<enter_schema>', '<enter_table>', 1, 4); -- in 1st session
CALL edb_check_table_row_corruption_parallel('<enter_schema>', '<enter_table>', 2, 4); -- in 2nd session
CALL edb_check_table_row_corruption_parallel('<enter_schema>', '<enter_table>', 3, 4); -- in 3rd session
CALL edb_check_table_row_corruption_parallel('<enter_schema>', '<enter_table>', 4, 4); -- in 4th session

One can assume 1 such session can use 100-200MB/s IO.

Examine the corrupted rows:

select count(*) from edb_corrupted_rows;
select * from edb_corrupted_rows limit 10;

Identify corrupted rows, and save healthy rows

The following two functions (single thread, and multi thread) go one step further, identifying the corrupted rows, and saving the non corrupted rows. Author Dilip Kumar.

3] Identify corrupted rows single thread and save healthy rows (small table)

This function should be used if the table is small, or only one CPU is available.

Create DDL:

CREATE TABLE edb_corrupted_rows(schemaname TEXT,
                                tablename TEXT,
                                t_ctid TID,
                                sqlstate TEXT,
                                sqlerrm TEXT);

CREATE TABLE edb_backup_healthy_rows as select * from "<enter_corrupted_table>" limit 0;
-- If `CREATE TABLE edb_backup_healthy_rows` fails, run \d+ "<enter_corrupted_table>", and re-create a copy manually

CREATE OR REPLACE PROCEDURE edb_preserve_healthy_rows(schemaname TEXT, tablename TEXT) AS $$
DECLARE
    rec RECORD;
    tmp RECORD;
    tmp_text TEXT;
    table_name TEXT;
    counter INT := 0;
BEGIN
    table_name = schemaname || '.' || tablename;

    FOR rec IN EXECUTE format($q$
        SELECT '(' || b || ','|| generate_series(0,292) || ')' AS generated_tid
            FROM generate_series(0, pg_relation_size('%I.%I')/current_setting('block_size')::integer) b
    $q$, schemaname, tablename)
    LOOP
    BEGIN
        BEGIN
            tmp_text = 'INSERT INTO edb_backup_healthy_rows SELECT * FROM '
                    || quote_ident(schemaname) || '.' || quote_ident(tablename)
                    || ' WHERE ctid = ''' || rec.generated_tid || '''::tid';
            EXECUTE tmp_text;
            counter := counter +1;
        EXCEPTION WHEN OTHERS THEN
        BEGIN
            INSERT INTO edb_corrupted_rows VALUES(schemaname, tablename, rec.generated_tid::tid, SQLSTATE::text, SQLERRM::text);
        END;
        END;
    END;
    END LOOP;
END;
$$ LANGUAGE PLPGSQL;

Run function

In psql:

CALL edb_preserve_healthy_rows('<enter_schema_name>', '<enter_table_name>');

OR

In nohup/screen:

If it is a large table being analysed but you only have one CPU and need to run single thread, it can take hours and the remote shell connection can be killed due to inactivity. To prevent this, run using your preferred tool nohup or screen. This demo demonstrates nohup:

echo "CALL edb_preserve_healthy_rows('<enter_schema_name>', '<enter_table_name>');" > call.sql

nohup edb-psql -h localhost -f call.sql -p 5444 -U enterprisedb -d edb &

Examine the corrupted rows:

select count(*) from edb_corrupted_rows;
select * from edb_corrupted_rows limit 10;

Examine content of healthy copy of corrupted table:

select count(*) from edb_backup_healthy_rows;
select * from edb_backup_healthy_rows limit 10;

4] Identify corrupted rows multithread and save healthy rows (large table)

Create DDL:

CREATE TABLE edb_corrupted_rows_parallel(schemaname TEXT,
                                tablename TEXT,
                                t_ctid TID,
                                sqlstate TEXT,
                                sqlerrm TEXT);

CREATE TABLE edb_backup_healthy_rows_parallel as select * from "<enter_corrupted_table>" limit 0;
-- If `CREATE TABLE edb_backup_healthy_rows` fails, run \d+ "<enter_corrupted_table>", and re-create a copy manually

CREATE OR REPLACE PROCEDURE edb_preserve_healthy_rows_parallel(schemaname TEXT, tablename TEXT, worker int default 1, dop int default 1)  AS $$
DECLARE
    tmp RECORD;
    tmp_text TEXT;
    insert_text TEXT;
    checkctid TEXT;
    s TEXT;
    s2 TEXT;
    totalblks BIGINT;
    blockno BIGINT;
    pct FLOAT;
    c INT:=0;
BEGIN
    IF (worker > dop OR worker = 0) THEN
        RAISE NOTICE 'wrong usage, worker needs to start at 1 and cannot be higher or requal than dop';
        RETURN;
    END IF;
    SELECT pg_relation_size(schemaname || '.' || tablename)/current_setting('block_size')::integer INTO totalblks;
    SELECT (100/dop)::float INTO pct;
    RAISE NOTICE 'total % blocks found, will read % %% of them in this session', totalblks, pct;
    FOR blockno IN 0..totalblks
    LOOP
        BEGIN
            -- split the rows across many workers
            IF MOD(blockno, dop)+1 <> worker THEN
                CONTINUE;
            END IF;
            -- tid-scan/fetch whole DB page with all LP ids at once
            -- just scanning up to 292 lpids (as in orignal non-parallel version
			-- magic number equation: 8192 (blocksize) / (27(sizeof HeapTupleHeaderData) + min 1 byte for data) =~ 292)
            s := format($q$ SELECT * FROM %I.%I WHERE ctid = ANY (ARRAY(SELECT ('(%s, ' || s.i || ')')::tid FROM generate_series(0,292) AS s(i) )) $q$,
                schemaname, tablename, blockno);
            EXECUTE s INTO tmp;
            tmp_text := tmp::text;

            s2 := format($q$ INSERT INTO edb_backup_healthy_rows_parallel SELECT * FROM %I.%I WHERE ctid = ANY (ARRAY(SELECT ('(%s, ' || s.i || ')')::tid FROM generate_series(0,292) AS s(i) )) $q$,schemaname, tablename, blockno);
            EXECUTE s2;

        EXCEPTION WHEN OTHERS THEN
            RAISE NOTICE 'detected damaged blockno % because of % and %', blockno, SQLSTATE::text, SQLERRM::text;
            -- if it fails then pinpoint it individually/slow path
            FOR lpid IN 1..292 LOOP
                BEGIN
                    checkctid := '(' || blockno || ',' || lpid || ')';
                    s := format ($q$ SELECT * FROM %I.%I  WHERE ctid = '%s'::tid $q$, schemaname, tablename, checkctid);
                    --RAISE NOTICE 'running %', s;
                    EXECUTE s INTO tmp;

                    s2 := format ($q$ INSERT INTO edb_backup_healthy_rows_parallel SELECT * FROM %I.%I  WHERE ctid = '%s'::tid $q$, schemaname, tablename, checkctid);
                    EXECUTE s2;

                EXCEPTION WHEN OTHERS THEN
                    --RAISE NOTICE 'failed checking %: %: % ', checkctid, SQLSTATE::text, SQLERRM::text;
                    INSERT INTO edb_corrupted_rows_parallel VALUES(schemaname, tablename, checkctid::tid, SQLSTATE::text, SQLERRM::text);
                END;
            END LOOP; -- for lpid
        END;
        c := c + 1;
        IF (c > 100) THEN
            c := 0;
            COMMIT;
        END IF;
    END LOOP;
END;
$$ LANGUAGE PLPGSQL;

Usage (several SSH sessions/screen(1), more sessions => more bandwidth from storage => bigger impact on the system).

Example of 4 threads.

For a very large table, running the function can take hours and the remote shell connection can be killed due to inactivity. To prevent this, run using your preferred tool nohup or screen. This demo demonstrates nohup:

Run function

In nohup:

echo "CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 1, 4);" > call1.sql
echo "CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 2, 4);" > call2.sql
echo "CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 3, 4);" > call3.sql
echo "CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 4, 4);" > call4.sql

nohup edb-psql -h localhost -f call1.sql -p 5444 -U enterprisedb -d edb &
nohup edb-psql -h localhost -f call2.sql -p 5444 -U enterprisedb -d edb &
nohup edb-psql -h localhost -f call3.sql -p 5444 -U enterprisedb -d edb &
nohup edb-psql -h localhost -f call4.sql -p 5444 -U enterprisedb -d edb &

OR if you can't install nohup/screen, in different psql sessions (may be killed due to inactivity):

CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 1, 4);
CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 2, 4);
CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 3, 4);
CALL edb_preserve_healthy_rows_parallel('<enter_schema>', '<enter_table>', 4, 4);

Examine the corrupted rows:

select count(*) from edb_corrupted_rows;
select * from edb_corrupted_rows limit 10;

Examine content of healthy copy of corrupted table:

select count(*) from edb_backup_healthy_rows;
select * from edb_backup_healthy_rows limit 10;

Was this article helpful?

0 out of 0 found this helpful