Developing HPE Ezmeral Data Fabric Streams C#/.NET Applications
Describes general tasks for developing C#/.NET applications.
Before Your Begin
Confirm that your environment meets the following requirements:
- HPE Ezmeral Data Fabric cluster version 6.0.1 or greater.
- HPE Ezmeral Data Fabric Streams C Client (mapr-librdkafka 0.11.3) is installed and configured on the node. See Configuring the HPE Ezmeral Data Fabric Streams C Client.
- HPE Ezmeral Data Fabric Streams C#/.NET Client (mapr-streams-dotnet) is installed on the node.
- .NET SKD 4.5.x or 4.6.x
- .NET Core SDK 1.1
- nuget.exe
Create a Producer Application
In general, you want to create a producer that performs the following steps:
- Import the producer class.
- Define the producer and its configuration.
- Produce data.
- Wait for all messages to be sent to consumer.
In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
class Producer
{
public static async void Produce()
{
string stream = "/my_stream";
string topicName = "mytopic";
var config = new Dictionary<string, object> { { "streams.producer.default.stream", stream } };
var messages = new string[] { "Msg1", "Msg2", "Msg3" };
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
foreach (var msg in messages)
{
var deliveryReport = await producer.ProduceAsync(topicName, null, msg);
Console.WriteLine($"Delivery report:{deliveryReport.TopicPartitionOffset}");
}
producer.Flush(TimeSpan.FromSeconds(1));
}
}
}
Create a Consumer Application
In general, you want to create a consumer that performs the following steps:
- Import the consumer class.
- Define the consumer and its configuration.
- Consume data.
- Wait for all messages to be consumed.
In following example code, the HPE Ezmeral Data Fabric Streams consumer
is subscribed to my_stream/mytopic
and it prints the content of each
message that it reads.
using Confluent.Kafka;
using Confluent.Kafka.Serialization
class Consumer
{
public static void Consume()
{
var stream = "/mystream";
var topic = "mytopic";
var config = new Dictionary<string, object>
{
{ "group.id", "simple-csharp-consumer" },
{ "streams.consumer.default.stream", stream }
};
bool running = true;
using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
var l = new List<TopicPartitionOffset> { new TopicPartitionOffset(topic, 0, 0) };
consumer.Assign(l);
// Raised on critical errors, e.g. connection failures.
consumer.OnError += (_, error) =>
{
Console.WriteLine($"Error: {error}");
running = false;
};
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, error) =>
{
Console.WriteLine($"Consume error: {error}");
running = false;
};
while (running)
{
Message<Ignore, string> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(10)))
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}
}
}
Run the Example Applications
To run the sample producer and consumer applications:
- Create a stream named mystream.
- Create a folder application.
- Create a file named example.cs.
- Add producer example code into the example.cs file.
- Add consumer example code into the example.cs file.
- Add an entry point for your application:
class Demo { public static void Main(string[] args) { Producer.Produce(); Consumer.Consume(); } }
- Create a project file named example.csproj.
- Add the following dependency properties into the example.csproj file:
<?xml version="1.0" encoding="utf-8"?> <Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> <ProjectGuid>{99EDBA4B-D7DA-48BB-8D0C-AF4B12387935}</ProjectGuid> <OutputType>Exe</OutputType> <RuntimeIdentifiers>win10-x64</RuntimeIdentifiers> <RootNamespace>app</RootNamespace> <AssemblyName>app</AssemblyName> <TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion> <FileAlignment>512</FileAlignment> <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <PlatformTarget>AnyCPU</PlatformTarget> <DebugSymbols>true</DebugSymbols> <DebugType>full</DebugType> <Optimize>false</Optimize> <OutputPath>bin\Debug\</OutputPath> <DefineConstants>DEBUG;TRACE</DefineConstants> <ErrorReport>prompt</ErrorReport> <WarningLevel>4</WarningLevel> </PropertyGroup> <ItemGroup> <Compile Include="app.cs" /> </ItemGroup> <ItemGroup> <PackageReference Include="mapr-streams-dotnet" Version="0.11.3" /> </ItemGroup> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> </Project>
- Verify that you have completed the steps to configure the HPE Ezmeral Data Fabric Streams C client
or complete the steps now. See Configuring the HPE Ezmeral Data Fabric Streams C Client.
NOTEThe HPE Ezmeral Data Fabric Streams C#/.NET Client is dependent on the HPE Ezmeral Data Fabric Streams C Client. Therefore, the HPE Ezmeral Data Fabric Streams C Client must be configured before you can run the application.
- Open your project folder on the command line and
run:
dotnet run