Friday, January 8, 2010

Multi threaded replication appliers

Lately I've been working on a transaction reducer to be able to multi thread the applier. Basic idea is to reduce the transaction to only affect one row with one statement, when that is the case, we can have a thread pool doing the actual persisting of the statements (of course it has some drawbacks as well, more about those later). This approach is particularly interesting for NoSQL appliers.

Reducing transactions
The transaction log in drizzle contains a list of statements. Each statement contains a list of records, where each record contains information about what changed on one row in the master. One example transaction could look like something like this:


transaction_context {
  server_id: 1
  transaction_id: 10
  start_timestamp: 1262812100381445
  end_timestamp: 1262812153799963
}
statement {
  type: INSERT
  start_timestamp: 1262812100381446
  end_timestamp: 1262812134317464
  insert_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "test"
    }
    field_metadata {
      type: VARCHAR
      name: "ignored"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "78"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "79"
      insert_value: "a"
      insert_value: "b"
    }
...
    record {
      insert_value: "87"
      insert_value: "a"
      insert_value: "b"
    }
  }
}
statement {
  type: UPDATE
  start_timestamp: 1262812134317466
  end_timestamp: 1262812151669387
  update_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    key_field_metadata {
      type: INTEGER
      name: "id"
    }
    set_field_metadata {
      type: VARCHAR
      name: "test"
    }
  }
  update_data {
    segment_id: 1
    end_segment: true
    record {
      key_value: "85"
      after_value: "test"
    }
  }
}
statement {
  type: DELETE
  start_timestamp: 1262812151669389
  end_timestamp: 1262812153799963
  delete_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    key_field_metadata {
      type: INTEGER
      name: "id"
    }
  }
  delete_data {
    segment_id: 1
    end_segment: true
    record {
      key_value: "81"
    }
  }
}

Or in SQL:
BEGIN;
INSERT INTO unittests.table1 (id, test, ignored) VALUES (78, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (79, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (80, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (81, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (82, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (83, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (84, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (85, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (86, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (87, "a","b");
UPDATE unittests.table1 set test = "test" WHERE id = 85;
DELETE FROM unittests.table1 WHERE id = 81;
COMMIT;


I.e. a number of inserted rows, one updated and one deleted. If we could exploit this and make sure that every record in the transaction only affects one row then each row is totally independent from all other records in the transaction and we could then have a pool of threads applying the transaction.

So, if i reduce the above transaction like this:

TransactionReducer reducer = new DrizzleTransactionReducer();
TransactionMessage.Transaction txn = getNextTransaction();
TransactionMessage.Transaction t = reducer.reduce(txn);

I get this transaction:


transaction_context {
  server_id: 1
  transaction_id: 10
  start_timestamp: 1262812100381445
  end_timestamp: 1262812153799963
}
statement {
  type: INSERT
  start_timestamp: 1262812100381446
  end_timestamp: 1262812134317464
  insert_header {
    table_metadata {
      schema_name: "unittests"
      table_name: "test1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "test"
    }
    field_metadata {
      type: VARCHAR
      name: "ignored"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "78"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "79"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "80"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "82"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "83"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "84"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "85"
      insert_value: "test"
      insert_value: "b"
    }
    record {
      insert_value: "86"
      insert_value: "a"
      insert_value: "b"
    }
    record {
      insert_value: "87"
      insert_value: "a"
      insert_value: "b"
    }
  }
}
Or, in SQL:
BEGIN;
INSERT INTO unittests.table1 (id, test, ignored) VALUES (78, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (79, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (80, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (82, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (83, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (84, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (85, "test","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (86, "a","b");
INSERT INTO unittests.table1 (id, test, ignored) VALUES (87, "a","b");
COMMIT;

I.e. a list of only inserts. This means we can apply the transaction using several threads. Another benefit is that we reduce the total number of records to insert meaning better performance.

Updates and deletes that affect rows outside the transaction are also reduced, for example if, within one transaction, one external row is updated twice, then deleted, only the delete statement will be executed.

Drawbacks
There are, of course some drawbacks, for example, if this is used when applying to a database, the applier will not be able to apply the transaction as a transaction since there is no way of sharing transaction context between several client threads.

Using it
To try it out, branch this repository: lp:~krummas/+junk/transactionreducer and look at the tests file. Should be straight forward to add more tests to see how it handles them.

It will also be available in RabbitReplication as a configuration option on the slave. The size of the thread pool and number of client connections will also be configurable.

Performance
In theory it should be faster to apply fewer statements using more threads, and the time spent reducing the transaction should easily be less then the time spent doing network I/O etc. I've done a few non-scientific benchmarks using a multi threaded cassandra applier and it takes approximately half the time applying reduced transactions using the multi threaded applier. I will make some proper benchmarks when everything is in place in RabbitReplication.

3 comments:

Lei said...

Does the transaction log contain overlaping transactions at all? If so, who is responsible for assembling DMLs before the reduction process in applier can take place?

Marcus Eriksson said...

The transaction log does not contain overlapping transactions, it is a serial log of transactions, a complete transaction is written to the log at COMMIT. So, when I read the log, I get an entire transaction and can reduce it and get the same end result as if it was unreduced.

Hope this answers your question.

David Shrewsbury said...

Update on overlapping transactions: Because of changes to support bulk loads (e.g., LOAD DATA), there may now be multiple Transaction messages, all sharing the same transaction_id. This had to be done because protobuf messages have a maximum size. And right now there is no guarantee that a multi-part Transaction message will not be overlapped with other Transaction messages, which means an applier will need to do some buffering until the end message is seen, then either COMMIT, or discard the entire transaction if it contains a ROLLBACK.