LucidDbHorizontalPartitioning

From Eigenpedia

Jump to: navigation, search

Introduction

LucidDB does not currently support DDL for automatic horizontal partitioning of tables. Although LucidDB's column-store architecture implements automatic vertical partitioning, horizontal partitioning is still a useful technique for scaling up to very large data volumes, so this page describes some manual approaches which can be used for very simple schemas.

Single Server

Let's start by examining a setup with a single LucidDB server. LucidDB does not currently parallelize query execution, so for this setup, the main benefit is that parallelization of loads becomes possible; some amount of query optimization is possible as well.

Partition a logical table into two physical tables, using range partitioning on year:

create schema single;
create table single.httpd_log_2007(ip_address varchar(15), userid varchar(128), ts timestamp, request varchar(1024), result_status int, result_size int);
create table single.httpd_log_2008(ip_address varchar(15), userid varchar(128), ts timestamp, request varchar(1024), result_status int, result_size int);

Next, create a view which reconstructs the logical table:

create view single.httpd_log as 
select * from single.httpd_log_2007 where ts between timestamp '2007-01-01 00:00:00' and timestamp '2007-12-31 23:59:59'
union all 
select * from single.httpd_log_2008 where ts between timestamp '2008-01-01 00:00:00' and timestamp '2008-12-31 23:59:59';

Notes:

  • The view declares the range constraints, and as we'll see below, the optimizer can make use of this information. LucidDB will not actually do anything to enforce these contraints, so when loading the data, you have to be careful to honor them.
  • The view uses UNION ALL rather than UNION since we don't need implicit duplicate removal of any kind.

Now, load data; these can be executed in parallel since they target different tables (see LucidDbConcurrencyControl):

insert into single.httpd_log_2007 values ('127.0.0.1', 'bert', timestamp '2007-07-04 12:00:00', 'GET /apple_pie.gif', 200, 12345);
insert into single.httpd_log_2008 values ('127.0.0.1', 'ernie', timestamp '2008-10-10 12:00:00', 'GET /rubber_duckie.gif', 404, 47);

Verify that the view is working correctly:

0: jdbc:luciddb:> select * from single.httpd_log;
+-------------+---------+------------------------+-------------------------+----------------+--------------+
| IP_ADDRESS  | USERID  |           TS           |         REQUEST         | RESULT_STATUS  | RESULT_SIZE  |
+-------------+---------+------------------------+-------------------------+----------------+--------------+
| 127.0.0.1   | bert    | 2007-07-04 12:00:00.0  | GET /apple_pie.gif      | 200            | 12345        |
| 127.0.0.1   | ernie   | 2008-10-10 12:00:00.0  | GET /rubber_duckie.gif  | 404            | 47           |
+-------------+---------+------------------------+-------------------------+----------------+--------------+
2 rows selected (0.091 seconds)
0: jdbc:luciddb:> select userid,ts from single.httpd_log where ts between timestamp '2008-10-01 00:00:00' and timestamp '2008-10-15 00:00:00';
+---------+------------------------+
| USERID  |           TS           |
+---------+------------------------+
| ernie   | 2008-10-10 12:00:00.0  |
+---------+------------------------+
1 row selected (0.519 seconds)

Now, let's see what the optimizer did with the second query (for background on EXPLAIN PLAN, read this doc):

0: jdbc:luciddb:> !set outputformat csv
0: jdbc:luciddb:> explain plan for select userid,ts from single.httpd_log where ts >= timestamp '2008-01-01 00:00:00';
'column0'
'FennelToIteratorConverter'
'  FennelMergeRel'
'    LcsRowScanRel(table=[[LOCALDB, SINGLE, HTTPD_LOG_2007]], projection=[[1, 2]], clustered indexes=[[SYS$CLUSTERED_INDEX$HTTPD_LOG_2007$TS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2007$USERID]], residual columns=[[2]])'
'      FennelValuesRel(tuples=[[{ '(', null, ')', null }]])'
'    LcsRowScanRel(table=[[LOCALDB, SINGLE, HTTPD_LOG_2008]], projection=[[1, 2]], clustered indexes=[[SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$TS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$USERID]], residual columns=[[2]])'
'      FennelValuesRel(tuples=[[{ '[', 2008-10-01 00:00:00.0, ']', 2008-10-15 00:00:00.0 }]])'
6 rows selected (0.149 seconds)

