Monday, January 25, 2010

Replicating transactions directly to RabbitMQ

Previously RabbitReplication tailed the transaction log provided by Drizzle and then the Java application sent the protobuf serialized transaction to RabbitMQ. Now it is possible to skip the transaction log file and send the transaction directly to the RabbitMQ server without the extra step of storing it in a file first.

The code is available at https://code.launchpad.net/~krummas/drizzle/rabbitmq_log and to build it with rabbitmq support you need to install librabbitmq which is a bit tricky;

Installing librabbitmq

  1. Install mercurial
  2. Branch the librabbitmq code: hg clone http://hg.rabbitmq.com/rabbitmq-c/ 
  3. Branch the rabbitmq codegen repo into a subdirectory called codegen in the rabbitmq-c directory:
    1. cd rabbitmq-c
    2. hg clone http://hg.rabbitmq.com/rabbitmq-codegen/ codegen
  4. Run autoconf like this: autoreconf -i 
  5. Run the configure script
  6. make
  7. make install
Build Drizzle
When librabbitmq is installed, build drizzle like this:

  1. bzr branch lp:~krummas/drizzle/rabbitmq-log
  2. config/autorun.sh
  3. ./configure --with-rabbitmq-log-plugin
  4. make
  5. make install
and it is done! 

Start Drizzle with RabbitMQ support
First, you can run drizzled with a --help flag to see the options available, they are all prefixed with --rabbitmq-log-XYZ. 

The default values for the parameters makes drizzle connect to localhost as "guest" and replicate to an exchange called ReplicationExchange. Start it like this to replicate changes to a rabbitmq on localhost:
$ sbin/drizzled --default-replicator-enable --rabbitmq-log-enable

The other available options are described in --help

Saturday, January 16, 2010

Drizzle to Infinispan replication and a small code walkthrough

This post aims to explain how to build your own key/value-store applier in RabbitReplication by walking through the new Infinispan support as an example.

Infinispan
Infinispan is a "distributed, highly available data grid platform" and it exposes a REST interface where it is possible to manipulate the data in infinispan. For example it is a simple HTTP PUT method to store new data and a HTTP DELETE does exactly what you expect. The data is stored under resources, for RabbitReplication the data is stored under /<schema.table>/<key>. This means you can view the data in infinispan using a browser.


When I implemented the Infinispan support, I simply dropped the .war file in the webapps directory of a jetty installation and started it. Since Infinispan has a REST interface, the client library can be any HTTP client, I picked the Jersey REST client since it is incredibly easy to use.


Customizing RabbitReplication
RabbitReplication uses Guice internally for dependency injection, so, to use another KeyValue store you need to create your own Module for configuration. I'll show an example below.

To add support for a new key/value store, you need to implement an interface, org.drizzle.replication.transformer.objectstores.KeyValueStoreClient (here). It is a quite straight-forward add/get/remove interface. Look at the infinispan implementation (here) for an example. Note the @Inject on the constructor, it tells guice that the WebResource parameter should be injected when it constructs the object. You will need to put the rabbitreplication.jar file on your classpath when building your stuff.



Guice is configured in modules where you bind() an interface to an implementation, so, to configure guice to use a new KeyValueStore, we need to bind() the KeyValueStoreClient interface to the new implementation. Look at the infinispan module (here) for an example how to do it, the method annotated with @Provides is called by guice when it needs to create a KeyValueStoreClient.

To tell RabbitReplication to use your new module, you simply edit your configuration file, and set where the new Module is located, see this example. If you need to configure your new client, simply add the properties in the config file, guice will bind every property in the file to @Named(...) strings, check the Infinispan module provider method for an example how to use it.

Now, build your code into a jar, drop the jar in the lib/ directory of rabbitreplication and start RabbitReplication like this; bin/start.sh config/someconf.properties - the config file should have the custom_module set to the name of your new module.

Downloading and installing
Best way to use rabbitreplication is still to branch the code from launchpad (lp:rabbitreplication) and then write ant in the base directory, this will create a .zip and a .tgz in the dist directory. You can also download the binaries here.

  • Unpack the distribution file
  • Copy a config file from .sample to .properties in the config directory and edit the file to your liking. objectslave.properties.sample is the sample you want to look at if you want to try out Infinispan.
  • Start rabbitreplication by executing bin/start.sh config/yourconf.properties
Look at the previous posts on rabbitreplication to find out how to start a master etc.

Friday, January 8, 2010

Multi threaded replication appliers

Lately I've been working on a transaction reducer to be able to multi thread the applier. Basic idea is to reduce the transaction to only affect one row with one statement, when that is the case, we can have a thread pool doing the actual persisting of the statements (of course it has some drawbacks as well, more about those later). This approach is particularly interesting for NoSQL appliers.

Reducing transactions
The transaction log in drizzle contains a list of statements. Each statement contains a list of records, where each record contains information about what changed on one row in the master. One example transaction could look like something like this:


transaction_context {
  server_id: 1
  transaction_id: 10
  start_timestamp: 1262812100381445
  end_timestamp: 1262812153799963
}
statement {
  type: INSERT
  start_timestamp: 1262812100381446
  end_timestamp: 1262812134317464
  insert_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "test"
    }
    field_metadata {
      type: VARCHAR
      name: "ignored"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "78"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "79"
      insert_value: "a"
      insert_value: "b"
    }
