zoukankan      html  css  js  c++  java
  • postgresql下一种对已有数据进行重新分表的plpgsql脚本

    背景:
      现有一个数据表task,目前已有数据位4000万左右,因为表太大,影响到数据库的操作性能,所以考虑对该task表中的数据进行重新分片。
    如果是在数据库设计的时候就考虑到这样的问题,可以采用postgresql的分区表,通过表继承以及创建一些trigger或者rules来实现这样的要求。但目前task表中已有大量的数据,所以直接多task表采用继承的方式来实现分片已经不太现实,另外insert到task表中时date_create已有值,但最终的表分区应该按照date_start,date_start只有在后期才会更新,在date_create时date_start为空;上层的应用已经成型,不可能更改代码。所以综合考虑下来采用传统的分区技术已经不太现实或者代价太高,无奈之下只能自己动手写一写plpgsql函数,通过crontab制定定时任务来对task表中的数据进行分块。
    具体代码如下:

    View Code
    --创建相对应的表
    CREATE OR REPLACE FUNCTION do_create_partition(table_name TEXT)
    RETURNS TEXT AS
    $BODY$
    DECLARE
    BEGIN
    RAISE NOTICE 'Begin to create patition ...';

    RAISE NOTICE 'partition table name = %', table_name;
    --创建分区
    /*
    EXECUTE 'CREATE TABLE '
    || table_name
    || '() INHERITS (task);';
    */
    EXECUTE 'CREATE TABLE '
    || table_name
    || '('
    || 'task_id integer NOT NULL,'
    || 'task_name text,'
    || 'task_type text,'
    || 'priority integer,'
    || 'expires integer,'
    || 'countdown integer,'
    || 'tube_name text,'
    || 'kwargs text,'
    || 'retries integer,'
    || 'status text,'
    || 'result text,'
    || 'date_start timestamp without time zone,'
    || 'date_done timestamp without time zone,'
    || 'traceback text,'
    || 'date_create timestamp without time zone,'
    || 'worker_id integer'
    || ')';
    --创建主键
    EXECUTE 'ALTER TABLE '
    || table_name
    || ' ADD CONSTRAINT '
    || table_name
    || '_task_id '
    ' PRIMARY KEY(task_id);';

    RAISE NOTICE '% created"', table_name;

    RETURN 'DONE';
    END
    $BODY$
    LANGUAGE 'plpgsql'

    --创建所需的分区表
    CREATE OR REPLACE FUNCTION create_partition_table()
    RETURNS TEXT AS
    $BODY$
    DECLARE
    p_table_name TEXT;
    now_dt TIMESTAMP;
    table_exist int;
    BEGIN
    RAISE NOTICE 'Begin to create partition table ...';
    --INSERT INTO task_20111018 VALUES (r.*)
    -- construct the table name
    --table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD');
    --获取当前时间
    SELECT CURRENT_TIMESTAMP INTO now_dt;
    p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD');
    RAISE NOTICE 'partition table = %', p_table_name;
    --检测所需的分区表是否存在
    SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=p_table_name;
    IF table_exist = 0 THEN
    --创建当前分区表
    RAISE NOTICE 'CREATE current partition table...';
    PERFORM do_create_partition(p_table_name);
    ELSE
    --检测创建第二天的分区表
    now_dt := now_dt + interval '1 day';
    p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD');
    RAISE NOTICE 'partition table = %', p_table_name;
    --检测所需的分区表是否存在
    SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=p_table_name;
    IF table_exist = 0 THEN
    --创建当前分区表
    RAISE NOTICE 'CREATE next partition table...';
    PERFORM do_create_partition(p_table_name);
    END IF;
    END IF;
    --INSERT INTO table_name VALUES (r.*);
    RAISE NOTICE 'DO..."';

    RETURN 'DONE';
    END
    $BODY$
    LANGUAGE 'plpgsql'


    --将task中的已有数据根据date_start转移到相应的分区表中
    CREATE OR REPLACE FUNCTION slice_data()
    RETURNS TEXT AS
    $BODY$
    DECLARE
    r task%rowtype;
    table_name TEXT;
    table_exist int;
    date_format TEXT;
    BEGIN
    RAISE NOTICE 'Begin to slice data ...';
    --将date_start在今天之前的相关记录进行转储
    FOR r IN SELECT * FROM task WHERE status in ('SUCCESS', 'FAILURE' ) and date_start is not NULL and date_start < current_date::timestamp --limit 1000000
    LOOP
    --INSERT INTO task_20111018 VALUES (r.*)
    -- construct the table name
    table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD');
    --table_name = 'task_20111018';
    RAISE NOTICE 'table_name = %', table_name;
    --INSERT INTO table_name VALUES (r.*);
    --检测所需的分区表是否存在
    SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=table_name;
    IF table_exist = 0 THEN
    --创建所需分区
    PERFORM do_create_partition(table_name);
    END IF;

    date_format := 'YYYY-MM-DD HH24:MI:SS';
    RAISE NOTICE 'do insert [task_id = %] ... ', r.task_id;
    EXECUTE 'INSERT INTO '
    || table_name
    || '(task_id, task_name, task_type, priority, expires, countdown, tube_name, kwargs, retries, status, result, date_start, date_done, traceback, date_create, worker_id)'
    || ' VALUES ('
    || r.task_id
    || ','
    || COALESCE(quote_literal(r.task_name), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(r.task_type), 'DEFAULT')
    || ','
    || r.priority
    || ','
    || r.expires
    || ','
    || r.countdown
    || ','
    || COALESCE(quote_literal(r.tube_name), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(r.kwargs), 'DEFAULT')
    || ','
    || r.retries
    || ','
    || COALESCE(quote_literal(r.status), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(r.result), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(to_char(r.date_start, date_format)), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(to_char(r.date_done, date_format)), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(r.traceback), 'DEFAULT')
    || ','
    || COALESCE(quote_literal(to_char(r.date_create, date_format)), 'DEFAULT')
    || ','
    || COALESCE(r.worker_id, 0)
    || ')';

    DELETE FROM task WHERE task_id = r.task_id;
    RAISE NOTICE 'delete old record';
    END LOOP;
    RETURN 'DONE';
    END
    $BODY$
    LANGUAGE 'plpgsql'

    可以在crontab中添加如下任务来实现对task已有数据的分块操作
    0 2 * * * /opt/pgsql9.1/bin/psql -d taskmanager -U postgres -c "select * slice_data();"

  • 相关阅读:
    RESTful
    Node.js Express 框架
    Node.js Web 模块
    Node.js Path 模块
    JavaFX输入并显示字符串
    JavaFX手眼协调小游戏(赋源代码)
    JavaFX作业七参考
    管理运筹学(Additional Simplex Algorithm)
    JavaFX15.4 ( 创建一个簡单的计算器 ) 编写一个程序完成加法、减法、乘法和除法操作
    JavaFX移动小球
  • 原文地址:https://www.cnblogs.com/Jerryshome/p/2352734.html
Copyright © 2011-2022 走看看