Sunday, January 24, 2010

how to handle 400 Billion rows in postgress

well currently I try to optimize an denoising and replacement algorithm. This includes calculating ion traces over thousands files which have 20k peaks or 1 Million ions each.
To make this more challenging we don't do this once, we do it during every BinBase export. Since this is the base algorithm for the Zero replacement.

Short we do the same calculation over and over again, like to calculate the minimum intensity for 100 files between 15 and 20 seconds for the ion 87.

Short this shouts use a SQL database.

Now how much data do we have?

  • 40k files
  • each file has around 20k spectra
  • each spectra has up to 500 ion

which translates into

400 000 000 000 rows

Performance Issues and how to make Postgres handle this amount of data

The first attempt was to just store all these files into a single table using hibernate. Which caused an OutOfMemory exception, surprise surprise.

So the next attempt was to use a stateless session which worked quite well and up to 1 Billion rows we had a query speed of under 20ms. Quite nice for no tuning of the database. But once we hit a bit over 1 Billions rows the query speed got worse and worse. Shor this was not an acceptable solution.

So we tried to use partions with postgres and define one table for each ion, inheriting from a master table.


Table "public.iontrace"
Column | Type | Modifiers
------------+------------------+-----------
id | bigint | not null
intensity | double precision |
ion | integer |
spectra_id | bigint |
Indexes:
"iontrace_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"fk4f93923dc3b62c9a" FOREIGN KEY (spectra_id) REFERENCES spectra(id)
Rules:
rule_index_iontrace_100 AS
ON INSERT TO iontrace
WHERE new.ion = 100 DO INSTEAD INSERT INTO iontrace_100 (id, intensity, ion, spectra_id)
VALUES (new.id, new.intensity, new.ion, new.spectra_id)
rule_index_iontrace_101 AS
ON INSERT TO iontrace
WHERE new.ion = 101 DO INSTEAD INSERT INTO iontrace_101 (id, intensity, ion, spectra_id)
VALUES (new.id, new.intensity, new.ion, new.spectra_id)
rule_index_iontrace_102 AS
ON INSERT TO iontrace
WHERE new.ion = 102 DO INSTEAD INSERT INTO iontrace_102 (id, intensity, ion, spectra_id)



which didn't work with hibernate. It kept complaining about


org.hibernate.StaleStateException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1


so we had to write our own batcher as was described here

a bit later we rewrote the example to actually work, since the field batch size is set to private.


/**
* hibernate partion batcher
*/
public class HibernatePartionBatcher extends BatchingBatcher {

public HibernatePartionBatcher(org.hibernate.jdbc.ConnectionManager connectionManager, org.hibernate.Interceptor interceptor) {
super(connectionManager, interceptor)
}


@Override
protected void doExecuteBatch(PreparedStatement ps) throws SQLException, HibernateException {

//use reflections to access the private field of the super class ugly but neccessaery
Field field = this.getClass().getSuperclass().getDeclaredField("batchSize")
field.setAccessible(true)

int value = field.getInt(this)

if (value != 0) {
try {
ps.executeBatch()
}
catch (RuntimeException re) {
throw re;
}
finally {
field.setInt(this, 0)
}
}
}
}

/**
* hibernate factory to intitialize the factory
*/
public class HibernatePartitionBatcherFactory extends BatchingBatcherFactory {

public HibernatePartitionBatcherFactory() {
}

public org.hibernate.jdbc.Batcher createBatcher(org.hibernate.jdbc.ConnectionManager connectionManager, org.hibernate.Interceptor interceptor) {
return new HibernatePartionBatcher(connectionManager, interceptor);
}
}



and to register this in our groovy script


def hibProps = [
"hibernate.jdbc.factory_class": HibernatePartitionBatcherFactory.class.getName()
]



A first test with just a single sample reveals that we are executing a sequential scan over all partions.


explain SELECT * from iontrace where ion = 105;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Result (cost=0.00..1163.25 rows=3374 width=28)
-> Append (cost=0.00..1163.25 rows=3374 width=28)
-> Seq Scan on iontrace (cost=0.00..10.46 rows=7 width=28)
Filter: (ion = 105)
-> Index Scan using index_iontrace_20 on iontrace_20 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_21 on iontrace_21 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_22 on iontrace_22 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_23 on iontrace_23 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_24 on iontrace_24 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_25 on iontrace_25 iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)



to avoid this you have to change a parameter in postgresql.


SET constraint_exclusion = on;


and the new result shows that we only work on the correct partition now


