Application Reset Tool
This tool allows your to reset an application and force it to reprocess its data from scratch by using the application reset tool. This tool can be useful for development and testing, or when fixing bugs.
Description
The application reset tool (ART) handles the Kafka Streams user topics (input, output, and intermediate topics) and internal topics differently when resetting the application.
The application reset tool does the following for each topic type:
- Input topics: Reset to the beginning of the topic. This means that it sets the
application’s committed consumer offsets for all partitions to each partition’s
earliest
offset (for consumer groupapplication.id
). - Intermediate topics: Skip to the end of the topic, i.e., set the application’s
committed consumer offsets for all partitions to each partition’s
logSize
(for consumer groupapplication.id
). - Internal topics: Delete the internal topic (this automatically deletes any committed offsets).
The application reset tool does not do the following:
- Reset output topics of an application. If any output (or intermediate) topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.
- Reset the local environment of your application instances. It is your responsibility to delete the local state on any machine on which an application instance was run.
See Confluent Application Reset Tool for additional reference information.
Running the Application Reset Tool
Invoke the application reset tool from the command line:
/opt/mapr/bin/kafka-streams-application-reset.sh
The tool accepts the following parameters:
--input-topics
and --intermediate-topics
parameters.Option | Description |
---|---|
--application-id <String: id> | (Required) The Kafka Streams application ID (application.id). |
--default-stream | The default stream that is used when the topic name is specified but the stream name is not. |
--config-file <String: file name> | Property file containing configs to be passed to admin clients and embedded consumer. |
--dry-run | Display the actions that would be performed without executing the reset commands. |
--input-topics <String: list> | Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset. |
--intermediate-topics <String: list> | Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end. |
Resetting your Local Environments
Example
In this example you are developing and testing an application locally and you want to iteratively improve your application via run-reset-modify cycles.
package mapr.examples.streams;
import ...;
public class ResetDemo {
public static void main(String[] args) throws Exception {
// Kafka Streams configuration
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// ...and so on...
// Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.countByKey("global-count")
.to("my-output-topic");
KStreams app = new KafkaStreams(builder.build(), streamsConfiguration);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
// Note: In real applications you would register a shutdown hook
// that would trigger the call to `app.close()` rather than
// using the sleep-then-close example we show here.
Thread.sleep(30 * 1000L);
app.close();
}
}
You can then perform run-reset-modify cycles as follows:
# Run your application
$ bin/kafka-run-class mapr.examples.streams.ResetDemo
# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset.sh --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.