hdfs_write_revised.c
This application demonstrates how to write to files by using the APIs
hdfsWrite()
and hdfsPwrite(): hdfs_write_revised.c
Before running this application:
- Ensure that you have access either to a cluster running file system.
- Ensure that a text-based file that you have access to exists on the cluster. Note the path to the file and the size of the file in bytes.
- The content of the file will be deleted before the first write is performed by the application.
- Decide on the length in bytes of a string to write to the file.
To build and run it, download it from this page to a Data Fabric client or to a system with the
mapr-core
package installed. Then, modify the run.sh
script in Building and Running C Applications on file system
Clients to point to this sample application. Run the script and then run the
application.
The application includes these header files:
-
stdio.h
-
hdfs.h
-
errno.h
-
fcntl.h
The APIs are defined in hdfs.h
. The file fcntl.h
defines
the file-access flags.
The application performs the actions that are described in the following sections.
- Takes a filename, file size, and buffer size as input
When you launch the application, provide the path and name of the file, the size of the file, and the number of bytes to write.
hdfs_write <filename> <filesize> <buffersize>
- Sets an RPC timeout
hdfsSetRpcTimeout()
is specific to thelibMapRClient
version oflibhdfs
and takes a value that is specified in seconds. The default is 99 seconds. If you change this value, set it either to 0 (which eliminates timeouts) or to a value greater than 30.int err = hdfsSetRpcTimeout(30); if (err) { fprintf(stderr, "Failed to set rpc timeout!\n"); exit(-1); }
- Connects to a file system, using an API that is supported in the hadoop-2.x version of libhdfs
The application tries to connect to the first file system cluster that is specified in the
mapr-clusters.conf
file in theMAPR_HOME/conf
directory on the client. After connecting to the file system, the application returns a handle to the file system.hdfsFS fs = hdfsConnect("default", 0); if (!fs) { fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); exit(-1); }
- Stores the values of the arguments
The application stores the values of the arguments in a character array and in two variables of type
tSize
. This datatype is defined inhdfs.h
and is a fixed-width, signed 32-byte integer type for storing the size of data for read or write operations.const char* rfile = argv[1]; tSize fileSize = strtoul(argv[2], NULL, 10); tSize bufferSize = strtoul(argv[3], NULL, 10);
- Opens the file that you specified
The application opens the specified file, passing the following values to the
hdfsOpenFile()
function:- The handle to the file system
- The name of the file, which you supplied when you launched the application.
- A flag to indicate the mode in which to open the file. In this case, the flag is
O_WRONLY
. This flag creates the file if the file does not exist and truncates the file if the file does exist. If the file existed and you wanted to preserve the content of the file, you would specifyO_WRONLY | O_APPEND
for flag. These flags are defined in the header filefcntl.h.
- The default chunk size for the directory in which the file is either located or will be created. This value is specified by the 0 in the last parameter.
Although there are two other parameters in the
hdfsOpenFile()
function – the fourth and fifth, thelibMapRClient
version oflibhdfs
ignores them.hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0); if (!writeFile) { fprintf(stderr, "Failed to open %s for writing!\n", rfile); exit(-2); }
- Creates a buffer of the size that you specified and populates the buffer
At this point that the application, creates a string to populate the buffer. This is the data that the application will write.
char* buffer = malloc(sizeof(char) * bufferSize); if(buffer == NULL) { fprintf(stderr, "Failed to allocate memory!\n"); return -2; } int i; for (i=0; i<bufferSize; i++) { buffer[i] = 'a' + i%26; }
- Writes an entire file with hdfsWrite()
The application calls the function
writeLength()
:int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize); if (ret < 0) { goto done; }
This function writes the content of the buffer to the file, starting at offset 0.
int writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize) { tSize writeBytes = 0; tSize ret = 0; uint64_t totalWrite = 0; if (fs == NULL || writeFile == NULL || buffer == NULL) { return -1; } if (writeSize == 0) { return 0; } for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) { ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize); if (ret > 0) { totalWrite += ret; } else { fprintf(stderr, "hdfsWrite failed with error %d \n", errno); hdfsCloseFile(fs, writeFile); return -1; } } return 0; }
- Seeks an offset and writes from that offset with hdfsWrite()
The application next calls the function
writeAtOffse()
:tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize); if (writeBytes < 0) { goto done; }
This function writes the content of the buffer to the file, starting at the specified offset. If the file already exists, the file is first truncated to this offset before the write operation begins. In this case, the specified offset is 0.
The difference between this function and the previous function is that, before writing, it calls
hdfsSeek()
to move to the specified offset in the file.tSize writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset, char *buffer, tSize bufferSize) { tSize ret = 0; if (fs == NULL || writeFile == NULL || buffer == NULL) { return -1; } ret = hdfsSeek(fs, writeFile, offset); if (!ret) { //hdfsWrite will return -1 if ret != number of bytes asked to //be written. ret = hdfsWrite(fs, writeFile, buffer, bufferSize); if (ret < 0) { fprintf(stderr, "hdfsWrite failed with error %d \n", errno); } } else { fprintf(stderr, "hdfsSeek failed with error %d \n", errno); } if (ret < 0) { //hdfsWrite does a flush in case of an error, explicit flush //is not required. hdfsCloseFile(fs, writeFile); } //Current offset within the file will be positioned at (offset + writeBytes)th byte. return ret; }
- Performs a positional write with hdfsPwrite()
The application next calls the function
positionalWrite()
:writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize); if (writeBytes < 0) { goto done; }
This function writes the content of the buffer to the file, starting at the offset that you specify.
tSize positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset, char *buffer, tSize bufferSize) { tSize writeBytes = 0; if (fs == NULL || writeFile == NULL || buffer == NULL) { return -1; } writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize); if (writeBytes < 0) { fprintf(stderr, "hdfsPwrite failed with error %d \n", errno); hdfsCloseFile(fs, writeFile); } //Current offset within the file will not be advanced if hdfsPwrite is used return writeBytes; }
- Closes the file
hdfsCloseFile(fs, writeFile);
- Frees the buffer
free(buffer);
- Disconnects from the file system
hdfsDisconnect(fs);
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include "hdfs.h"
#include <errno.h>
#include <fcntl.h>
tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize ret = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
ret = hdfsSeek(fs, writeFile, offset);
if (!ret) {
//hdfsWrite will return -1 if ret != number of bytes asked to
//be written.
ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
if (ret < 0) {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
}
} else {
fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
}
if (ret < 0) {
//hdfsWrite does a flush in case of an error, explicit flush
//is not required.
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will be positioned at (offset + writeBytes)th byte.
return ret;
}
tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize writeBytes = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
if (writeBytes < 0) {
fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will not be advanced if hdfsPwrite is used
return writeBytes;
}
int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
tSize writeBytes = 0;
tSize ret = 0;
uint64_t totalWrite = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
if (writeSize == 0) {
return 0;
}
for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
if (ret > 0) {
totalWrite += ret;
} else {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
return -1;
}
}
return 0;
}
int
main(int argc, char **argv)
{
if (argc != 4) {
fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize>\n");
exit(-1);
}
int err = hdfsSetRpcTimeout(30);
if (err) {
fprintf(stderr, "Oops! Failed to set rpc timeout!\n");
exit(-1);
}
hdfsFS fs = hdfsConnect("default", 0);
if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1);
}
const char* rfile = argv[1];
tSize fileSize = strtoul(argv[2], NULL, 10);
tSize bufferSize = strtoul(argv[3], NULL, 10);
//O_WRONLY creates the file if the file doesn't exist.
//O_WRONLY truncates the file if the file exists.
//O_WRONLY | O_APPEND will preserve the contents of the file if the file exists.
hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
if (!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rfile);
exit(-2);
}
char* buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
fprintf(stderr, "Failed to allocate memory!\n");
return -2;
}
int i;
for (i=0; i<bufferSize; i++) {
buffer[i] = 'a' + i%26;
}
//Write entire file from the beginning
int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
if (ret < 0) {
goto done;
}
//Write file at a particular offset
//In this case, we are writing from offset 0
tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
//Write file at a particular offset using positional write
//In this case, write from offset 20
writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
hdfsCloseFile(fs, writeFile);
done:
free(buffer);
hdfsDisconnect(fs);
return 0;
}
/**
* vim: ts=4: sw=4: et:
*/