From: Gustavo Martin Morcuende Date: Mon, 27 Feb 2017 19:29:16 +0000 (+0100) Subject: Reset offsets in topics, command console tool X-Git-Url: https://git.gumartinm.name/?a=commitdiff_plain;p=JavaForFun Reset offsets in topics, command console tool --- diff --git a/Kafka/MyTools/README b/Kafka/MyTools/README new file mode 100644 index 0000000..294a335 --- /dev/null +++ b/Kafka/MyTools/README @@ -0,0 +1,17 @@ + +- Run: +By default reset offsets to the first value in every topic's partition: +java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic + +We can do the same for just one topic's partition: +java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --partition 0 + +We choose offset for just one topic's partition: +java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --offset 24 --partition 0 + + +- Debug: +java -jar -Djaxp.debug=1 -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic + + +You must stop consumers before running this command. If there are consumers running this command sometimes works, but the most of time it does not :( diff --git a/Kafka/MyTools/build.gradle b/Kafka/MyTools/build.gradle new file mode 100644 index 0000000..19e8260 --- /dev/null +++ b/Kafka/MyTools/build.gradle @@ -0,0 +1,170 @@ +group = theGroup +version = theVersion +description = theDescription + +project.ext { + // FATJAR + mainClassName = 'de.example.kafka.tools.OffsetManagement' + + // LOG4J2 dependencies + slf4jVersion = '2.7' + log4jCoreVersion = '2.7' + jclOverSlf4jVersion = '1.7.22' + + // Unit test dependencies + mockitoVersion = '2.4.1' + junitVersion = '4.12' +} + + +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'jacoco' +apply plugin: 'eclipse' +apply plugin: 'idea' +apply plugin: 'maven-publish' + +targetCompatibility = 1.8 +sourceCompatibility = 1.8 + + +// ***************** REPOSITORIES FOR DEPENDENCIES ***************** +repositories { + mavenCentral() + maven { url "https://repo.spring.io/release" } +} + + +// ***************** PLUGINS ***************** +buildscript { + repositories { + mavenCentral() + maven { url 'https://plugins.gradle.org/m2/' } + } +} + +// ***************** DEPENDENCIES ***************** +dependencies { + // 1/3 Required dependency for log4j 2 with slf4j: binding between log4j2 and slf4j + compile("org.apache.logging.log4j:log4j-slf4j-impl:${slf4jVersion}") + // 2/3 Required dependency for log4j 2 with slf4j: log4j 2 maven plugin (it is the log4j 2 implementation) + compile("org.apache.logging.log4j:log4j-core:${log4jCoreVersion}") + // 3/3 Required dependency for getting rid of commons logging. This is the BRIDGE (no binding) between Jakarta Commons Logging (used by Spring) + // and whatever I am using for logging (in this case I am using log4j 2) See: http://www.slf4j.org/legacy.html + // We need exclusions in every dependency using Jakarta Commons Logging (see Spring dependencies below) + compile("org.slf4j:jcl-over-slf4j:${jclOverSlf4jVersion}") + + compile('org.apache.kafka:kafka_2.11:0.10.1.1') { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + compile('javax.validation:validation-api:1.1.0.Final') + + + // Unit tests + testCompile("junit:junit:${junitVersion}") + testCompile("org.mockito:mockito-core:${mockitoVersion}") +} + + + +// ***************** UNIT TESTS ***************** +test { + + // explicitly include or exclude tests + exclude '**/*IntegrationShould.class' + + testLogging { + events "PASSED", "FAILED", "SKIPPED" + } + + jacoco { + append = false + destinationFile = file("$buildDir/jacoco/jacoco.exec") + classDumpFile = file("$buildDir/jacoco/classpathdumps") + } +} + +// ***************** JAVADOC ***************** +javadoc { + source = sourceSets.main.allJava + classpath = configurations.compile +} + +// ***************** PUBLISH TO REPOSITORY ***************** +// Calls javadoc plugin and creates jar with the generated docs +task javadocJar(type: Jar) { + from javadoc + classifier 'javadoc' +} + + + +// Calls java plugin and creates jar with the sources +task sourceJar(type: Jar) { + from sourceSets.main.java + classifier 'sources' +} + +publishing { + publications { + mavenJava(MavenPublication) { + // Publishes war or jar file depending on whether we are using the war plugin. + if (plugins.withType(WarPlugin)) { + from components.web + } else { + from components.java + } + + // Publishes jar with sources + artifact sourceJar { + classifier 'sources' + } + // Publishes jar with javadoc + artifact javadocJar { + classifier 'javadoc' + } + + // By default, Maven scope will be runtime. We want scope compile :/ + pom.withXml { + asNode().dependencies.'*'.findAll() { + it.scope.text() == 'runtime' && project.configurations.compile.allDependencies.find { dep -> + dep.name == it.artifactId.text() + } + }.each() { + it.scope*.value = 'compile' + } + } + } + } + repositories { + maven { + credentials { + username project.artifactory_username + password project.artifactory_password + } + + if(project.version.endsWith('-SNAPSHOT')) { + url 'http://artifactory/artifactory/libs-snapshot' + } else { + url 'http://artifactory/artifactory/libs-release' + } + } + } +} + +// ***************** FATJAR / MANIFEST FILE ***************** +jar { + manifest { + attributes('Main-Class': "$mainClassName", + 'Implementation-Title': 'Kafka: my tools', + 'Implementation-Version': theVersion, + 'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"), + 'Built-By': System.getProperty('user.name'), + 'Built-JDK': System.getProperty('java.version') + ) + } + + from { + configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } + } +} diff --git a/Kafka/MyTools/gradle.properties b/Kafka/MyTools/gradle.properties new file mode 100644 index 0000000..a0222da --- /dev/null +++ b/Kafka/MyTools/gradle.properties @@ -0,0 +1,4 @@ +theGroup=de.example.kafka.tools +theDescription=Kafka my tools +theVersion=0.1.0-SNAPSHOT +theName=kafka-my-tools diff --git a/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..6ffa237 Binary files /dev/null and b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar differ diff --git a/Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..fdb1b7c --- /dev/null +++ b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Fri Feb 10 13:15:34 CET 2017 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip diff --git a/Kafka/MyTools/gradlew b/Kafka/MyTools/gradlew new file mode 100755 index 0000000..9aa616c --- /dev/null +++ b/Kafka/MyTools/gradlew @@ -0,0 +1,169 @@ +#!/usr/bin/env bash + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules +function splitJvmOpts() { + JVM_OPTS=("$@") +} +eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS +JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/Kafka/MyTools/gradlew.bat b/Kafka/MyTools/gradlew.bat new file mode 100644 index 0000000..e95643d --- /dev/null +++ b/Kafka/MyTools/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/Kafka/MyTools/settings.gradle b/Kafka/MyTools/settings.gradle new file mode 100644 index 0000000..a809ba9 --- /dev/null +++ b/Kafka/MyTools/settings.gradle @@ -0,0 +1 @@ +rootProject.name = theName diff --git a/Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java b/Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java new file mode 100644 index 0000000..2060e1b --- /dev/null +++ b/Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java @@ -0,0 +1,250 @@ +package de.example.kafka.tools; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import kafka.api.OffsetRequest; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import kafka.admin.AdminClient; +import kafka.admin.TopicCommand; +import kafka.utils.ZkUtils; + + +public class OffsetManagement { + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_ERROR = 1; + + private static OptionSpec bootstrapServerOption; + private static OptionSpec zookeeperOption; + private static OptionSpec applicationIdOption; + private static OptionSpec inputTopicsOption; + private static OptionSpec offsetOption; + private static OptionSpec partitionOption; + + + private OptionSet options = null; + private final Properties consumerConfig = new Properties(); + private final List allTopics = new LinkedList<>(); + + public int run(final String[] args) { + return run(args, new Properties()); + } + + public int run(final String[] args, final Properties config) { + consumerConfig.clear(); + consumerConfig.putAll(config); + + int exitCode = EXIT_CODE_SUCCESS; + + AdminClient adminClient = null; + ZkUtils zkUtils = null; + try { + parseArguments(args); + + adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); + final String groupId = this.options.valueOf(applicationIdOption); + + zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + + allTopics.clear(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + + resetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); + deleteInternalTopics(zkUtils); + } catch (final Throwable e) { + exitCode = EXIT_CODE_ERROR; + System.err.println("ERROR: " + e.getMessage()); + } finally { + if (adminClient != null) { + adminClient.close(); + } + if (zkUtils != null) { + zkUtils.close(); + } + } + + return exitCode; + } + + private void parseArguments(final String[] args) throws IOException { + final OptionParser optionParser = new OptionParser(); + applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)") + .withRequiredArg() + .ofType(String.class) + .describedAs("id") + .required(); + bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:9092") + .describedAs("urls"); + zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:2181") + .describedAs("url"); + inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + offsetOption = optionParser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end") + .withRequiredArg() + .describedAs("consume offset") + .ofType(Long.class) + .defaultsTo(OffsetRequest.EarliestTime()); + partitionOption = optionParser.accepts("partition", "The partition number. All partitions by default.") + .withRequiredArg() + .describedAs("partition number") + .ofType(Integer.class) + .defaultsTo(Integer.MIN_VALUE); + + try { + options = optionParser.parse(args); + } catch (final OptionException e) { + optionParser.printHelpOn(System.err); + throw e; + } + } + + private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() { + final List inputTopics = options.valuesOf(inputTopicsOption); + + if (inputTopics.size() == 0) { + System.out.println("No input or intermediate topics specified. Skipping seek."); + return; + } else { + if (inputTopics.size() != 0) { + System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics."); + } + } + + final Properties config = new Properties(); + config.putAll(consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption)); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + final Set topicsToSubscribe = new HashSet<>(inputTopics.size()); + for (final String topic : inputTopics) { + if (!allTopics.contains(topic)) { + System.err.println("Input topic " + topic + " not found. Skipping."); + } else { + topicsToSubscribe.add(topic); + } + } + for (final String topic : allTopics) { + if (isInternalTopic(topic)) { + topicsToSubscribe.add(topic); + } + } + + try (final KafkaConsumer client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + client.subscribe(topicsToSubscribe); + client.poll(1); + + final Set partitions = client.assignment(); + final Set inputAndInternalTopicPartitions = new HashSet<>(); + + for (final TopicPartition p : partitions) { + final String topic = p.topic(); + if (isInputTopic(topic) || isInternalTopic(topic)) { + inputAndInternalTopicPartitions.add(p); + } else { + System.err.println("Skipping invalid partition: " + p); + } + } + + if (inputAndInternalTopicPartitions.size() > 0) { + client.seekToBeginning(inputAndInternalTopicPartitions); + } + + Integer partition = options.valueOf(partitionOption); + if (partition == Integer.MIN_VALUE) { + + for (final TopicPartition p : partitions) { + client.position(p); + + Long offset = options.valueOf(offsetOption); + if (offset != OffsetRequest.EarliestTime()) { + client.seek(p, options.valueOf(offsetOption)); + } + } + + } else { + for (final TopicPartition p : partitions) { + + if (partition == p.partition()) { + client.position(p); + + Long offset = options.valueOf(offsetOption); + if (offset != OffsetRequest.EarliestTime()) { + client.seek(p, options.valueOf(offsetOption)); + } + } + + } + } + client.commitSync(); + + + } catch (final RuntimeException e) { + System.err.println("ERROR: Resetting offsets failed."); + throw e; + } + + System.out.println("Done."); + } + + private boolean isInputTopic(final String topic) { + return options.valuesOf(inputTopicsOption).contains(topic); + } + + private void deleteInternalTopics(final ZkUtils zkUtils) { + System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); + + for (final String topic : allTopics) { + if (isInternalTopic(topic)) { + final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ + "--zookeeper", options.valueOf(zookeeperOption), + "--delete", "--topic", topic}); + try { + TopicCommand.deleteTopic(zkUtils, commandOptions); + } catch (final RuntimeException e) { + System.err.println("ERROR: Deleting topic " + topic + " failed."); + throw e; + } + } + } + + System.out.println("Done."); + } + + private boolean isInternalTopic(final String topicName) { + return topicName.startsWith(options.valueOf(applicationIdOption) + "-") + && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); + } + + public static void main(final String[] args) { + System.exit(new OffsetManagement().run(args)); + } + +} +