explain SELECT * from iontrace where ion = 105;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Result (cost=0.00..13.03 rows=14 width=28)
-> Append (cost=0.00..13.03 rows=14 width=28)
-> Seq Scan on iontrace (cost=0.00..10.46 rows=7 width=28)
Filter: (ion = 105)
-> Index Scan using index_iontrace_105 on iontrace_105 iontrace (cost=0.00..2.57 rows=7 width=28)
Index Cond: (ion = 105)



it still executes a sequential scan, but now it only does it on the correct partion with the stored data.

Now to reduce the likelihood for an index scan over the database we will define indexes on all the table (the script at the end of this post does this for you)

the new query plan is


explain SELECT * from iontrace where ion = 105;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Result (cost=0.00..4.15 rows=14 width=28)
-> Append (cost=0.00..4.15 rows=14 width=28)
-> Index Scan using index_iontrace_ion on iontrace (cost=0.00..1.57 rows=7 width=28)
Index Cond: (ion = 105)
-> Index Scan using index_iontrace_105 on iontrace_105 iontrace (cost=0.00..2.57 rows=7 width=28)
Index Cond: (ion = 105)


The result is slightly different since we generate by now more data in the database. Sorry for that. But you can see that the query plan is much more efficient with the index.

Important is that you need an index for ion on every partition and on the main 'iontrace' table, if you don't have an index on the main table you will keep getting sequential scans.

The next snatch we encountered was the speed of the actual data insert. Since the check constraints seem to be rather slow and expensive.

First this is a hibernate insert on the main datatable:


LOG: duration: 8.961 ms statement: EXECUTE [PREPARE: insert into IonTrace (intensity, ion, spectra_id, id) values ($1, $2, $3, $4)]
LOG: duration: 8.951 ms statement: EXECUTE [PREPARE: insert into IonTrace (intensity, ion, spectra_id, id) values ($1, $2, $3, $4)]
LOG: duration: 8.966 ms statement: EXECUTE [PREPARE: insert into IonTrace (intensity, ion, spectra_id, id) values ($1, $2, $3, $4)]


Second this is plain old sql inserted into the main iontrace table:


LOG: duration: 6.608 ms statement: EXECUTE [PREPARE: insert into iontrace(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),1418.0,237,700013)]
LOG: duration: 6.610 ms statement: EXECUTE [PREPARE: insert into iontrace(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),1373.0,238,700013)]
LOG: duration: 6.605 ms statement: EXECUTE [PREPARE: insert into iontrace(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),1383.0,239,700013)]


Third this is plain old sql inserted in each partition:


LOG: duration: 0.055 ms statement: EXECUTE [PREPARE: insert into iontrace_475(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),27.0,475,680203)]
LOG: duration: 0.058 ms statement: EXECUTE [PREPARE: insert into iontrace_489(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),13.0,489,680203)]
LOG: duration: 0.051 ms statement: EXECUTE [PREPARE: insert into iontrace_496(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),13.0,496,680203)]
LOG: duration: 0.138 ms statement: EXECUTE [PREPARE: insert into Spectra (file_id, retentionTime, id) values ($1, $2, $3)]


From this we can see that it makes the most sense to use plain sql to insert the data into the related partitions and to avoid the checks during the insert phase.

For this we used the following approach


//build the ion trace
Statement statement = session.connection().createStatement()

for (int mass = 0; mass < mz.length; mass++) { if (mass >= beginMass && mass <= endMass) { if (intensity[mass] > 0) {
IonTrace trace = new IonTrace(ion: mz[mass], intensity: intensity[mass])
trace.spectra = spectra


statement.addBatch("insert into iontrace(id, intensity, ion, spectra_id) values (nextval('SEQ_TRACE'),${intensity[mass]},${(int)(mz[mass])},${spectra.id})")
}
}
}

statement.executeBatch()


which worked quite well. As a conclusion I removed the complete hibernate layer from this application, since we lost most of it's benefits by now.

To further optimize the insert speed we replace the Statements with PreparedStatements, which improves the speed of the insert by close to 50%.


LOG: duration: 0.035 ms statement: EXECUTE [PREPARE: insert into iontrace_127(id, intensity, ion, spectra_id) values (nextval('seq_trace'),$1,$2,$3)]
LOG: duration: 0.035 ms statement: EXECUTE [PREPARE: insert into iontrace_127(id, intensity, ion, spectra_id) values (nextval('seq_trace'),$1,$2,$3)]
LOG: duration: 0.035 ms statement: EXECUTE [PREPARE: insert into iontrace_127(id, intensity, ion, spectra_id) values (nextval('seq_trace'),$1,$2,$3)]



Storage

as you can imagine this amount of rows can take up quite some space. An initial estimate say it's roughly 20TB with the current table schema.