...
    record {
      insert_value: "87"
      insert_value: "a"
      insert_value: "b"
    }
  }
}
statement {
  type: UPDATE
  start_timestamp: 1262812134317466
  end_timestamp: 1262812151669387
  update_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    key_field_metadata {
      type: INTEGER
      name: "id"
    }
    set_field_metadata {
      type: VARCHAR
      name: "test"
    }
  }
  update_data {
    segment_id: 1
    end_segment: true
    record {
      key_value: "85"
      after_value: "test"
    }
  }
}
statement {
  type: DELETE
  start_timestamp: 1262812151669389
  end_timestamp: 1262812153799963
  delete_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    key_field_metadata {
      type: INTEGER
      name: "id"
    }
  }
  delete_data {
    segment_id: 1
    end_segment: true
    record {
      key_value: "81"
    }
  }
}

Or in SQL:
BEGIN;
INSERT INTO unittests.table1 (id, test, ignored) VALUES (78, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (79, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (80, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (81, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (82, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (83, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (84, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (85, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (86, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (87, "a","b");
UPDATE unittests.table1 set test = "test" WHERE id = 85;
DELETE FROM unittests.table1 WHERE id = 81;
COMMIT;


I.e. a number of inserted rows, one updated and one deleted. If we could exploit this and make sure that every record in the transaction only affects one row then each row is totally independent from all other records in the transaction and we could then have a pool of threads applying the transaction.

So, if i reduce the above transaction like this:

TransactionReducer reducer = new DrizzleTransactionReducer();
TransactionMessage.Transaction txn = getNextTransaction();
TransactionMessage.Transaction t = reducer.reduce(txn);

I get this transaction:


transaction_context {
  server_id: 1
  transaction_id: 10
  start_timestamp: 1262812100381445
  end_timestamp: 1262812153799963
}
statement {
  type: INSERT
  start_timestamp: 1262812100381446
  end_timestamp: 1262812134317464
  insert_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "test"
    }
    field_metadata {
      type: VARCHAR
      name: "ignored"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "78"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "79"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "80"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "82"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "83"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "84"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "85"
      insert_value: "test"
      insert_value: "b"
    }
    record {
      insert_value: "86"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "87"
      insert_value: "a"
      insert_value: "b"
    }
  }
}
Or, in SQL:
BEGIN;
INSERT INTO unittests.table1 (id, test, ignored) VALUES (78, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (79, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (80, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (82, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (83, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (84, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (85, "test","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (86, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (87, "a","b");
COMMIT;

I.e. a list of only inserts. This means we can apply the transaction using several threads. Another benefit is that we reduce the total number of records to insert meaning better performance.

Updates and deletes that affect rows outside the transaction are also reduced, for example if, within one transaction, one external row is updated twice, then deleted, only the delete statement will be executed.

Drawbacks
There are, of course some drawbacks, for example, if this is used when applying to a database, the applier will not be able to apply the transaction as a transaction since there is no way of sharing transaction context between several client threads.

Using it
To try it out, branch this repository: lp:~krummas/+junk/transactionreducer and look at the tests file. Should be straight forward to add more tests to see how it handles them.

It will also be available in RabbitReplication as a configuration option on the slave. The size of the thread pool and number of client connections will also be configurable.

Performance
In theory it should be faster to apply fewer statements using more threads, and the time spent reducing the transaction should easily be less then the time spent doing network I/O etc. I've done a few non-scientific benchmarks using a multi threaded cassandra applier and it takes approximately half the time applying reduced transactions using the multi threaded applier. I will make some proper benchmarks when everything is in place in RabbitReplication.

Friday, January 1, 2010

2009 and looking forward to 2010

So, this is yet another 2009 retrospect with some goals for 2010, I'll do it in list form so someone might actually browse it;

2009:
  • Main event of 2009 was that i got kid nr 2, Teo.
  • Ran 1000K despite injuries
  • Started Drizzle JDBC.
  • Got excited about programming again and realized I need a new job
  • Got Drizzle JDBC to version 0.6, not many users yet, so can't say much about the quality (or, one could look at it from another angle, it is bug free! *cough*)
  • Went to JavaOne, great stuff, probably the last one.
  • Started a very long parental leave!
  • Started working a lot on replication-related stuff:
  • "Learned" Haskell and started looking at Erlang.
  • Read some great books; Java Concurrency In Practice, Real World Haskell, Effective java (like every year) ...
2010 goals:
  • Continue my long, sweet parental leave (living in Sweden has its benefits)
  • Run 1500K
  • Get myself a new job, main requirements:
    • Most importantly, has a high paced startup-feel to it, I want to build stuff, not have meetings about the stuff we could build
    • Uses open source products
    • Contributes stuff back to open source communities (or, best of all, has some open source products of their own)
  • Make RabbitReplication into a proper product
    • Create web page
    • Make regular releases with good documentation
    • Build support for more column/key-value stores
    • Document how to extend it
    • Clean up the code
  • Get Drizzle-JDBC to "1.0" with someone using it in production (given drizzle makes it there). I'm guessing that when Drizzle itself is production ready, the user (and bug-) count will increase.
  • Learn at least one new programming language (Erlang, I'm looking at you), and build something with it.
  • Go to at least one conference
  • Blog more, November and December frequencies have been ok, mainly because I actually had time to build stuff worth blogging about.
  • Read more books (current reading queue: Erlang programming, SICP, Reread Distributed Systems - Concepts and design, Java Generics and Collections)
  • Invent a clock with 30 hrs a day to actually manage all the above goals