Asynchronously put ten rows of one cell each
APIs used in this operation
The first 6 APIs are defined in the header file
mutations.h
:
-
hb_put_create()
: Creates a structure for the put operation and returns its handle. -
hb_mutation_set_table()
: Sets the name of the table for the put operation. -
hb_mutation_set_bufferable()
: Sets whether or not the RPC call for the put operation can be buffered on the client side. -
hb_put_add_cell()
: Adds a cell to the put structure. The row key of the cell must be the same as the row key of the put structure. -
hb_mutation_send()
: Queues the put operation for sending to the server. Mutations are not performed atomically and can be batched in a non-deterministic way on either the client side or the server side. Any buffer attached to a mutation object (put or delete) must not be altered until the callback has been received.
The last API is defined in the header file
client.h
:
-
hb_client_flush()
: Flushes any buffered client-side write operations to the server. The callback is invoked after everything that was buffered at the time of the call is flushed. Invocation of the callback is a guarantee that all outstanding RPC calls are complete.
Sequence of steps in this code extract
- Create a row object named row_data.
- Create a put object.
- Specify the name of the table.
- Set whether or not the RPC call for the put operation can be buffered on the client side.
- Create cell data.
- Create a cell.
- Add the cell to the row.
- Queue the put.
- After following the steps above 10 times, flush the puts to the server.
- Wait for the RPC calls to complete.
Code
// let's send a batch of 10 puts with single cell asynchronously
outstanding_puts_count += num_puts;
for (int i = 0; i < num_puts; ++i) {
row_data_t *row_data = (row_data_t *) calloc(1, sizeof(row_data_t));
row_data->key = bytebuffer_printf("%s%02d", rowkey_prefix, i);
hb_put_create(row_data->key->buffer, row_data->key->length, &put);
hb_mutation_set_table(put, table_name, table_name_len);
hb_mutation_set_durability(put, DURABILITY_SKIP_WAL);
hb_mutation_set_bufferable(put, false);
cell_data_t *cell_data = new_cell_data();
row_data->first_cell = cell_data;
cell_data->value = bytebuffer_printf("%s%02d", value_prefix, i);
hb_cell_t *cell = (hb_cell_t*) calloc(1, sizeof(hb_cell_t));
cell_data->hb_cell = cell;
cell->row = row_data->key->buffer;
cell->row_len = row_data->key->length;
cell->family = FAMILIES[rand() % 2];
cell->family_len = 1;
cell->qualifier = column_a->buffer;
cell->qualifier_len = column_a->length;
cell->value = cell_data->value->buffer;
cell->value_len = cell_data->value->length;
cell->ts = HBASE_LATEST_TIMESTAMP;
hb_put_add_cell(put, cell);
HBASE_LOG_INFO("Sending row with row key : '%.*s'.",
cell->row_len, cell->row);
hb_mutation_send(client, put, put_callback, row_data);
}
hb_client_flush(client, client_flush_callback, NULL);
wait_for_flush();
wait_for_puts(); // outside the loop, wait for 10 puts to complete