Which is 20 times more than the files occupy on the harddrive.

So what can you do to keep the storage space down
  • use the smallest possible data types possible
  • only index what is necessary to be index
  • remove columns which are not necessary, like we don't need a primary key on the ion trace table
  • estimate your possible amount of data stored in the database, like we won't store billions of spectra, so we can use integer instead of bigint
  • store intensities as integer and not as double. 

To estimate the required storage we used the following function, which roughly gives us storage capacity for 4 years of netcdf files.


netcdf-repository=# SELECT pg_size_pretty(pg_database_size('netcdf-repository')/(select count(*) from netcdf)* 40000) as "estimated database size";
estimated database size
-------------------------
16 TB
(1 row)


And a server for this would currently cost around 7000$


-END-

Friday, January 22, 2010

protecting jboss

Protecting JBoss

in the last couple of weeks we released more and more of our BinBase Tools to make it able to actually work with the database. Which means we have to protect our data better.

Since I had no time at work for this and JBoss doesn't provide a convenient way, well I decided to write a little tool which does this for me.

So I created yet another google code project, called 'jboss-ip-filter', which basically does nothing else than providing an interceptor, which intercepts all method calls and check's if the ip is in a list of registered IP Address.

Features
  • protect ejb3.x services
  • protect ejb2.x services
  • ip can be defined as regular expression to support subnets
Configuration/Installation

First you need to download the latest release and copy it into the jboss library directory of your choosen configuration.

Afterwards you need to register the interceptor in the jboss configuration.

Example

vim /usr/local/jboss/server/all/conf/standardjboss.xml

Go to the part about the container configurations and register the interceptor in the first position for every ejb configuration you want to protect.

The name of the class is: 'com.blogspot.codingandmore.jboss.filter.SessionInterceptor'


<container-configuration>
<container-name>Standard CMP 2.x EntityBean</container-name>
<call-logging>false</call-logging>
<invoker-proxy-binding-name>entity-unified-invoker</invoker-proxy-binding-name>
<sync-on-commit-only>false</sync-on-commit-only>
<insert-after-ejb-post-create>false</insert-after-ejb-post-create>
<call-ejb-store-on-clean>true</call-ejb-store-on-clean>
<container-interceptors>
<interceptor>com.blogspot.codingandmore.jboss.filter.SessionInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.ProxyFactoryFinderInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.LogInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.SecurityInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.TxInterceptorCMT</interceptor>
<interceptor>org.jboss.ejb.plugins.CallValidationInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.EntityCreationInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.EntityLockInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.EntityInstanceInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.EntityReentranceInterceptor</interceptor>
<interceptor>org.jboss.resource.connectionmanager.CachedConnectionInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.EntitySynchronizationInterceptor</interceptor>
<interceptor>org.jboss.ejb.plugins.cmp.jdbc.JDBCRelationInterceptor</interceptor>
</container-interceptors>
<instance-pool>org.jboss.ejb.plugins.EntityInstancePool</instance-pool>
<instance-cache>org.jboss.ejb.plugins.InvalidableEntityInstanceCache</instance-cache>
<persistence-manager>org.jboss.ejb.plugins.cmp.jdbc.JDBCStoreManager</persistence-manager>
<locking-policy>org.jboss.ejb.plugins.lock.QueuedPessimisticEJBLock</locking-policy>
<container-cache-conf>
<cache-policy>org.jboss.ejb.plugins.LRUEnterpriseContextCachePolicy</cache-policy>
<cache-policy-conf>
<min-capacity>50</min-capacity>
<max-capacity>1000000</max-capacity>
<overager-period>300</overager-period>
<max-bean-age>600</max-bean-age>
<resizer-period>400</resizer-period>
<max-cache-miss-period>60</max-cache-miss-period>
<min-cache-miss-period>1</min-cache-miss-period>
<cache-load-factor>0.75</cache-load-factor>
</cache-policy-conf>
</container-cache-conf>
<container-pool-conf>
<MaximumSize>100</MaximumSize>
</container-pool-conf>
<commit-option>B</commit-option>
</container-configuration>


After this is done you need to restart your server and it should generate a property in the start directory after the next reboot. In this directory you configure your ip address. To be allowed.

For example if you started the server in the bin directory, the file will be found there


vim /usr/local/jboss/bin/ip-filter-config.properties


The ip address of the local host is always registered.

These following two lines allow it the host '128.120.136.154' to connect but refuses connections from any other hosts to the ejb's.


128.120.136.154 = true
\b(?:\d{1,3}\.){3}\d{1,3}\b = false


If you encounter any problems, please don't hesitate to contact me and I try to help with the encountered problems.