Reset offsets in topics, command console tool master
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Mon, 27 Feb 2017 19:29:16 +0000 (20:29 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Mon, 27 Feb 2017 19:29:16 +0000 (20:29 +0100)
Kafka/MyTools/README [new file with mode: 0644]
Kafka/MyTools/build.gradle [new file with mode: 0644]
Kafka/MyTools/gradle.properties [new file with mode: 0644]
Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar [new file with mode: 0644]
Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties [new file with mode: 0644]
Kafka/MyTools/gradlew [new file with mode: 0755]
Kafka/MyTools/gradlew.bat [new file with mode: 0644]
Kafka/MyTools/settings.gradle [new file with mode: 0644]
Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java [new file with mode: 0644]

diff --git a/Kafka/MyTools/README b/Kafka/MyTools/README
new file mode 100644 (file)
index 0000000..294a335
--- /dev/null
@@ -0,0 +1,17 @@
+
+- Run:
+By default reset offsets to the first value in every topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic
+
+We can do the same for just one topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --partition 0
+
+We choose offset for just one topic's partition:
+java -jar build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic --offset 24 --partition 0
+
+
+- Debug:
+java -jar -Djaxp.debug=1 -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y build/libs/kafka-my-tools-0.1.0-SNAPSHOT.jar --bootstrap-servers kafka:9092,kafka:9093,kafka:9094 --zookeeper kafka:2181 --application-id mygroup --input-topics mytopic
+
+
+You must stop consumers before running this command. If there are consumers running this command sometimes works, but the most of time it does not :(
diff --git a/Kafka/MyTools/build.gradle b/Kafka/MyTools/build.gradle
new file mode 100644 (file)
index 0000000..19e8260
--- /dev/null
@@ -0,0 +1,170 @@
+group = theGroup
+version = theVersion
+description = theDescription
+
+project.ext {
+    // FATJAR
+    mainClassName = 'de.example.kafka.tools.OffsetManagement'
+
+    // LOG4J2 dependencies
+    slf4jVersion = '2.7'
+    log4jCoreVersion = '2.7'
+    jclOverSlf4jVersion = '1.7.22'
+
+    // Unit test dependencies
+    mockitoVersion = '2.4.1'
+    junitVersion = '4.12'
+}
+
+
+apply plugin: 'java'
+apply plugin: 'idea'
+apply plugin: 'jacoco'
+apply plugin: 'eclipse'
+apply plugin: 'idea'
+apply plugin: 'maven-publish'
+
+targetCompatibility = 1.8
+sourceCompatibility = 1.8
+
+
+// *****************   REPOSITORIES FOR DEPENDENCIES   *****************
+repositories {
+    mavenCentral()
+    maven { url "https://repo.spring.io/release" }
+}
+
+
+// *****************   PLUGINS   *****************
+buildscript {
+    repositories {
+        mavenCentral()
+        maven { url 'https://plugins.gradle.org/m2/' }
+    }
+}
+
+// *****************   DEPENDENCIES   *****************
+dependencies {
+    // 1/3 Required dependency for log4j 2 with slf4j: binding between log4j2 and slf4j
+    compile("org.apache.logging.log4j:log4j-slf4j-impl:${slf4jVersion}")
+    // 2/3 Required dependency for log4j 2 with slf4j: log4j 2 maven plugin (it is the log4j 2 implementation)
+    compile("org.apache.logging.log4j:log4j-core:${log4jCoreVersion}")
+    // 3/3 Required dependency for getting rid of commons logging. This is the BRIDGE (no binding) between Jakarta Commons Logging (used by Spring)
+    // and whatever I am using for logging (in this case I am using log4j 2) See: http://www.slf4j.org/legacy.html
+    // We need exclusions in every dependency using Jakarta Commons Logging (see Spring dependencies below)
+    compile("org.slf4j:jcl-over-slf4j:${jclOverSlf4jVersion}")
+
+    compile('org.apache.kafka:kafka_2.11:0.10.1.1') {
+        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }
+    compile('javax.validation:validation-api:1.1.0.Final')
+
+
+    // Unit tests
+    testCompile("junit:junit:${junitVersion}")
+    testCompile("org.mockito:mockito-core:${mockitoVersion}")
+}
+
+
+
+// *****************   UNIT TESTS *****************
+test {
+
+    // explicitly include or exclude tests
+    exclude '**/*IntegrationShould.class'
+
+    testLogging {
+        events "PASSED", "FAILED", "SKIPPED"
+    }
+
+    jacoco {
+        append = false
+        destinationFile = file("$buildDir/jacoco/jacoco.exec")
+        classDumpFile = file("$buildDir/jacoco/classpathdumps")
+    }
+}
+
+// *****************   JAVADOC   *****************
+javadoc {
+    source = sourceSets.main.allJava
+    classpath = configurations.compile
+}
+
+// *****************   PUBLISH TO REPOSITORY   *****************
+// Calls javadoc plugin and creates jar with the generated docs
+task javadocJar(type: Jar) {
+    from javadoc
+    classifier 'javadoc'
+}
+
+
+
+// Calls java plugin and creates jar with the sources
+task sourceJar(type: Jar) {
+    from sourceSets.main.java
+    classifier 'sources'
+}
+
+publishing {
+    publications {
+        mavenJava(MavenPublication) {
+            // Publishes war or jar file depending on whether we are using the war plugin.
+            if (plugins.withType(WarPlugin)) {
+                from components.web
+            } else {
+                from components.java
+            }
+
+            // Publishes jar with sources
+            artifact sourceJar {
+                classifier 'sources'
+            }
+            // Publishes jar with javadoc
+            artifact javadocJar {
+                classifier 'javadoc'
+            }
+
+            // By default, Maven scope will be runtime. We want scope compile :/
+            pom.withXml {
+                asNode().dependencies.'*'.findAll() {
+                    it.scope.text() == 'runtime' && project.configurations.compile.allDependencies.find { dep ->
+                        dep.name == it.artifactId.text()
+                    }
+                }.each() {
+                    it.scope*.value = 'compile'
+                }
+            }
+        }
+    }
+    repositories {
+        maven {
+            credentials {
+                username project.artifactory_username
+                password project.artifactory_password
+            }
+
+            if(project.version.endsWith('-SNAPSHOT')) {
+                url 'http://artifactory/artifactory/libs-snapshot'
+            } else {
+                url 'http://artifactory/artifactory/libs-release'
+            }
+        }
+    }
+}
+
+// *****************   FATJAR / MANIFEST FILE  *****************
+jar {
+    manifest {
+        attributes('Main-Class': "$mainClassName",
+            'Implementation-Title': 'Kafka: my tools',
+            'Implementation-Version': theVersion,
+            'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"),
+            'Built-By': System.getProperty('user.name'),
+            'Built-JDK': System.getProperty('java.version')
+        )
+    }
+
+    from {
+        configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+    }
+}
diff --git a/Kafka/MyTools/gradle.properties b/Kafka/MyTools/gradle.properties
new file mode 100644 (file)
index 0000000..a0222da
--- /dev/null
@@ -0,0 +1,4 @@
+theGroup=de.example.kafka.tools
+theDescription=Kafka my tools
+theVersion=0.1.0-SNAPSHOT
+theName=kafka-my-tools
diff --git a/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar
new file mode 100644 (file)
index 0000000..6ffa237
Binary files /dev/null and b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties b/Kafka/MyTools/gradle/wrapper/gradle-wrapper.properties
new file mode 100644 (file)
index 0000000..fdb1b7c
--- /dev/null
@@ -0,0 +1,6 @@
+#Fri Feb 10 13:15:34 CET 2017
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip
diff --git a/Kafka/MyTools/gradlew b/Kafka/MyTools/gradlew
new file mode 100755 (executable)
index 0000000..9aa616c
--- /dev/null
@@ -0,0 +1,169 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+##  Gradle start up script for UN*X
+##
+##############################################################################
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >/dev/null
+APP_HOME="`pwd -P`"
+cd "$SAVED" >/dev/null
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+    echo "$*"
+}
+
+die ( ) {
+    echo
+    echo "$*"
+    echo
+    exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "`uname`" in
+  CYGWIN* )
+    cygwin=true
+    ;;
+  Darwin* )
+    darwin=true
+    ;;
+  MINGW* )
+    msys=true
+    ;;
+  NONSTOP* )
+    nonstop=true
+    ;;
+esac
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+        # IBM's JDK on AIX uses strange locations for the executables
+        JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+        JAVACMD="$JAVA_HOME/bin/java"
+    fi
+    if [ ! -x "$JAVACMD" ] ; then
+        die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+    fi
+else
+    JAVACMD="java"
+    which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
+    MAX_FD_LIMIT=`ulimit -H -n`
+    if [ $? -eq 0 ] ; then
+        if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+            MAX_FD="$MAX_FD_LIMIT"
+        fi
+        ulimit -n $MAX_FD
+        if [ $? -ne 0 ] ; then
+            warn "Could not set maximum file descriptor limit: $MAX_FD"
+        fi
+    else
+        warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+    fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+    GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+    APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+    CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+    JAVACMD=`cygpath --unix "$JAVACMD"`
+
+    # We build the pattern for arguments to be converted via cygpath
+    ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+    SEP=""
+    for dir in $ROOTDIRSRAW ; do
+        ROOTDIRS="$ROOTDIRS$SEP$dir"
+        SEP="|"
+    done
+    OURCYGPATTERN="(^($ROOTDIRS))"
+    # Add a user-defined pattern to the cygpath arguments
+    if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+        OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+    fi
+    # Now convert the arguments - kludge to limit ourselves to /bin/sh
+    i=0
+    for arg in "$@" ; do
+        CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+        CHECK2=`echo "$arg"|egrep -c "^-"`                                 ### Determine if an option
+
+        if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then                    ### Added a condition
+            eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+        else
+            eval `echo args$i`="\"$arg\""
+        fi
+        i=$((i+1))
+    done
+    case $i in
+        (0) set -- ;;
+        (1) set -- "$args0" ;;
+        (2) set -- "$args0" "$args1" ;;
+        (3) set -- "$args0" "$args1" "$args2" ;;
+        (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+        (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+        (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+        (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+        (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+        (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+    esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+    JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
+if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
+  cd "$(dirname "$0")"
+fi
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
diff --git a/Kafka/MyTools/gradlew.bat b/Kafka/MyTools/gradlew.bat
new file mode 100644 (file)
index 0000000..e95643d
--- /dev/null
@@ -0,0 +1,84 @@
+@if "%DEBUG%" == "" @echo off\r
+@rem ##########################################################################\r
+@rem\r
+@rem  Gradle startup script for Windows\r
+@rem\r
+@rem ##########################################################################\r
+\r
+@rem Set local scope for the variables with windows NT shell\r
+if "%OS%"=="Windows_NT" setlocal\r
+\r
+set DIRNAME=%~dp0\r
+if "%DIRNAME%" == "" set DIRNAME=.\r
+set APP_BASE_NAME=%~n0\r
+set APP_HOME=%DIRNAME%\r
+\r
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.\r
+set DEFAULT_JVM_OPTS=\r
+\r
+@rem Find java.exe\r
+if defined JAVA_HOME goto findJavaFromJavaHome\r
+\r
+set JAVA_EXE=java.exe\r
+%JAVA_EXE% -version >NUL 2>&1\r
+if "%ERRORLEVEL%" == "0" goto init\r
+\r
+echo.\r
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.\r
+echo.\r
+echo Please set the JAVA_HOME variable in your environment to match the\r
+echo location of your Java installation.\r
+\r
+goto fail\r
+\r
+:findJavaFromJavaHome\r
+set JAVA_HOME=%JAVA_HOME:"=%\r
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe\r
+\r
+if exist "%JAVA_EXE%" goto init\r
+\r
+echo.\r
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%\r
+echo.\r
+echo Please set the JAVA_HOME variable in your environment to match the\r
+echo location of your Java installation.\r
+\r
+goto fail\r
+\r
+:init\r
+@rem Get command-line arguments, handling Windows variants\r
+\r
+if not "%OS%" == "Windows_NT" goto win9xME_args\r
+\r
+:win9xME_args\r
+@rem Slurp the command line arguments.\r
+set CMD_LINE_ARGS=\r
+set _SKIP=2\r
+\r
+:win9xME_args_slurp\r
+if "x%~1" == "x" goto execute\r
+\r
+set CMD_LINE_ARGS=%*\r
+\r
+:execute\r
+@rem Setup the command line\r
+\r
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar\r
+\r
+@rem Execute Gradle\r
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%\r
+\r
+:end\r
+@rem End local scope for the variables with windows NT shell\r
+if "%ERRORLEVEL%"=="0" goto mainEnd\r
+\r
+:fail\r
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of\r
+rem the _cmd.exe /c_ return code!\r
+if  not "" == "%GRADLE_EXIT_CONSOLE%" exit 1\r
+exit /b 1\r
+\r
+:mainEnd\r
+if "%OS%"=="Windows_NT" endlocal\r
+\r
+:omega\r
diff --git a/Kafka/MyTools/settings.gradle b/Kafka/MyTools/settings.gradle
new file mode 100644 (file)
index 0000000..a809ba9
--- /dev/null
@@ -0,0 +1 @@
+rootProject.name = theName
diff --git a/Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java b/Kafka/MyTools/src/main/java/de/example/kafka/tools/OffsetManagement.java
new file mode 100644 (file)
index 0000000..2060e1b
--- /dev/null
@@ -0,0 +1,250 @@
+package de.example.kafka.tools;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import kafka.api.OffsetRequest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import kafka.admin.AdminClient;
+import kafka.admin.TopicCommand;
+import kafka.utils.ZkUtils;
+
+
+public class OffsetManagement {
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_ERROR = 1;
+
+    private static OptionSpec<String> bootstrapServerOption;
+    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpec<String> applicationIdOption;
+    private static OptionSpec<String> inputTopicsOption;
+    private static OptionSpec<Long> offsetOption;
+    private static OptionSpec<Integer> partitionOption;
+
+
+    private OptionSet options = null;
+    private final Properties consumerConfig = new Properties();
+    private final List<String> allTopics = new LinkedList<>();
+
+    public int run(final String[] args) {
+        return run(args, new Properties());
+    }
+
+    public int run(final String[] args, final Properties config) {
+        consumerConfig.clear();
+        consumerConfig.putAll(config);
+
+        int exitCode = EXIT_CODE_SUCCESS;
+
+        AdminClient adminClient = null;
+        ZkUtils zkUtils = null;
+        try {
+            parseArguments(args);
+
+            adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
+            final String groupId = this.options.valueOf(applicationIdOption);
+
+            zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            allTopics.clear();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+
+            resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+            deleteInternalTopics(zkUtils);
+        } catch (final Throwable e) {
+            exitCode = EXIT_CODE_ERROR;
+            System.err.println("ERROR: " + e.getMessage());
+        } finally {
+            if (adminClient != null) {
+                adminClient.close();
+            }
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+
+        return exitCode;
+    }
+
+    private void parseArguments(final String[] args) throws IOException {
+        final OptionParser optionParser = new OptionParser();
+        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("id")
+            .required();
+        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:9092")
+            .describedAs("urls");
+        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:2181")
+            .describedAs("url");
+        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+        offsetOption = optionParser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
+                .withRequiredArg()
+                .describedAs("consume offset")
+                .ofType(Long.class)
+                .defaultsTo(OffsetRequest.EarliestTime());
+        partitionOption = optionParser.accepts("partition", "The partition number. All partitions by default.")
+                .withRequiredArg()
+                .describedAs("partition number")
+                .ofType(Integer.class)
+                .defaultsTo(Integer.MIN_VALUE);
+
+        try {
+            options = optionParser.parse(args);
+        } catch (final OptionException e) {
+            optionParser.printHelpOn(System.err);
+            throw e;
+        }
+    }
+
+    private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+
+        if (inputTopics.size() == 0) {
+            System.out.println("No input or intermediate topics specified. Skipping seek.");
+            return;
+        } else {
+            if (inputTopics.size() != 0) {
+                System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+            }
+        }
+
+        final Properties config = new Properties();
+        config.putAll(consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size());
+        for (final String topic : inputTopics) {
+            if (!allTopics.contains(topic)) {
+                System.err.println("Input topic " + topic + " not found. Skipping.");
+            } else {
+                topicsToSubscribe.add(topic);
+            }
+        }
+        for (final String topic : allTopics) {
+            if (isInternalTopic(topic)) {
+                topicsToSubscribe.add(topic);
+            }
+        }
+
+        try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+            client.subscribe(topicsToSubscribe);
+            client.poll(1);
+
+            final Set<TopicPartition> partitions = client.assignment();
+            final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+
+            for (final TopicPartition p : partitions) {
+                final String topic = p.topic();
+                if (isInputTopic(topic) || isInternalTopic(topic)) {
+                    inputAndInternalTopicPartitions.add(p);
+                } else {
+                    System.err.println("Skipping invalid partition: " + p);
+                }
+            }
+
+            if (inputAndInternalTopicPartitions.size() > 0) {
+                client.seekToBeginning(inputAndInternalTopicPartitions);
+            }
+
+            Integer partition = options.valueOf(partitionOption);
+            if (partition == Integer.MIN_VALUE) {
+               
+               for (final TopicPartition p : partitions) {
+                       client.position(p);
+                       
+                       Long offset = options.valueOf(offsetOption);
+                       if (offset != OffsetRequest.EarliestTime()) {
+                               client.seek(p, options.valueOf(offsetOption));
+                       }
+               }
+               
+            } else {
+               for (final TopicPartition p : partitions) {
+                       
+                       if (partition == p.partition()) {
+                               client.position(p);
+                               
+                               Long offset = options.valueOf(offsetOption);
+                               if (offset != OffsetRequest.EarliestTime()) {
+                                       client.seek(p, options.valueOf(offsetOption));
+                               }
+                       }
+                       
+               }
+            }
+            client.commitSync();
+            
+            
+        } catch (final RuntimeException e) {
+            System.err.println("ERROR: Resetting offsets failed.");
+            throw e;
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInputTopic(final String topic) {
+        return options.valuesOf(inputTopicsOption).contains(topic);
+    }
+    
+    private void deleteInternalTopics(final ZkUtils zkUtils) {
+        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+
+        for (final String topic : allTopics) {
+            if (isInternalTopic(topic)) {
+                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
+                    "--zookeeper", options.valueOf(zookeeperOption),
+                    "--delete", "--topic", topic});
+                try {
+                    TopicCommand.deleteTopic(zkUtils, commandOptions);
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInternalTopic(final String topicName) {
+        return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
+            && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+    }
+
+    public static void main(final String[] args) {
+        System.exit(new OffsetManagement().run(args));
+    }
+
+}
+