Asynchronously put ten rows of one cell each

APIs used in this operation

The first 6 APIs are defined in the header file mutations.h:

  • hb_put_create(): Creates a structure for the put operation and returns its handle.
  • hb_mutation_set_table(): Sets the name of the table for the put operation.

  • hb_mutation_set_bufferable(): Sets whether or not the RPC call for the put operation can be buffered on the client side.
  • hb_put_add_cell(): Adds a cell to the put structure. The row key of the cell must be the same as the row key of the put structure.
  • hb_mutation_send(): Queues the put operation for sending to the server. Mutations are not performed atomically and can be batched in a non-deterministic way on either the client side or the server side. Any buffer attached to a mutation object (put or delete) must not be altered until the callback has been received.

The last API is defined in the header file client.h:

  • hb_client_flush(): Flushes any buffered client-side write operations to the server. The callback is invoked after everything that was buffered at the time of the call is flushed. Invocation of the callback is a guarantee that all outstanding RPC calls are complete.

Sequence of steps in this code extract

  1. Create a row object named row_data.
  2. Create a put object.
  3. Specify the name of the table.
  4. Set whether or not the RPC call for the put operation can be buffered on the client side.
  5. Create cell data.
  6. Create a cell.
  7. Add the cell to the row.
  8. Queue the put.
  9. After following the steps above 10 times, flush the puts to the server.
  10. Wait for the RPC calls to complete.

Code

  // let's send a batch of 10 puts with single cell asynchronously
  outstanding_puts_count += num_puts;
  for (int i = 0; i < num_puts; ++i) {
    row_data_t *row_data = (row_data_t *) calloc(1, sizeof(row_data_t));
    row_data->key   = bytebuffer_printf("%s%02d", rowkey_prefix, i);
    hb_put_create(row_data->key->buffer, row_data->key->length, &put);
    hb_mutation_set_table(put, table_name, table_name_len);
    hb_mutation_set_durability(put, DURABILITY_SKIP_WAL);
    hb_mutation_set_bufferable(put, false);
 
    cell_data_t *cell_data = new_cell_data();
    row_data->first_cell = cell_data;
    cell_data->value = bytebuffer_printf("%s%02d", value_prefix, i);
 
    hb_cell_t *cell = (hb_cell_t*) calloc(1, sizeof(hb_cell_t));
    cell_data->hb_cell = cell;
 
    cell->row = row_data->key->buffer;
    cell->row_len = row_data->key->length;
    cell->family = FAMILIES[rand() % 2];
    cell->family_len = 1;
    cell->qualifier = column_a->buffer;
    cell->qualifier_len = column_a->length;
    cell->value = cell_data->value->buffer;
    cell->value_len = cell_data->value->length;
    cell->ts = HBASE_LATEST_TIMESTAMP;
  
    hb_put_add_cell(put, cell);
    HBASE_LOG_INFO("Sending row with row key : '%.*s'.",
                   cell->row_len, cell->row);
    hb_mutation_send(client, put, put_callback, row_data);
  }
  hb_client_flush(client, client_flush_callback, NULL);
  wait_for_flush();
 
  wait_for_puts(); // outside the loop, wait for 10 puts to complete