Storage Driver: Ceph Object Storage (RADOS)

This driver implements the storagedriver.StorageDriver interface and
uses Ceph Object Storage as storage backend.

Since RADOS is an object storage and no hierarchy notion, the
following convention is used to keep the filesystem notions stored in
this backend:

* All the objects data are stored with opaque UUID names prefixed
  (e.g. "blob:d3d232ff-ab3a-4046-9ab7-930228d4c164).
* All the hierarchy information are stored in rados omaps, where the
  omap object identifier is the virtual directory name, the keys in
  a specific are the relative filenames and the values the blob
  object identifier (or empty value for a sub directory).

  e.g. For the following hierarchy:

     /directory1
     /directory1/object1
     /directory1/object2
     /directory1/directory2/object3

  The omap "/directory1" will contains the following key / values:
    - "object1" "blob:d3d232ff-ab3a-4046-9ab7-930228d4c164"
    - "object2" "blob:db2e359d-4af0-4bfb-ba1d-d2fd029866a0"
    - "directory2" ""

  The omap "/directory1/directory2" will contains:
    - "object3" "blob:9ae2371c-81fc-4945-80ac-8bf7f566a5d9"

* The MOVE is implemented by changing the reference to a specific
  blob in its parent virtual directory omap.

This driver stripes rados objects to a fixed size (e.g. 4M). The idea
is to keep small objects (as done by RBD on the top of RADOS) that
will be easily synchronized accross OSDs. The information of the
original object (i.e total size of the chunks) is stored as a Xattr
in the first chunk object.

Signed-off-by: Vincent Giersch <vincent.giersch@ovh.net>
This commit is contained in:
Vincent Giersch 2015-04-23 16:13:52 +00:00
parent 36d4e365aa
commit 394eea0231
17 changed files with 2517 additions and 4 deletions

5
Godeps/Godeps.json generated
View File

@ -88,6 +88,11 @@
"ImportPath": "github.com/jlhawn/go-crypto",
"Rev": "cd738dde20f0b3782516181b0866c9bb9db47401"
},
{
"ImportPath": "github.com/noahdesu/go-ceph/rados",
"Comment": "v.0.3.0-29-gb15639c",
"Rev": "b15639c44c05368348355229070361395d9152ee"
},
{
"ImportPath": "github.com/yvasiyarov/go-metrics",
"Rev": "57bccd1ccd43f94bb17fdd8bf3007059b802f85e"

View File

@ -0,0 +1,300 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import "unsafe"
import "bytes"
// ClusterStat represents Ceph cluster statistics.
type ClusterStat struct {
Kb uint64
Kb_used uint64
Kb_avail uint64
Num_objects uint64
}
// Conn is a connection handle to a Ceph cluster.
type Conn struct {
cluster C.rados_t
}
// PingMonitor sends a ping to a monitor and returns the reply.
func (c *Conn) PingMonitor(id string) (string, error) {
c_id := C.CString(id)
defer C.free(unsafe.Pointer(c_id))
var strlen C.size_t
var strout *C.char
ret := C.rados_ping_monitor(c.cluster, c_id, &strout, &strlen)
defer C.rados_buffer_free(strout)
if ret == 0 {
reply := C.GoStringN(strout, (C.int)(strlen))
return reply, nil
} else {
return "", RadosError(int(ret))
}
}
// Connect establishes a connection to a RADOS cluster. It returns an error,
// if any.
func (c *Conn) Connect() error {
ret := C.rados_connect(c.cluster)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Shutdown disconnects from the cluster.
func (c *Conn) Shutdown() {
C.rados_shutdown(c.cluster)
}
// ReadConfigFile configures the connection using a Ceph configuration file.
func (c *Conn) ReadConfigFile(path string) error {
c_path := C.CString(path)
defer C.free(unsafe.Pointer(c_path))
ret := C.rados_conf_read_file(c.cluster, c_path)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// ReadDefaultConfigFile configures the connection using a Ceph configuration
// file located at default locations.
func (c *Conn) ReadDefaultConfigFile() error {
ret := C.rados_conf_read_file(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
func (c *Conn) OpenIOContext(pool string) (*IOContext, error) {
c_pool := C.CString(pool)
defer C.free(unsafe.Pointer(c_pool))
ioctx := &IOContext{}
ret := C.rados_ioctx_create(c.cluster, c_pool, &ioctx.ioctx)
if ret == 0 {
return ioctx, nil
} else {
return nil, RadosError(int(ret))
}
}
// ListPools returns the names of all existing pools.
func (c *Conn) ListPools() (names []string, err error) {
buf := make([]byte, 4096)
for {
ret := int(C.rados_pool_list(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
if ret < 0 {
return nil, RadosError(int(ret))
}
if ret > len(buf) {
buf = make([]byte, ret)
continue
}
tmp := bytes.SplitAfter(buf[:ret-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
names = append(names, name)
}
}
return names, nil
}
}
// SetConfigOption sets the value of the configuration option identified by
// the given name.
func (c *Conn) SetConfigOption(option, value string) error {
c_opt, c_val := C.CString(option), C.CString(value)
defer C.free(unsafe.Pointer(c_opt))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_conf_set(c.cluster, c_opt, c_val)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetConfigOption returns the value of the Ceph configuration option
// identified by the given name.
func (c *Conn) GetConfigOption(name string) (value string, err error) {
buf := make([]byte, 4096)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_conf_get(c.cluster, c_name,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: ret may be -ENAMETOOLONG if the buffer is not large enough. We
// can handle this case, but we need a reliable way to test for
// -ENAMETOOLONG constant. Will the syscall/Errno stuff in Go help?
if ret == 0 {
value = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return value, nil
} else {
return "", RadosError(ret)
}
}
// WaitForLatestOSDMap blocks the caller until the latest OSD map has been
// retrieved.
func (c *Conn) WaitForLatestOSDMap() error {
ret := C.rados_wait_for_latest_osdmap(c.cluster)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetClusterStat returns statistics about the cluster associated with the
// connection.
func (c *Conn) GetClusterStats() (stat ClusterStat, err error) {
c_stat := C.struct_rados_cluster_stat_t{}
ret := C.rados_cluster_stat(c.cluster, &c_stat)
if ret < 0 {
return ClusterStat{}, RadosError(int(ret))
} else {
return ClusterStat{
Kb: uint64(c_stat.kb),
Kb_used: uint64(c_stat.kb_used),
Kb_avail: uint64(c_stat.kb_avail),
Num_objects: uint64(c_stat.num_objects),
}, nil
}
}
// ParseCmdLineArgs configures the connection from command line arguments.
func (c *Conn) ParseCmdLineArgs(args []string) error {
// add an empty element 0 -- Ceph treats the array as the actual contents
// of argv and skips the first element (the executable name)
argc := C.int(len(args) + 1)
argv := make([]*C.char, argc)
// make the first element a string just in case it is ever examined
argv[0] = C.CString("placeholder")
defer C.free(unsafe.Pointer(argv[0]))
for i, arg := range args {
argv[i+1] = C.CString(arg)
defer C.free(unsafe.Pointer(argv[i+1]))
}
ret := C.rados_conf_parse_argv(c.cluster, argc, &argv[0])
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// ParseDefaultConfigEnv configures the connection from the default Ceph
// environment variable(s).
func (c *Conn) ParseDefaultConfigEnv() error {
ret := C.rados_conf_parse_env(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// GetFSID returns the fsid of the cluster as a hexadecimal string. The fsid
// is a unique identifier of an entire Ceph cluster.
func (c *Conn) GetFSID() (fsid string, err error) {
buf := make([]byte, 37)
ret := int(C.rados_cluster_fsid(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: the success case isn't documented correctly in librados.h
if ret == 36 {
fsid = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return fsid, nil
} else {
return "", RadosError(int(ret))
}
}
// GetInstanceID returns a globally unique identifier for the cluster
// connection instance.
func (c *Conn) GetInstanceID() uint64 {
// FIXME: are there any error cases for this?
return uint64(C.rados_get_instance_id(c.cluster))
}
// MakePool creates a new pool with default settings.
func (c *Conn) MakePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_create(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// DeletePool deletes a pool and all the data inside the pool.
func (c *Conn) DeletePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_delete(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// MonCommand sends a command to one of the monitors
func (c *Conn) MonCommand(args []byte) (buffer []byte, info string, err error) {
argv := make([]*C.char, len(args))
for i, _ := range args {
argv[i] = (*C.char)(unsafe.Pointer(&args[i]))
}
var (
outs, outbuf *C.char
outslen, outbuflen C.size_t
)
inbuf := C.CString("")
defer C.free(unsafe.Pointer(inbuf))
ret := C.rados_mon_command(c.cluster,
&argv[0], C.size_t(len(args)),
inbuf, // bulk input (e.g. crush map)
C.size_t(0), // length inbuf
&outbuf, // buffer
&outbuflen, // buffer length
&outs, // status string
&outslen)
if outslen > 0 {
info = C.GoStringN(outs, C.int(outslen))
C.free(unsafe.Pointer(outs))
}
if outbuflen > 0 {
buffer = C.GoBytes(unsafe.Pointer(outbuf), C.int(outbuflen))
C.free(unsafe.Pointer(outbuf))
}
if ret != 0 {
err = RadosError(int(ret))
return nil, info, err
}
return
}

View File

@ -0,0 +1,4 @@
/*
Set of wrappers around librados API.
*/
package rados

View File

@ -0,0 +1,547 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import "unsafe"
import "time"
// PoolStat represents Ceph pool statistics.
type PoolStat struct {
// space used in bytes
Num_bytes uint64
// space used in KB
Num_kb uint64
// number of objects in the pool
Num_objects uint64
// number of clones of objects
Num_object_clones uint64
// num_objects * num_replicas
Num_object_copies uint64
Num_objects_missing_on_primary uint64
// number of objects found on no OSDs
Num_objects_unfound uint64
// number of objects replicated fewer times than they should be
// (but found on at least one OSD)
Num_objects_degraded uint64
Num_rd uint64
Num_rd_kb uint64
Num_wr uint64
Num_wr_kb uint64
}
// ObjectStat represents an object stat information
type ObjectStat struct {
// current length in bytes
Size uint64
// last modification time
ModTime time.Time
}
// IOContext represents a context for performing I/O within a pool.
type IOContext struct {
ioctx C.rados_ioctx_t
}
// Pointer returns a uintptr representation of the IOContext.
func (ioctx *IOContext) Pointer() uintptr {
return uintptr(ioctx.ioctx)
}
// Write writes len(data) bytes to the object with key oid starting at byte
// offset offset. It returns an error, if any.
func (ioctx *IOContext) Write(oid string, data []byte, offset uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_write(ioctx.ioctx, c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Read reads up to len(data) bytes from the object with key oid starting at byte
// offset offset. It returns the number of bytes read and an error, if any.
func (ioctx *IOContext) Read(oid string, data []byte, offset uint64) (int, error) {
if len(data) == 0 {
return 0, nil
}
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_read(
ioctx.ioctx,
c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
if ret >= 0 {
return int(ret), nil
} else {
return 0, RadosError(int(ret))
}
}
// Delete deletes the object with key oid. It returns an error, if any.
func (ioctx *IOContext) Delete(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_remove(ioctx.ioctx, c_oid)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Truncate resizes the object with key oid to size size. If the operation
// enlarges the object, the new area is logically filled with zeroes. If the
// operation shrinks the object, the excess data is removed. It returns an
// error, if any.
func (ioctx *IOContext) Truncate(oid string, size uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_trunc(ioctx.ioctx, c_oid, (C.uint64_t)(size))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Destroy informs librados that the I/O context is no longer in use.
// Resources associated with the context may not be freed immediately, and the
// context should not be used again after calling this method.
func (ioctx *IOContext) Destroy() {
C.rados_ioctx_destroy(ioctx.ioctx)
}
// Stat returns a set of statistics about the pool associated with this I/O
// context.
func (ioctx *IOContext) GetPoolStats() (stat PoolStat, err error) {
c_stat := C.struct_rados_pool_stat_t{}
ret := C.rados_ioctx_pool_stat(ioctx.ioctx, &c_stat)
if ret < 0 {
return PoolStat{}, RadosError(int(ret))
} else {
return PoolStat{
Num_bytes: uint64(c_stat.num_bytes),
Num_kb: uint64(c_stat.num_kb),
Num_objects: uint64(c_stat.num_objects),
Num_object_clones: uint64(c_stat.num_object_clones),
Num_object_copies: uint64(c_stat.num_object_copies),
Num_objects_missing_on_primary: uint64(c_stat.num_objects_missing_on_primary),
Num_objects_unfound: uint64(c_stat.num_objects_unfound),
Num_objects_degraded: uint64(c_stat.num_objects_degraded),
Num_rd: uint64(c_stat.num_rd),
Num_rd_kb: uint64(c_stat.num_rd_kb),
Num_wr: uint64(c_stat.num_wr),
Num_wr_kb: uint64(c_stat.num_wr_kb),
}, nil
}
}
// GetPoolName returns the name of the pool associated with the I/O context.
func (ioctx *IOContext) GetPoolName() (name string, err error) {
buf := make([]byte, 128)
for {
ret := C.rados_ioctx_get_pool_name(ioctx.ioctx,
(*C.char)(unsafe.Pointer(&buf[0])), C.unsigned(len(buf)))
if ret == -34 { // FIXME
buf = make([]byte, len(buf)*2)
continue
} else if ret < 0 {
return "", RadosError(ret)
}
name = C.GoStringN((*C.char)(unsafe.Pointer(&buf[0])), ret)
return name, nil
}
}
// ObjectListFunc is the type of the function called for each object visited
// by ListObjects.
type ObjectListFunc func(oid string)
// ListObjects lists all of the objects in the pool associated with the I/O
// context, and called the provided listFn function for each object, passing
// to the function the name of the object.
func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error {
var ctx C.rados_list_ctx_t
ret := C.rados_objects_list_open(ioctx.ioctx, &ctx)
if ret < 0 {
return RadosError(ret)
}
defer func() { C.rados_objects_list_close(ctx) }()
for {
var c_entry *C.char
ret := C.rados_objects_list_next(ctx, &c_entry, nil)
if ret == -2 { // FIXME
return nil
} else if ret < 0 {
return RadosError(ret)
}
listFn(C.GoString(c_entry))
}
panic("invalid state")
}
// Stat returns the size of the object and its last modification time
func (ioctx *IOContext) Stat(object string) (stat ObjectStat, err error) {
var c_psize C.uint64_t
var c_pmtime C.time_t
c_object := C.CString(object)
defer C.free(unsafe.Pointer(c_object))
ret := C.rados_stat(
ioctx.ioctx,
c_object,
&c_psize,
&c_pmtime)
if ret < 0 {
return ObjectStat{}, RadosError(int(ret))
} else {
return ObjectStat{
Size: uint64(c_psize),
ModTime: time.Unix(int64(c_pmtime), 0),
}, nil
}
}
// GetXattr gets an xattr with key `name`, it returns the length of
// the key read or an error if not successful
func (ioctx *IOContext) GetXattr(object string, name string, data []byte) (int, error) {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_getxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
if ret >= 0 {
return int(ret), nil
} else {
return 0, RadosError(int(ret))
}
}
// Sets an xattr for an object with key `name` with value as `data`
func (ioctx *IOContext) SetXattr(object string, name string, data []byte) error {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_setxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// function that lists all the xattrs for an object, since xattrs are
// a k-v pair, this function returns a map of k-v pairs on
// success, error code on failure
func (ioctx *IOContext) ListXattrs(oid string) (map[string][]byte, error) {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var it C.rados_xattrs_iter_t
ret := C.rados_getxattrs(ioctx.ioctx, c_oid, &it)
if ret < 0 {
return nil, RadosError(ret)
}
defer func() { C.rados_getxattrs_end(it) }()
m := make(map[string][]byte)
for {
var c_name, c_val *C.char
var c_len C.size_t
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_getxattrs_next(it, &c_name, &c_val, &c_len)
if ret < 0 {
return nil, RadosError(int(ret))
}
// rados api returns a null name,val & 0-length upon
// end of iteration
if c_name == nil {
return m, nil // stop iteration
}
m[C.GoString(c_name)] = C.GoBytes(unsafe.Pointer(c_val), (C.int)(c_len))
}
}
// Remove an xattr with key `name` from object `oid`
func (ioctx *IOContext) RmXattr(oid string, name string) error {
c_oid := C.CString(oid)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_rmxattr(
ioctx.ioctx,
c_oid,
c_name)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Append the map `pairs` to the omap `oid`
func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var s C.size_t
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_values := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_lengths := C.malloc(C.size_t(len(pairs)) * C.size_t(unsafe.Sizeof(s)))
defer C.free(unsafe.Pointer(c_keys))
defer C.free(unsafe.Pointer(c_values))
defer C.free(unsafe.Pointer(c_lengths))
i := 0
for key, value := range pairs {
// key
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
// value and its length
c_value_ptr := (**C.char)(unsafe.Pointer(uintptr(c_values) + uintptr(i) * ptrSize))
var c_length C.size_t
if len(value) > 0 {
*c_value_ptr = (*C.char)(unsafe.Pointer(&value[0]))
c_length = C.size_t(len(value))
} else {
*c_value_ptr = nil
c_length = C.size_t(0)
}
c_length_ptr := (*C.size_t)(unsafe.Pointer(uintptr(c_lengths) + uintptr(i) * ptrSize))
*c_length_ptr = c_length
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_set(
op,
(**C.char)(c_keys),
(**C.char)(c_values),
(*C.size_t)(c_lengths),
C.size_t(len(pairs)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// OmapListFunc is the type of the function called for each omap key
// visited by ListOmapValues
type OmapListFunc func(key string, value []byte)
// Iterate on a set of keys and their values from an omap
// `startAfter`: iterate only on the keys after this specified one
// `filterPrefix`: iterate only on the keys beginning with this prefix
// `maxReturn`: iterate no more than `maxReturn` key/value pairs
// `listFn`: the function called at each iteration
func (ioctx *IOContext) ListOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64, listFn OmapListFunc) error {
c_oid := C.CString(oid)
c_start_after := C.CString(startAfter)
c_filter_prefix := C.CString(filterPrefix)
c_max_return := C.uint64_t(maxReturn)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_start_after))
defer C.free(unsafe.Pointer(c_filter_prefix))
op := C.rados_create_read_op()
var c_iter C.rados_omap_iter_t
var c_prval C.int
C.rados_read_op_omap_get_vals(
op,
c_start_after,
c_filter_prefix,
c_max_return,
&c_iter,
&c_prval,
)
ret := C.rados_read_op_operate(op, ioctx.ioctx, c_oid, 0)
if int(c_prval) != 0 {
return RadosError(int(c_prval))
} else if int(ret) != 0 {
return RadosError(int(ret))
}
for {
var c_key *C.char
var c_val *C.char
var c_len C.size_t
ret = C.rados_omap_get_next(c_iter, &c_key, &c_val, &c_len)
if int(ret) != 0 {
return RadosError(int(ret))
}
if c_key == nil {
break
}
listFn(C.GoString(c_key), C.GoBytes(unsafe.Pointer(c_val), C.int(c_len)))
}
C.rados_omap_get_end(c_iter)
C.rados_release_read_op(op)
return nil
}
// Fetch a set of keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `maxReturn`: retrieve no more than `maxReturn` key/value pairs
func (ioctx *IOContext) GetOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64) (map[string][]byte, error) {
omap := map[string][]byte{}
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, maxReturn,
func(key string, value []byte) {
omap[key] = value
},
)
return omap, err
}
// Fetch all the keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `iteratorSize`: internal number of keys to fetch during a read operation
func (ioctx *IOContext) GetAllOmapValues(oid string, startAfter string, filterPrefix string, iteratorSize int64) (map[string][]byte, error) {
omap := map[string][]byte{}
omapSize := 0
for {
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, iteratorSize,
func (key string, value []byte) {
omap[key] = value
startAfter = key
},
)
if err != nil {
return omap, err
}
// End of omap
if len(omap) == omapSize {
break
}
omapSize = len(omap)
}
return omap, nil
}
// Remove the specified `keys` from the omap `oid`
func (ioctx *IOContext) RmOmapKeys(oid string, keys []string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(keys)) * C.size_t(ptrSize))
defer C.free(unsafe.Pointer(c_keys))
i := 0
for _, key := range keys {
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_rm_keys(
op,
(**C.char)(c_keys),
C.size_t(len(keys)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Clear the omap `oid`
func (ioctx *IOContext) CleanOmap(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
op := C.rados_create_write_op()
C.rados_write_op_omap_clear(op)
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}

View File

@ -0,0 +1,54 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import (
"fmt"
"unsafe"
)
type RadosError int
func (e RadosError) Error() string {
return fmt.Sprintf("rados: ret=%d", e)
}
// Version returns the major, minor, and patch components of the version of
// the RADOS library linked against.
func Version() (int, int, int) {
var c_major, c_minor, c_patch C.int
C.rados_version(&c_major, &c_minor, &c_patch)
return int(c_major), int(c_minor), int(c_patch)
}
// NewConn creates a new connection object. It returns the connection and an
// error, if any.
func NewConn() (*Conn, error) {
conn := &Conn{}
ret := C.rados_create(&conn.cluster, nil)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}
// NewConnWithUser creates a new connection object with a custom username.
// It returns the connection and an error, if any.
func NewConnWithUser(user string) (*Conn, error) {
c_user := C.CString(user)
defer C.free(unsafe.Pointer(c_user))
conn := &Conn{}
ret := C.rados_create(&conn.cluster, c_user)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}

View File

@ -0,0 +1,703 @@
package rados_test
import "testing"
//import "bytes"
import "github.com/noahdesu/go-ceph/rados"
import "github.com/stretchr/testify/assert"
import "os"
import "os/exec"
import "io"
import "io/ioutil"
import "time"
import "net"
import "fmt"
import "sort"
import "encoding/json"
func GetUUID() string {
out, _ := exec.Command("uuidgen").Output()
return string(out[:36])
}
func TestVersion(t *testing.T) {
var major, minor, patch = rados.Version()
assert.False(t, major < 0 || major > 1000, "invalid major")
assert.False(t, minor < 0 || minor > 1000, "invalid minor")
assert.False(t, patch < 0 || patch > 1000, "invalid patch")
}
func TestGetSetConfigOption(t *testing.T) {
conn, _ := rados.NewConn()
// rejects invalid options
err := conn.SetConfigOption("wefoijweojfiw", "welfkwjelkfj")
assert.Error(t, err, "Invalid option")
// verify SetConfigOption changes a values
log_file_val, err := conn.GetConfigOption("log_file")
assert.NotEqual(t, log_file_val, "/dev/null")
err = conn.SetConfigOption("log_file", "/dev/null")
assert.NoError(t, err, "Invalid option")
log_file_val, err = conn.GetConfigOption("log_file")
assert.Equal(t, log_file_val, "/dev/null")
}
func TestParseDefaultConfigEnv(t *testing.T) {
conn, _ := rados.NewConn()
log_file_val, _ := conn.GetConfigOption("log_file")
assert.NotEqual(t, log_file_val, "/dev/null")
err := os.Setenv("CEPH_ARGS", "--log-file /dev/null")
assert.NoError(t, err)
err = conn.ParseDefaultConfigEnv()
assert.NoError(t, err)
log_file_val, _ = conn.GetConfigOption("log_file")
assert.Equal(t, log_file_val, "/dev/null")
}
func TestParseCmdLineArgs(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
mon_host_val, _ := conn.GetConfigOption("mon_host")
assert.NotEqual(t, mon_host_val, "1.1.1.1")
args := []string{"--mon-host", "1.1.1.1"}
err := conn.ParseCmdLineArgs(args)
assert.NoError(t, err)
mon_host_val, _ = conn.GetConfigOption("mon_host")
assert.Equal(t, mon_host_val, "1.1.1.1")
}
func TestGetClusterStats(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
// grab current stats
prev_stat, err := conn.GetClusterStats()
fmt.Printf("prev_stat: %+v\n", prev_stat)
assert.NoError(t, err)
// make some changes to the cluster
buf := make([]byte, 1<<20)
for i := 0; i < 10; i++ {
objname := GetUUID()
pool.Write(objname, buf, 0)
}
// wait a while for the stats to change
for i := 0; i < 30; i++ {
stat, err := conn.GetClusterStats()
assert.NoError(t, err)
// wait for something to change
if stat == prev_stat {
fmt.Printf("curr_stat: %+v (trying again...)\n", stat)
time.Sleep(time.Second)
} else {
// success
fmt.Printf("curr_stat: %+v (change detected)\n", stat)
conn.Shutdown()
return
}
}
pool.Destroy()
conn.Shutdown()
t.Error("Cluster stats aren't changing")
}
func TestGetFSID(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
fsid, err := conn.GetFSID()
assert.NoError(t, err)
assert.NotEqual(t, fsid, "")
conn.Shutdown()
}
func TestGetInstanceID(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
id := conn.GetInstanceID()
assert.NotEqual(t, id, 0)
conn.Shutdown()
}
func TestMakeDeletePool(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// get current list of pool
pools, err := conn.ListPools()
assert.NoError(t, err)
// check that new pool name is unique
new_name := GetUUID()
for _, poolname := range pools {
if new_name == poolname {
t.Error("Random pool name exists!")
return
}
}
// create pool
err = conn.MakePool(new_name)
assert.NoError(t, err)
// get updated list of pools
pools, err = conn.ListPools()
assert.NoError(t, err)
// verify that the new pool name exists
found := false
for _, poolname := range pools {
if new_name == poolname {
found = true
}
}
if !found {
t.Error("Cannot find newly created pool")
}
// delete the pool
err = conn.DeletePool(new_name)
assert.NoError(t, err)
// verify that it is gone
// get updated list of pools
pools, err = conn.ListPools()
assert.NoError(t, err)
// verify that the new pool name exists
found = false
for _, poolname := range pools {
if new_name == poolname {
found = true
}
}
if found {
t.Error("Deleted pool still exists")
}
conn.Shutdown()
}
func TestPingMonitor(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// mon id that should work with vstart.sh
reply, err := conn.PingMonitor("a")
if err == nil {
assert.NotEqual(t, reply, "")
return
}
// mon id that should work with micro-osd.sh
reply, err = conn.PingMonitor("0")
if err == nil {
assert.NotEqual(t, reply, "")
return
}
// try to use a hostname as the monitor id
mon_addr, _ := conn.GetConfigOption("mon_host")
hosts, _ := net.LookupAddr(mon_addr)
for _, host := range hosts {
reply, err := conn.PingMonitor(host)
if err == nil {
assert.NotEqual(t, reply, "")
return
}
}
t.Error("Could not find a valid monitor id")
conn.Shutdown()
}
func TestReadConfigFile(t *testing.T) {
conn, _ := rados.NewConn()
// check current log_file value
log_file_val, err := conn.GetConfigOption("log_file")
assert.NoError(t, err)
assert.NotEqual(t, log_file_val, "/dev/null")
// create a temporary ceph.conf file that changes the log_file conf
// option.
file, err := ioutil.TempFile("/tmp", "go-rados")
assert.NoError(t, err)
_, err = io.WriteString(file, "[global]\nlog_file = /dev/null\n")
assert.NoError(t, err)
// parse the config file
err = conn.ReadConfigFile(file.Name())
assert.NoError(t, err)
// check current log_file value
log_file_val, err = conn.GetConfigOption("log_file")
assert.NoError(t, err)
assert.Equal(t, log_file_val, "/dev/null")
// cleanup
file.Close()
os.Remove(file.Name())
}
func TestWaitForLatestOSDMap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
err := conn.WaitForLatestOSDMap()
assert.NoError(t, err)
conn.Shutdown()
}
func TestReadWrite(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
bytes_out := make([]byte, len(bytes_in))
n_out, err := pool.Read("obj", bytes_out, 0)
assert.Equal(t, n_out, len(bytes_in))
assert.Equal(t, bytes_in, bytes_out)
pool.Destroy()
}
func TestObjectStat(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
stat, err := pool.Stat("obj")
assert.Equal(t, uint64(len(bytes_in)), stat.Size)
assert.NotNil(t, stat.ModTime)
pool.Destroy()
conn.Shutdown()
}
func TestGetPoolStats(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
// grab current stats
prev_stat, err := pool.GetPoolStats()
fmt.Printf("prev_stat: %+v\n", prev_stat)
assert.NoError(t, err)
// make some changes to the cluster
buf := make([]byte, 1<<20)
for i := 0; i < 10; i++ {
objname := GetUUID()
pool.Write(objname, buf, 0)
}
// wait a while for the stats to change
for i := 0; i < 30; i++ {
stat, err := pool.GetPoolStats()
assert.NoError(t, err)
// wait for something to change
if stat == prev_stat {
fmt.Printf("curr_stat: %+v (trying again...)\n", stat)
time.Sleep(time.Second)
} else {
// success
fmt.Printf("curr_stat: %+v (change detected)\n", stat)
conn.Shutdown()
return
}
}
pool.Destroy()
conn.Shutdown()
t.Error("Pool stats aren't changing")
}
func TestGetPoolName(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
poolname_ret, err := ioctx.GetPoolName()
assert.NoError(t, err)
assert.Equal(t, poolname, poolname_ret)
ioctx.Destroy()
conn.Shutdown()
}
func TestMonCommand(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
command, err := json.Marshal(map[string]string{"prefix": "df", "format": "json"})
assert.NoError(t, err)
buf, info, err := conn.MonCommand(command)
assert.NoError(t, err)
assert.Equal(t, info, "")
var message map[string]interface{}
err = json.Unmarshal(buf, &message)
assert.NoError(t, err)
conn.Shutdown()
}
func TestObjectIterator(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
objectList := []string{}
err = ioctx.ListObjects(func(oid string) {
objectList = append(objectList, oid)
})
assert.NoError(t, err)
assert.True(t, len(objectList) == 0)
createdList := []string{}
for i := 0; i < 200; i++ {
oid := GetUUID()
bytes_in := []byte("input data")
err = ioctx.Write(oid, bytes_in, 0)
assert.NoError(t, err)
createdList = append(createdList, oid)
}
assert.True(t, len(createdList) == 200)
err = ioctx.ListObjects(func(oid string) {
objectList = append(objectList, oid)
})
assert.NoError(t, err)
assert.Equal(t, len(objectList), len(createdList))
sort.Strings(objectList)
sort.Strings(createdList)
assert.Equal(t, objectList, createdList)
}
func TestNewConnWithUser(t *testing.T) {
_, err := rados.NewConnWithUser("admin")
assert.Equal(t, err, nil)
}
func TestReadWriteXattr(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
my_xattr_in := []byte("my_value")
err = pool.SetXattr("obj", "my_key", my_xattr_in)
assert.NoError(t, err)
my_xattr_out := make([]byte, len(my_xattr_in))
n_out, err := pool.GetXattr("obj", "my_key", my_xattr_out)
assert.Equal(t, n_out, len(my_xattr_in))
assert.Equal(t, my_xattr_in, my_xattr_out)
pool.Destroy()
}
func TestListXattrs(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
input_xattrs := make(map[string][]byte)
for i := 0; i < 200; i++ {
name := fmt.Sprintf("key_%d", i)
data := []byte(GetUUID())
err = pool.SetXattr("obj", name, data)
assert.NoError(t, err)
input_xattrs[name] = data
}
output_xattrs := make(map[string][]byte)
output_xattrs, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(input_xattrs), len(output_xattrs))
assert.Equal(t, input_xattrs, output_xattrs)
pool.Destroy()
}
func TestRmXattr(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
key := "key1"
val := []byte("val1")
err = pool.SetXattr("obj", key, val)
assert.NoError(t, err)
key = "key2"
val = []byte("val2")
err = pool.SetXattr("obj", key, val)
assert.NoError(t, err)
xattr_list := make(map[string][]byte)
xattr_list, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(xattr_list), 2)
pool.RmXattr("obj", "key2")
xattr_list, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(xattr_list), 1)
found := false
for key, _ = range xattr_list {
if key == "key2" {
found = true
}
}
if found {
t.Error("Deleted pool still exists")
}
pool.Destroy()
}
func TestReadWriteOmap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
// Set
orig := map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
"prefixed-key3": []byte("value3"),
"empty": []byte(""),
}
err = pool.SetOmap("obj", orig)
assert.NoError(t, err)
// List
remaining := map[string][]byte{}
for k, v := range orig {
remaining[k] = v
}
err = pool.ListOmapValues("obj", "", "", 4, func(key string, value []byte) {
assert.Equal(t, remaining[key], value)
delete(remaining, key)
})
assert.NoError(t, err)
assert.Equal(t, 0, len(remaining))
// Get (with a fixed number of keys)
fetched, err := pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Get All (with an iterator size bigger than the map size)
fetched, err = pool.GetAllOmapValues("obj", "", "", 100)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Get All (with an iterator size smaller than the map size)
fetched, err = pool.GetAllOmapValues("obj", "", "", 1)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Remove
err = pool.RmOmapKeys("obj", []string{"key1", "prefixed-key3"})
assert.NoError(t, err)
fetched, err = pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"key2": []byte("value2"),
"empty": []byte(""),
}, fetched)
// Clear
err = pool.CleanOmap("obj")
assert.NoError(t, err)
fetched, err = pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{}, fetched)
pool.Destroy()
}
func TestReadFilterOmap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
orig := map[string][]byte{
"key1": []byte("value1"),
"prefixed-key3": []byte("value3"),
"key2": []byte("value2"),
}
err = pool.SetOmap("obj", orig)
assert.NoError(t, err)
// filter by prefix
fetched, err := pool.GetOmapValues("obj", "", "prefixed", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"prefixed-key3": []byte("value3"),
}, fetched)
// "start_after" a key
fetched, err = pool.GetOmapValues("obj", "key1", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"prefixed-key3": []byte("value3"),
"key2": []byte("value2"),
}, fetched)
// maxReturn
fetched, err = pool.GetOmapValues("obj", "", "key", 1)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"key1": []byte("value1"),
}, fetched)
pool.Destroy()
}

View File

@ -18,7 +18,7 @@ version/version.go:
${PREFIX}/bin/registry: version/version.go $(shell find . -type f -name '*.go')
@echo "+ $@"
@go build -o $@ ${GO_LDFLAGS} ./cmd/registry
@go build -tags "${DOCKER_BUILDTAGS}" -o $@ ${GO_LDFLAGS} ./cmd/registry
${PREFIX}/bin/registry-api-descriptor-template: version/version.go $(shell find . -type f -name '*.go')
@echo "+ $@"
@ -46,11 +46,11 @@ lint:
build:
@echo "+ $@"
@go build -v ${GO_LDFLAGS} ./...
@go build -tags "${DOCKER_BUILDTAGS}" -v ${GO_LDFLAGS} ./...
test:
@echo "+ $@"
@go test -test.short ./...
@go test -test.short -tags "${DOCKER_BUILDTAGS}" ./...
test-full:
@echo "+ $@"

View File

@ -3,6 +3,9 @@ machine:
pre:
# Install gvm
- bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/1.0.22/binscripts/gvm-installer)
# Install ceph to test rados driver & create pool
- sudo -i ~/distribution/contrib/ceph/ci-setup.sh
- ceph osd pool create docker-distribution 1
post:
# Install many go versions
@ -18,8 +21,11 @@ machine:
BASE_OLD: ../../../$HOME/.gvm/pkgsets/old/global/$BASE_DIR
BASE_STABLE: ../../../$HOME/.gvm/pkgsets/stable/global/$BASE_DIR
# BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR
DOCKER_BUILDTAGS: "include_rados"
# Workaround Circle parsing dumb bugs and/or YAML wonkyness
CIRCLE_PAIN: "mode: set"
# Ceph config
RADOS_POOL: "docker-distribution"
hosts:
# Not used yet
@ -95,7 +101,7 @@ test:
- gvm use stable; go list ./... | xargs -L 1 -I{} rm -f $GOPATH/src/{}/coverage.out:
pwd: $BASE_STABLE
- gvm use stable; go list ./... | xargs -L 1 -I{} godep go test -test.short -coverprofile=$GOPATH/src/{}/coverage.out {}:
- gvm use stable; go list ./... | xargs -L 1 -I{} godep go test -tags "$DOCKER_BUILDTAGS" -test.short -coverprofile=$GOPATH/src/{}/coverage.out {}:
timeout: 600
pwd: $BASE_STABLE

5
cmd/registry/rados.go Normal file
View File

@ -0,0 +1,5 @@
// +build include_rados
package main
import _ "github.com/docker/distribution/registry/storage/driver/rados"

119
contrib/ceph/ci-setup.sh Executable file
View File

@ -0,0 +1,119 @@
#! /bin/bash
#
# Ceph cluster setup in Circle CI
#
set -x
set -e
set -u
NODE=$(hostname)
CEPHDIR=/tmp/ceph
mkdir cluster
pushd cluster
# Install
retries=0
until [ $retries -ge 5 ]; do
pip install ceph-deploy && break
retries=$[$retries+1]
sleep 30
done
retries=0
until [ $retries -ge 5 ]; do
ceph-deploy install --release hammer $NODE && break
retries=$[$retries+1]
sleep 30
done
retries=0
until [ $retries -ge 5 ]; do
ceph-deploy pkg --install librados-dev $NODE && break
retries=$[$retries+1]
sleep 30
done
echo $(ip route get 1 | awk '{print $NF;exit}') $(hostname) >> /etc/hosts
ssh-keygen -t rsa -f ~/.ssh/id_rsa -q -N ""
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh-keyscan $NODE >> ~/.ssh/known_hosts
ceph-deploy new $NODE
cat >> ceph.conf <<EOF
osd objectstore = memstore
memstore device bytes = 2147483648
osd data = $CEPHDIR
osd journal = $CEPHDIR/journal
osd crush chooseleaf type = 0
osd pool default size = 1
osd pool default min size = 1
osd scrub load threshold = 1000
debug_lockdep = 0/0
debug_context = 0/0
debug_crush = 0/0
debug_buffer = 0/0
debug_timer = 0/0
debug_filer = 0/0
debug_objecter = 0/0
debug_rados = 0/0
debug_rbd = 0/0
debug_journaler = 0/0
debug_objectcatcher = 0/0
debug_client = 0/0
debug_osd = 0/0
debug_optracker = 0/0
debug_objclass = 0/0
debug_filestore = 0/0
debug_journal = 0/0
debug_ms = 0/0
debug_monc = 0/0
debug_tp = 0/0
debug_auth = 0/0
debug_finisher = 0/0
debug_heartbeatmap = 0/0
debug_perfcounter = 0/0
debug_asok = 0/0
debug_throttle = 0/0
debug_mon = 0/0
debug_paxos = 0/0
debug_rgw = 0/0
osd_op_num_threads_per_shard = 1 //You may want to try with 1 as well
osd_op_num_shards = 5 //Depends on your cpu util
ms_nocrc = true
cephx_sign_messages = false
cephx_require_signatures = false
ms_dispatch_throttle_bytes = 0
throttler_perf_counter = false
[osd]
osd_client_message_size_cap = 0
osd_client_message_cap = 0
osd_enable_op_tracker = false
EOF
sed -i -r 's/mon_host =.*/mon_host = 127.0.0.1/' ceph.conf
sed -i -r 's/auth_cluster_required =.*/auth_cluster_required = none/' ceph.conf
sed -i -r 's/auth_service_required =.*/auth_service_required = none/' ceph.conf
sed -i -r 's/auth_client_required =.*/auth_client_required = none/' ceph.conf
# Setup monitor and keyrings
ceph-deploy mon create-initial $NODE
ceph-deploy admin $NODE
sudo chmod a+r /etc/ceph/ceph.client.admin.keyring
# Setup OSD
mkdir -p $CEPHDIR
OSD=$(ceph osd create)
ceph osd crush add osd.${OSD} 1 root=default host=$NODE
ceph-osd --id ${OSD} --mkjournal --mkfs
ceph-osd --id ${OSD}
# Status
ceph status
ceph health detail
ceph osd tree
popd

View File

@ -121,3 +121,15 @@ $ make
If that is successful, standard `go` commands, such as `go test` should work,
per package, without issue.
### Optional build tags
Optional [build tags](http://golang.org/pkg/go/build/) can be provided using
the environment variable `DOCKER_BUILDTAGS`.
To enable the [Ceph RADOS storage driver](storage-drivers/rados.md)
(librados-dev and librbd-dev will be required to build the bindings):
```sh
export DOCKER_BUILDTAGS='include_rados'
```

View File

@ -43,6 +43,10 @@ storage:
v4auth: true
chunksize: 5242880
rootdirectory: /s3/object/name/prefix
rados:
poolname: radospool
username: radosuser
chunksize: 4194304
cache:
blobdescriptor: redis
maintenance:
@ -261,6 +265,10 @@ storage:
v4auth: true
chunksize: 5242880
rootdirectory: /s3/object/name/prefix
rados:
poolname: radospool
username: radosuser
chunksize: 4194304
cache:
blobdescriptor: inmemory
maintenance:
@ -348,6 +356,51 @@ This storage backend uses Microsoft's Azure Storage platform.
</table>
### rados
This storage backend uses [Ceph Object Storage](http://ceph.com/docs/master/rados/).
<table>
<tr>
<th>Parameter</th>
<th>Required</th>
<th>Description</th>
</tr>
<tr>
<td>
<code>poolname</code>
</td>
<td>
yes
</td>
<td>
Ceph pool name.
</td>
</tr>
<tr>
<td>
<code>username</code>
</td>
<td>
no
</td>
<td>
Ceph cluster user to connect as (i.e. admin, not client.admin).
</td>
</tr>
<tr>
<td>
<code>chunksize</code>
</td>
<td>
no
</td>
<td>
Size of the written RADOS objects. Default value is 4MB (4194304).
</td>
</tr>
</table>
### S3

View File

@ -10,4 +10,5 @@
- ['registry/storage-drivers/azure.md', '**HIDDEN**' ]
- ['registry/storage-drivers/filesystem.md', '**HIDDEN**' ]
- ['registry/storage-drivers/inmemory.md', '**HIDDEN**' ]
- ['registry/storage-drivers/rados.md', '**HIDDEN**' ]
- ['registry/storage-drivers/s3.md','**HIDDEN**' ]

View File

@ -0,0 +1,37 @@
<!--GITHUB
page_title: Ceph RADOS storage driver
page_description: Explains how to use the Ceph RADOS storage driver
page_keywords: registry, service, driver, images, storage, ceph, rados
IGNORES-->
# Ceph RADOS storage driver
An implementation of the `storagedriver.StorageDriver` interface which uses
[Ceph RADOS Object Storage][rados] for storage backend.
## Parameters
The following parameters must be used to configure the storage driver
(case-sensitive):
* `poolname`: Name of the Ceph pool
* `username` *optional*: The user to connect as (i.e. admin, not client.admin)
* `chunksize` *optional*: Size of the written RADOS objects. Default value is
4MB (4194304).
This drivers loads the [Ceph client configuration][rados-config] from the
following regular paths (the first found is used):
* `$CEPH_CONF` (environment variable)
* `/etc/ceph/ceph.conf`
* `~/.ceph/config`
* `ceph.conf` (in the current working directory)
## Developing
To include this driver when building Docker Distribution, use the build tag
`include_rados`. Please see the [building documentation][building] for details.
[rados]: http://ceph.com/docs/master/rados/
[rados-config]: http://ceph.com/docs/master/rados/configuration/ceph-conf/
[building]: https://github.com/docker/distribution/blob/master/docs/building.md#optional-build-tags

View File

@ -16,6 +16,7 @@ This storage driver package comes bundled with several drivers:
- [filesystem](storage-drivers/filesystem.md): A local storage driver configured to use a directory tree in the local filesystem.
- [s3](storage-drivers/s3.md): A driver storing objects in an Amazon Simple Storage Solution (S3) bucket.
- [azure](storage-drivers/azure.md): A driver storing objects in [Microsoft Azure Blob Storage](http://azure.microsoft.com/en-us/services/storage/).
- [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool.
## Storage Driver API

View File

@ -0,0 +1,628 @@
package rados
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"path"
"strconv"
"code.google.com/p/go-uuid/uuid"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/noahdesu/go-ceph/rados"
)
const driverName = "rados"
// Prefix all the stored blob
const objectBlobPrefix = "blob:"
// Stripes objects size to 4M
const defaultChunkSize = 4 << 20
const defaultXattrTotalSizeName = "total-size"
// Max number of keys fetched from omap at each read operation
const defaultKeysFetched = 1
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
poolname string
username string
chunksize uint64
}
func init() {
factory.Register(driverName, &radosDriverFactory{})
}
// radosDriverFactory implements the factory.StorageDriverFactory interface
type radosDriverFactory struct{}
func (factory *radosDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
Conn *rados.Conn
Ioctx *rados.IOContext
chunksize uint64
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Ceph RADOS
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - poolname: the ceph pool name
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
pool, ok := parameters["poolname"]
if !ok {
return nil, fmt.Errorf("No poolname parameter provided")
}
username, ok := parameters["username"]
if !ok {
username = ""
}
chunksize := uint64(defaultChunkSize)
chunksizeParam, ok := parameters["chunksize"]
if ok {
chunksize, ok = chunksizeParam.(uint64)
if !ok {
return nil, fmt.Errorf("The chunksize parameter should be a number")
}
}
params := DriverParameters{
fmt.Sprint(pool),
fmt.Sprint(username),
chunksize,
}
return New(params)
}
// New constructs a new Driver
func New(params DriverParameters) (*Driver, error) {
var conn *rados.Conn
var err error
if params.username != "" {
log.Infof("Opening connection to pool %s using user %s", params.poolname, params.username)
conn, err = rados.NewConnWithUser(params.username)
} else {
log.Infof("Opening connection to pool %s", params.poolname)
conn, err = rados.NewConn()
}
if err != nil {
return nil, err
}
err = conn.ReadDefaultConfigFile()
if err != nil {
return nil, err
}
err = conn.Connect()
if err != nil {
return nil, err
}
log.Infof("Connected")
ioctx, err := conn.OpenIOContext(params.poolname)
log.Infof("Connected to pool %s", params.poolname)
if err != nil {
return nil, err
}
d := &driver{
Ioctx: ioctx,
Conn: conn,
chunksize: params.chunksize,
}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: d,
},
},
}, nil
}
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
rc, err := d.ReadStream(ctx, path, 0)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := ioutil.ReadAll(rc)
if err != nil {
return nil, err
}
return p, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
if _, err := d.WriteStream(ctx, path, 0, bytes.NewReader(contents)); err != nil {
return err
}
return nil
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
type readStreamReader struct {
driver *driver
oid string
size uint64
offset uint64
}
func (r *readStreamReader) Read(b []byte) (n int, err error) {
// Determine the part available to read
bufferOffset := uint64(0)
bufferSize := uint64(len(b))
// End of the object, read less than the buffer size
if bufferSize > r.size-r.offset {
bufferSize = r.size - r.offset
}
// Fill `b`
for bufferOffset < bufferSize {
// Get the offset in the object chunk
chunkedOid, chunkedOffset := r.driver.getChunkNameFromOffset(r.oid, r.offset)
// Determine the best size to read
bufferEndOffset := bufferSize
if bufferEndOffset-bufferOffset > r.driver.chunksize-chunkedOffset {
bufferEndOffset = bufferOffset + (r.driver.chunksize - chunkedOffset)
}
// Read the chunk
n, err = r.driver.Ioctx.Read(chunkedOid, b[bufferOffset:bufferEndOffset], chunkedOffset)
if err != nil {
return int(bufferOffset), err
}
bufferOffset += uint64(n)
r.offset += uint64(n)
}
// EOF if the offset is at the end of the object
if r.offset == r.size {
return int(bufferOffset), io.EOF
}
return int(bufferOffset), nil
}
func (r *readStreamReader) Close() error {
return nil
}
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
// get oid from filename
oid, err := d.getOid(path)
if err != nil {
return nil, err
}
// get object stat
stat, err := d.Stat(ctx, path)
if err != nil {
return nil, err
}
if offset > stat.Size() {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
return &readStreamReader{
driver: d,
oid: oid,
size: uint64(stat.Size()),
offset: uint64(offset),
}, nil
}
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
buf := make([]byte, d.chunksize)
totalRead = 0
oid, err := d.getOid(path)
if err != nil {
switch err.(type) {
// Trying to write new object, generate new blob identifier for it
case storagedriver.PathNotFoundError:
oid = d.generateOid()
err = d.putOid(path, oid)
if err != nil {
return 0, err
}
default:
return 0, err
}
} else {
// Check total object size only for existing ones
totalSize, err := d.getXattrTotalSize(ctx, oid)
if err != nil {
return 0, err
}
// If offset if after the current object size, fill the gap with zeros
for totalSize < uint64(offset) {
sizeToWrite := d.chunksize
if totalSize-uint64(offset) < sizeToWrite {
sizeToWrite = totalSize - uint64(offset)
}
chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(totalSize))
err = d.Ioctx.Write(chunkName, buf[:sizeToWrite], uint64(chunkOffset))
if err != nil {
return totalRead, err
}
totalSize += sizeToWrite
}
}
// Writer
for {
// Align to chunk size
sizeRead := uint64(0)
sizeToRead := uint64(offset+totalRead) % d.chunksize
if sizeToRead == 0 {
sizeToRead = d.chunksize
}
// Read from `reader`
for sizeRead < sizeToRead {
nn, err := reader.Read(buf[sizeRead:sizeToRead])
sizeRead += uint64(nn)
if err != nil {
if err != io.EOF {
return totalRead, err
}
break
}
}
// End of file and nothing was read
if sizeRead == 0 {
break
}
// Write chunk object
chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(offset+totalRead))
err = d.Ioctx.Write(chunkName, buf[:sizeRead], uint64(chunkOffset))
if err != nil {
return totalRead, err
}
// Update total object size as xattr in the first chunk of the object
err = d.setXattrTotalSize(oid, uint64(offset+totalRead)+sizeRead)
if err != nil {
return totalRead, err
}
totalRead += int64(sizeRead)
// End of file
if sizeRead < sizeToRead {
break
}
}
return totalRead, nil
}
// Stat retrieves the FileInfo for the given path, including the current size
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
// get oid from filename
oid, err := d.getOid(path)
if err != nil {
return nil, err
}
// the path is a virtual directory?
if oid == "" {
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: path,
Size: 0,
IsDir: true,
},
}, nil
}
// stat first chunk
stat, err := d.Ioctx.Stat(oid + "-0")
if err != nil {
return nil, err
}
// get total size of chunked object
totalSize, err := d.getXattrTotalSize(ctx, oid)
if err != nil {
return nil, err
}
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: path,
Size: int64(totalSize),
ModTime: stat.ModTime,
},
}, nil
}
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, dirPath string) ([]string, error) {
files, err := d.listDirectoryOid(dirPath)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(files))
for k := range files {
keys = append(keys, path.Join(dirPath, k))
}
return keys, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
// Get oid
oid, err := d.getOid(sourcePath)
if err != nil {
return err
}
// Move reference
err = d.putOid(destPath, oid)
if err != nil {
return err
}
// Delete old reference
err = d.deleteOid(sourcePath)
if err != nil {
return err
}
return nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, objectPath string) error {
// Get oid
oid, err := d.getOid(objectPath)
if err != nil {
return err
}
// Deleting virtual directory
if oid == "" {
objects, err := d.listDirectoryOid(objectPath)
if err != nil {
return err
}
for object := range objects {
err = d.Delete(ctx, path.Join(objectPath, object))
if err != nil {
return err
}
}
} else {
// Delete object chunks
totalSize, err := d.getXattrTotalSize(ctx, oid)
if err != nil {
return err
}
for offset := uint64(0); offset < totalSize; offset += d.chunksize {
chunkName, _ := d.getChunkNameFromOffset(oid, offset)
err = d.Ioctx.Delete(chunkName)
if err != nil {
return err
}
}
// Delete reference
err = d.deleteOid(objectPath)
if err != nil {
return err
}
}
return nil
}
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
return "", storagedriver.ErrUnsupportedMethod
}
// Generate a blob identifier
func (d *driver) generateOid() string {
return objectBlobPrefix + uuid.New()
}
// Reference a object and its hierarchy
func (d *driver) putOid(objectPath string, oid string) error {
directory := path.Dir(objectPath)
base := path.Base(objectPath)
createParentReference := true
// After creating this reference, skip the parents referencing since the
// hierarchy already exists
if oid == "" {
firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1)
if (err == nil) && (len(firstReference) > 0) {
createParentReference = false
}
}
oids := map[string][]byte{
base: []byte(oid),
}
// Reference object
err := d.Ioctx.SetOmap(directory, oids)
if err != nil {
return err
}
// Esure parent virtual directories
if createParentReference && directory != "/" {
return d.putOid(directory, "")
}
return nil
}
// Get the object identifier from an object name
func (d *driver) getOid(objectPath string) (string, error) {
directory := path.Dir(objectPath)
base := path.Base(objectPath)
files, err := d.Ioctx.GetOmapValues(directory, "", base, 1)
if (err != nil) || (files[base] == nil) {
return "", storagedriver.PathNotFoundError{Path: objectPath}
}
return string(files[base]), nil
}
// List the objects of a virtual directory
func (d *driver) listDirectoryOid(path string) (list map[string][]byte, err error) {
return d.Ioctx.GetAllOmapValues(path, "", "", defaultKeysFetched)
}
// Remove a file from the files hierarchy
func (d *driver) deleteOid(objectPath string) error {
// Remove object reference
directory := path.Dir(objectPath)
base := path.Base(objectPath)
err := d.Ioctx.RmOmapKeys(directory, []string{base})
if err != nil {
return err
}
// Remove virtual directory if empty (no more references)
firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1)
if err != nil {
return err
}
if len(firstReference) == 0 {
// Delete omap
err := d.Ioctx.Delete(directory)
if err != nil {
return err
}
// Remove reference on parent omaps
if directory != "/" {
return d.deleteOid(directory)
}
}
return nil
}
// Takes an offset in an chunked object and return the chunk name and a new
// offset in this chunk object
func (d *driver) getChunkNameFromOffset(oid string, offset uint64) (string, uint64) {
chunkID := offset / d.chunksize
chunkedOid := oid + "-" + strconv.FormatInt(int64(chunkID), 10)
chunkedOffset := offset % d.chunksize
return chunkedOid, chunkedOffset
}
// Set the total size of a chunked object `oid`
func (d *driver) setXattrTotalSize(oid string, size uint64) error {
// Convert uint64 `size` to []byte
xattr := make([]byte, binary.MaxVarintLen64)
binary.LittleEndian.PutUint64(xattr, size)
// Save the total size as a xattr in the first chunk
return d.Ioctx.SetXattr(oid+"-0", defaultXattrTotalSizeName, xattr)
}
// Get the total size of the chunked object `oid` stored as xattr
func (d *driver) getXattrTotalSize(ctx context.Context, oid string) (uint64, error) {
// Fetch xattr as []byte
xattr := make([]byte, binary.MaxVarintLen64)
xattrLength, err := d.Ioctx.GetXattr(oid+"-0", defaultXattrTotalSizeName, xattr)
if err != nil {
return 0, err
}
if xattrLength != len(xattr) {
context.GetLogger(ctx).Errorf("object %s xattr length mismatch: %d != %d", oid, xattrLength, len(xattr))
return 0, storagedriver.PathNotFoundError{Path: oid}
}
// Convert []byte as uint64
totalSize := binary.LittleEndian.Uint64(xattr)
return totalSize, nil
}

View File

@ -0,0 +1,38 @@
package rados
import (
"os"
"testing"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites"
"gopkg.in/check.v1"
)
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) }
func init() {
poolname := os.Getenv("RADOS_POOL")
username := os.Getenv("RADOS_USER")
driverConstructor := func() (storagedriver.StorageDriver, error) {
parameters := DriverParameters{
poolname,
username,
defaultChunkSize,
}
return New(parameters)
}
skipCheck := func() string {
if poolname == "" {
return "RADOS_POOL must be set to run Rado tests"
}
return ""
}
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
}