The FennelMergeRel is the physical implementation of the UNION ALL. The interesting thing here is that the optimizer figured out that the scan over HTTPD_LOG_2007 is not going to turn up any records; you can see this as the first FennelValuesRel with nulls for the parameters. How did it do this? By intersecting the range between timestamp '2007-01-01 00:00:00' and timestamp '2007-12-31 23:59:59' from the view definition with the where ts between timestamp '2008-10-01 00:00:00' and timestamp '2008-10-15 00:00:00' condition supplied in the query.

This optimization is helpful, but some minimal amount of scanning will still be done on HTTPD_LOG_2007. An improvement to the optimizer would allow it to eliminate this branch of the UNION ALL entirely, and then eliminate the UNION ALL itself since only a single input remains.

Note that if a bitmap index exists on the ts column in both tables, then there is not a lot of point to the range partitioning in this setup. A bitmap index on a fine-grained datatype such as TIMESTAMP might not perform optimally, so an alternative would be to create a redundant coarse-grained DATE column and reference it in predicates..

Besides pushing down the filter through the UNION ALL, the optimizer was also smart enough to push down projection (the select list), so that we'll get the benefits of column-store: only the userid and ts data will be read from disk.

However, the optimizer is not yet smart enough to push down operations such as join and aggregation through UNION ALL, so this partitioning approach isn't suitable beyond a single table and simple search queries.

LucidDB provides snapshot versioning, so a query will see consistent results even if loads against the tables are executed while the query is running. However, if a query starts after the first table load is committed and before the second table load is committed, the query will see the intermediate state. A LucidDB enhancement which would address this would be a way to create a "label" similar to source control, and provide a way for a query to use that label as its snapshot; the label would only be advanced after all loads are committed.

By default, LucidDB system parameter expectedConcurrentStatements is set to a value of 4. If you try to execute more loads than this in parallel, you are likely to run into "out of scratch space" exceptions. See LucidDbMemoryManagement for more information.

Distributed Servers

Now, let's suppose that you have two servers srv2007 and srv2008 (each with their own private storage and LucidDB installation), and you'd like to distribute the table across them. You can do this using LucidDB's SQL/MED capabilities.

We'll make srv2008 the coordinator to which all queries will be sent. It will also store the 2008 data. srv2007 will only store the 2007 data; we won't query it directly. For queries which need to access the 2007 data, srv2008 will dispatch to srv2007.

On srv2007:

create schema data2007;
create table data2007.httpd_log_2007(ip_address varchar(15), userid varchar(128), ts timestamp, request varchar(1024), result_status int, result_size int);
insert into data2007.httpd_log_2007 values ('127.0.0.1', 'bert', timestamp '2007-07-04 12:00:00', 'GET /apple_pie.gif', 200, 12345);

On srv2008:

create schema data2008;
create table data2008.httpd_log_2008(ip_address varchar(15), userid varchar(128), ts timestamp, request varchar(1024), result_status int, result_size int);
insert into data2008.httpd_log_2008 values ('127.0.0.1', 'ernie', timestamp '2008-10-10 12:00:00', 'GET /rubber_duckie.gif', 404, 47);
create server srv2007_link
foreign data wrapper sys_jdbc
options(
    driver_class 'com.lucidera.jdbc.LucidDbRmiDriver',
    url 'jdbc:luciddb:rmi://srv2007',
    user_name 'sa');
