rd_kafka_topic_t            *topic;
     rd_kafka_conf_t             *kafka_conf;
     rd_kafka_t                  *kafka;
-    int                          has_key;
-    uint32_t                     key;
+    char                        *key;
     char                        *prefix;
     char                        *postfix;
     char                         escape_char;
           user_data_t *ud)
 {
     int      status = 0;
-    uint32_t key;
+    void    *key;
+    size_t   keylen = 0;
     char     buffer[8192];
     size_t   bfree = sizeof(buffer);
     size_t   bfill = 0;
         return -1;
     }
 
-    /*
-     * We partition our stream by metric name
-     */
-    if (ctx->has_key)
-        key = ctx->key;
-    else
-        key = rand();
+    key = ctx->key;
+    if (key != NULL)
+        keylen = strlen (key);
 
     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
                      RD_KAFKA_MSG_F_COPY, buffer, blen,
-                     &key, sizeof(key), NULL);
+                     key, keylen, NULL);
 
     return status;
 } /* }}} int kafka_write */
             }
 
         } else if (strcasecmp ("Key", child->key) == 0)  {
-            char *tmp_buf = NULL;
-            status = cf_util_get_string(child, &tmp_buf);
-            if (status != 0) {
-                WARNING("write_kafka plugin: invalid key supplied");
-                break;
-            }
-
-            if (strcasecmp(tmp_buf, "Random") != 0) {
-                tctx->has_key = 1;
-                tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
-            }
-            sfree(tmp_buf);
-
+            cf_util_get_string (child, &tctx->key);
         } else if (strcasecmp ("Format", child->key) == 0) {
             status = cf_util_get_string(child, &key);
             if (status != 0)