--- /dev/null
+
+- Run:
+By default reset offsets to the first value in every topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic
+
+We can do the same for just one topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --partition 0
+
+We choose offset for just one topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --offset 24 --partition 0
+
+
+- Debug:
+java -jar -Djaxp.debug=1 -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic
+
+
+You must stop consumers before running this command. If there are consumers running this command sometimes works, but the most of time it does not :(
--- /dev/null
+group = theGroup
+version = theVersion
+description = theDescription
+
+project.ext {
+ // FATJAR
+ mainClassName = 'de.example.kafka.tools.OffsetManagement'
+
+ // LOG4J2 dependencies
+ slf4jVersion = '2.7'
+ log4jCoreVersion = '2.7'
+ jclOverSlf4jVersion = '1.7.22'
+
+ // Unit test dependencies
+ mockitoVersion = '2.4.1'
+ junitVersion = '4.12'
+}
+
+
+apply plugin: 'java'
+apply plugin: 'idea'
+apply plugin: 'jacoco'
+apply plugin: 'eclipse'
+apply plugin: 'idea'
+apply plugin: 'maven-publish'
+
+targetCompatibility = 1.8
+sourceCompatibility = 1.8
+
+
+// ***************** REPOSITORIES FOR DEPENDENCIES *****************
+repositories {
+ mavenCentral()
+ maven { url "https://repo.spring.io/release" }
+}
+
+
+// ***************** PLUGINS *****************
+buildscript {
+ repositories {
+ mavenCentral()
+ maven { url 'https://plugins.gradle.org/m2/' }
+ }
+}
+
+// ***************** DEPENDENCIES *****************
+dependencies {
+ // 1/3 Required dependency for log4j 2 with slf4j: binding between log4j2 and slf4j
+ compile("org.apache.logging.log4j:log4j-slf4j-impl:${slf4jVersion}")
+ // 2/3 Required dependency for log4j 2 with slf4j: log4j 2 maven plugin (it is the log4j 2 implementation)
+ compile("org.apache.logging.log4j:log4j-core:${log4jCoreVersion}")
+ // 3/3 Required dependency for getting rid of commons logging. This is the BRIDGE (no binding) between Jakarta Commons Logging (used by Spring)
+ // and whatever I am using for logging (in this case I am using log4j 2) See: http://www.slf4j.org/legacy.html
+ // We need exclusions in every dependency using Jakarta Commons Logging (see Spring dependencies below)
+ compile("org.slf4j:jcl-over-slf4j:${jclOverSlf4jVersion}")
+
+ compile('org.apache.kafka:kafka_2.11:0.10.1.1') {
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ }
+ compile('javax.validation:validation-api:1.1.0.Final')
+
+
+ // Unit tests
+ testCompile("junit:junit:${junitVersion}")
+ testCompile("org.mockito:mockito-core:${mockitoVersion}")
+}
+
+
+
+// ***************** UNIT TESTS *****************
+test {
+
+ // explicitly include or exclude tests
+ exclude '**/*IntegrationShould.class'
+
+ testLogging {
+ events "PASSED", "FAILED", "SKIPPED"
+ }
+
+ jacoco {
+ append = false
+ destinationFile = file("$buildDir/jacoco/jacoco.exec")
+ classDumpFile = file("$buildDir/jacoco/classpathdumps")
+ }
+}
+
+// ***************** JAVADOC *****************
+javadoc {
+ source = sourceSets.main.allJava
+ classpath = configurations.compile
+}
+
+// ***************** PUBLISH TO REPOSITORY *****************
+// Calls javadoc plugin and creates jar with the generated docs
+task javadocJar(type: Jar) {
+ from javadoc
+ classifier 'javadoc'
+}
+
+
+
+// Calls java plugin and creates jar with the sources
+task sourceJar(type: Jar) {
+ from sourceSets.main.java
+ classifier 'sources'
+}
+
+publishing {
+ publications {
+ mavenJava(MavenPublication) {
+ // Publishes war or jar file depending on whether we are using the war plugin.
+ if (plugins.withType(WarPlugin)) {
+ from components.web
+ } else {
+ from components.java
+ }
+
+ // Publishes jar with sources
+ artifact sourceJar {
+ classifier 'sources'
+ }
+ // Publishes jar with javadoc
+ artifact javadocJar {
+ classifier 'javadoc'
+ }
+
+ // By default, Maven scope will be runtime. We want scope compile :/
+ pom.withXml {
+ asNode().dependencies.'*'.findAll() {
+ it.scope.text() == 'runtime' && project.configurations.compile.allDependencies.find { dep ->
+ dep.name == it.artifactId.text()
+ }
+ }.each() {
+ it.scope*.value = 'compile'
+ }
+ }
+ }
+ }
+ repositories {
+ maven {
+ credentials {
+ username project.artifactory_username
+ password project.artifactory_password
+ }
+
+ if(project.version.endsWith('-SNAPSHOT')) {
+ url 'http://artifactory/artifactory/libs-snapshot'
+ } else {
+ url 'http://artifactory/artifactory/libs-release'
+ }
+ }
+ }
+}
+
+// ***************** FATJAR / MANIFEST FILE *****************
+jar {
+ manifest {
+ attributes('Main-Class': "$mainClassName",
+ 'Implementation-Title': 'Kafka: my tools',
+ 'Implementation-Version': theVersion,
+ 'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"),
+ 'Built-By': System.getProperty('user.name'),
+ 'Built-JDK': System.getProperty('java.version')
+ )
+ }
+
+ from {
+ configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+}
--- /dev/null
+theGroup=de.example.kafka.tools
+theDescription=Kafka my tools
+theVersion=0.1.0-SNAPSHOT
+theName=kafka-my-tools
--- /dev/null
+#Fri Feb 10 13:15:34 CET 2017
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip
--- /dev/null
+#!/usr/bin/env bash
+
+##############################################################################
+##
+## Gradle start up script for UN*X
+##
+##############################################################################
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >/dev/null
+APP_HOME="`pwd -P`"
+cd "$SAVED" >/dev/null
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+ echo "$*"
+}
+
+die ( ) {
+ echo
+ echo "$*"
+ echo
+ exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "`uname`" in
+ CYGWIN* )
+ cygwin=true
+ ;;
+ Darwin* )
+ darwin=true
+ ;;
+ MINGW* )
+ msys=true
+ ;;
+ NONSTOP* )
+ nonstop=true
+ ;;
+esac
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD="java"
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
+ MAX_FD_LIMIT=`ulimit -H -n`
+ if [ $? -eq 0 ] ; then
+ if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+ MAX_FD="$MAX_FD_LIMIT"
+ fi
+ ulimit -n $MAX_FD
+ if [ $? -ne 0 ] ; then
+ warn "Could not set maximum file descriptor limit: $MAX_FD"
+ fi
+ else
+ warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+ fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+ APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+ CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+ JAVACMD=`cygpath --unix "$JAVACMD"`
+
+ # We build the pattern for arguments to be converted via cygpath
+ ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+ SEP=""
+ for dir in $ROOTDIRSRAW ; do
+ ROOTDIRS="$ROOTDIRS$SEP$dir"
+ SEP="|"
+ done
+ OURCYGPATTERN="(^($ROOTDIRS))"
+ # Add a user-defined pattern to the cygpath arguments
+ if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+ OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+ fi
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ i=0
+ for arg in "$@" ; do
+ CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+ CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
+
+ if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
+ eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+ else
+ eval `echo args$i`="\"$arg\""
+ fi
+ i=$((i+1))
+ done
+ case $i in
+ (0) set -- ;;
+ (1) set -- "$args0" ;;
+ (2) set -- "$args0" "$args1" ;;
+ (3) set -- "$args0" "$args1" "$args2" ;;
+ (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+ (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+ (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+ (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+ (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+ (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+ esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+ JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
+if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
+ cd "$(dirname "$0")"
+fi
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
--- /dev/null
+@if "%DEBUG%" == "" @echo off\r
+@rem ##########################################################################\r
+@rem\r
+@rem Gradle startup script for Windows\r
+@rem\r
+@rem ##########################################################################\r
+\r
+@rem Set local scope for the variables with windows NT shell\r
+if "%OS%"=="Windows_NT" setlocal\r
+\r
+set DIRNAME=%~dp0\r
+if "%DIRNAME%" == "" set DIRNAME=.\r
+set APP_BASE_NAME=%~n0\r
+set APP_HOME=%DIRNAME%\r
+\r
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.\r
+set DEFAULT_JVM_OPTS=\r
+\r
+@rem Find java.exe\r
+if defined JAVA_HOME goto findJavaFromJavaHome\r
+\r
+set JAVA_EXE=java.exe\r
+%JAVA_EXE% -version >NUL 2>&1\r
+if "%ERRORLEVEL%" == "0" goto init\r
+\r
+echo.\r
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.\r
+echo.\r
+echo Please set the JAVA_HOME variable in your environment to match the\r
+echo location of your Java installation.\r
+\r
+goto fail\r
+\r
+:findJavaFromJavaHome\r
+set JAVA_HOME=%JAVA_HOME:"=%\r
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe\r
+\r
+if exist "%JAVA_EXE%" goto init\r
+\r
+echo.\r
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%\r
+echo.\r
+echo Please set the JAVA_HOME variable in your environment to match the\r
+echo location of your Java installation.\r
+\r
+goto fail\r
+\r
+:init\r
+@rem Get command-line arguments, handling Windows variants\r
+\r
+if not "%OS%" == "Windows_NT" goto win9xME_args\r
+\r
+:win9xME_args\r
+@rem Slurp the command line arguments.\r
+set CMD_LINE_ARGS=\r
+set _SKIP=2\r
+\r
+:win9xME_args_slurp\r
+if "x%~1" == "x" goto execute\r
+\r
+set CMD_LINE_ARGS=%*\r
+\r
+:execute\r
+@rem Setup the command line\r
+\r
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar\r
+\r
+@rem Execute Gradle\r
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%\r
+\r
+:end\r
+@rem End local scope for the variables with windows NT shell\r
+if "%ERRORLEVEL%"=="0" goto mainEnd\r
+\r
+:fail\r
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of\r
+rem the _cmd.exe /c_ return code!\r
+if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1\r
+exit /b 1\r
+\r
+:mainEnd\r
+if "%OS%"=="Windows_NT" endlocal\r
+\r
+:omega\r
--- /dev/null
+rootProject.name = theName
--- /dev/null
+package de.example.kafka.tools;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import kafka.api.OffsetRequest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import kafka.admin.AdminClient;
+import kafka.admin.TopicCommand;
+import kafka.utils.ZkUtils;
+
+
+public class OffsetManagement {
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_ERROR = 1;
+
+ private static OptionSpec<String> bootstrapServerOption;
+ private static OptionSpec<String> zookeeperOption;
+ private static OptionSpec<String> applicationIdOption;
+ private static OptionSpec<String> inputTopicsOption;
+ private static OptionSpec<Long> offsetOption;
+ private static OptionSpec<Integer> partitionOption;
+
+
+ private OptionSet options = null;
+ private final Properties consumerConfig = new Properties();
+ private final List<String> allTopics = new LinkedList<>();
+
+ public int run(final String[] args) {
+ return run(args, new Properties());
+ }
+
+ public int run(final String[] args, final Properties config) {
+ consumerConfig.clear();
+ consumerConfig.putAll(config);
+
+ int exitCode = EXIT_CODE_SUCCESS;
+
+ AdminClient adminClient = null;
+ ZkUtils zkUtils = null;
+ try {
+ parseArguments(args);
+
+ adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
+ final String groupId = this.options.valueOf(applicationIdOption);
+
+ zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+
+ allTopics.clear();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+
+ resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+ deleteInternalTopics(zkUtils);
+ } catch (final Throwable e) {
+ exitCode = EXIT_CODE_ERROR;
+ System.err.println("ERROR: " + e.getMessage());
+ } finally {
+ if (adminClient != null) {
+ adminClient.close();
+ }
+ if (zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+
+ return exitCode;
+ }
+
+ private void parseArguments(final String[] args) throws IOException {
+ final OptionParser optionParser = new OptionParser();
+ applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)")
+ .withRequiredArg()
+ .ofType(String.class)
+ .describedAs("id")
+ .required();
+ bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo("localhost:9092")
+ .describedAs("urls");
+ zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo("localhost:2181")
+ .describedAs("url");
+ inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics")
+ .withRequiredArg()
+ .ofType(String.class)
+ .withValuesSeparatedBy(',')
+ .describedAs("list");
+ offsetOption = optionParser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
+ .withRequiredArg()
+ .describedAs("consume offset")
+ .ofType(Long.class)
+ .defaultsTo(OffsetRequest.EarliestTime());
+ partitionOption = optionParser.accepts("partition", "The partition number. All partitions by default.")
+ .withRequiredArg()
+ .describedAs("partition number")
+ .ofType(Integer.class)
+ .defaultsTo(Integer.MIN_VALUE);
+
+ try {
+ options = optionParser.parse(args);
+ } catch (final OptionException e) {
+ optionParser.printHelpOn(System.err);
+ throw e;
+ }
+ }
+
+ private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+ final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+
+ if (inputTopics.size() == 0) {
+ System.out.println("No input or intermediate topics specified. Skipping seek.");
+ return;
+ } else {
+ if (inputTopics.size() != 0) {
+ System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+ }
+ }
+
+ final Properties config = new Properties();
+ config.putAll(consumerConfig);
+ config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+ config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
+ config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size());
+ for (final String topic : inputTopics) {
+ if (!allTopics.contains(topic)) {
+ System.err.println("Input topic " + topic + " not found. Skipping.");
+ } else {
+ topicsToSubscribe.add(topic);
+ }
+ }
+ for (final String topic : allTopics) {
+ if (isInternalTopic(topic)) {
+ topicsToSubscribe.add(topic);
+ }
+ }
+
+ try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ client.subscribe(topicsToSubscribe);
+ client.poll(1);
+
+ final Set<TopicPartition> partitions = client.assignment();
+ final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+
+ for (final TopicPartition p : partitions) {
+ final String topic = p.topic();
+ if (isInputTopic(topic) || isInternalTopic(topic)) {
+ inputAndInternalTopicPartitions.add(p);
+ } else {
+ System.err.println("Skipping invalid partition: " + p);
+ }
+ }
+
+ if (inputAndInternalTopicPartitions.size() > 0) {
+ client.seekToBeginning(inputAndInternalTopicPartitions);
+ }
+
+ Integer partition = options.valueOf(partitionOption);
+ if (partition == Integer.MIN_VALUE) {
+
+ for (final TopicPartition p : partitions) {
+ client.position(p);
+
+ Long offset = options.valueOf(offsetOption);
+ if (offset != OffsetRequest.EarliestTime()) {
+ client.seek(p, options.valueOf(offsetOption));
+ }
+ }
+
+ } else {
+ for (final TopicPartition p : partitions) {
+
+ if (partition == p.partition()) {
+ client.position(p);
+
+ Long offset = options.valueOf(offsetOption);
+ if (offset != OffsetRequest.EarliestTime()) {
+ client.seek(p, options.valueOf(offsetOption));
+ }
+ }
+
+ }
+ }
+ client.commitSync();
+
+
+ } catch (final RuntimeException e) {
+ System.err.println("ERROR: Resetting offsets failed.");
+ throw e;
+ }
+
+ System.out.println("Done.");
+ }
+
+ private boolean isInputTopic(final String topic) {
+ return options.valuesOf(inputTopicsOption).contains(topic);
+ }
+
+ private void deleteInternalTopics(final ZkUtils zkUtils) {
+ System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+
+ for (final String topic : allTopics) {
+ if (isInternalTopic(topic)) {
+ final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
+ "--zookeeper", options.valueOf(zookeeperOption),
+ "--delete", "--topic", topic});
+ try {
+ TopicCommand.deleteTopic(zkUtils, commandOptions);
+ } catch (final RuntimeException e) {
+ System.err.println("ERROR: Deleting topic " + topic + " failed.");
+ throw e;
+ }
+ }
+ }
+
+ System.out.println("Done.");
+ }
+
+ private boolean isInternalTopic(final String topicName) {
+ return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
+ && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+ }
+
+ public static void main(final String[] args) {
+ System.exit(new OffsetManagement().run(args));
+ }
+
+}
+