create schema distributed;
create view distributed.httpd_log as 
select * from srv2007_link.data2007.httpd_log_2007 where ts between timestamp '2007-01-01 00:00:00' and timestamp '2007-12-31 23:59:59'
union all 
select * from data2008.httpd_log_2008 where ts between timestamp '2008-01-01 00:00:00' and timestamp '2008-12-31 23:59:59';

Verify that the view is working, and use EXPLAIN PLAN to see how it is implemented:

0: jdbc:luciddb:> explain plan for select * from distributed.httpd_log;
'column0'
'FennelToIteratorConverter'
'  FennelMergeRel'
'    IteratorToFennelConverter'
'      ResultSetToFarragoIteratorConverter'
'        MedJdbcQueryRel(foreignSql=[SELECT *'
'FROM "DATA2007"."HTTPD_LOG_2007"'
'WHERE "TS" >= TIMESTAMP '2007-01-01 00:00:00' AND "TS" <= TIMESTAMP '2007-12-31 23:59:59'])'
'    LcsRowScanRel(table=[[LOCALDB, DATA2008, HTTPD_LOG_2008]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$IP_ADDRESS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$REQUEST, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$RESULT_SIZE, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$RESULT_STATUS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$TS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$USERID]], residual columns=[[2]])'
'      FennelValuesRel(tuples=[[{ '[', 2008-01-01 00:00:00.0, ']', 2008-12-31 23:59:59.0 }]])'
9 rows selected (0.173 seconds)

Again, the FennelMergeRel implements the UNION ALL, but this time, the first input is a remote query to srv2007. Note that we keep the range predicate definition on the coordinator side (rather than defining it on the remote side) so that once the UNION ALL branch elimination optimization mentioned above is implemented, we'll avoid issuing any remote query at all in cases where the optimizer can be sure that no remote data is needed.

Unfortunately, we still won't get any query parallelism here. For that, the necessary enhancement is for the executor to be able to fire off all inputs of a FennelMergeRel in parallel so that all remote servers can process their partitions together.

Another beneficial optimization would be the ability to push down aggregation:

0: jdbc:luciddb:> explain plan for
. . . . . . . . > select userid, count(*) as hits
. . . . . . . . > from distributed.httpd_log
. . . . . . . . > group by userid;
'column0'
'FennelToIteratorConverter'
'  LhxAggRel(groupCount=[1], HITS=[COUNT()])'
'    FennelMergeRel'
'      FennelReshapeRel(projection=[[0]], outputRowType=[RecordType(VARCHAR(128) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" USERID) NOT NULL])'
'        IteratorToFennelConverter'
'          ResultSetToFarragoIteratorConverter'
'            MedJdbcQueryRel(foreignSql=[SELECT "USERID"'
'FROM "DATA2007"."HTTPD_LOG_2007"'
'WHERE "TS" >= TIMESTAMP '2007-01-01 00:00:00' AND "TS" <= TIMESTAMP '2007-12-31 23:59:59'])'
'      LcsRowScanRel(table=[[LOCALDB, DATA2008, HTTPD_LOG_2008]], projection=[[1]], clustered indexes=[[SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$TS, SYS$CLUSTERED_INDEX$HTTPD_LOG_2008$USERID]], residual columns=[[2]])'
'        FennelValuesRel(tuples=[[{ '[', 2008-01-01 00:00:00.0, ']', 2008-12-31 23:59:59.0 }]])'
11 rows selected (0.451 seconds)

The plan above is poor because it will pull all data back to srv2008, merge it, and then aggregate it. Instead, we would like to execute the equivalent query:

select userid, sum(hits) as hits
from (
    select userid, count(*) as hits
    from data2007.httpd_log_2007        -- execute this part on srv2007
    group by userid
union all
    select userid, count(*) as hits
    from data2008.httpd_log_2008
    group by userid
)
group by userid;

Besides a relational algebra optimizer rule for pushing aggregation down through union, a SQL/MED JDBC optimizer rule is also needed for pushing the aggregate through to the remote server (currently only filters and projections are handled).

Personal tools